feat: a lot of a lot

This commit is contained in:
2024-09-01 21:45:24 +03:00
parent 4ae03284a3
commit 6ad78df35d
12 changed files with 270 additions and 62 deletions

View File

@@ -1,20 +1,38 @@
import asyncio
import time import time
from random import randint from random import randint
from tracemalloc import Trace
from typing import Optional
from sqlalchemy import True_
from sqlalchemy.ext.asyncio import AsyncSession
from backend.session import session_maker
from background.celery_app import celery from background.celery_app import celery
from decorators.async_utils import async_to_sync
from decorators.locking import lock
from marketplaces import MarketplaceControllerFactory
from models import Marketplace
@celery.task(name='synchronize_marketplace') @celery.task(
def synchronize_marketplace(marketplace_id: int): name='synchronize_marketplace'
time.sleep(10) )
if randint(0,10) % 2 == 0: @lock(
return 1 'synchronize_marketplace',
else: include_args_in_key=True
raise Exception('Some error') )
# async with session_maker() as session: @async_to_sync
# session: AsyncSession async def synchronize_marketplace(marketplace_id: int):
# marketplace: Optional[Marketplace] = await session.get(Marketplace, marketplace_id) try:
# if not marketplace: async with session_maker() as session:
# return session: AsyncSession
# controller = MarketplaceControllerFactory.get_controller(session, marketplace) marketplace: Optional[Marketplace] = await session.get(Marketplace, marketplace_id)
# await controller.synchronize_products() if not marketplace:
return
controller = MarketplaceControllerFactory.get_controller(session, marketplace)
await controller.create_products()
await controller.synchronize_products()
return f"Marketplace {marketplace.id} synchronized"
except Exception as e:
return e

32
decorators/locking.py Normal file
View File

@@ -0,0 +1,32 @@
from functools import wraps
from typing import Any, Callable
import redis
import utils.hashing_utils
from backend import config
redis_client = redis.from_url(
config.CELERY_BROKER_URL,
)
def lock(lock_key: str, include_args_in_key=False) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
key = lock_key
if include_args_in_key:
key += '_' + utils.hashing_utils.hash_args_and_kwargs_with_crc32(*args, **kwargs)
redis_lock = redis_client.lock(key)
acquired = redis_lock.acquire(blocking=False)
if not acquired:
raise Exception(f'Lock {key} is already acquired')
try:
return func(*args, **kwargs)
finally:
redis_lock.release()
return wrapper
return decorator

View File

@@ -21,5 +21,9 @@ class BaseMarketplaceController(ABC):
self.session = session self.session = session
@abstractmethod @abstractmethod
async def synchronize_products(self): async def create_products(self):
pass pass
@abstractmethod
async def synchronize_products(self):
pass

View File

@@ -2,5 +2,5 @@ from marketplaces.base.core import BaseMarketplaceController
class OzonController(BaseMarketplaceController): class OzonController(BaseMarketplaceController):
async def synchronize_products(self): async def create_products(self):
pass pass

View File

