From b5110ec69a8621962d4436fa6c6cfe593fe901af Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 11 May 2025 07:46:57 +0300 Subject: [PATCH] rewritten crap --- limiter/redis_client.py | 11 +++ marketplaces/base.py | 11 +-- marketplaces/factory.py | 1 + marketplaces/ozon.py | 51 ++------------ marketplaces/wildberries.py | 62 +---------------- marketplaces/yandexmarket.py | 46 ++----------- queries/general.py | 10 ++- requirements.txt | 6 +- sender/__init__.py | 0 sender/base.py | 114 +++++++++++++++++++++++++++++++ sender/factory.py | 26 +++++++ sender/ozon.py | 43 ++++++++++++ sender/wildberries.py | 54 +++++++++++++++ sender/yandexmarket.py | 51 ++++++++++++++ start_taskiq.sh | 2 +- updaters/base.py | 111 +++++++++++++++++++++++------- updaters/factory.py | 1 + updaters/ozon_updater.py | 19 ++++-- updaters/wildberries_updater.py | 27 +++++++- updaters/yandexmarket_updater.py | 22 ++++-- 20 files changed, 475 insertions(+), 193 deletions(-) create mode 100644 limiter/redis_client.py create mode 100644 sender/__init__.py create mode 100644 sender/base.py create mode 100644 sender/factory.py create mode 100644 sender/ozon.py create mode 100644 sender/wildberries.py create mode 100644 sender/yandexmarket.py diff --git a/limiter/redis_client.py b/limiter/redis_client.py new file mode 100644 index 0000000..497247d --- /dev/null +++ b/limiter/redis_client.py @@ -0,0 +1,11 @@ +import redis.asyncio as redis + +import backend.config + +pool = redis.ConnectionPool.from_url(backend.config.REDIS_URL) + + +def get_client() -> redis.Redis: + global pool + client = redis.Redis.from_pool(pool) + return client diff --git a/marketplaces/base.py b/marketplaces/base.py index 5726d73..14c798b 100644 --- a/marketplaces/base.py +++ b/marketplaces/base.py @@ -8,13 +8,14 @@ from database import Marketplace class BaseMarketplaceApi(ABC): session: ClientSession + is_valid: bool @abstractmethod def __init__(self, marketplace: Marketplace): pass @abstractmethod - async def update_stocks(self, data: Union[list, dict]): + async def update_stocks(self, data: Union[list, dict]) -> ClientResponse: pass @abstractmethod @@ -26,15 +27,15 @@ class BaseMarketplaceApi(ABC): def api_url(self): pass - def init_session(self): - self.session = ClientSession() - async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'], method: str, data: dict) -> ClientResponse: - return await self.session.request( + self.session = ClientSession() + response = await self.session.request( http_method, f'{self.api_url}{method}', json=data, headers=self.get_headers() ) + await self.session.close() + return response diff --git a/marketplaces/factory.py b/marketplaces/factory.py index b34a6ff..7f50cd6 100644 --- a/marketplaces/factory.py +++ b/marketplaces/factory.py @@ -21,3 +21,4 @@ class MarketplaceApiFactory: return WildberriesMarketplaceApi(marketplace) case BaseMarketplace.YANDEX_MARKET: return YandexmarketMarketplaceApi(marketplace) + raise ValueError() diff --git a/marketplaces/ozon.py b/marketplaces/ozon.py index 08ca46a..f94711e 100644 --- a/marketplaces/ozon.py +++ b/marketplaces/ozon.py @@ -1,11 +1,10 @@ -import asyncio import json import logging from typing import Union -import utils +from aiohttp import ClientResponse + from database import Marketplace -from limiter import BatchLimiter from marketplaces.base import BaseMarketplaceApi @@ -35,47 +34,5 @@ class OzonMarketplaceApi(BaseMarketplaceApi): def api_url(self): return 'https://api-seller.ozon.ru' - async def update_stocks(self, data: Union[list, dict]): - if type(data) is not list: - return - if not self.is_valid: - return - max_stocks = 100 - chunks = utils.chunk_list(data, max_stocks) - if not chunks: - return - - self.init_session() - limiter = BatchLimiter() - max_retries = 10 - while chunks: - current_retry = 0 - chunk = chunks.pop() - while current_retry <= max_retries: - try: - await limiter.acquire_ozon(self.limiter_key) - request_data = {'stocks': chunk} - response = await self._method('POST', '/v2/products/stocks', data=request_data) - current_retry += 1 - response = await response.json() - error_message = response.get('message') - error_code = response.get('code') - if error_message: - if error_code == 8: - logging.warning(f'Ozon rate limit exceeded for marketplace [{self.marketplace.id}]') - await asyncio.sleep(1) - continue - else: - logging.warning( - f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})') - break - else: - break - - except Exception as e: - logging.error( - f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') - break - - - await self.session.close() + async def update_stocks(self, data: Union[list, dict]) -> ClientResponse: + return await self._method('POST', '/v2/products/stocks', data={'stocks': data}) diff --git a/marketplaces/wildberries.py b/marketplaces/wildberries.py index 922349c..b91b01c 100644 --- a/marketplaces/wildberries.py +++ b/marketplaces/wildberries.py @@ -1,17 +1,15 @@ -import asyncio import json import logging from typing import Union import jwt -import utils from database import Marketplace -from limiter import BatchLimiter from marketplaces.base import BaseMarketplaceApi class WildberriesMarketplaceApi(BaseMarketplaceApi): + def __init__(self, marketplace: Marketplace): self.marketplace = marketplace auth_data = json.loads(marketplace.auth_data) @@ -38,61 +36,7 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi): def api_url(self): return 'https://marketplace-api.wildberries.ru' - def _filter_chunk_with_conflict(self, chunk: dict, response: list): - if not isinstance(response, list): - return chunk - filter_skus = [] - for error in response: - for sku in error.get('data', []): - filter_skus.append(sku['sku']) - return list(filter(lambda x: x['sku'] not in filter_skus, chunk)) - async def update_stocks(self, data: Union[list, dict]): - if type(data) is not list: - return - if not self.is_valid: - logging.warning(f'Skipping marketplace [{self.marketplace.id}] because of invalid token') - return - max_stocks = 1000 - chunks = list(utils.chunk_list(data, max_stocks)) - if not chunks: - return - self.init_session() - limiter = BatchLimiter() - max_retries = 10 - while chunks: - current_retry = 0 - chunk = chunks.pop() - while current_retry <= max_retries: - try: - await limiter.acquire_wildberries(self.limiter_key) - request_data = {'stocks': chunk} - response = await self._method('PUT', f'/api/v3/stocks/{self.marketplace.warehouse_id}', - data=request_data) - current_retry += 1 - if (response.status not in [204, 409, 429]): - response = await response.json() - error_message = response.get('message') - error_code = response.get('code') - logging.warning( - f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})') - break - if response.status == 429: - logging.warning(f'WB rate limit exceeded for marketplace [{self.marketplace.id}]') - await asyncio.sleep(1) - continue - if response.status == 409: - response_data = await response.json() + warehouse_id = self.marketplace.warehouse_id + return await self._method('PUT', f'/api/v3/stocks/{warehouse_id}', data={'stocks': data}) - logging.warning( - f'Conflict occurred when sending stocks to [{self.marketplace.id}]') - await asyncio.sleep(1) - chunk = self._filter_chunk_with_conflict(chunk, response_data) - continue - await asyncio.sleep(0.2) - break - except Exception as e: - logging.error( - f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') - break - await self.session.close() diff --git a/marketplaces/yandexmarket.py b/marketplaces/yandexmarket.py index 83ff969..0fd0a8b 100644 --- a/marketplaces/yandexmarket.py +++ b/marketplaces/yandexmarket.py @@ -28,6 +28,9 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi): } else: access_token = auth_data.get('accessToken') + if not access_token: + self.is_valid = False + return self.limiter_key = str(marketplace.company_id) + str(access_token) + str(self.marketplace.campaign_id) self.headers = { 'Authorization': f'OAuth oauth_token="{access_token}", oauth_client_id="{YANDEX_CLIENT_ID}"' @@ -41,44 +44,7 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi): return 'https://api.partner.market.yandex.ru/v2' async def update_stocks(self, data: Union[list, dict]): - if type(data) is not list: - return - if not self.is_valid: - return campaign_id = self.marketplace.campaign_id - max_stocks = 2000 - chunks = chunk_list(data, max_stocks) - if not chunks: - return - self.init_session() - limiter = BatchLimiter() - - async def send_stock_chunk(chunk): - try: - await limiter.acquire_yandexmarket(self.limiter_key) - request_data = { - 'skus': chunk - } - response = await self._method('PUT', - f'/campaigns/{campaign_id}/offers/stocks', - data=request_data) - if response.status != 200: - logging.warning( - f'Error occurred when sending stocks to [{self.marketplace.id}]') - return False - return True - except Exception as e: - logging.error( - f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') - return False - - tasks = [send_stock_chunk(chunk) for chunk in chunks] - first_request = tasks[0] - first_response = await first_request - if not first_response: - logging.error(f'Skipping marketplace [{self.marketplace.id}] because first request was unsuccessful') - await self.session.close() - return - - await asyncio.gather(*tasks[1:]) - await self.session.close() + return await self._method('PUT', + f'/campaigns/{campaign_id}/offers/stocks', + data={'skus': data}) diff --git a/queries/general.py b/queries/general.py index cde7134..bd4c3fc 100644 --- a/queries/general.py +++ b/queries/general.py @@ -13,6 +13,7 @@ class StockData(TypedDict): full_stock: int article: Union[str, int] marketplace_product: MarketplaceProduct + product_id: int def get_marketplace_suppliers_and_company_warehouses(marketplace: Marketplace): @@ -267,7 +268,7 @@ async def get_stocks_data( func.coalesce(slaves_stock_subquery.c.slaves_stock, 0).label('slaves_stock'), MarketplaceProduct.price_recommended.label('price_recommended'), MarketplaceProduct.is_archived.label('is_archived'), - func.coalesce(fbo_stock_subquery.c.quantity,0).label('fbo_stock') + func.coalesce(fbo_stock_subquery.c.quantity, 0).label('fbo_stock') ) .select_from( MarketplaceProduct @@ -337,13 +338,16 @@ async def get_stocks_data( 'article': denco_article, 'full_stock': 0, 'marketplace_product': marketplace_product, + 'product_id': marketplace_product.product_id }) continue - if fbo_stock> 0 and prefer_fbo_over_fbs: + if fbo_stock > 0 and prefer_fbo_over_fbs: response.append({ 'article': denco_article, 'full_stock': 0, 'marketplace_product': marketplace_product, + 'product_id': marketplace_product.product_id + }) continue is_mix = mix_stock is not None @@ -375,5 +379,7 @@ async def get_stocks_data( 'article': denco_article, 'full_stock': full_stock, 'marketplace_product': marketplace_product, + 'product_id': marketplace_product.product_id + }) return response diff --git a/requirements.txt b/requirements.txt index f53cee4..3f6981c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,5 +17,7 @@ redis[hiredis] celery[librabbitmq] gevent -taskiq-aio-pika -taskiq-fastapi \ No newline at end of file +taskiq-aio-pika==0.4.2 +taskiq-fastapi==0.3.5 +taskiq==0.11.17 +taskiq-pipelines \ No newline at end of file diff --git a/sender/__init__.py b/sender/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sender/base.py b/sender/base.py new file mode 100644 index 0000000..6193eb2 --- /dev/null +++ b/sender/base.py @@ -0,0 +1,114 @@ +import logging +from abc import ABC, abstractmethod +from enum import IntEnum +from typing import TYPE_CHECKING, TypedDict + +from utils import chunk_list + +if TYPE_CHECKING: + from marketplaces.base import BaseMarketplaceApi + from updaters.base import BaseMarketplaceUpdater + + +class SendStockStatus(IntEnum): + SUCCESS = 0 + SHOULD_RETRY = 1 + ERROR = 2 + + +class StockRequest(TypedDict): + request_data: dict + product_id: int + full_stock: int + + +class BaseStocksSender(ABC): + updater: "BaseMarketplaceUpdater" + api: "BaseMarketplaceApi" + + def __init__(self, updater: "BaseMarketplaceUpdater"): + self.updater = updater + self.api = updater.marketplace_api + + @property + @abstractmethod + def max_retries(self) -> int: + raise NotImplementedError() + + @property + @abstractmethod + def chunk_size(self) -> int: + raise NotImplementedError() + + def get_lock(self): + lock = self.updater.redis_client.lock(self.updater.get_lock_key(), timeout=60 * 10, blocking=True) + return lock + + @abstractmethod + async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: + raise NotImplementedError() + + async def __process_chunk_with_retries(self, chunk: list[dict]) -> SendStockStatus: + for attempt in range(self.max_retries + 1): + status = await self._process_chunk(chunk) + if status != SendStockStatus.SHOULD_RETRY: + return status + return SendStockStatus.SHOULD_RETRY + + async def process_chunk(self, chunk: list[dict]) -> SendStockStatus: + try: + return await self.__process_chunk_with_retries(chunk) + except Exception as e: + logging.error(f'[{self.updater.marketplace.id}] {e}') + return SendStockStatus.ERROR + + @abstractmethod + async def after_chunk_processed(self): + return + + @abstractmethod + async def before_chunk_processed(self): + return + + async def __send(self, chunks: list[list[StockRequest]]) -> list[int]: + invalid_product_ids = [] + is_global_error = False + try: + for chunk in chunks: + request = [] + product_ids = [] + for stock in chunk: + request.append(stock['request_data']) + product_ids.append(stock['product_id']) + + await self.before_chunk_processed() + try: + status = await self.process_chunk(request) + if status == SendStockStatus.ERROR: + is_global_error = True + break + if status == SendStockStatus.SHOULD_RETRY: + invalid_product_ids.extend(product_ids) + except Exception as e: + invalid_product_ids.extend(product_ids) + logging.error(f'[{self.updater.marketplace.id}]: {e}') + continue + await self.after_chunk_processed() + except Exception as e: + logging.error(f'{self.updater.marketplace.id}: {e}') + + if is_global_error: + invalid_product_ids = [] + for chunk in chunks: + for stock in chunk: + invalid_product_ids.append(stock['product_id']) + return invalid_product_ids + + async def send(self, stocks: list[StockRequest]) -> list[int]: + if not stocks: + return [] + chunks = chunk_list(stocks, self.chunk_size) + lock = self.get_lock() + async with lock: + result = await self.__send(chunks) + return result diff --git a/sender/factory.py b/sender/factory.py new file mode 100644 index 0000000..bbdc800 --- /dev/null +++ b/sender/factory.py @@ -0,0 +1,26 @@ +from typing import TYPE_CHECKING, Union + +from database.sipro.enums.general import BaseMarketplace +from sender.ozon import OzonStocksSender +from sender.wildberries import WildberriesStocksSender +from sender.yandexmarket import YandexmarketStocksSender + +if TYPE_CHECKING: + from updaters.base import BaseMarketplaceUpdater + + +class SenderFactory: + @staticmethod + def get_sender(updater: "BaseMarketplaceUpdater") -> Union[ + WildberriesStocksSender, + OzonStocksSender, + YandexmarketStocksSender + ]: + match updater.marketplace.base_marketplace: + case BaseMarketplace.WILDBERRIES: + return WildberriesStocksSender(updater) + case BaseMarketplace.OZON: + return OzonStocksSender(updater) + case BaseMarketplace.YANDEX_MARKET: + return YandexmarketStocksSender(updater) + raise ValueError() diff --git a/sender/ozon.py b/sender/ozon.py new file mode 100644 index 0000000..5767a18 --- /dev/null +++ b/sender/ozon.py @@ -0,0 +1,43 @@ +import asyncio +import logging + +from starlette.responses import JSONResponse + +from sender.base import BaseStocksSender, SendStockStatus + + +class OzonStocksSender(BaseStocksSender): + @property + def max_retries(self) -> int: + return 5 + + @property + def chunk_size(self) -> int: + return 100 + + async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: + response = await self.api.update_stocks(data=chunk) + status_code = response.status + if status_code == 200: + return SendStockStatus.SUCCESS + if response.content_type != JSONResponse.media_type: + return SendStockStatus.ERROR + json_data = await response.json() + error_code = json_data.get('code') + error_message = json_data.get('message') + if error_code == 8: + await asyncio.sleep(1) + return SendStockStatus.SHOULD_RETRY + logging.error(f'[{self.updater.marketplace.id}]: {error_message}') + if status_code in [ + 404, + 500, + ]: + return SendStockStatus.SHOULD_RETRY + return SendStockStatus.ERROR + + async def after_chunk_processed(self): + return await asyncio.sleep(80 / 100) + + async def before_chunk_processed(self): + pass diff --git a/sender/wildberries.py b/sender/wildberries.py new file mode 100644 index 0000000..fa9906f --- /dev/null +++ b/sender/wildberries.py @@ -0,0 +1,54 @@ +import asyncio +from typing import TYPE_CHECKING + +from marketplaces import WildberriesMarketplaceApi +from sender.base import SendStockStatus, BaseStocksSender + +if TYPE_CHECKING: + from updaters.base import BaseMarketplaceUpdater + + +class WildberriesStocksSender(BaseStocksSender): + def __init__(self, updater: "BaseMarketplaceUpdater"): + super().__init__(updater) + self.remaining = 1 + self.sleep_time = 60 / 300 + + @property + def max_retries(self) -> int: + return 5 + + async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: + response = await self.api.update_stocks(chunk) + headers = response.headers + status_code = response.status + + if status_code in [ + 401, # Invalid token + 403, # Access denied + 404, # Not found + 400, # Other + ]: + return SendStockStatus.ERROR + + # If there is rate limit + if status_code == 429: + delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time)) + await asyncio.sleep(delay_time) + self.remaining = int(headers.get('X-Ratelimit-Limit', 1)) + return SendStockStatus.SHOULD_RETRY + self.remaining = int(headers.get('X-Ratelimit-Remaining', 0)) + return SendStockStatus.SUCCESS + + async def after_chunk_processed(self): + if self.remaining <= 0: + await asyncio.sleep(self.sleep_time) + self.remaining = 1 + + async def before_chunk_processed(self): + if self.remaining <= 0: + await asyncio.sleep(self.sleep_time) + self.remaining = 1 + @property + def chunk_size(self) -> int: + return 1000 diff --git a/sender/yandexmarket.py b/sender/yandexmarket.py new file mode 100644 index 0000000..04d991e --- /dev/null +++ b/sender/yandexmarket.py @@ -0,0 +1,51 @@ +import asyncio +import logging +import time +from typing import TYPE_CHECKING + +from sender.base import BaseStocksSender, SendStockStatus + +if TYPE_CHECKING: + from updaters.base import BaseMarketplaceUpdater + + +class YandexmarketStocksSender(BaseStocksSender): + def __init__(self, updater: "BaseMarketplaceUpdater"): + super().__init__(updater) + + self.start_time = time.time() + + self.total_stocks_sent = 0 + + @property + def max_retries(self) -> int: + return 5 + + @property + def chunk_size(self) -> int: + return 2000 + + async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: + response = await self.api.update_stocks(chunk) + status_code = response.status + if status_code == 200: + self.total_stocks_sent += len(chunk) + return SendStockStatus.SUCCESS + if status_code == 420: + time_to_sleep = 60 - (time.time() - self.start_time) + await asyncio.sleep(time_to_sleep) + return SendStockStatus.SHOULD_RETRY + response_text = await response.text() + logging.error(f'[{self.updater.marketplace.id}]: {response_text}') + return SendStockStatus.ERROR + + async def after_chunk_processed(self): + time_delta = time.time() - self.start_time + if self.total_stocks_sent >= 100_000 and time_delta < 60: + time_to_sleep = 60 - time_delta + await asyncio.sleep(time_to_sleep) + self.start_time = time.time() + self.total_stocks_sent = 0 + + async def before_chunk_processed(self): + pass diff --git a/start_taskiq.sh b/start_taskiq.sh index 53b06e9..9499076 100755 --- a/start_taskiq.sh +++ b/start_taskiq.sh @@ -54,7 +54,7 @@ fi # Start the Taskiq worker log_info "Starting Taskiq worker..." -taskiq worker background:taskiq_broker background.tasks --max-async-task 5000 --max-threadpool-threads 8 --max-prefetch 10000 +taskiq worker background:taskiq_broker background.tasks --max-async-task 5000 --max-threadpool-threads 8 --max-prefetch 10000 --workers 1 # Log when the Taskiq worker stops log_info "Taskiq worker stopped" \ No newline at end of file diff --git a/updaters/base.py b/updaters/base.py index 967465e..4db7015 100644 --- a/updaters/base.py +++ b/updaters/base.py @@ -1,65 +1,131 @@ -import time +import json from abc import ABC, abstractmethod -from typing import List +from typing import List, Optional +import redis.asyncio from sqlalchemy.ext.asyncio import AsyncSession import queries.general from database import Marketplace +from limiter import redis_client from marketplaces import MarketplaceApiFactory from marketplaces.base import BaseMarketplaceApi from queries.general import StockData from schemas.general import StockUpdate +from sender.base import StockRequest +from sender.factory import SenderFactory class BaseMarketplaceUpdater(ABC): marketplace: Marketplace marketplace_api: BaseMarketplaceApi session: AsyncSession + lock_key: Optional[str] + cache_key: Optional[str] + redis_client: redis.asyncio.Redis def __init__(self, marketplace: Marketplace, session: AsyncSession): self.marketplace = marketplace self.session = session self.marketplace_api = MarketplaceApiFactory.get_marketplace_api(marketplace) + self.redis_client = redis_client.get_client() + self.sender = SenderFactory.get_sender(self) + + self.cache_key = None + self.lock_key = None + + def is_valid_updater(self) -> bool: + if not self.marketplace_api: + return False + return self.marketplace_api.is_valid @abstractmethod - def get_update_for_marketplace(self, - stock_data: StockData) -> dict: + def get_stock_request(self, + stock_data: StockData) -> StockRequest: pass + @abstractmethod + def _get_identifier(self) -> str: + raise NotImplementedError() + + def __get_base_marketplace_key(self): + base_marketplace = 'wb' + if self.marketplace.base_marketplace == 1: + base_marketplace = 'ozon' + elif self.marketplace.base_marketplace == 2: + base_marketplace = 'yandexmarket' + return base_marketplace + + def get_lock_key(self) -> str: + identifier = self._get_identifier() + base_marketplace = self.__get_base_marketplace_key() + return f'{base_marketplace}_{identifier}_lock' + + def get_cache_key(self): + identifier = self._get_identifier() + base_marketplace = self.__get_base_marketplace_key() + return f'{base_marketplace}_{self.marketplace.warehouse_id}_{identifier}_cache' + + def get_auth_data(self) -> dict: + try: + return json.loads(self.marketplace.auth_data) + except Exception as e: + return {} + + async def filter_stocks_data(self, stock_data_list: list[StockData]) -> list[StockData]: + cached_stocks: dict = await self.redis_client.hgetall(self.get_cache_key()) + cached_stocks = {int(k): int(v) for k, v in cached_stocks.items()} + result = [] + for stock_data in stock_data_list: + cached_stock = cached_stocks.get(stock_data['product_id']) + if cached_stock is not None and cached_stock == stock_data['full_stock']: + continue + result.append(stock_data) + return result + + async def after_sender_sent(self, stock_requests: list[StockRequest], invalid_product_ids: list[int]): + stock_requests = list(filter(lambda stock: stock['product_id'] not in invalid_product_ids, stock_requests)) + mapping = {stock['product_id']: stock['full_stock'] for stock in stock_requests} + await self.redis_client.hset(self.get_cache_key(), mapping=mapping) + + async def get_marketplace_updates(self, stock_data_list: list[StockData]) -> list[StockRequest]: + marketplace_updates = [] + for stock_data in stock_data_list: + marketplace_update = self.get_stock_request(stock_data) + marketplace_updates.append(marketplace_update) + return marketplace_updates + async def update(self, updates: List[StockUpdate]): - if not self.marketplace_api: - return product_ids = list(set([update.product_id for update in updates])) await self.update_products(product_ids) async def update_products(self, product_ids: list[int]): + if not self.is_valid_updater(): + return stock_data_list = await queries.general.get_stocks_data( session=self.session, marketplace=self.marketplace, product_ids=product_ids ) - marketplace_updates = [] - for stock_data in stock_data_list: - marketplace_update = self.get_update_for_marketplace(stock_data) - marketplace_updates.append(marketplace_update) - await self.marketplace_api.update_stocks(marketplace_updates) + stock_data_list = await self.filter_stocks_data(stock_data_list) + stock_requests = await self.get_marketplace_updates(stock_data_list) + invalid_product_ids = await self.sender.send(stock_requests) + await self.after_sender_sent(stock_requests, invalid_product_ids) async def update_all(self): - if not self.marketplace_api: + if not self.is_valid_updater(): return stock_data_list = await queries.general.get_stocks_data( session=self.session, marketplace=self.marketplace, ) - marketplace_updates = [] - for stock_data in stock_data_list: - marketplace_update = self.get_update_for_marketplace(stock_data) - marketplace_updates.append(marketplace_update) - await self.marketplace_api.update_stocks(marketplace_updates) + stock_data_list = await self.filter_stocks_data(stock_data_list) + stock_requests = await self.get_marketplace_updates(stock_data_list) + invalid_product_ids = await self.sender.send(stock_requests) + await self.after_sender_sent(stock_requests, invalid_product_ids) async def get_all_stocks(self, only_available: bool) -> List[StockData]: - if not self.marketplace_api: + if not self.is_valid_updater(): return [] stock_data_list = await queries.general.get_stocks_data( session=self.session, @@ -74,14 +140,11 @@ class BaseMarketplaceUpdater(ABC): return stock_data_list async def reset(self): - if not self.marketplace_api: + if not self.is_valid_updater(): return stock_data_list = await queries.general.get_stocks_data( session=self.session, marketplace=self.marketplace ) - marketplace_updates = [] - for stock_data in stock_data_list: - marketplace_update = self.get_update_for_marketplace(stock_data) - marketplace_updates.append(marketplace_update) - await self.marketplace_api.update_stocks(marketplace_updates) + stock_requests = await self.get_marketplace_updates(stock_data_list) + await self.sender.send(stock_requests) diff --git a/updaters/factory.py b/updaters/factory.py index 5c336d5..a13bb47 100644 --- a/updaters/factory.py +++ b/updaters/factory.py @@ -23,3 +23,4 @@ class UpdaterFactory: return OzonUpdater(marketplace, session) case BaseMarketplace.YANDEX_MARKET: return YandexmarketUpdater(marketplace, session) + raise ValueError() diff --git a/updaters/ozon_updater.py b/updaters/ozon_updater.py index c267461..c1bd9fa 100644 --- a/updaters/ozon_updater.py +++ b/updaters/ozon_updater.py @@ -1,11 +1,22 @@ from queries.general import StockData +from sender.base import StockRequest from updaters.base import BaseMarketplaceUpdater class OzonUpdater(BaseMarketplaceUpdater): - def get_update_for_marketplace(self, data: StockData) -> dict: + + def _get_identifier(self) -> str: + auth_data = self.get_auth_data() + client_id = auth_data.get('clientId', -1) + return str(client_id) + + def get_stock_request(self, stock_data: StockData) -> StockRequest: return { - 'offer_id': str(data['article']), - 'stock': data['full_stock'], - 'warehouse_id': self.marketplace.warehouse_id + 'request_data': { + 'offer_id': str(stock_data['article']), + 'stock': stock_data['full_stock'], + 'warehouse_id': self.marketplace.warehouse_id + }, + 'product_id': stock_data['product_id'], + 'full_stock': stock_data['full_stock'] } diff --git a/updaters/wildberries_updater.py b/updaters/wildberries_updater.py index a13298f..263d824 100644 --- a/updaters/wildberries_updater.py +++ b/updaters/wildberries_updater.py @@ -1,11 +1,32 @@ +import json + +import jwt +from celery.backends.database import retry + from queries.general import StockData +from sender.base import StockRequest from updaters.base import BaseMarketplaceUpdater class WildberriesUpdater(BaseMarketplaceUpdater): - def get_update_for_marketplace(self, stock_data: StockData) -> dict: + def _get_identifier(self) -> str: + try: + auth_data = self.get_auth_data() + token = auth_data.get('token') + decoded_token = jwt.decode(token, algorithms=["HS256"], options={"verify_signature": False}) + sid = decoded_token.get('sid') + return str(sid) + except Exception: + return f'-1' + + def get_stock_request(self, stock_data: StockData) -> StockRequest: return { - 'sku': stock_data['marketplace_product'].third_additional_article, - 'amount': stock_data['full_stock'] + 'request_data': { + 'sku': stock_data['marketplace_product'].third_additional_article, + 'amount': stock_data['full_stock'] + }, + 'product_id': stock_data['product_id'], + + 'full_stock': stock_data['full_stock'] } diff --git a/updaters/yandexmarket_updater.py b/updaters/yandexmarket_updater.py index 7e0e693..06bd278 100644 --- a/updaters/yandexmarket_updater.py +++ b/updaters/yandexmarket_updater.py @@ -1,13 +1,23 @@ from queries.general import StockData +from sender.base import StockRequest from updaters.base import BaseMarketplaceUpdater class YandexmarketUpdater(BaseMarketplaceUpdater): - def get_update_for_marketplace(self, stock_data: StockData) -> dict: + + def _get_identifier(self) -> str: + return str(self.marketplace.campaign_id) + + def get_stock_request(self, stock_data: StockData) -> StockRequest: return { - 'sku': str(stock_data['article']), - 'warehouseId': self.marketplace.warehouse_id, - 'items': [{ - 'count': stock_data['full_stock'], - }] + 'request_data': { + 'sku': str(stock_data['article']), + 'warehouseId': self.marketplace.warehouse_id, + 'items': [{ + 'count': stock_data['full_stock'], + }] + }, + 'product_id': stock_data['product_id'], + 'full_stock': stock_data['full_stock'] + }