feat: ozon sync

This commit is contained in:
2024-09-02 19:19:41 +03:00
parent 82236a7750
commit d24be8fb7d
8 changed files with 337 additions and 39 deletions

View File

@@ -0,0 +1,40 @@
from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession
from external.marketplace.base import BaseMarketplaceApi
from models import Product, ProductBarcode, ProductImage
class BaseProductSynchronizer(ABC):
products: list[Product] = []
barcodes: list[ProductBarcode] = []
images: list[ProductImage] = []
marketplace_products: list = []
api: BaseMarketplaceApi
session: AsyncSession
def __init__(self, session, marketplace, api):
self.session = session
self.marketplace = marketplace
self.api = api
self._clear()
def _clear(self):
self.products = []
self.barcodes = []
self.images = []
self.marketplace_products = []
async def _write(self):
instances = self.products + self.marketplace_products + self.barcodes + self.images
self.session.add_all(instances)
await self.session.commit()
@abstractmethod
async def create_products(self):
pass
@abstractmethod
async def synchronize_products(self):
pass

View File

@@ -1,15 +1,61 @@
from typing import AsyncIterator
from external.marketplace.base.core import BaseMarketplaceApi from external.marketplace.base.core import BaseMarketplaceApi
from models import Marketplace from models import Marketplace
class OzonMarketplaceApi(BaseMarketplaceApi): class OzonMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace): def __init__(self, marketplace: Marketplace):
pass client_id = marketplace.auth_data.get('Client-Id')
api_key = marketplace.auth_data.get('Api-Key')
if not client_id or not api_key:
raise ValueError(
f"Client-Id or Api-Key is missing for Marketplace ID: {marketplace.id}. "
"Please check the marketplace credentials."
)
self.headers = marketplace.auth_data
self.marketplace = marketplace
@property @property
def get_headers(self) -> dict: def get_headers(self) -> dict:
return {} return self.headers
@property @property
def base_url(self) -> str: def base_url(self) -> str:
return "" return "https://api-seller.ozon.ru"
async def get_products(self, data: dict) -> dict:
method = '/v2/product/list'
response = await self._method('POST', method, json=data)
return response
async def get_all_products(self) -> AsyncIterator[dict]:
limit = 100
last_id = ''
while True:
data = {
'limit': limit,
'last_id': last_id,
}
response = await self.get_products(data)
if not response:
break
result = response.get('result')
if not result:
break
items = result.get('items')
if not items:
break
for item in items:
yield item
last_id = result.get('last_id')
if not last_id:
break
async def get_products_info(self, data: dict) -> dict:
method = '/v2/product/info/list'
response = await self._method('POST', method, json=data)
return response

View File