@@ -1,22 +1,44 @@
import time import time
from collections import defaultdict
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import selectinload
from external.marketplace.wildberries.core import WildberriesMarketplaceApi from external.marketplace.wildberries.core import WildberriesMarketplaceApi
from marketplaces.base.core import BaseMarketplaceController from marketplaces.base.core import BaseMarketplaceController
from models import Product, ProductBarcode, ProductImage, WildberriesProduct from models import Product, ProductBarcode, ProductImage, WildberriesProduct
class WildberriesController(BaseMarketplaceController): class WildberriesProductSynchronizer:
api: WildberriesMarketplaceApi products: list[Product]
barcodes: list[ProductBarcode]
images: list[ProductImage]
wildberries_products: list[WildberriesProduct]
async def synchronize_products(self): def __init__(self, session, marketplace, api):
products = [] self.session = session
barcodes = [] self.marketplace = marketplace
images = [] self.api = api
wildberries_products = [] self.products = []
self.barcodes = []
self.images = []
self.wildberries_products = []
marketplace_id: int = self.marketplace.id def _clear(self):
self.products = []
self.barcodes = []
self.images = []
self.wildberries_products = []
async def _write(self):
instances = self.products + self.wildberries_products + self.barcodes + self.images
start = time.time()
self.session.add_all(instances)
await self.session.commit()
print(f'Write time: {time.time() - start}')
async def _get_synchronized_nm_uuids(self):
marketplace_id = self.marketplace.id
synchronized_nm_uuids = set( synchronized_nm_uuids = set(
( (
await self.session.scalars( await self.session.scalars(
@@ -29,48 +51,163 @@ class WildberriesController(BaseMarketplaceController):
) )
).all() ).all()
) )
return synchronized_nm_uuids
async def _create_product(self, card, size_value):
return Product(
client_id=self.marketplace.client_id,
name=card['title'],
article=card['vendorCode'],
size=size_value
)
async def _create_barcodes(self, product, skus):
barcodes = []
for sku in skus:
barcode = ProductBarcode(
product=product,
barcode=sku
)
barcodes.append(barcode)
return barcodes
async def _create_images(self, product, photos):
images = []
for photo in photos[:1]:
image = ProductImage(
product=product,
image_url=photo['big']
)
images.append(image)
return images
async def _create_wildberries_product(self, product, nm_uuid):
return WildberriesProduct(
marketplace_id=self.marketplace.id,
product=product,
nm_uuid=nm_uuid
)
async def _update_product_info(self, product, card):
product.name = card['title']
product.article = card['vendorCode']
async def _update_barcodes(self, product, skus):
existing_barcodes = {barcode.barcode for barcode in product.barcodes}
new_barcodes = []
for sku in skus:
if sku not in existing_barcodes:
barcode = ProductBarcode(
product=product,
barcode=sku
)
new_barcodes.append(barcode)
return new_barcodes
async def _update_images(self, product, photos):
existing_images = {image.image_url for image in product.images}
new_images = []
for photo in photos[:1]:
if photo['big'] not in existing_images:
image = ProductImage(
product=product,
image_url=photo['big']
)
new_images.append(image)
return new_images
async def _process_product(self, card, size_value, nm_uuid):
product = await self._create_product(card, size_value)
barcodes = await self._create_barcodes(product, card.get('sizes')[0].get('skus') or [])
images = await self._create_images(product, card.get('photos') or [])
wildberries_product = await self._create_wildberries_product(product, nm_uuid)
self.products.append(product)
self.barcodes.extend(barcodes)
self.images.extend(images)
self.wildberries_products.append(wildberries_product)
async def create_products(self):
self._clear()
synchronized_nm_uuids = await self._get_synchronized_nm_uuids()
async for card in self.api.get_all_products(): async for card in self.api.get_all_products():
nm_uuid = card['nmUUID'] nm_uuid = card['nmUUID']
if nm_uuid in synchronized_nm_uuids: if nm_uuid in synchronized_nm_uuids:
continue continue
sizes: list[dict] = card.get('sizes') or [] sizes = card.get('sizes') or []
for size in sizes: for size in sizes:
tech_size = size.get('techSize') tech_size = size.get('techSize')
wb_size = size.get('wbSize') wb_size = size.get('wbSize')
size_value = tech_size or wb_size size_value = tech_size or wb_size
product = Product( await self._process_product(
client_id=self.marketplace.client_id, card,
name=card['title'], size_value,
article=card['vendorCode'], nm_uuid
size=size_value
) )
skus = size.get('skus') or [] await self._write()
for sku in skus:
barcode = ProductBarcode( async def synchronize_products(self):
product=product, self._clear()
barcode=sku synchronized_products_stmt = (
select(Product)
.join(
WildberriesProduct
)
.options(
selectinload(Product.barcodes),
selectinload(Product.wildberries_products)
)
.where(WildberriesProduct.marketplace_id == self.marketplace.id)
)
synchronized_products = await self.session.execute(synchronized_products_stmt)
synchronized_products = synchronized_products.scalars().all()
synchronized_products_nm_id_dict = defaultdict(list)
for product in synchronized_products:
for wb_product in product.wildberries_products:
synchronized_products_nm_id_dict[wb_product.nm_uuid].append(product)
synchronized_nm_uuids = list(synchronized_products_nm_id_dict.keys())
async for card in self.api.get_all_products():
nm_uuid = card['nmUUID']
if nm_uuid not in synchronized_nm_uuids:
continue
products = synchronized_products_nm_id_dict[nm_uuid]
existing_sizes = {product.size for product in products}
size_product_dict = {product.size: product for product in products}
sizes = card.get('sizes') or []
for size in sizes:
tech_size = size.get('techSize')
wb_size = size.get('wbSize')
size_value = tech_size or wb_size
if size_value in existing_sizes:
product = size_product_dict[size_value]
await self._update_product_info(product, card)
self.barcodes.extend(
await self._update_barcodes(product, size.get('skus') or [])
) )
barcodes.append(barcode) self.images.extend(
photos = card.get('photos') or [] await self._update_images(product, card.get('photos') or [])
for photo in photos:
image = ProductImage(
product=product,
image_url=photo['big']
) )
images.append(image) continue
break await self._process_product(
wildberries_product = WildberriesProduct( card,
marketplace_id=self.marketplace.id, size_value,
product=product, nm_uuid
nm_uuid=nm_uuid
) )
wildberries_products.append( await self._write()
wildberries_product
)
products.append(product) class WildberriesController(
instances = products + wildberries_products + barcodes + images BaseMarketplaceController
start = time.time() ):
self.session.add_all(instances) api: WildberriesMarketplaceApi
await self.session.commit()
print(f'Add and commit elapsed: {time.time() - start}') def __init__(self, session, marketplace):
super().__init__(session, marketplace)
self.product_synchronizer = WildberriesProductSynchronizer(session, marketplace, self.api)
async def create_products(self):
await self.product_synchronizer.create_products()
async def synchronize_products(self):
await self.product_synchronizer.synchronize_products()

