diff --git a/marketplaces/base.py b/marketplaces/base.py index 14c798b..a7eb058 100644 --- a/marketplaces/base.py +++ b/marketplaces/base.py @@ -15,7 +15,7 @@ class BaseMarketplaceApi(ABC): pass @abstractmethod - async def update_stocks(self, data: Union[list, dict]) -> ClientResponse: + async def update_stocks(self, data: Union[list, dict]) -> (ClientSession, ClientResponse): pass @abstractmethod @@ -29,13 +29,12 @@ class BaseMarketplaceApi(ABC): async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'], method: str, - data: dict) -> ClientResponse: - self.session = ClientSession() - response = await self.session.request( + data: dict) -> (ClientSession, ClientResponse): + session = ClientSession() + response = await session.request( http_method, f'{self.api_url}{method}', json=data, headers=self.get_headers() ) - await self.session.close() - return response + return session, response diff --git a/marketplaces/ozon.py b/marketplaces/ozon.py index f94711e..d3ee98e 100644 --- a/marketplaces/ozon.py +++ b/marketplaces/ozon.py @@ -2,7 +2,7 @@ import json import logging from typing import Union -from aiohttp import ClientResponse +from aiohttp import ClientResponse, ClientSession from database import Marketplace from marketplaces.base import BaseMarketplaceApi @@ -34,5 +34,5 @@ class OzonMarketplaceApi(BaseMarketplaceApi): def api_url(self): return 'https://api-seller.ozon.ru' - async def update_stocks(self, data: Union[list, dict]) -> ClientResponse: + async def update_stocks(self, data: Union[list, dict]) -> (ClientSession, ClientResponse): return await self._method('POST', '/v2/products/stocks', data={'stocks': data}) diff --git a/marketplaces/wildberries.py b/marketplaces/wildberries.py index b91b01c..14ce3ca 100644 --- a/marketplaces/wildberries.py +++ b/marketplaces/wildberries.py @@ -3,6 +3,7 @@ import logging from typing import Union import jwt +from aiohttp import ClientSession, ClientResponse from database import Marketplace from marketplaces.base import BaseMarketplaceApi @@ -36,7 +37,7 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi): def api_url(self): return 'https://marketplace-api.wildberries.ru' - async def update_stocks(self, data: Union[list, dict]): + async def update_stocks(self, data: Union[list, dict])-> (ClientSession, ClientResponse): warehouse_id = self.marketplace.warehouse_id return await self._method('PUT', f'/api/v3/stocks/{warehouse_id}', data={'stocks': data}) diff --git a/marketplaces/yandexmarket.py b/marketplaces/yandexmarket.py index 0fd0a8b..56031d8 100644 --- a/marketplaces/yandexmarket.py +++ b/marketplaces/yandexmarket.py @@ -3,6 +3,8 @@ import json import logging from typing import Union +from aiohttp import ClientResponse, ClientSession + from backend.config import YANDEX_CLIENT_ID from database import Marketplace from limiter import BatchLimiter @@ -43,7 +45,7 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi): def api_url(self): return 'https://api.partner.market.yandex.ru/v2' - async def update_stocks(self, data: Union[list, dict]): + async def update_stocks(self, data: Union[list, dict]) -> (ClientSession, ClientResponse): campaign_id = self.marketplace.campaign_id return await self._method('PUT', f'/campaigns/{campaign_id}/offers/stocks', diff --git a/sender/ozon.py b/sender/ozon.py index 5767a18..a723d6e 100644 --- a/sender/ozon.py +++ b/sender/ozon.py @@ -16,26 +16,28 @@ class OzonStocksSender(BaseStocksSender): return 100 async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: - response = await self.api.update_stocks(data=chunk) - status_code = response.status - if status_code == 200: - return SendStockStatus.SUCCESS - if response.content_type != JSONResponse.media_type: + session ,response = await self.api.update_stocks(data=chunk) + try: + status_code = response.status + if status_code == 200: + return SendStockStatus.SUCCESS + if response.content_type != JSONResponse.media_type: + return SendStockStatus.ERROR + json_data = await response.json() + error_code = json_data.get('code') + error_message = json_data.get('message') + if error_code == 8: + await asyncio.sleep(1) + return SendStockStatus.SHOULD_RETRY + logging.error(f'[{self.updater.marketplace.id}]: {error_message}') + if status_code in [ + 404, + 500, + ]: + return SendStockStatus.SHOULD_RETRY return SendStockStatus.ERROR - json_data = await response.json() - error_code = json_data.get('code') - error_message = json_data.get('message') - if error_code == 8: - await asyncio.sleep(1) - return SendStockStatus.SHOULD_RETRY - logging.error(f'[{self.updater.marketplace.id}]: {error_message}') - if status_code in [ - 404, - 500, - ]: - return SendStockStatus.SHOULD_RETRY - return SendStockStatus.ERROR - + finally: + await session.close() async def after_chunk_processed(self): return await asyncio.sleep(80 / 100) diff --git a/sender/wildberries.py b/sender/wildberries.py index fa9906f..47845f5 100644 --- a/sender/wildberries.py +++ b/sender/wildberries.py @@ -19,27 +19,29 @@ class WildberriesStocksSender(BaseStocksSender): return 5 async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: - response = await self.api.update_stocks(chunk) - headers = response.headers - status_code = response.status + session,response = await self.api.update_stocks(chunk) + try: + headers = response.headers + status_code = response.status - if status_code in [ - 401, # Invalid token - 403, # Access denied - 404, # Not found - 400, # Other - ]: - return SendStockStatus.ERROR - - # If there is rate limit - if status_code == 429: - delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time)) - await asyncio.sleep(delay_time) - self.remaining = int(headers.get('X-Ratelimit-Limit', 1)) - return SendStockStatus.SHOULD_RETRY - self.remaining = int(headers.get('X-Ratelimit-Remaining', 0)) - return SendStockStatus.SUCCESS + if status_code in [ + 401, # Invalid token + 403, # Access denied + 404, # Not found + 400, # Other + ]: + return SendStockStatus.ERROR + # If there is rate limit + if status_code == 429: + delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time)) + await asyncio.sleep(delay_time) + self.remaining = int(headers.get('X-Ratelimit-Limit', 1)) + return SendStockStatus.SHOULD_RETRY + self.remaining = int(headers.get('X-Ratelimit-Remaining', 0)) + return SendStockStatus.SUCCESS + finally: + await session.close() async def after_chunk_processed(self): if self.remaining <= 0: await asyncio.sleep(self.sleep_time) diff --git a/sender/yandexmarket.py b/sender/yandexmarket.py index 04d991e..172643f 100644 --- a/sender/yandexmarket.py +++ b/sender/yandexmarket.py @@ -26,18 +26,21 @@ class YandexmarketStocksSender(BaseStocksSender): return 2000 async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: - response = await self.api.update_stocks(chunk) - status_code = response.status - if status_code == 200: - self.total_stocks_sent += len(chunk) - return SendStockStatus.SUCCESS - if status_code == 420: - time_to_sleep = 60 - (time.time() - self.start_time) - await asyncio.sleep(time_to_sleep) - return SendStockStatus.SHOULD_RETRY - response_text = await response.text() - logging.error(f'[{self.updater.marketplace.id}]: {response_text}') - return SendStockStatus.ERROR + session, response = await self.api.update_stocks(chunk) + try: + status_code = response.status + if status_code == 200: + self.total_stocks_sent += len(chunk) + return SendStockStatus.SUCCESS + if status_code == 420: + time_to_sleep = 60 - (time.time() - self.start_time) + await asyncio.sleep(time_to_sleep) + return SendStockStatus.SHOULD_RETRY + response_text = await response.text() + logging.error(f'[{self.updater.marketplace.id}]: {response_text}') + return SendStockStatus.ERROR + finally: + await session.close() async def after_chunk_processed(self): time_delta = time.time() - self.start_time diff --git a/updaters/base.py b/updaters/base.py index 4db7015..8402c8e 100644 --- a/updaters/base.py +++ b/updaters/base.py @@ -73,6 +73,7 @@ class BaseMarketplaceUpdater(ABC): return {} async def filter_stocks_data(self, stock_data_list: list[StockData]) -> list[StockData]: + return stock_data_list cached_stocks: dict = await self.redis_client.hgetall(self.get_cache_key()) cached_stocks = {int(k): int(v) for k, v in cached_stocks.items()} result = [] @@ -86,6 +87,8 @@ class BaseMarketplaceUpdater(ABC): async def after_sender_sent(self, stock_requests: list[StockRequest], invalid_product_ids: list[int]): stock_requests = list(filter(lambda stock: stock['product_id'] not in invalid_product_ids, stock_requests)) mapping = {stock['product_id']: stock['full_stock'] for stock in stock_requests} + if not mapping: + return await self.redis_client.hset(self.get_cache_key(), mapping=mapping) async def get_marketplace_updates(self, stock_data_list: list[StockData]) -> list[StockRequest]: