rewritten crap

This commit is contained in:
2025-05-11 08:00:23 +03:00
parent b5110ec69a
commit 2e3245c8f9
8 changed files with 72 additions and 60 deletions

View File

@@ -15,7 +15,7 @@ class BaseMarketplaceApi(ABC):
pass pass
@abstractmethod @abstractmethod
async def update_stocks(self, data: Union[list, dict]) -> ClientResponse: async def update_stocks(self, data: Union[list, dict]) -> (ClientSession, ClientResponse):
pass pass
@abstractmethod @abstractmethod
@@ -29,13 +29,12 @@ class BaseMarketplaceApi(ABC):
async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'], async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'],
method: str, method: str,
data: dict) -> ClientResponse: data: dict) -> (ClientSession, ClientResponse):
self.session = ClientSession() session = ClientSession()
response = await self.session.request( response = await session.request(
http_method, http_method,
f'{self.api_url}{method}', f'{self.api_url}{method}',
json=data, json=data,
headers=self.get_headers() headers=self.get_headers()
) )
await self.session.close() return session, response
return response

View File

@@ -2,7 +2,7 @@ import json
import logging import logging
from typing import Union from typing import Union
from aiohttp import ClientResponse from aiohttp import ClientResponse, ClientSession
from database import Marketplace from database import Marketplace
from marketplaces.base import BaseMarketplaceApi from marketplaces.base import BaseMarketplaceApi
@@ -34,5 +34,5 @@ class OzonMarketplaceApi(BaseMarketplaceApi):
def api_url(self): def api_url(self):
return 'https://api-seller.ozon.ru' return 'https://api-seller.ozon.ru'
async def update_stocks(self, data: Union[list, dict]) -> ClientResponse: async def update_stocks(self, data: Union[list, dict]) -> (ClientSession, ClientResponse):
return await self._method('POST', '/v2/products/stocks', data={'stocks': data}) return await self._method('POST', '/v2/products/stocks', data={'stocks': data})

View File

@@ -3,6 +3,7 @@ import logging
from typing import Union from typing import Union
import jwt import jwt
from aiohttp import ClientSession, ClientResponse
from database import Marketplace from database import Marketplace
from marketplaces.base import BaseMarketplaceApi from marketplaces.base import BaseMarketplaceApi
@@ -36,7 +37,7 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi):
def api_url(self): def api_url(self):
return 'https://marketplace-api.wildberries.ru' return 'https://marketplace-api.wildberries.ru'
async def update_stocks(self, data: Union[list, dict]): async def update_stocks(self, data: Union[list, dict])-> (ClientSession, ClientResponse):
warehouse_id = self.marketplace.warehouse_id warehouse_id = self.marketplace.warehouse_id
return await self._method('PUT', f'/api/v3/stocks/{warehouse_id}', data={'stocks': data}) return await self._method('PUT', f'/api/v3/stocks/{warehouse_id}', data={'stocks': data})

View File

@@ -3,6 +3,8 @@ import json
import logging import logging
from typing import Union from typing import Union
from aiohttp import ClientResponse, ClientSession
from backend.config import YANDEX_CLIENT_ID from backend.config import YANDEX_CLIENT_ID
from database import Marketplace from database import Marketplace
from limiter import BatchLimiter from limiter import BatchLimiter
@@ -43,7 +45,7 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi):
def api_url(self): def api_url(self):
return 'https://api.partner.market.yandex.ru/v2' return 'https://api.partner.market.yandex.ru/v2'
async def update_stocks(self, data: Union[list, dict]): async def update_stocks(self, data: Union[list, dict]) -> (ClientSession, ClientResponse):
campaign_id = self.marketplace.campaign_id campaign_id = self.marketplace.campaign_id
return await self._method('PUT', return await self._method('PUT',
f'/campaigns/{campaign_id}/offers/stocks', f'/campaigns/{campaign_id}/offers/stocks',

View File

@@ -16,26 +16,28 @@ class OzonStocksSender(BaseStocksSender):
return 100 return 100
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
response = await self.api.update_stocks(data=chunk) session ,response = await self.api.update_stocks(data=chunk)
status_code = response.status try:
if status_code == 200: status_code = response.status
return SendStockStatus.SUCCESS if status_code == 200:
if response.content_type != JSONResponse.media_type: return SendStockStatus.SUCCESS
if response.content_type != JSONResponse.media_type:
return SendStockStatus.ERROR
json_data = await response.json()
error_code = json_data.get('code')
error_message = json_data.get('message')
if error_code == 8:
await asyncio.sleep(1)
return SendStockStatus.SHOULD_RETRY
logging.error(f'[{self.updater.marketplace.id}]: {error_message}')
if status_code in [
404,
500,
]:
return SendStockStatus.SHOULD_RETRY
return SendStockStatus.ERROR return SendStockStatus.ERROR
json_data = await response.json() finally:
error_code = json_data.get('code') await session.close()
error_message = json_data.get('message')
if error_code == 8:
await asyncio.sleep(1)
return SendStockStatus.SHOULD_RETRY
logging.error(f'[{self.updater.marketplace.id}]: {error_message}')
if status_code in [
404,
500,
]:
return SendStockStatus.SHOULD_RETRY
return SendStockStatus.ERROR
async def after_chunk_processed(self): async def after_chunk_processed(self):
return await asyncio.sleep(80 / 100) return await asyncio.sleep(80 / 100)

View File

@@ -19,27 +19,29 @@ class WildberriesStocksSender(BaseStocksSender):
return 5 return 5
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
response = await self.api.update_stocks(chunk) session,response = await self.api.update_stocks(chunk)
headers = response.headers try:
status_code = response.status headers = response.headers
status_code = response.status
if status_code in [ if status_code in [
401, # Invalid token 401, # Invalid token
403, # Access denied 403, # Access denied
404, # Not found 404, # Not found
400, # Other 400, # Other
]: ]:
return SendStockStatus.ERROR return SendStockStatus.ERROR
# If there is rate limit
if status_code == 429:
delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time))
await asyncio.sleep(delay_time)
self.remaining = int(headers.get('X-Ratelimit-Limit', 1))
return SendStockStatus.SHOULD_RETRY
self.remaining = int(headers.get('X-Ratelimit-Remaining', 0))
return SendStockStatus.SUCCESS
# If there is rate limit
if status_code == 429:
delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time))
await asyncio.sleep(delay_time)
self.remaining = int(headers.get('X-Ratelimit-Limit', 1))
return SendStockStatus.SHOULD_RETRY
self.remaining = int(headers.get('X-Ratelimit-Remaining', 0))
return SendStockStatus.SUCCESS
finally:
await session.close()
async def after_chunk_processed(self): async def after_chunk_processed(self):
if self.remaining <= 0: if self.remaining <= 0:
await asyncio.sleep(self.sleep_time) await asyncio.sleep(self.sleep_time)