View File

@@ -2,5 +2,5 @@ from marketplaces.base.core import BaseMarketplaceController
class YandexController(BaseMarketplaceController): class YandexController(BaseMarketplaceController):
async def synchronize_products(self): async def create_products(self):
pass pass

View File

@@ -13,7 +13,7 @@ class WildberriesProduct(BaseModel):
marketplace: Mapped["Marketplace"] = relationship() marketplace: Mapped["Marketplace"] = relationship()
product_id: Mapped[int] = mapped_column(ForeignKey('products.id'), primary_key=True) product_id: Mapped[int] = mapped_column(ForeignKey('products.id'), primary_key=True)
product: Mapped["Product"] = relationship() product: Mapped["Product"] = relationship(back_populates='wildberries_products')
nm_uuid: Mapped[str] = mapped_column(nullable=False) nm_uuid: Mapped[str] = mapped_column(nullable=False)

View File

@@ -35,6 +35,10 @@ class Product(BaseModel):
lazy='selectin', lazy='selectin',
cascade="all, delete-orphan") cascade="all, delete-orphan")
wildberries_products = relationship('WildberriesProduct',
back_populates='product',
cascade="all, delete-orphan",
uselist=True)
class ProductImage(BaseModel): class ProductImage(BaseModel):

View File

@@ -5,7 +5,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from backend.session import get_session from backend.session import get_session
from schemas.barcode import * from schemas.barcode import *
from services.auth import authorized_user, guest_user from services.auth import guest_user
from services.barcode import BarcodeService from services.barcode import BarcodeService
barcode_router = APIRouter( barcode_router = APIRouter(

View File

@@ -34,7 +34,7 @@ class DealSummary(BaseSchema):
base_marketplace: Optional[BaseMarketplaceSchema] = None base_marketplace: Optional[BaseMarketplaceSchema] = None
shipment_warehouse_id: Optional[int] shipment_warehouse_id: Optional[int]
shipment_warehouse_name: Optional[str]
class DealServiceSchema(BaseSchema): class DealServiceSchema(BaseSchema):
service: ServiceSchema service: ServiceSchema

View File

@@ -178,7 +178,8 @@ class DealService(BaseService):
) )
.options( .options(
selectinload(Deal.status_history), selectinload(Deal.status_history),
joinedload(Deal.client) joinedload(Deal.client),
joinedload(Deal.shipping_warehouse),
) )
.outerjoin( .outerjoin(
price_subquery, Deal.id == price_subquery.c.deal_id) price_subquery, Deal.id == price_subquery.c.deal_id)
@@ -202,6 +203,7 @@ class DealService(BaseService):
base_marketplace = None base_marketplace = None
if deal.base_marketplace: if deal.base_marketplace:
base_marketplace = BaseMarketplaceSchema.model_validate(deal.base_marketplace) base_marketplace = BaseMarketplaceSchema.model_validate(deal.base_marketplace)
shipment_warehouse_name = deal.shipping_warehouse.name if deal.shipping_warehouse else None
summaries.append( summaries.append(
DealSummary( DealSummary(
id=deal.id, id=deal.id,
@@ -214,7 +216,8 @@ class DealService(BaseService):
rank=rank, rank=rank,
base_marketplace=base_marketplace, base_marketplace=base_marketplace,
created_at=deal.created_at, created_at=deal.created_at,
shipment_warehouse_id=deal.shipping_warehouse_id shipment_warehouse_id=deal.shipping_warehouse_id,
shipment_warehouse_name=shipment_warehouse_name
) )
) )
return DealSummaryResponse(summaries=summaries) return DealSummaryResponse(summaries=summaries)

10
utils/hashing_utils.py Normal file
View File

@@ -0,0 +1,10 @@
import pickle
import zlib
def hash_args_and_kwargs_with_crc32(*args, **kwargs):
# Сериализуем args и kwargs с помощью pickle
serialized_data = pickle.dumps((args, kwargs))
# Генерируем хэш CRC32
return format(zlib.crc32(serialized_data) & 0xffffffff, '08x').upper()