From c08c2c04c4eaf56f2a9d9389fa1434f591c63975 Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 13 Apr 2025 13:50:02 +0300 Subject: [PATCH] feat: ym sync --- external/marketplace/yandex/core.py | 119 +++++++++++++++++- marketplaces/yandex/core.py | 183 +++++++++++++++++++++++++++- routers/marketplace.py | 11 ++ schemas/marketplace.py | 13 ++ services/marketplace.py | 23 +++- 5 files changed, 344 insertions(+), 5 deletions(-) diff --git a/external/marketplace/yandex/core.py b/external/marketplace/yandex/core.py index fc839fc..852e334 100644 --- a/external/marketplace/yandex/core.py +++ b/external/marketplace/yandex/core.py @@ -1,15 +1,128 @@ +from typing import AsyncIterator, Optional + from external.marketplace.base.core import BaseMarketplaceApi from models import Marketplace +from utils.list_utils import chunk_list class YandexMarketplaceApi(BaseMarketplaceApi): def __init__(self, marketplace: Marketplace): - pass + token = marketplace.auth_data.get('Api-Key') + if not token: + raise ValueError( + f"Authorization token is missing for Marketplace ID: {marketplace.id}. " + "Please check the marketplace credentials." + ) + self.token = token + self.headers = {'Api-Key': token} + self.marketplace = marketplace @property def get_headers(self) -> dict: - return {} + return self.headers @property def base_url(self) -> str: - return "" + return 'https://api.partner.market.yandex.ru' + + def _get_campaign_id(self) -> Optional[int]: + campaign_id: Optional[str] = self.marketplace.auth_data.get('CampaignId') + if not campaign_id: + return None + if not str(campaign_id).isdigit(): + return None + return int(campaign_id) + + async def get_campaigns(self) -> AsyncIterator[dict]: + method = '/campaigns' + page = 1 + while True: + params = { + 'page': page, + } + response = await self._method('GET', method, params=params) + if not response: + break + campaigns = response.get('campaigns') + if not campaigns: + break + for campaign in campaigns: + yield campaign + + pager = response.get('pager') + if not pager: + break + pages_count = pager.get('pagesCount') + if not pages_count: + break + if page >= pages_count: + break + page += 1 + + async def get_business_id(self, campaign_id: int) -> Optional[int]: + async for campaign in self.get_campaigns(): + if campaign['id'] == campaign_id: + return campaign['business']['id'] + return None + + async def get_all_products(self) -> AsyncIterator[dict]: + campaign_id = self._get_campaign_id() + if not campaign_id: + return + business_id = await self.get_business_id(campaign_id) + if not business_id: + return + + method = f'/businesses/{business_id}/offer-mappings' + limit = 200 + page_token = '' + while True: + params = { + 'limit': limit, + 'page_token': page_token, + } + response = await self._method('POST', method, params=params) + if not response: + break + response = response.get('result') + if not response: + break + offers = response.get('offerMappings') + if not offers: + break + for offer in offers: + yield offer + paging = response.get('paging') + if not paging: + break + + next_page_token = paging.get('nextPageToken') + if not next_page_token: + break + page_token = next_page_token + + async def get_products_by_offer_ids(self, offer_ids: list[str]) -> AsyncIterator[dict]: + campaign_id = self._get_campaign_id() + if not campaign_id: + return + business_id = await self.get_business_id(campaign_id) + if not business_id: + return + + method = f'/businesses/{business_id}/offer-mappings' + limit = 200 + for chunk in chunk_list(offer_ids, limit): + params = { + 'offer_ids': chunk, + } + response = await self._method('POST', method, params=params) + if not response: + break + response = response.get('result') + if not response: + break + offers = response.get('offerMappings') + if not offers: + break + for offer in offers: + yield offer diff --git a/marketplaces/yandex/core.py b/marketplaces/yandex/core.py index 87f4090..d32df60 100644 --- a/marketplaces/yandex/core.py +++ b/marketplaces/yandex/core.py @@ -1,6 +1,187 @@ +from typing import Optional + +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from external.marketplace import YandexMarketplaceApi +from external.marketplace.base.product_synchronizer import BaseProductSynchronizer from marketplaces.base.core import BaseMarketplaceController +from models import Product, YandexProduct, ProductBarcode, ProductImage + + +class YandexProductSynchronizer(BaseProductSynchronizer): + api: YandexMarketplaceApi + + def _try_get_param(self, offer: dict, param: str) -> Optional[str]: + params = offer.get('params') + if not params: + return None + for p in params: + if p['name'] == param: + return str(p['value']) + return None + + def _create_product(self, offer: dict) -> Product: + return Product( + client_id=self.marketplace.client_id, + name=offer['name'], + article=offer['offerId'], + brand=self._try_get_param(offer, 'Бренд'), + size=self._try_get_param(offer, 'Размер'), + color=self._try_get_param(offer, 'Цвет товара'), + composition=self._try_get_param(offer, 'Состав материала'), + ) + + def _create_barcodes(self, product: Product, offer: dict): + barcodes = [] + for sku in offer['barcodes']: + barcode = ProductBarcode( + product=product, + barcode=sku + ) + barcodes.append(barcode) + return barcodes + + def _create_images(self, product: Product, offer: dict): + product_images = [] + images = offer.get('pictures', []) + for image in images[:1]: + product_image = ProductImage( + product=product, + image_url=image + ) + product_images.append(product_image) + return product_images + + def _create_ym_product(self, product: Product): + return YandexProduct( + marketplace_id=self.marketplace.id, + product=product, + ) + + async def create_products(self): + self._clear() + + synchronized_articles = await self._get_synchronized_products() + async for product in self.api.get_all_products(): + try: + offer = product.get('offer') + if not offer: + continue + if offer['offerId'] in synchronized_articles: + continue + product = self._create_product(offer) + self.products.append(product) + barcodes = self._create_barcodes(product, offer) + product.barcodes.extend(barcodes) + images = self._create_images(product, offer) + product.images.extend(images) + ym_product = self._create_ym_product(product) + self.marketplace_products.append(ym_product) + except Exception as e: + print(e) + await self._write() + + def _update_barcodes(self, product: Product, offer: dict): + existing_barcodes = {barcode.barcode for barcode in product.barcodes} + new_barcodes = [] + for barcode in offer['barcodes']: + if barcode not in existing_barcodes: + barcode = ProductBarcode( + product=product, + barcode=barcode + ) + new_barcodes.append(barcode) + return new_barcodes + + def _update_images(self, product: Product, offer: dict): + existing_images = {image.image_url for image in product.images} + new_images = [] + images = offer.get('pictures', []) + for image in images[:1]: + if image not in existing_images: + product_image = ProductImage( + product=product, + image_url=image + ) + new_images.append(product_image) + return new_images + + async def _update_product(self, product: Product, offer: dict): + product.name = offer['name'] + product.brand = self._try_get_param(offer, 'Бренд') + product.size = self._try_get_param(offer, 'Размер') + product.color = self._try_get_param(offer, 'Цвет товара') + product.composition = self._try_get_param(offer, 'Состав материала') + + barcodes = self._update_barcodes(product, offer) + product.barcodes.extend(barcodes) + images = self._update_images(product, offer) + product.images.extend(images) + + async def synchronize_products(self): + self._clear() + synchronized_products = ( + select( + Product + ) + .options( + selectinload(Product.barcodes), + selectinload(Product.images), + ) + .select_from( + YandexProduct + ) + .join( + Product + ) + .where( + YandexProduct.marketplace_id == self.marketplace.id + ) + ) + result = await self.session.execute(synchronized_products) + synchronized_products = result.scalars().all() + synchronized_products_dict = {product.article: product for product in synchronized_products} + synchronized_articles = list(set(synchronized_products_dict.keys())) + async for product in self.api.get_products_by_offer_ids(synchronized_articles): + try: + offer = product.get('offer') + if not offer: + continue + article = offer['offerId'] + if article not in synchronized_articles: + continue + product = synchronized_products_dict[article] + await self._update_product(product, offer) + except Exception as e: + print(f'Error: {e}') + continue + await self._write() + + async def _get_synchronized_products(self): + stmt = ( + select( + Product.article + ) + .select_from( + YandexProduct + ) + .join( + Product + ) + ) + result = await self.session.execute(stmt) + return set(result.scalars().all()) class YandexController(BaseMarketplaceController): + + def __init__(self, session, marketplace): + super().__init__(session, marketplace) + self.synchronizer = YandexProductSynchronizer(session, marketplace, self.api) + + async def synchronize_products(self): + await self.synchronizer.synchronize_products() + async def create_products(self): - pass + await self.synchronizer.create_products() diff --git a/routers/marketplace.py b/routers/marketplace.py index bfe5f05..3b786ab 100644 --- a/routers/marketplace.py +++ b/routers/marketplace.py @@ -69,3 +69,14 @@ async def update( request: UpdateMarketplaceRequest ): return await MarketplaceService(session).update_marketplace(request) + +@marketplace_router.post( + '/yandex-market/get-campaigns', + operation_id='get_yandex_market_campaigns', + response_model=GetYandexMarketCampaignsResponse +) +async def get_yandex_market_campaigns( + session: SessionDependency, + request: GetYandexMarketCampaignsRequest +): + return await MarketplaceService(session).get_yandex_market_campaigns(request) diff --git a/schemas/marketplace.py b/schemas/marketplace.py index 3ef5ead..971924d 100644 --- a/schemas/marketplace.py +++ b/schemas/marketplace.py @@ -29,6 +29,11 @@ class MarketplaceSchema(MarketplaceMixin): id: int +class YandexMarketCampaignSchema(BaseSchema): + id: int + name: str + + # endregion # region Requests @@ -48,6 +53,10 @@ class UpdateMarketplaceRequest(BaseSchema): marketplace: MarketplaceSchema +class GetYandexMarketCampaignsRequest(BaseSchema): + api_key: str + + # endregion # region Responses @@ -66,7 +75,11 @@ class CreateMarketplaceResponse(OkMessageSchema): class DeleteMarketplaceResponse(OkMessageSchema): pass + class UpdateMarketplaceResponse(OkMessageSchema): pass + +class GetYandexMarketCampaignsResponse(BaseSchema): + campaigns: List[YandexMarketCampaignSchema] # endregion diff --git a/services/marketplace.py b/services/marketplace.py index c3d15ca..ab33474 100644 --- a/services/marketplace.py +++ b/services/marketplace.py @@ -1,7 +1,9 @@ -from pyexpat.errors import messages +from types import SimpleNamespace + from sqlalchemy import select, insert, delete, update from sqlalchemy.orm import joinedload +from external.marketplace import YandexMarketplaceApi from models import BaseMarketplace, Marketplace from schemas.marketplace import * from services.base import BaseService @@ -92,3 +94,22 @@ class MarketplaceService(BaseService): except Exception as e: await self.session.rollback() return UpdateMarketplaceResponse(ok=False, message=str(e)) + + async def get_yandex_market_campaigns(self, + request: GetYandexMarketCampaignsRequest) -> GetYandexMarketCampaignsResponse: + try: + marketplace = SimpleNamespace() + marketplace.auth_data = { + 'Api-Key': request.api_key + } + ym_api = YandexMarketplaceApi(marketplace) + campaigns = [] + async for campaign in ym_api.get_campaigns(): + campaigns.append({ + 'id': campaign.get('id'), + 'name': campaign.get('domain'), + }) + return GetYandexMarketCampaignsResponse(campaigns=campaigns) + except Exception as e: + print(e) + return GetYandexMarketCampaignsResponse(campaigns=[])