refactor: enhance stock sending logic to include retry mechanism and handle rate limits
This commit is contained in:
@@ -47,32 +47,35 @@ class OzonMarketplaceApi(BaseMarketplaceApi):
|
|||||||
|
|
||||||
self.init_session()
|
self.init_session()
|
||||||
limiter = BatchLimiter()
|
limiter = BatchLimiter()
|
||||||
|
max_retries = 5
|
||||||
async def send_stock_chunk(chunk) -> bool:
|
while chunks:
|
||||||
|
current_retry = 0
|
||||||
|
chunk = chunks.pop()
|
||||||
|
while current_retry <= max_retries:
|
||||||
try:
|
try:
|
||||||
await limiter.acquire_ozon(self.limiter_key)
|
await limiter.acquire_ozon(self.limiter_key)
|
||||||
request_data = {'stocks': chunk}
|
request_data = {'stocks': chunk}
|
||||||
response = await self._method('POST', '/v2/products/stocks', data=request_data)
|
response = await self._method('POST', '/v2/products/stocks', data=request_data)
|
||||||
|
current_retry += 1
|
||||||
response = await response.json()
|
response = await response.json()
|
||||||
error_message = response.get('message')
|
error_message = response.get('message')
|
||||||
error_code = response.get('code')
|
error_code = response.get('code')
|
||||||
if error_message:
|
if error_message:
|
||||||
|
if error_code == 8:
|
||||||
|
logging.warning(f'Ozon rate limit exceeded for marketplace [{self.marketplace.id}]')
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
logging.warning(
|
logging.warning(
|
||||||
f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})')
|
f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})')
|
||||||
return False
|
break
|
||||||
return True
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(
|
logging.error(
|
||||||
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
|
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
|
||||||
return False
|
break
|
||||||
|
|
||||||
tasks = [send_stock_chunk(chunk) for chunk in chunks]
|
|
||||||
first_request = tasks[0]
|
|
||||||
first_response = await first_request
|
|
||||||
if not first_response:
|
|
||||||
logging.error(f'Skipping marketplace [{self.marketplace.id}] because first request was unsuccessful')
|
|
||||||
await self.session.close()
|
|
||||||
return
|
|
||||||
|
|
||||||
await asyncio.gather(*tasks[1:])
|
|
||||||
await self.session.close()
|
await self.session.close()
|
||||||
|
|||||||
@@ -50,16 +50,17 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi):
|
|||||||
return
|
return
|
||||||
self.init_session()
|
self.init_session()
|
||||||
limiter = BatchLimiter()
|
limiter = BatchLimiter()
|
||||||
|
max_retries = 5
|
||||||
|
while chunks:
|
||||||
|
current_retry = 0
|
||||||
chunk = chunks.pop()
|
chunk = chunks.pop()
|
||||||
while True:
|
while current_retry <= max_retries:
|
||||||
try:
|
try:
|
||||||
if not chunks:
|
|
||||||
break
|
|
||||||
await limiter.acquire_wildberries(self.limiter_key)
|
await limiter.acquire_wildberries(self.limiter_key)
|
||||||
request_data = {'stocks': chunk}
|
request_data = {'stocks': chunk}
|
||||||
response = await self._method('PUT', f'/api/v3/stocks/{self.marketplace.warehouse_id}',
|
response = await self._method('PUT', f'/api/v3/stocks/{self.marketplace.warehouse_id}',
|
||||||
data=request_data)
|
data=request_data)
|
||||||
|
current_retry += 1
|
||||||
if (response.status not in [204, 409]) and (response.status != 429):
|
if (response.status not in [204, 409]) and (response.status != 429):
|
||||||
response = await response.json()
|
response = await response.json()
|
||||||
error_message = response.get('message')
|
error_message = response.get('message')
|
||||||
@@ -68,22 +69,13 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi):
|
|||||||
f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})')
|
f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})')
|
||||||
break
|
break
|
||||||
if response.status == 429:
|
if response.status == 429:
|
||||||
|
logging.warning(f'WB rate limit exceeded for marketplace [{self.marketplace.id}]')
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
logging.warning(f'Rate limit exceeded for marketplace [{self.marketplace.id}]')
|
|
||||||
continue
|
continue
|
||||||
chunk = chunks.pop()
|
|
||||||
await asyncio.sleep(0.2)
|
await asyncio.sleep(0.2)
|
||||||
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(
|
logging.error(
|
||||||
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
|
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
|
||||||
# tasks = [send_stock_chunk(chunk) for chunk in chunks]
|
break
|
||||||
# first_request = tasks[0]
|
|
||||||
# first_response = await first_request
|
|
||||||
# if not first_response:
|
|
||||||
# logging.error(f'Skipping marketplace [{self.marketplace.id}] because first request was unsuccessful')
|
|
||||||
# await self.session.close()
|
|
||||||
# return
|
|
||||||
#
|
|
||||||
# await asyncio.gather(*tasks[1:])
|
|
||||||
await self.session.close()
|
await self.session.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user