Files
Fulfillment-Backend/external/marketplace/yandex/core.py
2025-04-13 13:50:17 +03:00

129 lines
4.0 KiB
Python

from typing import AsyncIterator, Optional
from external.marketplace.base.core import BaseMarketplaceApi
from models import Marketplace
from utils.list_utils import chunk_list
class YandexMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace):
token = marketplace.auth_data.get('Api-Key')
if not token:
raise ValueError(
f"Authorization token is missing for Marketplace ID: {marketplace.id}. "
"Please check the marketplace credentials."
)
self.token = token
self.headers = {'Api-Key': token}
self.marketplace = marketplace
@property
def get_headers(self) -> dict:
return self.headers
@property
def base_url(self) -> str:
return 'https://api.partner.market.yandex.ru'
def _get_campaign_id(self) -> Optional[int]:
campaign_id: Optional[str] = self.marketplace.auth_data.get('CampaignId')
if not campaign_id:
return None
if not str(campaign_id).isdigit():
return None
return int(campaign_id)
async def get_campaigns(self) -> AsyncIterator[dict]:
method = '/campaigns'
page = 1
while True:
params = {
'page': page,
}
response = await self._method('GET', method, params=params)
if not response:
break
campaigns = response.get('campaigns')
if not campaigns:
break
for campaign in campaigns:
yield campaign
pager = response.get('pager')
if not pager:
break
pages_count = pager.get('pagesCount')
if not pages_count:
break
if page >= pages_count:
break
page += 1
async def get_business_id(self, campaign_id: int) -> Optional[int]:
async for campaign in self.get_campaigns():
if campaign['id'] == campaign_id:
return campaign['business']['id']
return None
async def get_all_products(self) -> AsyncIterator[dict]:
campaign_id = self._get_campaign_id()
if not campaign_id:
return
business_id = await self.get_business_id(campaign_id)
if not business_id:
return
method = f'/businesses/{business_id}/offer-mappings'
limit = 200
page_token = ''
while True:
params = {
'limit': limit,
'page_token': page_token,
}
response = await self._method('POST', method, params=params)
if not response:
break
response = response.get('result')
if not response:
break
offers = response.get('offerMappings')
if not offers:
break
for offer in offers:
yield offer
paging = response.get('paging')
if not paging:
break
next_page_token = paging.get('nextPageToken')
if not next_page_token:
break
page_token = next_page_token
async def get_products_by_offer_ids(self, offer_ids: list[str]) -> AsyncIterator[dict]:
campaign_id = self._get_campaign_id()
if not campaign_id:
return
business_id = await self.get_business_id(campaign_id)
if not business_id:
return
method = f'/businesses/{business_id}/offer-mappings'
limit = 200
for chunk in chunk_list(offer_ids, limit):
params = {
'offer_ids': chunk,
}
response = await self._method('POST', method, params=params)
if not response:
break
response = response.get('result')
if not response:
break
offers = response.get('offerMappings')
if not offers:
break
for offer in offers:
yield offer