115 lines
3.5 KiB
Python
115 lines
3.5 KiB
Python
import logging
|
|
from abc import ABC, abstractmethod
|
|
from enum import IntEnum
|
|
from typing import TYPE_CHECKING, TypedDict
|
|
|
|
from utils import chunk_list
|
|
|
|
if TYPE_CHECKING:
|
|
from marketplaces.base import BaseMarketplaceApi
|
|
from updaters.base import BaseMarketplaceUpdater
|
|
|
|
|
|
class SendStockStatus(IntEnum):
|
|
SUCCESS = 0
|
|
SHOULD_RETRY = 1
|
|
ERROR = 2
|
|
|
|
|
|
class StockRequest(TypedDict):
|
|
request_data: dict
|
|
product_id: int
|
|
full_stock: int
|
|
|
|
|
|
class BaseStocksSender(ABC):
|
|
updater: "BaseMarketplaceUpdater"
|
|
api: "BaseMarketplaceApi"
|
|
|
|
def __init__(self, updater: "BaseMarketplaceUpdater"):
|
|
self.updater = updater
|
|
self.api = updater.marketplace_api
|
|
|
|
@property
|
|
@abstractmethod
|
|
def max_retries(self) -> int:
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
@abstractmethod
|
|
def chunk_size(self) -> int:
|
|
raise NotImplementedError()
|
|
|
|
def get_lock(self):
|
|
lock = self.updater.redis_client.lock(self.updater.get_lock_key(), timeout=60 * 10, blocking=True)
|
|
return lock
|
|
|
|
@abstractmethod
|
|
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
|
raise NotImplementedError()
|
|
|
|
async def __process_chunk_with_retries(self, chunk: list[dict]) -> SendStockStatus:
|
|
for attempt in range(self.max_retries + 1):
|
|
status = await self._process_chunk(chunk)
|
|
if status != SendStockStatus.SHOULD_RETRY:
|
|
return status
|
|
return SendStockStatus.SHOULD_RETRY
|
|
|
|
async def process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
|
try:
|
|
return await self.__process_chunk_with_retries(chunk)
|
|
except Exception as e:
|
|
logging.error(f'[{self.updater.marketplace.id}] {e}')
|
|
return SendStockStatus.ERROR
|
|
|
|
@abstractmethod
|
|
async def after_chunk_processed(self):
|
|
return
|
|
|
|
@abstractmethod
|
|
async def before_chunk_processed(self):
|
|
return
|
|
|
|
async def __send(self, chunks: list[list[StockRequest]]) -> list[int]:
|
|
invalid_product_ids = []
|
|
is_global_error = False
|
|
try:
|
|
for chunk in chunks:
|
|
request = []
|
|
product_ids = []
|
|
for stock in chunk:
|
|
request.append(stock['request_data'])
|
|
product_ids.append(stock['product_id'])
|
|
|
|
await self.before_chunk_processed()
|
|
try:
|
|
status = await self.process_chunk(request)
|
|
if status == SendStockStatus.ERROR:
|
|
is_global_error = True
|
|
break
|
|
if status == SendStockStatus.SHOULD_RETRY:
|
|
invalid_product_ids.extend(product_ids)
|
|
except Exception as e:
|
|
invalid_product_ids.extend(product_ids)
|
|
logging.error(f'[{self.updater.marketplace.id}]: {e}')
|
|
continue
|
|
await self.after_chunk_processed()
|
|
except Exception as e:
|
|
logging.error(f'{self.updater.marketplace.id}: {e}')
|
|
|
|
if is_global_error:
|
|
invalid_product_ids = []
|
|
for chunk in chunks:
|
|
for stock in chunk:
|
|
invalid_product_ids.append(stock['product_id'])
|
|
return invalid_product_ids
|
|
|
|
async def send(self, stocks: list[StockRequest]) -> list[int]:
|
|
if not stocks:
|
|
return []
|
|
chunks = chunk_list(stocks, self.chunk_size)
|
|
lock = self.get_lock()
|
|
async with lock:
|
|
result = await self.__send(chunks)
|
|
return result
|