rewritten crap
This commit is contained in:
@@ -16,29 +16,30 @@ class OzonStocksSender(BaseStocksSender):
|
|||||||
return 100
|
return 100
|
||||||
|
|
||||||
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
||||||
session ,response = await self.api.update_stocks(data=chunk)
|
session, response = await self.api.update_stocks(data=chunk)
|
||||||
try:
|
try:
|
||||||
status_code = response.status
|
async with response:
|
||||||
if status_code == 200:
|
status_code = response.status
|
||||||
return SendStockStatus.SUCCESS
|
if status_code == 200:
|
||||||
if response.content_type != JSONResponse.media_type:
|
return SendStockStatus.SUCCESS
|
||||||
|
if response.content_type != JSONResponse.media_type:
|
||||||
|
return SendStockStatus.ERROR
|
||||||
|
json_data = await response.json()
|
||||||
|
error_code = json_data.get('code')
|
||||||
|
error_message = json_data.get('message')
|
||||||
|
if error_code == 8:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
return SendStockStatus.SHOULD_RETRY
|
||||||
|
logging.error(f'[{self.updater.marketplace.id}]: {error_message}')
|
||||||
|
if status_code in [
|
||||||
|
404,
|
||||||
|
500,
|
||||||
|
]:
|
||||||
|
return SendStockStatus.SHOULD_RETRY
|
||||||
return SendStockStatus.ERROR
|
return SendStockStatus.ERROR
|
||||||
json_data = await response.json()
|
|
||||||
error_code = json_data.get('code')
|
|
||||||
error_message = json_data.get('message')
|
|
||||||
if error_code == 8:
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
return SendStockStatus.SHOULD_RETRY
|
|
||||||
logging.error(f'[{self.updater.marketplace.id}]: {error_message}')
|
|
||||||
if status_code in [
|
|
||||||
404,
|
|
||||||
500,
|
|
||||||
]:
|
|
||||||
return SendStockStatus.SHOULD_RETRY
|
|
||||||
return SendStockStatus.ERROR
|
|
||||||
finally:
|
finally:
|
||||||
await response.close()
|
|
||||||
await session.close()
|
await session.close()
|
||||||
|
|
||||||
async def after_chunk_processed(self):
|
async def after_chunk_processed(self):
|
||||||
return await asyncio.sleep(80 / 100)
|
return await asyncio.sleep(80 / 100)
|
||||||
|
|
||||||
|
|||||||
@@ -19,30 +19,31 @@ class WildberriesStocksSender(BaseStocksSender):
|
|||||||
return 5
|
return 5
|
||||||
|
|
||||||
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
||||||
session,response = await self.api.update_stocks(chunk)
|
session, response = await self.api.update_stocks(chunk)
|
||||||
try:
|
try:
|
||||||
headers = response.headers
|
async with response:
|
||||||
status_code = response.status
|
headers = response.headers
|
||||||
|
status_code = response.status
|
||||||
|
|
||||||
if status_code in [
|
if status_code in [
|
||||||
401, # Invalid token
|
401, # Invalid token
|
||||||
403, # Access denied
|
403, # Access denied
|
||||||
404, # Not found
|
404, # Not found
|
||||||
400, # Other
|
400, # Other
|
||||||
]:
|
]:
|
||||||
return SendStockStatus.ERROR
|
return SendStockStatus.ERROR
|
||||||
|
|
||||||
# If there is rate limit
|
# If there is rate limit
|
||||||
if status_code == 429:
|
if status_code == 429:
|
||||||
delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time))
|
delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time))
|
||||||
await asyncio.sleep(delay_time)
|
await asyncio.sleep(delay_time)
|
||||||
self.remaining = int(headers.get('X-Ratelimit-Limit', 1))
|
self.remaining = int(headers.get('X-Ratelimit-Limit', 1))
|
||||||
return SendStockStatus.SHOULD_RETRY
|
return SendStockStatus.SHOULD_RETRY
|
||||||
self.remaining = int(headers.get('X-Ratelimit-Remaining', 0))
|
self.remaining = int(headers.get('X-Ratelimit-Remaining', 0))
|
||||||
return SendStockStatus.SUCCESS
|
return SendStockStatus.SUCCESS
|
||||||
finally:
|
finally:
|
||||||
await response.close()
|
|
||||||
await session.close()
|
await session.close()
|
||||||
|
|
||||||
async def after_chunk_processed(self):
|
async def after_chunk_processed(self):
|
||||||
if self.remaining <= 0:
|
if self.remaining <= 0:
|
||||||
await asyncio.sleep(self.sleep_time)
|
await asyncio.sleep(self.sleep_time)
|
||||||
@@ -52,6 +53,7 @@ class WildberriesStocksSender(BaseStocksSender):
|
|||||||
if self.remaining <= 0:
|
if self.remaining <= 0:
|
||||||
await asyncio.sleep(self.sleep_time)
|
await asyncio.sleep(self.sleep_time)
|
||||||
self.remaining = 1
|
self.remaining = 1
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def chunk_size(self) -> int:
|
def chunk_size(self) -> int:
|
||||||
return 1000
|
return 1000
|
||||||
|
|||||||
@@ -28,19 +28,19 @@ class YandexmarketStocksSender(BaseStocksSender):
|
|||||||
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
||||||
session, response = await self.api.update_stocks(chunk)
|
session, response = await self.api.update_stocks(chunk)
|
||||||
try:
|
try:
|
||||||
status_code = response.status
|
async with response:
|
||||||
if status_code == 200:
|
status_code = response.status
|
||||||
self.total_stocks_sent += len(chunk)
|
if status_code == 200:
|
||||||
return SendStockStatus.SUCCESS
|
self.total_stocks_sent += len(chunk)
|
||||||
if status_code == 420:
|
return SendStockStatus.SUCCESS
|
||||||
time_to_sleep = 60 - (time.time() - self.start_time)
|
if status_code == 420:
|
||||||
await asyncio.sleep(time_to_sleep)
|
time_to_sleep = 60 - (time.time() - self.start_time)
|
||||||
return SendStockStatus.SHOULD_RETRY
|
await asyncio.sleep(time_to_sleep)
|
||||||
response_text = await response.text()
|
return SendStockStatus.SHOULD_RETRY
|
||||||
logging.error(f'[{self.updater.marketplace.id}]: {response_text}')
|
response_text = await response.text()
|
||||||
return SendStockStatus.ERROR
|
logging.error(f'[{self.updater.marketplace.id}]: {response_text}')
|
||||||
|
return SendStockStatus.ERROR
|
||||||
finally:
|
finally:
|
||||||
await response.close()
|
|
||||||
await session.close()
|
await session.close()
|
||||||
|
|
||||||
async def after_chunk_processed(self):
|
async def after_chunk_processed(self):
|
||||||
|
|||||||
@@ -73,7 +73,6 @@ class BaseMarketplaceUpdater(ABC):
|
|||||||
return {}
|
return {}
|
||||||
|
|
||||||
async def filter_stocks_data(self, stock_data_list: list[StockData]) -> list[StockData]:
|
async def filter_stocks_data(self, stock_data_list: list[StockData]) -> list[StockData]:
|
||||||
return stock_data_list
|
|
||||||
cached_stocks: dict = await self.redis_client.hgetall(self.get_cache_key())
|
cached_stocks: dict = await self.redis_client.hgetall(self.get_cache_key())
|
||||||
cached_stocks = {int(k): int(v) for k, v in cached_stocks.items()}
|
cached_stocks = {int(k): int(v) for k, v in cached_stocks.items()}
|
||||||
result = []
|
result = []
|
||||||
|
|||||||
Reference in New Issue
Block a user