rewritten crap
This commit is contained in:
11
limiter/redis_client.py
Normal file
11
limiter/redis_client.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import redis.asyncio as redis
|
||||
|
||||
import backend.config
|
||||
|
||||
pool = redis.ConnectionPool.from_url(backend.config.REDIS_URL)
|
||||
|
||||
|
||||
def get_client() -> redis.Redis:
|
||||
global pool
|
||||
client = redis.Redis.from_pool(pool)
|
||||
return client
|
||||
@@ -8,13 +8,14 @@ from database import Marketplace
|
||||
|
||||
class BaseMarketplaceApi(ABC):
|
||||
session: ClientSession
|
||||
is_valid: bool
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, marketplace: Marketplace):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def update_stocks(self, data: Union[list, dict]):
|
||||
async def update_stocks(self, data: Union[list, dict]) -> ClientResponse:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
@@ -26,15 +27,15 @@ class BaseMarketplaceApi(ABC):
|
||||
def api_url(self):
|
||||
pass
|
||||
|
||||
def init_session(self):
|
||||
self.session = ClientSession()
|
||||
|
||||
async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'],
|
||||
method: str,
|
||||
data: dict) -> ClientResponse:
|
||||
return await self.session.request(
|
||||
self.session = ClientSession()
|
||||
response = await self.session.request(
|
||||
http_method,
|
||||
f'{self.api_url}{method}',
|
||||
json=data,
|
||||
headers=self.get_headers()
|
||||
)
|
||||
await self.session.close()
|
||||
return response
|
||||
|
||||
@@ -21,3 +21,4 @@ class MarketplaceApiFactory:
|
||||
return WildberriesMarketplaceApi(marketplace)
|
||||
case BaseMarketplace.YANDEX_MARKET:
|
||||
return YandexmarketMarketplaceApi(marketplace)
|
||||
raise ValueError()
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Union
|
||||
|
||||
import utils
|
||||
from aiohttp import ClientResponse
|
||||
|
||||
from database import Marketplace
|
||||
from limiter import BatchLimiter
|
||||
from marketplaces.base import BaseMarketplaceApi
|
||||
|
||||
|
||||
@@ -35,47 +34,5 @@ class OzonMarketplaceApi(BaseMarketplaceApi):
|
||||
def api_url(self):
|
||||
return 'https://api-seller.ozon.ru'
|
||||
|
||||
async def update_stocks(self, data: Union[list, dict]):
|
||||
if type(data) is not list:
|
||||
return
|
||||
if not self.is_valid:
|
||||
return
|
||||
max_stocks = 100
|
||||
chunks = utils.chunk_list(data, max_stocks)
|
||||
if not chunks:
|
||||
return
|
||||
|
||||
self.init_session()
|
||||
limiter = BatchLimiter()
|
||||
max_retries = 10
|
||||
while chunks:
|
||||
current_retry = 0
|
||||
chunk = chunks.pop()
|
||||
while current_retry <= max_retries:
|
||||
try:
|
||||
await limiter.acquire_ozon(self.limiter_key)
|
||||
request_data = {'stocks': chunk}
|
||||
response = await self._method('POST', '/v2/products/stocks', data=request_data)
|
||||
current_retry += 1
|
||||
response = await response.json()
|
||||
error_message = response.get('message')
|
||||
error_code = response.get('code')
|
||||
if error_message:
|
||||
if error_code == 8:
|
||||
logging.warning(f'Ozon rate limit exceeded for marketplace [{self.marketplace.id}]')
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
else:
|
||||
logging.warning(
|
||||
f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})')
|
||||
break
|
||||
else:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
|
||||
break
|
||||
|
||||
|
||||
await self.session.close()
|
||||
async def update_stocks(self, data: Union[list, dict]) -> ClientResponse:
|
||||
return await self._method('POST', '/v2/products/stocks', data={'stocks': data})
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Union
|
||||
|
||||
import jwt
|
||||
|
||||
import utils
|
||||
from database import Marketplace
|
||||
from limiter import BatchLimiter
|
||||
from marketplaces.base import BaseMarketplaceApi
|
||||
|
||||
|
||||
class WildberriesMarketplaceApi(BaseMarketplaceApi):
|
||||
|
||||
def __init__(self, marketplace: Marketplace):
|
||||
self.marketplace = marketplace
|
||||
auth_data = json.loads(marketplace.auth_data)
|
||||
@@ -38,61 +36,7 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi):
|
||||
def api_url(self):
|
||||
return 'https://marketplace-api.wildberries.ru'
|
||||
|
||||
def _filter_chunk_with_conflict(self, chunk: dict, response: list):
|
||||
if not isinstance(response, list):
|
||||
return chunk
|
||||
filter_skus = []
|
||||
for error in response:
|
||||
for sku in error.get('data', []):
|
||||
filter_skus.append(sku['sku'])
|
||||
return list(filter(lambda x: x['sku'] not in filter_skus, chunk))
|
||||
|
||||
async def update_stocks(self, data: Union[list, dict]):
|
||||
if type(data) is not list:
|
||||
return
|
||||
if not self.is_valid:
|
||||
logging.warning(f'Skipping marketplace [{self.marketplace.id}] because of invalid token')
|
||||
return
|
||||
max_stocks = 1000
|
||||
chunks = list(utils.chunk_list(data, max_stocks))
|
||||
if not chunks:
|
||||
return
|
||||
self.init_session()
|
||||
limiter = BatchLimiter()
|
||||
max_retries = 10
|
||||
while chunks:
|
||||
current_retry = 0
|
||||
chunk = chunks.pop()
|
||||
while current_retry <= max_retries:
|
||||
try:
|
||||
await limiter.acquire_wildberries(self.limiter_key)
|
||||
request_data = {'stocks': chunk}
|
||||
response = await self._method('PUT', f'/api/v3/stocks/{self.marketplace.warehouse_id}',
|
||||
data=request_data)
|
||||
current_retry += 1
|
||||
if (response.status not in [204, 409, 429]):
|
||||
response = await response.json()
|
||||
error_message = response.get('message')
|
||||
error_code = response.get('code')
|
||||
logging.warning(
|
||||
f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})')
|
||||
break
|
||||
if response.status == 429:
|
||||
logging.warning(f'WB rate limit exceeded for marketplace [{self.marketplace.id}]')
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
if response.status == 409:
|
||||
response_data = await response.json()
|
||||
warehouse_id = self.marketplace.warehouse_id
|
||||
return await self._method('PUT', f'/api/v3/stocks/{warehouse_id}', data={'stocks': data})
|
||||
|
||||
logging.warning(
|
||||
f'Conflict occurred when sending stocks to [{self.marketplace.id}]')
|
||||
await asyncio.sleep(1)
|
||||
chunk = self._filter_chunk_with_conflict(chunk, response_data)
|
||||
continue
|
||||
await asyncio.sleep(0.2)
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
|
||||
break
|
||||
await self.session.close()
|
||||
|
||||
@@ -28,6 +28,9 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi):
|
||||
}
|
||||
else:
|
||||
access_token = auth_data.get('accessToken')
|
||||
if not access_token:
|
||||
self.is_valid = False
|
||||
return
|
||||
self.limiter_key = str(marketplace.company_id) + str(access_token) + str(self.marketplace.campaign_id)
|
||||
self.headers = {
|
||||
'Authorization': f'OAuth oauth_token="{access_token}", oauth_client_id="{YANDEX_CLIENT_ID}"'
|
||||
@@ -41,44 +44,7 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi):
|
||||
return 'https://api.partner.market.yandex.ru/v2'
|
||||
|
||||
async def update_stocks(self, data: Union[list, dict]):
|
||||
if type(data) is not list:
|
||||
return
|
||||
if not self.is_valid:
|
||||
return
|
||||
campaign_id = self.marketplace.campaign_id
|
||||
max_stocks = 2000
|
||||
chunks = chunk_list(data, max_stocks)
|
||||
if not chunks:
|
||||
return
|
||||
self.init_session()
|
||||
limiter = BatchLimiter()
|
||||
|
||||
async def send_stock_chunk(chunk):
|
||||
try:
|
||||
await limiter.acquire_yandexmarket(self.limiter_key)
|
||||
request_data = {
|
||||
'skus': chunk
|
||||
}
|
||||
response = await self._method('PUT',
|
||||
return await self._method('PUT',
|
||||
f'/campaigns/{campaign_id}/offers/stocks',
|
||||
data=request_data)
|
||||
if response.status != 200:
|
||||
logging.warning(
|
||||
f'Error occurred when sending stocks to [{self.marketplace.id}]')
|
||||
return False
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
|
||||
return False
|
||||
|
||||
tasks = [send_stock_chunk(chunk) for chunk in chunks]
|
||||
first_request = tasks[0]
|
||||
first_response = await first_request
|
||||
if not first_response:
|
||||
logging.error(f'Skipping marketplace [{self.marketplace.id}] because first request was unsuccessful')
|
||||
await self.session.close()
|
||||
return
|
||||
|
||||
await asyncio.gather(*tasks[1:])
|
||||
await self.session.close()
|
||||
data={'skus': data})
|
||||
|
||||
@@ -13,6 +13,7 @@ class StockData(TypedDict):
|
||||
full_stock: int
|
||||
article: Union[str, int]
|
||||
marketplace_product: MarketplaceProduct
|
||||
product_id: int
|
||||
|
||||
|
||||
def get_marketplace_suppliers_and_company_warehouses(marketplace: Marketplace):
|
||||
@@ -267,7 +268,7 @@ async def get_stocks_data(
|
||||
func.coalesce(slaves_stock_subquery.c.slaves_stock, 0).label('slaves_stock'),
|
||||
MarketplaceProduct.price_recommended.label('price_recommended'),
|
||||
MarketplaceProduct.is_archived.label('is_archived'),
|
||||
func.coalesce(fbo_stock_subquery.c.quantity,0).label('fbo_stock')
|
||||
func.coalesce(fbo_stock_subquery.c.quantity, 0).label('fbo_stock')
|
||||
)
|
||||
.select_from(
|
||||
MarketplaceProduct
|
||||
@@ -337,13 +338,16 @@ async def get_stocks_data(
|
||||
'article': denco_article,
|
||||
'full_stock': 0,
|
||||
'marketplace_product': marketplace_product,
|
||||
'product_id': marketplace_product.product_id
|
||||
})
|
||||
continue
|
||||
if fbo_stock> 0 and prefer_fbo_over_fbs:
|
||||
if fbo_stock > 0 and prefer_fbo_over_fbs:
|
||||
response.append({
|
||||
'article': denco_article,
|
||||
'full_stock': 0,
|
||||
'marketplace_product': marketplace_product,
|
||||
'product_id': marketplace_product.product_id
|
||||
|
||||
})
|
||||
continue
|
||||
is_mix = mix_stock is not None
|
||||
@@ -375,5 +379,7 @@ async def get_stocks_data(
|
||||
'article': denco_article,
|
||||
'full_stock': full_stock,
|
||||
'marketplace_product': marketplace_product,
|
||||
'product_id': marketplace_product.product_id
|
||||
|
||||
})
|
||||
return response
|
||||
|
||||
@@ -17,5 +17,7 @@ redis[hiredis]
|
||||
celery[librabbitmq]
|
||||
gevent
|
||||
|
||||
taskiq-aio-pika
|
||||
taskiq-fastapi
|
||||
taskiq-aio-pika==0.4.2
|
||||
taskiq-fastapi==0.3.5
|
||||
taskiq==0.11.17
|
||||
taskiq-pipelines
|
||||
0
sender/__init__.py
Normal file
0
sender/__init__.py
Normal file
114
sender/base.py
Normal file
114
sender/base.py
Normal file
@@ -0,0 +1,114 @@
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import IntEnum
|
||||
from typing import TYPE_CHECKING, TypedDict
|
||||
|
||||
from utils import chunk_list
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from marketplaces.base import BaseMarketplaceApi
|
||||
from updaters.base import BaseMarketplaceUpdater
|
||||
|
||||
|
||||
class SendStockStatus(IntEnum):
|
||||
SUCCESS = 0
|
||||
SHOULD_RETRY = 1
|
||||
ERROR = 2
|
||||
|
||||
|
||||
class StockRequest(TypedDict):
|
||||
request_data: dict
|
||||
product_id: int
|
||||
full_stock: int
|
||||
|
||||
|
||||
class BaseStocksSender(ABC):
|
||||
updater: "BaseMarketplaceUpdater"
|
||||
api: "BaseMarketplaceApi"
|
||||
|
||||
def __init__(self, updater: "BaseMarketplaceUpdater"):
|
||||
self.updater = updater
|
||||
self.api = updater.marketplace_api
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def max_retries(self) -> int:
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def chunk_size(self) -> int:
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_lock(self):
|
||||
lock = self.updater.redis_client.lock(self.updater.get_lock_key(), timeout=60 * 10, blocking=True)
|
||||
return lock
|
||||
|
||||
@abstractmethod
|
||||
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
||||
raise NotImplementedError()
|
||||
|
||||
async def __process_chunk_with_retries(self, chunk: list[dict]) -> SendStockStatus:
|
||||
for attempt in range(self.max_retries + 1):
|
||||
status = await self._process_chunk(chunk)
|
||||
if status != SendStockStatus.SHOULD_RETRY:
|
||||
return status
|
||||
return SendStockStatus.SHOULD_RETRY
|
||||
|
||||
async def process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
||||
try:
|
||||
return await self.__process_chunk_with_retries(chunk)
|
||||
except Exception as e:
|
||||
logging.error(f'[{self.updater.marketplace.id}] {e}')
|
||||
return SendStockStatus.ERROR
|
||||
|
||||
@abstractmethod
|
||||
async def after_chunk_processed(self):
|
||||
return
|
||||
|
||||
@abstractmethod
|
||||
async def before_chunk_processed(self):
|
||||
return
|
||||
|
||||
async def __send(self, chunks: list[list[StockRequest]]) -> list[int]:
|
||||
invalid_product_ids = []
|
||||
is_global_error = False
|
||||
try:
|
||||
for chunk in chunks:
|
||||
request = []
|
||||
product_ids = []
|
||||
for stock in chunk:
|
||||
request.append(stock['request_data'])
|
||||
product_ids.append(stock['product_id'])
|
||||
|
||||
await self.before_chunk_processed()
|
||||
try:
|
||||
status = await self.process_chunk(request)
|
||||
if status == SendStockStatus.ERROR:
|
||||
is_global_error = True
|
||||
break
|
||||
if status == SendStockStatus.SHOULD_RETRY:
|
||||
invalid_product_ids.extend(product_ids)
|
||||
except Exception as e:
|
||||
invalid_product_ids.extend(product_ids)
|
||||
logging.error(f'[{self.updater.marketplace.id}]: {e}')
|
||||
continue
|
||||
await self.after_chunk_processed()
|
||||
except Exception as e:
|
||||
logging.error(f'{self.updater.marketplace.id}: {e}')
|
||||
|
||||
if is_global_error:
|
||||
invalid_product_ids = []
|
||||
for chunk in chunks:
|
||||
for stock in chunk:
|
||||
invalid_product_ids.append(stock['product_id'])
|
||||
return invalid_product_ids
|
||||
|
||||
async def send(self, stocks: list[StockRequest]) -> list[int]:
|
||||
if not stocks:
|
||||
return []
|
||||
chunks = chunk_list(stocks, self.chunk_size)
|
||||
lock = self.get_lock()
|
||||
async with lock:
|
||||
result = await self.__send(chunks)
|
||||
return result
|
||||
26
sender/factory.py
Normal file
26
sender/factory.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from typing import TYPE_CHECKING, Union
|
||||
|
||||
from database.sipro.enums.general import BaseMarketplace
|
||||
from sender.ozon import OzonStocksSender
|
||||
from sender.wildberries import WildberriesStocksSender
|
||||
from sender.yandexmarket import YandexmarketStocksSender
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from updaters.base import BaseMarketplaceUpdater
|
||||
|
||||
|
||||
class SenderFactory:
|
||||
@staticmethod
|
||||
def get_sender(updater: "BaseMarketplaceUpdater") -> Union[
|
||||
WildberriesStocksSender,
|
||||
OzonStocksSender,
|
||||
YandexmarketStocksSender
|
||||
]:
|
||||
match updater.marketplace.base_marketplace:
|
||||
case BaseMarketplace.WILDBERRIES:
|
||||
return WildberriesStocksSender(updater)
|
||||
case BaseMarketplace.OZON:
|
||||
return OzonStocksSender(updater)
|
||||
case BaseMarketplace.YANDEX_MARKET:
|
||||
return YandexmarketStocksSender(updater)
|
||||
raise ValueError()
|
||||
43
sender/ozon.py
Normal file
43
sender/ozon.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from starlette.responses import JSONResponse
|
||||
|
||||
from sender.base import BaseStocksSender, SendStockStatus
|
||||
|
||||
|
||||
class OzonStocksSender(BaseStocksSender):
|
||||
@property
|
||||
def max_retries(self) -> int:
|
||||
return 5
|
||||
|
||||
@property
|
||||
def chunk_size(self) -> int:
|
||||
return 100
|
||||
|
||||
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
||||
response = await self.api.update_stocks(data=chunk)
|
||||
status_code = response.status
|
||||
if status_code == 200:
|
||||
return SendStockStatus.SUCCESS
|
||||
if response.content_type != JSONResponse.media_type:
|
||||
return SendStockStatus.ERROR
|
||||
json_data = await response.json()
|
||||
error_code = json_data.get('code')
|
||||
error_message = json_data.get('message')
|
||||
if error_code == 8:
|
||||
await asyncio.sleep(1)
|
||||
return SendStockStatus.SHOULD_RETRY
|
||||
logging.error(f'[{self.updater.marketplace.id}]: {error_message}')
|
||||
if status_code in [
|
||||
404,
|
||||
500,
|
||||
]:
|
||||
return SendStockStatus.SHOULD_RETRY
|
||||
return SendStockStatus.ERROR
|
||||
|
||||
async def after_chunk_processed(self):
|
||||
return await asyncio.sleep(80 / 100)
|
||||
|
||||
async def before_chunk_processed(self):
|
||||
pass
|
||||
54
sender/wildberries.py
Normal file
54
sender/wildberries.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from marketplaces import WildberriesMarketplaceApi
|
||||
from sender.base import SendStockStatus, BaseStocksSender
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from updaters.base import BaseMarketplaceUpdater
|
||||
|
||||
|
||||
class WildberriesStocksSender(BaseStocksSender):
|
||||
def __init__(self, updater: "BaseMarketplaceUpdater"):
|
||||
super().__init__(updater)
|
||||
self.remaining = 1
|
||||
self.sleep_time = 60 / 300
|
||||
|
||||
@property
|
||||
def max_retries(self) -> int:
|
||||
return 5
|
||||
|
||||
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
||||
response = await self.api.update_stocks(chunk)
|
||||
headers = response.headers
|
||||
status_code = response.status
|
||||
|
||||
if status_code in [
|
||||
401, # Invalid token
|
||||
403, # Access denied
|
||||
404, # Not found
|
||||
400, # Other
|
||||
]:
|
||||
return SendStockStatus.ERROR
|
||||
|
||||
# If there is rate limit
|
||||
if status_code == 429:
|
||||
delay_time = float(headers.get('X-Ratelimit-Reset', self.sleep_time))
|
||||
await asyncio.sleep(delay_time)
|
||||
self.remaining = int(headers.get('X-Ratelimit-Limit', 1))
|
||||
return SendStockStatus.SHOULD_RETRY
|
||||
self.remaining = int(headers.get('X-Ratelimit-Remaining', 0))
|
||||
return SendStockStatus.SUCCESS
|
||||
|
||||
async def after_chunk_processed(self):
|
||||
if self.remaining <= 0:
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
self.remaining = 1
|
||||
|
||||
async def before_chunk_processed(self):
|
||||
if self.remaining <= 0:
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
self.remaining = 1
|
||||
@property
|
||||
def chunk_size(self) -> int:
|
||||
return 1000
|
||||
51
sender/yandexmarket.py
Normal file
51
sender/yandexmarket.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sender.base import BaseStocksSender, SendStockStatus
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from updaters.base import BaseMarketplaceUpdater
|
||||
|
||||
|
||||
class YandexmarketStocksSender(BaseStocksSender):
|
||||
def __init__(self, updater: "BaseMarketplaceUpdater"):
|
||||
super().__init__(updater)
|
||||
|
||||
self.start_time = time.time()
|
||||
|
||||
self.total_stocks_sent = 0
|
||||
|
||||
@property
|
||||
def max_retries(self) -> int:
|
||||
return 5
|
||||
|
||||
@property
|
||||
def chunk_size(self) -> int:
|
||||
return 2000
|
||||
|
||||
async def _process_chunk(self, chunk: list[dict]) -> SendStockStatus:
|
||||
response = await self.api.update_stocks(chunk)
|
||||
status_code = response.status
|
||||
if status_code == 200:
|
||||
self.total_stocks_sent += len(chunk)
|
||||
return SendStockStatus.SUCCESS
|
||||
if status_code == 420:
|
||||
time_to_sleep = 60 - (time.time() - self.start_time)
|
||||
await asyncio.sleep(time_to_sleep)
|
||||
return SendStockStatus.SHOULD_RETRY
|
||||
response_text = await response.text()
|
||||
logging.error(f'[{self.updater.marketplace.id}]: {response_text}')
|
||||
return SendStockStatus.ERROR
|
||||
|
||||
async def after_chunk_processed(self):
|
||||
time_delta = time.time() - self.start_time
|
||||
if self.total_stocks_sent >= 100_000 and time_delta < 60:
|
||||
time_to_sleep = 60 - time_delta
|
||||
await asyncio.sleep(time_to_sleep)
|
||||
self.start_time = time.time()
|
||||
self.total_stocks_sent = 0
|
||||
|
||||
async def before_chunk_processed(self):
|
||||
pass
|
||||
@@ -54,7 +54,7 @@ fi
|
||||
|
||||
# Start the Taskiq worker
|
||||
log_info "Starting Taskiq worker..."
|
||||
taskiq worker background:taskiq_broker background.tasks --max-async-task 5000 --max-threadpool-threads 8 --max-prefetch 10000
|
||||
taskiq worker background:taskiq_broker background.tasks --max-async-task 5000 --max-threadpool-threads 8 --max-prefetch 10000 --workers 1
|
||||
|
||||
# Log when the Taskiq worker stops
|
||||
log_info "Taskiq worker stopped"
|
||||
111
updaters/base.py
111
updaters/base.py
@@ -1,65 +1,131 @@
|
||||
import time
|
||||
import json
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List
|
||||
from typing import List, Optional
|
||||
|
||||
import redis.asyncio
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
import queries.general
|
||||
from database import Marketplace
|
||||
from limiter import redis_client
|
||||
from marketplaces import MarketplaceApiFactory
|
||||
from marketplaces.base import BaseMarketplaceApi
|
||||
from queries.general import StockData
|
||||
from schemas.general import StockUpdate
|
||||
from sender.base import StockRequest
|
||||
from sender.factory import SenderFactory
|
||||
|
||||
|
||||
class BaseMarketplaceUpdater(ABC):
|
||||
marketplace: Marketplace
|
||||
marketplace_api: BaseMarketplaceApi
|
||||
session: AsyncSession
|
||||
lock_key: Optional[str]
|
||||
cache_key: Optional[str]
|
||||
redis_client: redis.asyncio.Redis
|
||||
|
||||
def __init__(self, marketplace: Marketplace, session: AsyncSession):
|
||||
self.marketplace = marketplace
|
||||
self.session = session
|
||||
self.marketplace_api = MarketplaceApiFactory.get_marketplace_api(marketplace)
|
||||
self.redis_client = redis_client.get_client()
|
||||
self.sender = SenderFactory.get_sender(self)
|
||||
|
||||
self.cache_key = None
|
||||
self.lock_key = None
|
||||
|
||||
def is_valid_updater(self) -> bool:
|
||||
if not self.marketplace_api:
|
||||
return False
|
||||
return self.marketplace_api.is_valid
|
||||
|
||||
@abstractmethod
|
||||
def get_update_for_marketplace(self,
|
||||
stock_data: StockData) -> dict:
|
||||
def get_stock_request(self,
|
||||
stock_data: StockData) -> StockRequest:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _get_identifier(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
def __get_base_marketplace_key(self):
|
||||
base_marketplace = 'wb'
|
||||
if self.marketplace.base_marketplace == 1:
|
||||
base_marketplace = 'ozon'
|
||||
elif self.marketplace.base_marketplace == 2:
|
||||
base_marketplace = 'yandexmarket'
|
||||
return base_marketplace
|
||||
|
||||
def get_lock_key(self) -> str:
|
||||
identifier = self._get_identifier()
|
||||
base_marketplace = self.__get_base_marketplace_key()
|
||||
return f'{base_marketplace}_{identifier}_lock'
|
||||
|
||||
def get_cache_key(self):
|
||||
identifier = self._get_identifier()
|
||||
base_marketplace = self.__get_base_marketplace_key()
|
||||
return f'{base_marketplace}_{self.marketplace.warehouse_id}_{identifier}_cache'
|
||||
|
||||
def get_auth_data(self) -> dict:
|
||||
try:
|
||||
return json.loads(self.marketplace.auth_data)
|
||||
except Exception as e:
|
||||
return {}
|
||||
|
||||
async def filter_stocks_data(self, stock_data_list: list[StockData]) -> list[StockData]:
|
||||
cached_stocks: dict = await self.redis_client.hgetall(self.get_cache_key())
|
||||
cached_stocks = {int(k): int(v) for k, v in cached_stocks.items()}
|
||||
result = []
|
||||
for stock_data in stock_data_list:
|
||||
cached_stock = cached_stocks.get(stock_data['product_id'])
|
||||
if cached_stock is not None and cached_stock == stock_data['full_stock']:
|
||||
continue
|
||||
result.append(stock_data)
|
||||
return result
|
||||
|
||||
async def after_sender_sent(self, stock_requests: list[StockRequest], invalid_product_ids: list[int]):
|
||||
stock_requests = list(filter(lambda stock: stock['product_id'] not in invalid_product_ids, stock_requests))
|
||||
mapping = {stock['product_id']: stock['full_stock'] for stock in stock_requests}
|
||||
await self.redis_client.hset(self.get_cache_key(), mapping=mapping)
|
||||
|
||||
async def get_marketplace_updates(self, stock_data_list: list[StockData]) -> list[StockRequest]:
|
||||
marketplace_updates = []
|
||||
for stock_data in stock_data_list:
|
||||
marketplace_update = self.get_stock_request(stock_data)
|
||||
marketplace_updates.append(marketplace_update)
|
||||
return marketplace_updates
|
||||
|
||||
async def update(self, updates: List[StockUpdate]):
|
||||
if not self.marketplace_api:
|
||||
return
|
||||
product_ids = list(set([update.product_id for update in updates]))
|
||||
await self.update_products(product_ids)
|
||||
|
||||
async def update_products(self, product_ids: list[int]):
|
||||
if not self.is_valid_updater():
|
||||
return
|
||||
stock_data_list = await queries.general.get_stocks_data(
|
||||
session=self.session,
|
||||
marketplace=self.marketplace,
|
||||
product_ids=product_ids
|
||||
)
|
||||
marketplace_updates = []
|
||||
for stock_data in stock_data_list:
|
||||
marketplace_update = self.get_update_for_marketplace(stock_data)
|
||||
marketplace_updates.append(marketplace_update)
|
||||
await self.marketplace_api.update_stocks(marketplace_updates)
|
||||
stock_data_list = await self.filter_stocks_data(stock_data_list)
|
||||
stock_requests = await self.get_marketplace_updates(stock_data_list)
|
||||
invalid_product_ids = await self.sender.send(stock_requests)
|
||||
await self.after_sender_sent(stock_requests, invalid_product_ids)
|
||||
|
||||
async def update_all(self):
|
||||
if not self.marketplace_api:
|
||||
if not self.is_valid_updater():
|
||||
return
|
||||
stock_data_list = await queries.general.get_stocks_data(
|
||||
session=self.session,
|
||||
marketplace=self.marketplace,
|
||||
)
|
||||
marketplace_updates = []
|
||||
for stock_data in stock_data_list:
|
||||
marketplace_update = self.get_update_for_marketplace(stock_data)
|
||||
marketplace_updates.append(marketplace_update)
|
||||
await self.marketplace_api.update_stocks(marketplace_updates)
|
||||
stock_data_list = await self.filter_stocks_data(stock_data_list)
|
||||
stock_requests = await self.get_marketplace_updates(stock_data_list)
|
||||
invalid_product_ids = await self.sender.send(stock_requests)
|
||||
await self.after_sender_sent(stock_requests, invalid_product_ids)
|
||||
|
||||
async def get_all_stocks(self, only_available: bool) -> List[StockData]:
|
||||
if not self.marketplace_api:
|
||||
if not self.is_valid_updater():
|
||||
return []
|
||||
stock_data_list = await queries.general.get_stocks_data(
|
||||
session=self.session,
|
||||
@@ -74,14 +140,11 @@ class BaseMarketplaceUpdater(ABC):
|
||||
return stock_data_list
|
||||
|
||||
async def reset(self):
|
||||
if not self.marketplace_api:
|
||||
if not self.is_valid_updater():
|
||||
return
|
||||
stock_data_list = await queries.general.get_stocks_data(
|
||||
session=self.session,
|
||||
marketplace=self.marketplace
|
||||
)
|
||||
marketplace_updates = []
|
||||
for stock_data in stock_data_list:
|
||||
marketplace_update = self.get_update_for_marketplace(stock_data)
|
||||
marketplace_updates.append(marketplace_update)
|
||||
await self.marketplace_api.update_stocks(marketplace_updates)
|
||||
stock_requests = await self.get_marketplace_updates(stock_data_list)
|
||||
await self.sender.send(stock_requests)
|
||||
|
||||
@@ -23,3 +23,4 @@ class UpdaterFactory:
|
||||
return OzonUpdater(marketplace, session)
|
||||
case BaseMarketplace.YANDEX_MARKET:
|
||||
return YandexmarketUpdater(marketplace, session)
|
||||
raise ValueError()
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
from queries.general import StockData
|
||||
from sender.base import StockRequest
|
||||
from updaters.base import BaseMarketplaceUpdater
|
||||
|
||||
|
||||
class OzonUpdater(BaseMarketplaceUpdater):
|
||||
def get_update_for_marketplace(self, data: StockData) -> dict:
|
||||
|
||||
def _get_identifier(self) -> str:
|
||||
auth_data = self.get_auth_data()
|
||||
client_id = auth_data.get('clientId', -1)
|
||||
return str(client_id)
|
||||
|
||||
def get_stock_request(self, stock_data: StockData) -> StockRequest:
|
||||
return {
|
||||
'offer_id': str(data['article']),
|
||||
'stock': data['full_stock'],
|
||||
'request_data': {
|
||||
'offer_id': str(stock_data['article']),
|
||||
'stock': stock_data['full_stock'],
|
||||
'warehouse_id': self.marketplace.warehouse_id
|
||||
},
|
||||
'product_id': stock_data['product_id'],
|
||||
'full_stock': stock_data['full_stock']
|
||||
}
|
||||
|
||||
@@ -1,11 +1,32 @@
|
||||
import json
|
||||
|
||||
import jwt
|
||||
from celery.backends.database import retry
|
||||
|
||||
from queries.general import StockData
|
||||
from sender.base import StockRequest
|
||||
from updaters.base import BaseMarketplaceUpdater
|
||||
|
||||
|
||||
class WildberriesUpdater(BaseMarketplaceUpdater):
|
||||
|
||||
def get_update_for_marketplace(self, stock_data: StockData) -> dict:
|
||||
def _get_identifier(self) -> str:
|
||||
try:
|
||||
auth_data = self.get_auth_data()
|
||||
token = auth_data.get('token')
|
||||
decoded_token = jwt.decode(token, algorithms=["HS256"], options={"verify_signature": False})
|
||||
sid = decoded_token.get('sid')
|
||||
return str(sid)
|
||||
except Exception:
|
||||
return f'-1'
|
||||
|
||||
def get_stock_request(self, stock_data: StockData) -> StockRequest:
|
||||
return {
|
||||
'request_data': {
|
||||
'sku': stock_data['marketplace_product'].third_additional_article,
|
||||
'amount': stock_data['full_stock']
|
||||
},
|
||||
'product_id': stock_data['product_id'],
|
||||
|
||||
'full_stock': stock_data['full_stock']
|
||||
}
|
||||
|
||||
@@ -1,13 +1,23 @@
|
||||
from queries.general import StockData
|
||||
from sender.base import StockRequest
|
||||
from updaters.base import BaseMarketplaceUpdater
|
||||
|
||||
|
||||
class YandexmarketUpdater(BaseMarketplaceUpdater):
|
||||
def get_update_for_marketplace(self, stock_data: StockData) -> dict:
|
||||
|
||||
def _get_identifier(self) -> str:
|
||||
return str(self.marketplace.campaign_id)
|
||||
|
||||
def get_stock_request(self, stock_data: StockData) -> StockRequest:
|
||||
return {
|
||||
'request_data': {
|
||||
'sku': str(stock_data['article']),
|
||||
'warehouseId': self.marketplace.warehouse_id,
|
||||
'items': [{
|
||||
'count': stock_data['full_stock'],
|
||||
}]
|
||||
},
|
||||
'product_id': stock_data['product_id'],
|
||||
'full_stock': stock_data['full_stock']
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user