import asyncio import json import logging from typing import Union from backend.config import YANDEX_CLIENT_ID from database import Marketplace from limiter import BatchLimiter from marketplaces.base import BaseMarketplaceApi from utils import chunk_list class YandexmarketMarketplaceApi(BaseMarketplaceApi): def __init__(self, marketplace: Marketplace): self.marketplace = marketplace self.is_valid = True try: auth_data = json.loads(marketplace.auth_data) except Exception: logging.error(f"Couldn't load auth data for marketplace [{self.marketplace.id}]") self.is_valid = False return access_token = auth_data.get('accessToken') self.limiter_key = str(marketplace.company_id) + str(access_token) + str(self.marketplace.campaign_id) self.headers = { 'Authorization': f'OAuth oauth_token="{access_token}", oauth_client_id="{YANDEX_CLIENT_ID}"' } def get_headers(self): return self.headers @property def api_url(self): return 'https://api.partner.market.yandex.ru/v2' async def update_stocks(self, data: Union[list, dict]): if type(data) is not list: return if not self.is_valid: return campaign_id = self.marketplace.campaign_id max_stocks = 2000 chunks = chunk_list(data, max_stocks) if not chunks: return limiter = BatchLimiter() async def send_stock_chunk(chunk): try: await limiter.acquire_yandexmarket(self.limiter_key) request_data = { 'skus': chunk } response = await self._method('PUT', f'/campaigns/{campaign_id}/offers/stocks', data=request_data) if response.status != 200: logging.warning( f'Error occurred when sending stocks to [{self.marketplace.id}]') return False return True except Exception as e: logging.error( f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') return False tasks = [send_stock_chunk(chunk) for chunk in chunks] first_request = tasks[0] first_response = await first_request if not first_response: logging.error(f'Skipping marketplace [{self.marketplace.id}] because first request was unsuccessful') return await asyncio.gather(*tasks[1:])