View File

@@ -26,18 +26,21 @@ class YandexmarketStocksSender(BaseStocksSender):
return 2000 return 2000
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus: async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
response = await self.api.update_stocks(chunk) session, response = await self.api.update_stocks(chunk)
status_code = response.status try:
if status_code == 200: status_code = response.status
self.total_stocks_sent += len(chunk) if status_code == 200:
return SendStockStatus.SUCCESS self.total_stocks_sent += len(chunk)
if status_code == 420: return SendStockStatus.SUCCESS
time_to_sleep = 60 - (time.time() - self.start_time) if status_code == 420:
await asyncio.sleep(time_to_sleep) time_to_sleep = 60 - (time.time() - self.start_time)
return SendStockStatus.SHOULD_RETRY await asyncio.sleep(time_to_sleep)
response_text = await response.text() return SendStockStatus.SHOULD_RETRY
logging.error(f'[{self.updater.marketplace.id}]: {response_text}') response_text = await response.text()
return SendStockStatus.ERROR logging.error(f'[{self.updater.marketplace.id}]: {response_text}')
return SendStockStatus.ERROR
finally:
await session.close()
async def after_chunk_processed(self): async def after_chunk_processed(self):
time_delta = time.time() - self.start_time time_delta = time.time() - self.start_time

View File

@@ -73,6 +73,7 @@ class BaseMarketplaceUpdater(ABC):
return {} return {}
async def filter_stocks_data(self, stock_data_list: list[StockData]) -> list[StockData]: async def filter_stocks_data(self, stock_data_list: list[StockData]) -> list[StockData]:
return stock_data_list
cached_stocks: dict = await self.redis_client.hgetall(self.get_cache_key()) cached_stocks: dict = await self.redis_client.hgetall(self.get_cache_key())
cached_stocks = {int(k): int(v) for k, v in cached_stocks.items()} cached_stocks = {int(k): int(v) for k, v in cached_stocks.items()}
result = [] result = []
@@ -86,6 +87,8 @@ class BaseMarketplaceUpdater(ABC):
async def after_sender_sent(self, stock_requests: list[StockRequest], invalid_product_ids: list[int]): async def after_sender_sent(self, stock_requests: list[StockRequest], invalid_product_ids: list[int]):
stock_requests = list(filter(lambda stock: stock['product_id'] not in invalid_product_ids, stock_requests)) stock_requests = list(filter(lambda stock: stock['product_id'] not in invalid_product_ids, stock_requests))
mapping = {stock['product_id']: stock['full_stock'] for stock in stock_requests} mapping = {stock['product_id']: stock['full_stock'] for stock in stock_requests}
if not mapping:
return
await self.redis_client.hset(self.get_cache_key(), mapping=mapping) await self.redis_client.hset(self.get_cache_key(), mapping=mapping)
async def get_marketplace_updates(self, stock_data_list: list[StockData]) -> list[StockRequest]: async def get_marketplace_updates(self, stock_data_list: list[StockData]) -> list[StockRequest]: