diff --git a/external/marketplace/base/product_synchronizer.py b/external/marketplace/base/product_synchronizer.py new file mode 100644 index 0000000..16ee4ab --- /dev/null +++ b/external/marketplace/base/product_synchronizer.py @@ -0,0 +1,40 @@ +from abc import ABC, abstractmethod + +from sqlalchemy.ext.asyncio import AsyncSession + +from external.marketplace.base import BaseMarketplaceApi +from models import Product, ProductBarcode, ProductImage + + +class BaseProductSynchronizer(ABC): + products: list[Product] = [] + barcodes: list[ProductBarcode] = [] + images: list[ProductImage] = [] + marketplace_products: list = [] + api: BaseMarketplaceApi + session: AsyncSession + + def __init__(self, session, marketplace, api): + self.session = session + self.marketplace = marketplace + self.api = api + self._clear() + + def _clear(self): + self.products = [] + self.barcodes = [] + self.images = [] + self.marketplace_products = [] + + async def _write(self): + instances = self.products + self.marketplace_products + self.barcodes + self.images + self.session.add_all(instances) + await self.session.commit() + + @abstractmethod + async def create_products(self): + pass + + @abstractmethod + async def synchronize_products(self): + pass diff --git a/external/marketplace/ozon/core.py b/external/marketplace/ozon/core.py index f51d011..2dd82d0 100644 --- a/external/marketplace/ozon/core.py +++ b/external/marketplace/ozon/core.py @@ -1,15 +1,61 @@ +from typing import AsyncIterator + from external.marketplace.base.core import BaseMarketplaceApi from models import Marketplace class OzonMarketplaceApi(BaseMarketplaceApi): def __init__(self, marketplace: Marketplace): - pass + client_id = marketplace.auth_data.get('Client-Id') + api_key = marketplace.auth_data.get('Api-Key') + if not client_id or not api_key: + raise ValueError( + f"Client-Id or Api-Key is missing for Marketplace ID: {marketplace.id}. " + "Please check the marketplace credentials." + ) + self.headers = marketplace.auth_data + self.marketplace = marketplace @property def get_headers(self) -> dict: - return {} + return self.headers @property def base_url(self) -> str: - return "" + return "https://api-seller.ozon.ru" + + async def get_products(self, data: dict) -> dict: + method = '/v2/product/list' + response = await self._method('POST', method, json=data) + return response + + async def get_all_products(self) -> AsyncIterator[dict]: + limit = 100 + last_id = '' + while True: + data = { + 'limit': limit, + 'last_id': last_id, + + } + response = await self.get_products(data) + if not response: + break + result = response.get('result') + if not result: + break + items = result.get('items') + if not items: + break + for item in items: + yield item + last_id = result.get('last_id') + if not last_id: + break + + async def get_products_info(self, data: dict) -> dict: + method = '/v2/product/info/list' + response = await self._method('POST', method, json=data) + return response + + \ No newline at end of file diff --git a/marketplaces/ozon/core.py b/marketplaces/ozon/core.py index e9468a8..cf1d58e 100644 --- a/marketplaces/ozon/core.py +++ b/marketplaces/ozon/core.py @@ -1,6 +1,190 @@ +from collections import defaultdict + +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from external.marketplace import OzonMarketplaceApi +from external.marketplace.base.product_synchronizer import BaseProductSynchronizer from marketplaces.base.core import BaseMarketplaceController +from models import OzonProduct, ProductImage, ProductBarcode, Product +from utils.list_utils import chunk_list + + +class OzonProductSynchronizer(BaseProductSynchronizer): + api: OzonMarketplaceApi + marketplace_products: list[OzonProduct] + + def _create_product(self, product_info): + return Product( + client_id=self.marketplace.client_id, + name=product_info['name'], + article=product_info['offer_id'], + ) + + def _create_barcodes(self, product, param): + barcodes = [] + for sku in param: + barcode = ProductBarcode( + product=product, + barcode=sku + ) + barcodes.append(barcode) + return barcodes + + def _create_images(self, product, product_info): + primary_image = product_info.get('primary_image') + if primary_image: + image = ProductImage( + product=product, + image_url=primary_image + ) + return [image] + product_images = [] + images = product_info.get('images') or [] + for image in images[:1]: + product_image = ProductImage( + product=product, + image_url=image + ) + product_images.append(product_image) + return product_images + + def _create_ozon_product(self, product, product_info): + return OzonProduct( + marketplace_id=self.marketplace.id, + product=product, + ozon_product_id=product_info['id'], + ) + + def _process_product(self, product_info): + product = self._create_product(product_info) + barcodes = self._create_barcodes(product, product_info['barcodes']) + images = self._create_images(product, product_info) + ozon_product = self._create_ozon_product(product, product_info) + + self.products.append(product) + self.barcodes.extend(barcodes) + self.images.extend(images) + self.marketplace_products.append(ozon_product) + + async def create_products(self): + self._clear() + product_ids = [] + + async for product in self.api.get_all_products(): + product_ids.append(product['product_id']) + if len(product_ids) > 100: + break + max_products = 1000 + for chunk in chunk_list(product_ids, max_products): + data = { + 'product_id': chunk + } + products_info = await self.api.get_products_info(data) + if not products_info: + continue + result = products_info.get('result') + if not result: + continue + items = result.get('items') + for product_info in items: + product = self._create_product(product_info) + self.products.append(product) + barcodes = self._create_barcodes(product, product_info['barcodes']) + self.barcodes.extend(barcodes) + images = self._create_images(product, product_info) + self.images.extend(images) + ozon_product = self._create_ozon_product(product, product_info) + self.marketplace_products.append(ozon_product) + + await self._write() + + async def synchronize_products(self): + self._clear() + synchronized_products_stmt = ( + select( + Product + ) + .join( + OzonProduct + ).options( + selectinload(Product.barcodes), + selectinload(Product.ozon_products) + ) + .where( + OzonProduct.marketplace_id == self.marketplace.id + ) + ) + synchronized_products = await self.session.execute(synchronized_products_stmt) + synchronized_products = synchronized_products.scalars().all() + synchronized_products_ozon_id_dict = defaultdict(list) + for product in synchronized_products: + for ozon_product in product.ozon_products: + synchronized_products_ozon_id_dict[ozon_product.ozon_product_id].append(product) + synchronized_ozon_ids = list(synchronized_products_ozon_id_dict.keys()) + max_products = 1000 + for chunk in chunk_list(synchronized_ozon_ids, max_products): + data = { + 'product_id': chunk + } + products_info = await self.api.get_products_info(data) + if not products_info: + continue + result = products_info.get('result') + if not result: + continue + items = result.get('items') + for product_info in items: + ozon_id = product_info['id'] + if ozon_id not in synchronized_ozon_ids: + continue + products = synchronized_products_ozon_id_dict.get(ozon_id) + if not products: + continue + product = products[0] + await self._update_product_info(product, product_info) + await self._write() + + async def _update_product_info(self, product, product_info): + product.name = product_info['name'] + product.article = product_info['offer_id'] + barcodes = self._update_barcodes(product, product_info['barcodes']) + self.barcodes.extend(barcodes) + images = self._update_images(product, product_info) + self.images.extend(images) + + def _update_barcodes(self, product, param): + existing_barcodes = {barcode.barcode for barcode in product.barcodes} + barcodes = [] + for sku in param: + if sku not in existing_barcodes: + barcode = ProductBarcode( + product=product, + barcode=sku + ) + barcodes.append(barcode) + return barcodes + + def _update_images(self, product, product_info): + existing_images = {image.image_url for image in product.images} + primary_image = product_info.get('primary_image') + if primary_image and primary_image not in existing_images: + image = ProductImage( + product=product, + image_url=primary_image + ) + return [image] + return [] + class OzonController(BaseMarketplaceController): + def __init__(self, session, marketplace): + super().__init__(session, marketplace) + self.product_synchronizer = OzonProductSynchronizer(session, marketplace, self.api) + + async def synchronize_products(self): + await self.product_synchronizer.synchronize_products() + async def create_products(self): - pass + await self.product_synchronizer.create_products() diff --git a/marketplaces/wildberries/core.py b/marketplaces/wildberries/core.py index d100bec..b2996c6 100644 --- a/marketplaces/wildberries/core.py +++ b/marketplaces/wildberries/core.py @@ -4,34 +4,18 @@ from collections import defaultdict from sqlalchemy import select from sqlalchemy.orm import selectinload +from external.marketplace.base.product_synchronizer import BaseProductSynchronizer from external.marketplace.wildberries.core import WildberriesMarketplaceApi from marketplaces.base.core import BaseMarketplaceController from models import Product, ProductBarcode, ProductImage, WildberriesProduct -class WildberriesProductSynchronizer: - products: list[Product] - barcodes: list[ProductBarcode] - images: list[ProductImage] - wildberries_products: list[WildberriesProduct] - - def __init__(self, session, marketplace, api): - self.session = session - self.marketplace = marketplace - self.api = api - self.products = [] - self.barcodes = [] - self.images = [] - self.wildberries_products = [] - - def _clear(self): - self.products = [] - self.barcodes = [] - self.images = [] - self.wildberries_products = [] +class WildberriesProductSynchronizer(BaseProductSynchronizer): + marketplace_products: list[WildberriesProduct] + api: WildberriesMarketplaceApi async def _write(self): - instances = self.products + self.wildberries_products + self.barcodes + self.images + instances = self.products + self.marketplace_products + self.barcodes + self.images start = time.time() self.session.add_all(instances) await self.session.commit() @@ -125,7 +109,7 @@ class WildberriesProductSynchronizer: self.products.append(product) self.barcodes.extend(barcodes) self.images.extend(images) - self.wildberries_products.append(wildberries_product) + self.marketplace_products.append(wildberries_product) async def create_products(self): self._clear() diff --git a/models/marketplace_products.py b/models/marketplace_products.py index 2539fde..3c73638 100644 --- a/models/marketplace_products.py +++ b/models/marketplace_products.py @@ -27,7 +27,7 @@ class OzonProduct(BaseModel): product_id: Mapped[int] = mapped_column(ForeignKey('products.id'), primary_key=True) product: Mapped["Product"] = relationship() - + ozon_product_id: Mapped[int] = mapped_column(nullable=False) class YandexProduct(BaseModel): __tablename__ = 'yandex_products' diff --git a/models/product.py b/models/product.py index 67671d4..902b794 100644 --- a/models/product.py +++ b/models/product.py @@ -40,6 +40,11 @@ class Product(BaseModel): cascade="all, delete-orphan", uselist=True) + ozon_products = relationship('OzonProduct', + back_populates='product', + cascade="all, delete-orphan", + uselist=True) + class ProductImage(BaseModel): __tablename__ = 'product_images' diff --git a/test.py b/test.py index 693cb1a..5ac04ff 100644 --- a/test.py +++ b/test.py @@ -1,18 +1,54 @@ -import asyncio -import datetime +# import asyncio +# +# from dict_hash import dict_hash +# from sqlalchemy.ext.asyncio import AsyncSession +# +# from backend.session import session_maker +# from marketplaces import MarketplaceControllerFactory +# from models import Marketplace +# import pickle +# pickle.dumps() +# +# async def main(): +# a = "example" +# b = "example" +# +# print(hash(a)) # Хэш для строки "example" +# print(hash(b)) # Хэш для строки "example", будет таким же как и у a +# +# return +# session: AsyncSession = session_maker() +# +# try: +# mp = await session.get(Marketplace, 2) +# if not mp: +# return +# c = MarketplaceControllerFactory.get_controller(session, mp) +# await c.synchronize_products() +# finally: +# await session.close() +# +# +# if __name__ == '__main__': +# loop = asyncio.get_event_loop() +# loop.run_until_complete(main()) +import hashlib +import time -from sqlalchemy import select, func -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload +from reportlab.rl_settings import autoGenerateMissingTTFName -import models -from backend.session import session_maker -from models import User, PaymentRecord +from decorators.locking import lock, redis_client -async def main(): - pass +@lock('synchronize_marketplace', include_args_in_key=True) +def test(marketplace_id: int): + print("test") + time.sleep(100) + + +def main(): + test(1) + if __name__ == '__main__': - loop = asyncio.get_event_loop() - loop.run_until_complete(main()) + main() diff --git a/utils/list_utils.py b/utils/list_utils.py new file mode 100644 index 0000000..2332ee2 --- /dev/null +++ b/utils/list_utils.py @@ -0,0 +1,3 @@ +def chunk_list(lst, n) -> list: + for i in range(0, len(lst), n): + yield lst[i:i + n]