diff --git a/background/tasks/marketplace.py b/background/tasks/marketplace.py index b4985dc..33766e5 100644 --- a/background/tasks/marketplace.py +++ b/background/tasks/marketplace.py @@ -1,20 +1,38 @@ +import asyncio import time from random import randint +from tracemalloc import Trace +from typing import Optional +from sqlalchemy import True_ +from sqlalchemy.ext.asyncio import AsyncSession + +from backend.session import session_maker from background.celery_app import celery +from decorators.async_utils import async_to_sync +from decorators.locking import lock +from marketplaces import MarketplaceControllerFactory +from models import Marketplace -@celery.task(name='synchronize_marketplace') -def synchronize_marketplace(marketplace_id: int): - time.sleep(10) - if randint(0,10) % 2 == 0: - return 1 - else: - raise Exception('Some error') - # async with session_maker() as session: - # session: AsyncSession - # marketplace: Optional[Marketplace] = await session.get(Marketplace, marketplace_id) - # if not marketplace: - # return - # controller = MarketplaceControllerFactory.get_controller(session, marketplace) - # await controller.synchronize_products() +@celery.task( + name='synchronize_marketplace' +) +@lock( + 'synchronize_marketplace', + include_args_in_key=True +) +@async_to_sync +async def synchronize_marketplace(marketplace_id: int): + try: + async with session_maker() as session: + session: AsyncSession + marketplace: Optional[Marketplace] = await session.get(Marketplace, marketplace_id) + if not marketplace: + return + controller = MarketplaceControllerFactory.get_controller(session, marketplace) + await controller.create_products() + await controller.synchronize_products() + return f"Marketplace {marketplace.id} synchronized" + except Exception as e: + return e diff --git a/decorators/locking.py b/decorators/locking.py new file mode 100644 index 0000000..1d23ba1 --- /dev/null +++ b/decorators/locking.py @@ -0,0 +1,32 @@ +from functools import wraps +from typing import Any, Callable + +import redis + +import utils.hashing_utils +from backend import config + +redis_client = redis.from_url( + config.CELERY_BROKER_URL, +) + + +def lock(lock_key: str, include_args_in_key=False) -> Callable: + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs) -> Any: + key = lock_key + if include_args_in_key: + key += '_' + utils.hashing_utils.hash_args_and_kwargs_with_crc32(*args, **kwargs) + redis_lock = redis_client.lock(key) + acquired = redis_lock.acquire(blocking=False) + if not acquired: + raise Exception(f'Lock {key} is already acquired') + try: + return func(*args, **kwargs) + finally: + redis_lock.release() + + return wrapper + + return decorator diff --git a/marketplaces/base/core.py b/marketplaces/base/core.py index f50f8e1..02d3b09 100644 --- a/marketplaces/base/core.py +++ b/marketplaces/base/core.py @@ -21,5 +21,9 @@ class BaseMarketplaceController(ABC): self.session = session @abstractmethod - async def synchronize_products(self): + async def create_products(self): pass + + @abstractmethod + async def synchronize_products(self): + pass \ No newline at end of file diff --git a/marketplaces/ozon/core.py b/marketplaces/ozon/core.py index d1c370f..e9468a8 100644 --- a/marketplaces/ozon/core.py +++ b/marketplaces/ozon/core.py @@ -2,5 +2,5 @@ from marketplaces.base.core import BaseMarketplaceController class OzonController(BaseMarketplaceController): - async def synchronize_products(self): + async def create_products(self): pass diff --git a/marketplaces/wildberries/core.py b/marketplaces/wildberries/core.py index d419e04..d100bec 100644 --- a/marketplaces/wildberries/core.py +++ b/marketplaces/wildberries/core.py @@ -1,22 +1,44 @@ import time +from collections import defaultdict from sqlalchemy import select +from sqlalchemy.orm import selectinload from external.marketplace.wildberries.core import WildberriesMarketplaceApi from marketplaces.base.core import BaseMarketplaceController from models import Product, ProductBarcode, ProductImage, WildberriesProduct -class WildberriesController(BaseMarketplaceController): - api: WildberriesMarketplaceApi +class WildberriesProductSynchronizer: + products: list[Product] + barcodes: list[ProductBarcode] + images: list[ProductImage] + wildberries_products: list[WildberriesProduct] - async def synchronize_products(self): - products = [] - barcodes = [] - images = [] - wildberries_products = [] + def __init__(self, session, marketplace, api): + self.session = session + self.marketplace = marketplace + self.api = api + self.products = [] + self.barcodes = [] + self.images = [] + self.wildberries_products = [] - marketplace_id: int = self.marketplace.id + def _clear(self): + self.products = [] + self.barcodes = [] + self.images = [] + self.wildberries_products = [] + + async def _write(self): + instances = self.products + self.wildberries_products + self.barcodes + self.images + start = time.time() + self.session.add_all(instances) + await self.session.commit() + print(f'Write time: {time.time() - start}') + + async def _get_synchronized_nm_uuids(self): + marketplace_id = self.marketplace.id synchronized_nm_uuids = set( ( await self.session.scalars( @@ -29,48 +51,163 @@ class WildberriesController(BaseMarketplaceController): ) ).all() ) + return synchronized_nm_uuids + async def _create_product(self, card, size_value): + return Product( + client_id=self.marketplace.client_id, + name=card['title'], + article=card['vendorCode'], + size=size_value + ) + + async def _create_barcodes(self, product, skus): + barcodes = [] + for sku in skus: + barcode = ProductBarcode( + product=product, + barcode=sku + ) + barcodes.append(barcode) + return barcodes + + async def _create_images(self, product, photos): + images = [] + for photo in photos[:1]: + image = ProductImage( + product=product, + image_url=photo['big'] + ) + images.append(image) + return images + + async def _create_wildberries_product(self, product, nm_uuid): + return WildberriesProduct( + marketplace_id=self.marketplace.id, + product=product, + nm_uuid=nm_uuid + ) + + async def _update_product_info(self, product, card): + product.name = card['title'] + product.article = card['vendorCode'] + + async def _update_barcodes(self, product, skus): + existing_barcodes = {barcode.barcode for barcode in product.barcodes} + new_barcodes = [] + for sku in skus: + if sku not in existing_barcodes: + barcode = ProductBarcode( + product=product, + barcode=sku + ) + new_barcodes.append(barcode) + return new_barcodes + + async def _update_images(self, product, photos): + existing_images = {image.image_url for image in product.images} + new_images = [] + for photo in photos[:1]: + if photo['big'] not in existing_images: + image = ProductImage( + product=product, + image_url=photo['big'] + ) + new_images.append(image) + return new_images + + async def _process_product(self, card, size_value, nm_uuid): + product = await self._create_product(card, size_value) + barcodes = await self._create_barcodes(product, card.get('sizes')[0].get('skus') or []) + images = await self._create_images(product, card.get('photos') or []) + wildberries_product = await self._create_wildberries_product(product, nm_uuid) + + self.products.append(product) + self.barcodes.extend(barcodes) + self.images.extend(images) + self.wildberries_products.append(wildberries_product) + + async def create_products(self): + self._clear() + synchronized_nm_uuids = await self._get_synchronized_nm_uuids() async for card in self.api.get_all_products(): nm_uuid = card['nmUUID'] if nm_uuid in synchronized_nm_uuids: continue - sizes: list[dict] = card.get('sizes') or [] + sizes = card.get('sizes') or [] for size in sizes: tech_size = size.get('techSize') wb_size = size.get('wbSize') size_value = tech_size or wb_size - product = Product( - client_id=self.marketplace.client_id, - name=card['title'], - article=card['vendorCode'], - size=size_value + await self._process_product( + card, + size_value, + nm_uuid ) - skus = size.get('skus') or [] - for sku in skus: - barcode = ProductBarcode( - product=product, - barcode=sku + await self._write() + + async def synchronize_products(self): + self._clear() + synchronized_products_stmt = ( + select(Product) + .join( + WildberriesProduct + ) + .options( + selectinload(Product.barcodes), + selectinload(Product.wildberries_products) + ) + .where(WildberriesProduct.marketplace_id == self.marketplace.id) + ) + synchronized_products = await self.session.execute(synchronized_products_stmt) + synchronized_products = synchronized_products.scalars().all() + synchronized_products_nm_id_dict = defaultdict(list) + for product in synchronized_products: + for wb_product in product.wildberries_products: + synchronized_products_nm_id_dict[wb_product.nm_uuid].append(product) + synchronized_nm_uuids = list(synchronized_products_nm_id_dict.keys()) + async for card in self.api.get_all_products(): + nm_uuid = card['nmUUID'] + if nm_uuid not in synchronized_nm_uuids: + continue + products = synchronized_products_nm_id_dict[nm_uuid] + existing_sizes = {product.size for product in products} + size_product_dict = {product.size: product for product in products} + + sizes = card.get('sizes') or [] + for size in sizes: + tech_size = size.get('techSize') + wb_size = size.get('wbSize') + size_value = tech_size or wb_size + if size_value in existing_sizes: + product = size_product_dict[size_value] + await self._update_product_info(product, card) + self.barcodes.extend( + await self._update_barcodes(product, size.get('skus') or []) ) - barcodes.append(barcode) - photos = card.get('photos') or [] - for photo in photos: - image = ProductImage( - product=product, - image_url=photo['big'] + self.images.extend( + await self._update_images(product, card.get('photos') or []) ) - images.append(image) - break - wildberries_product = WildberriesProduct( - marketplace_id=self.marketplace.id, - product=product, - nm_uuid=nm_uuid + continue + await self._process_product( + card, + size_value, + nm_uuid ) - wildberries_products.append( - wildberries_product - ) - products.append(product) - instances = products + wildberries_products + barcodes + images - start = time.time() - self.session.add_all(instances) - await self.session.commit() - print(f'Add and commit elapsed: {time.time() - start}') + await self._write() + + +class WildberriesController( + BaseMarketplaceController +): + api: WildberriesMarketplaceApi + + def __init__(self, session, marketplace): + super().__init__(session, marketplace) + self.product_synchronizer = WildberriesProductSynchronizer(session, marketplace, self.api) + + async def create_products(self): + await self.product_synchronizer.create_products() + + async def synchronize_products(self): + await self.product_synchronizer.synchronize_products() diff --git a/marketplaces/yandex/core.py b/marketplaces/yandex/core.py index 057b1e7..87f4090 100644 --- a/marketplaces/yandex/core.py +++ b/marketplaces/yandex/core.py @@ -2,5 +2,5 @@ from marketplaces.base.core import BaseMarketplaceController class YandexController(BaseMarketplaceController): - async def synchronize_products(self): + async def create_products(self): pass diff --git a/models/marketplace_products.py b/models/marketplace_products.py index ec2c5a3..2539fde 100644 --- a/models/marketplace_products.py +++ b/models/marketplace_products.py @@ -13,7 +13,7 @@ class WildberriesProduct(BaseModel): marketplace: Mapped["Marketplace"] = relationship() product_id: Mapped[int] = mapped_column(ForeignKey('products.id'), primary_key=True) - product: Mapped["Product"] = relationship() + product: Mapped["Product"] = relationship(back_populates='wildberries_products') nm_uuid: Mapped[str] = mapped_column(nullable=False) diff --git a/models/product.py b/models/product.py index a61a140..67671d4 100644 --- a/models/product.py +++ b/models/product.py @@ -35,6 +35,10 @@ class Product(BaseModel): lazy='selectin', cascade="all, delete-orphan") + wildberries_products = relationship('WildberriesProduct', + back_populates='product', + cascade="all, delete-orphan", + uselist=True) class ProductImage(BaseModel): diff --git a/routers/barcode.py b/routers/barcode.py index 0e9c918..4dd4e39 100644 --- a/routers/barcode.py +++ b/routers/barcode.py @@ -5,7 +5,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from backend.session import get_session from schemas.barcode import * -from services.auth import authorized_user, guest_user +from services.auth import guest_user from services.barcode import BarcodeService barcode_router = APIRouter( diff --git a/schemas/deal.py b/schemas/deal.py index eac731b..4eb1e02 100644 --- a/schemas/deal.py +++ b/schemas/deal.py @@ -34,7 +34,7 @@ class DealSummary(BaseSchema): base_marketplace: Optional[BaseMarketplaceSchema] = None shipment_warehouse_id: Optional[int] - + shipment_warehouse_name: Optional[str] class DealServiceSchema(BaseSchema): service: ServiceSchema diff --git a/services/deal.py b/services/deal.py index 28fb18a..df2eaf3 100644 --- a/services/deal.py +++ b/services/deal.py @@ -178,7 +178,8 @@ class DealService(BaseService): ) .options( selectinload(Deal.status_history), - joinedload(Deal.client) + joinedload(Deal.client), + joinedload(Deal.shipping_warehouse), ) .outerjoin( price_subquery, Deal.id == price_subquery.c.deal_id) @@ -202,6 +203,7 @@ class DealService(BaseService): base_marketplace = None if deal.base_marketplace: base_marketplace = BaseMarketplaceSchema.model_validate(deal.base_marketplace) + shipment_warehouse_name = deal.shipping_warehouse.name if deal.shipping_warehouse else None summaries.append( DealSummary( id=deal.id, @@ -214,7 +216,8 @@ class DealService(BaseService): rank=rank, base_marketplace=base_marketplace, created_at=deal.created_at, - shipment_warehouse_id=deal.shipping_warehouse_id + shipment_warehouse_id=deal.shipping_warehouse_id, + shipment_warehouse_name=shipment_warehouse_name ) ) return DealSummaryResponse(summaries=summaries) diff --git a/utils/hashing_utils.py b/utils/hashing_utils.py new file mode 100644 index 0000000..b800a0d --- /dev/null +++ b/utils/hashing_utils.py @@ -0,0 +1,10 @@ +import pickle + +import zlib + + +def hash_args_and_kwargs_with_crc32(*args, **kwargs): + # Сериализуем args и kwargs с помощью pickle + serialized_data = pickle.dumps((args, kwargs)) + # Генерируем хэш CRC32 + return format(zlib.crc32(serialized_data) & 0xffffffff, '08x').upper()