@@ -1,6 +1,190 @@
from collections import defaultdict
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from external.marketplace import OzonMarketplaceApi
from external.marketplace.base.product_synchronizer import BaseProductSynchronizer
from marketplaces.base.core import BaseMarketplaceController from marketplaces.base.core import BaseMarketplaceController
from models import OzonProduct, ProductImage, ProductBarcode, Product
from utils.list_utils import chunk_list
class OzonProductSynchronizer(BaseProductSynchronizer):
api: OzonMarketplaceApi
marketplace_products: list[OzonProduct]
def _create_product(self, product_info):
return Product(
client_id=self.marketplace.client_id,
name=product_info['name'],
article=product_info['offer_id'],
)
def _create_barcodes(self, product, param):
barcodes = []
for sku in param:
barcode = ProductBarcode(
product=product,
barcode=sku
)
barcodes.append(barcode)
return barcodes
def _create_images(self, product, product_info):
primary_image = product_info.get('primary_image')
if primary_image:
image = ProductImage(
product=product,
image_url=primary_image
)
return [image]
product_images = []
images = product_info.get('images') or []
for image in images[:1]:
product_image = ProductImage(
product=product,
image_url=image
)
product_images.append(product_image)
return product_images
def _create_ozon_product(self, product, product_info):
return OzonProduct(
marketplace_id=self.marketplace.id,
product=product,
ozon_product_id=product_info['id'],
)
def _process_product(self, product_info):
product = self._create_product(product_info)
barcodes = self._create_barcodes(product, product_info['barcodes'])
images = self._create_images(product, product_info)
ozon_product = self._create_ozon_product(product, product_info)
self.products.append(product)
self.barcodes.extend(barcodes)
self.images.extend(images)
self.marketplace_products.append(ozon_product)
async def create_products(self):
self._clear()
product_ids = []
async for product in self.api.get_all_products():
product_ids.append(product['product_id'])
if len(product_ids) > 100:
break
max_products = 1000
for chunk in chunk_list(product_ids, max_products):
data = {
'product_id': chunk
}
products_info = await self.api.get_products_info(data)
if not products_info:
continue
result = products_info.get('result')
if not result:
continue
items = result.get('items')
for product_info in items:
product = self._create_product(product_info)
self.products.append(product)
barcodes = self._create_barcodes(product, product_info['barcodes'])
self.barcodes.extend(barcodes)
images = self._create_images(product, product_info)
self.images.extend(images)
ozon_product = self._create_ozon_product(product, product_info)
self.marketplace_products.append(ozon_product)
await self._write()
async def synchronize_products(self):
self._clear()
synchronized_products_stmt = (
select(
Product
)
.join(
OzonProduct
).options(
selectinload(Product.barcodes),
selectinload(Product.ozon_products)
)
.where(
OzonProduct.marketplace_id == self.marketplace.id
)
)
synchronized_products = await self.session.execute(synchronized_products_stmt)
synchronized_products = synchronized_products.scalars().all()
synchronized_products_ozon_id_dict = defaultdict(list)
for product in synchronized_products:
for ozon_product in product.ozon_products:
synchronized_products_ozon_id_dict[ozon_product.ozon_product_id].append(product)
synchronized_ozon_ids = list(synchronized_products_ozon_id_dict.keys())
max_products = 1000
for chunk in chunk_list(synchronized_ozon_ids, max_products):
data = {
'product_id': chunk
}
products_info = await self.api.get_products_info(data)
if not products_info:
continue
result = products_info.get('result')
if not result:
continue
items = result.get('items')
for product_info in items:
ozon_id = product_info['id']
if ozon_id not in synchronized_ozon_ids:
continue
products = synchronized_products_ozon_id_dict.get(ozon_id)
if not products:
continue
product = products[0]
await self._update_product_info(product, product_info)
await self._write()
async def _update_product_info(self, product, product_info):
product.name = product_info['name']
product.article = product_info['offer_id']
barcodes = self._update_barcodes(product, product_info['barcodes'])
self.barcodes.extend(barcodes)
images = self._update_images(product, product_info)
self.images.extend(images)
def _update_barcodes(self, product, param):
existing_barcodes = {barcode.barcode for barcode in product.barcodes}
barcodes = []
for sku in param:
if sku not in existing_barcodes:
barcode = ProductBarcode(
product=product,
barcode=sku
)
barcodes.append(barcode)
return barcodes
def _update_images(self, product, product_info):
existing_images = {image.image_url for image in product.images}
primary_image = product_info.get('primary_image')
if primary_image and primary_image not in existing_images:
image = ProductImage(
product=product,
image_url=primary_image
)
return [image]
return []
class OzonController(BaseMarketplaceController): class OzonController(BaseMarketplaceController):
def __init__(self, session, marketplace):
super().__init__(session, marketplace)
self.product_synchronizer = OzonProductSynchronizer(session, marketplace, self.api)
async def synchronize_products(self):
await self.product_synchronizer.synchronize_products()
async def create_products(self): async def create_products(self):
pass await self.product_synchronizer.create_products()

View File

