Files
Sipro-Stocks/sender/base.py
2025-05-11 07:46:57 +03:00

115 lines
3.5 KiB
Python

import logging
from abc import ABC, abstractmethod
from enum import IntEnum
from typing import TYPE_CHECKING, TypedDict
from utils import chunk_list
if TYPE_CHECKING:
from marketplaces.base import BaseMarketplaceApi
from updaters.base import BaseMarketplaceUpdater
class SendStockStatus(IntEnum):
SUCCESS = 0
SHOULD_RETRY = 1
ERROR = 2
class StockRequest(TypedDict):
request_data: dict
product_id: int
full_stock: int
class BaseStocksSender(ABC):
updater: "BaseMarketplaceUpdater"
api: "BaseMarketplaceApi"
def __init__(self, updater: "BaseMarketplaceUpdater"):
self.updater = updater
self.api = updater.marketplace_api
@property
@abstractmethod
def max_retries(self) -> int:
raise NotImplementedError()
@property
@abstractmethod
def chunk_size(self) -> int:
raise NotImplementedError()
def get_lock(self):
lock = self.updater.redis_client.lock(self.updater.get_lock_key(), timeout=60 * 10, blocking=True)
return lock
@abstractmethod
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
raise NotImplementedError()
async def __process_chunk_with_retries(self, chunk: list[dict]) -> SendStockStatus:
for attempt in range(self.max_retries + 1):
status = await self._process_chunk(chunk)
if status != SendStockStatus.SHOULD_RETRY:
return status
return SendStockStatus.SHOULD_RETRY
async def process_chunk(self, chunk: list[dict]) -> SendStockStatus:
try:
return await self.__process_chunk_with_retries(chunk)
except Exception as e:
logging.error(f'[{self.updater.marketplace.id}] {e}')
return SendStockStatus.ERROR
@abstractmethod
async def after_chunk_processed(self):
return
@abstractmethod
async def before_chunk_processed(self):
return
async def __send(self, chunks: list[list[StockRequest]]) -> list[int]:
invalid_product_ids = []
is_global_error = False
try:
for chunk in chunks:
request = []
product_ids = []
for stock in chunk:
request.append(stock['request_data'])
product_ids.append(stock['product_id'])
await self.before_chunk_processed()
try:
status = await self.process_chunk(request)
if status == SendStockStatus.ERROR:
is_global_error = True
break
if status == SendStockStatus.SHOULD_RETRY:
invalid_product_ids.extend(product_ids)
except Exception as e:
invalid_product_ids.extend(product_ids)
logging.error(f'[{self.updater.marketplace.id}]: {e}')
continue
await self.after_chunk_processed()
except Exception as e:
logging.error(f'{self.updater.marketplace.id}: {e}')
if is_global_error:
invalid_product_ids = []
for chunk in chunks:
for stock in chunk:
invalid_product_ids.append(stock['product_id'])
return invalid_product_ids
async def send(self, stocks: list[StockRequest]) -> list[int]:
if not stocks:
return []
chunks = chunk_list(stocks, self.chunk_size)
lock = self.get_lock()
async with lock:
result = await self.__send(chunks)
return result