From 11c128a5a768cb9721422730a2a6f815f5ede8f4 Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 11 May 2025 08:12:26 +0300 Subject: [PATCH] rewritten crap --- sender/ozon.py | 39 ++++++++++++++++++++------------------- sender/wildberries.py | 40 +++++++++++++++++++++------------------- sender/yandexmarket.py | 24 ++++++++++++------------ updaters/base.py | 1 - 4 files changed, 53 insertions(+), 51 deletions(-) diff --git a/sender/ozon.py b/sender/ozon.py index 4e4a620..837d866 100644 --- a/sender/ozon.py +++ b/sender/ozon.py @@ -16,29 +16,30 @@ class OzonStocksSender(BaseStocksSender): return 100 async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: - session ,response = await self.api.update_stocks(data=chunk) + session, response = await self.api.update_stocks(data=chunk) try: - status_code = response.status - if status_code == 200: - return SendStockStatus.SUCCESS - if response.content_type != JSONResponse.media_type: + async with response: + status_code = response.status + if status_code == 200: + return SendStockStatus.SUCCESS + if response.content_type != JSONResponse.media_type: + return SendStockStatus.ERROR + json_data = await response.json() + error_code = json_data.get('code') + error_message = json_data.get('message') + if error_code == 8: + await asyncio.sleep(1) + return SendStockStatus.SHOULD_RETRY + logging.error(f'[{self.updater.marketplace.id}]: {error_message}') + if status_code in [ + 404, + 500, + ]: + return SendStockStatus.SHOULD_RETRY return SendStockStatus.ERROR - json_data = await response.json() - error_code = json_data.get('code') - error_message = json_data.get('message') - if error_code == 8: - await asyncio.sleep(1) - return SendStockStatus.SHOULD_RETRY - logging.error(f'[{self.updater.marketplace.id}]: {error_message}') - if status_code in [ - 404, - 500, - ]: - return SendStockStatus.SHOULD_RETRY - return SendStockStatus.ERROR finally: - await response.close() await session.close() + async def after_chunk_processed(self): return await asyncio.sleep(80 / 100) diff --git a/sender/wildberries.py b/sender/wildberries.py index 430f67e..d517614 100644 --- a/sender/wildberries.py +++ b/sender/wildberries.py @@ -19,30 +19,31 @@ class WildberriesStocksSender(BaseStocksSender): return 5 async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: - session,response = await self.api.update_stocks(chunk) + session, response = await self.api.update_stocks(chunk) try: - headers = response.headers - status_code = response.status + async with response: + headers = response.headers + status_code = response.status - if status_code in [ - 401, # Invalid token - 403, # Access denied - 404, # Not found - 400, # Other - ]: - return SendStockStatus.ERROR + if status_code in [ + 401, # Invalid token + 403, # Access denied + 404, # Not found + 400, # Other + ]: + return SendStockStatus.ERROR - # If there is rate limit - if status_code == 429: - delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time)) - await asyncio.sleep(delay_time) - self.remaining = int(headers.get('X-Ratelimit-Limit', 1)) - return SendStockStatus.SHOULD_RETRY - self.remaining = int(headers.get('X-Ratelimit-Remaining', 0)) - return SendStockStatus.SUCCESS + # If there is rate limit + if status_code == 429: + delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time)) + await asyncio.sleep(delay_time) + self.remaining = int(headers.get('X-Ratelimit-Limit', 1)) + return SendStockStatus.SHOULD_RETRY + self.remaining = int(headers.get('X-Ratelimit-Remaining', 0)) + return SendStockStatus.SUCCESS finally: - await response.close() await session.close() + async def after_chunk_processed(self): if self.remaining <= 0: await asyncio.sleep(self.sleep_time) @@ -52,6 +53,7 @@ class WildberriesStocksSender(BaseStocksSender): if self.remaining <= 0: await asyncio.sleep(self.sleep_time) self.remaining = 1 + @property def chunk_size(self) -> int: return 1000 diff --git a/sender/yandexmarket.py b/sender/yandexmarket.py index 1681d6d..4a2a544 100644 --- a/sender/yandexmarket.py +++ b/sender/yandexmarket.py @@ -28,19 +28,19 @@ class YandexmarketStocksSender(BaseStocksSender): async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: session, response = await self.api.update_stocks(chunk) try: - status_code = response.status - if status_code == 200: - self.total_stocks_sent += len(chunk) - return SendStockStatus.SUCCESS - if status_code == 420: - time_to_sleep = 60 - (time.time() - self.start_time) - await asyncio.sleep(time_to_sleep) - return SendStockStatus.SHOULD_RETRY - response_text = await response.text() - logging.error(f'[{self.updater.marketplace.id}]: {response_text}') - return SendStockStatus.ERROR + async with response: + status_code = response.status + if status_code == 200: + self.total_stocks_sent += len(chunk) + return SendStockStatus.SUCCESS + if status_code == 420: + time_to_sleep = 60 - (time.time() - self.start_time) + await asyncio.sleep(time_to_sleep) + return SendStockStatus.SHOULD_RETRY + response_text = await response.text() + logging.error(f'[{self.updater.marketplace.id}]: {response_text}') + return SendStockStatus.ERROR finally: - await response.close() await session.close() async def after_chunk_processed(self): diff --git a/updaters/base.py b/updaters/base.py index 8402c8e..a52c847 100644 --- a/updaters/base.py +++ b/updaters/base.py @@ -73,7 +73,6 @@ class BaseMarketplaceUpdater(ABC): return {} async def filter_stocks_data(self, stock_data_list: list[StockData]) -> list[StockData]: - return stock_data_list cached_stocks: dict = await self.redis_client.hgetall(self.get_cache_key()) cached_stocks = {int(k): int(v) for k, v in cached_stocks.items()} result = []