@@ -4,34 +4,18 @@ from collections import defaultdict
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from external.marketplace.base.product_synchronizer import BaseProductSynchronizer
from external.marketplace.wildberries.core import WildberriesMarketplaceApi from external.marketplace.wildberries.core import WildberriesMarketplaceApi
from marketplaces.base.core import BaseMarketplaceController from marketplaces.base.core import BaseMarketplaceController
from models import Product, ProductBarcode, ProductImage, WildberriesProduct from models import Product, ProductBarcode, ProductImage, WildberriesProduct
class WildberriesProductSynchronizer: class WildberriesProductSynchronizer(BaseProductSynchronizer):
products: list[Product] marketplace_products: list[WildberriesProduct]
barcodes: list[ProductBarcode] api: WildberriesMarketplaceApi
images: list[ProductImage]
wildberries_products: list[WildberriesProduct]
def __init__(self, session, marketplace, api):
self.session = session
self.marketplace = marketplace
self.api = api
self.products = []
self.barcodes = []
self.images = []
self.wildberries_products = []
def _clear(self):
self.products = []
self.barcodes = []
self.images = []
self.wildberries_products = []
async def _write(self): async def _write(self):
instances = self.products + self.wildberries_products + self.barcodes + self.images instances = self.products + self.marketplace_products + self.barcodes + self.images
start = time.time() start = time.time()
self.session.add_all(instances) self.session.add_all(instances)
await self.session.commit() await self.session.commit()
@@ -125,7 +109,7 @@ class WildberriesProductSynchronizer:
self.products.append(product) self.products.append(product)
self.barcodes.extend(barcodes) self.barcodes.extend(barcodes)
self.images.extend(images) self.images.extend(images)
self.wildberries_products.append(wildberries_product) self.marketplace_products.append(wildberries_product)
async def create_products(self): async def create_products(self):
self._clear() self._clear()

View File

@@ -27,7 +27,7 @@ class OzonProduct(BaseModel):
product_id: Mapped[int] = mapped_column(ForeignKey('products.id'), primary_key=True) product_id: Mapped[int] = mapped_column(ForeignKey('products.id'), primary_key=True)
product: Mapped["Product"] = relationship() product: Mapped["Product"] = relationship()
ozon_product_id: Mapped[int] = mapped_column(nullable=False)
class YandexProduct(BaseModel): class YandexProduct(BaseModel):
__tablename__ = 'yandex_products' __tablename__ = 'yandex_products'

View File

@@ -40,6 +40,11 @@ class Product(BaseModel):
cascade="all, delete-orphan", cascade="all, delete-orphan",
uselist=True) uselist=True)
ozon_products = relationship('OzonProduct',
back_populates='product',
cascade="all, delete-orphan",
uselist=True)
class ProductImage(BaseModel): class ProductImage(BaseModel):
__tablename__ = 'product_images' __tablename__ = 'product_images'

60
test.py
View File

@@ -1,18 +1,54 @@
import asyncio # import asyncio
import datetime #
# from dict_hash import dict_hash
# from sqlalchemy.ext.asyncio import AsyncSession
#
# from backend.session import session_maker
# from marketplaces import MarketplaceControllerFactory
# from models import Marketplace
# import pickle
# pickle.dumps()
#
# async def main():
# a = "example"
# b = "example"
#
# print(hash(a)) # Хэш для строки "example"
# print(hash(b)) # Хэш для строки "example", будет таким же как и у a
#
# return
# session: AsyncSession = session_maker()
#
# try:
# mp = await session.get(Marketplace, 2)
# if not mp:
# return
# c = MarketplaceControllerFactory.get_controller(session, mp)
# await c.synchronize_products()
# finally:
# await session.close()
#
#
# if __name__ == '__main__':
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
import hashlib
import time
from sqlalchemy import select, func from reportlab.rl_settings import autoGenerateMissingTTFName
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
import models from decorators.locking import lock, redis_client
from backend.session import session_maker
from models import User, PaymentRecord
async def main(): @lock('synchronize_marketplace', include_args_in_key=True)
pass def test(marketplace_id: int):
print("test")
time.sleep(100)
def main():
test(1)
if __name__ == '__main__': if __name__ == '__main__':
loop = asyncio.get_event_loop() main()
loop.run_until_complete(main())

3
utils/list_utils.py Normal file
View File

@@ -0,0 +1,3 @@
def chunk_list(lst, n) -> list:
for i in range(0, len(lst), n):
yield lst[i:i + n]