207 lines
		
	
	
		
			7.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			207 lines
		
	
	
		
			7.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from collections import defaultdict
 | 
						|
 | 
						|
from sqlalchemy import select
 | 
						|
from sqlalchemy.orm import selectinload
 | 
						|
 | 
						|
from external.marketplace import OzonMarketplaceApi
 | 
						|
from external.marketplace.base.product_synchronizer import BaseProductSynchronizer
 | 
						|
from marketplaces.base.core import BaseMarketplaceController
 | 
						|
from models import OzonProduct, ProductImage, ProductBarcode, Product
 | 
						|
from utils.list_utils import chunk_list
 | 
						|
 | 
						|
 | 
						|
class OzonProductSynchronizer(BaseProductSynchronizer):
 | 
						|
    api: OzonMarketplaceApi
 | 
						|
    marketplace_products: list[OzonProduct]
 | 
						|
 | 
						|
    def _create_product(self, product_info):
 | 
						|
        return Product(
 | 
						|
            client_id=self.marketplace.client_id,
 | 
						|
            name=product_info['name'],
 | 
						|
            article=product_info['offer_id'],
 | 
						|
        )
 | 
						|
 | 
						|
    def _create_barcodes(self, product, param):
 | 
						|
        barcodes = []
 | 
						|
        for sku in param:
 | 
						|
            barcode = ProductBarcode(
 | 
						|
                product=product,
 | 
						|
                barcode=sku
 | 
						|
            )
 | 
						|
            barcodes.append(barcode)
 | 
						|
        return barcodes
 | 
						|
 | 
						|
    def _create_images(self, product, product_info):
 | 
						|
        primary_image = product_info.get('primary_image')
 | 
						|
        if primary_image:
 | 
						|
            image = ProductImage(
 | 
						|
                product=product,
 | 
						|
                image_url=primary_image[0]
 | 
						|
            )
 | 
						|
            return [image]
 | 
						|
        product_images = []
 | 
						|
        images = product_info.get('images') or []
 | 
						|
        for image in images[:1]:
 | 
						|
            product_image = ProductImage(
 | 
						|
                product=product,
 | 
						|
                image_url=image
 | 
						|
            )
 | 
						|
            product_images.append(product_image)
 | 
						|
        return product_images
 | 
						|
 | 
						|
    def _create_ozon_product(self, product, product_info):
 | 
						|
        return OzonProduct(
 | 
						|
            marketplace_id=self.marketplace.id,
 | 
						|
            product=product,
 | 
						|
            ozon_product_id=product_info['id'],
 | 
						|
        )
 | 
						|
 | 
						|
    def _process_product(self, product_info):
 | 
						|
        product = self._create_product(product_info)
 | 
						|
        barcodes = self._create_barcodes(product, product_info['barcodes'])
 | 
						|
        images = self._create_images(product, product_info)
 | 
						|
        ozon_product = self._create_ozon_product(product, product_info)
 | 
						|
 | 
						|
        self.products.append(product)
 | 
						|
        self.barcodes.extend(barcodes)
 | 
						|
        self.images.extend(images)
 | 
						|
        self.marketplace_products.append(ozon_product)
 | 
						|
 | 
						|
    async def create_products(self):
 | 
						|
        self._clear()
 | 
						|
        product_ids = []
 | 
						|
        synchronized_product_ids = await self._get_synchronized_product_ids()
 | 
						|
        async for product in self.api.get_all_products():
 | 
						|
            product_id = product['product_id']
 | 
						|
            if product_id in synchronized_product_ids:
 | 
						|
                continue
 | 
						|
            product_ids.append(product_id)
 | 
						|
        max_products = 1000
 | 
						|
        for chunk in chunk_list(product_ids, max_products):
 | 
						|
            data = {
 | 
						|
                'product_id': chunk
 | 
						|
            }
 | 
						|
            products_info = await self.api.get_products_info(data)
 | 
						|
            if not products_info:
 | 
						|
                continue
 | 
						|
            result = products_info
 | 
						|
            if not result:
 | 
						|
                continue
 | 
						|
            items = result.get('items')
 | 
						|
            for product_info in items:
 | 
						|
                product = self._create_product(product_info)
 | 
						|
                self.products.append(product)
 | 
						|
                barcodes = self._create_barcodes(product, product_info['barcodes'])
 | 
						|
                self.barcodes.extend(barcodes)
 | 
						|
                images = self._create_images(product, product_info)
 | 
						|
                self.images.extend(images)
 | 
						|
                ozon_product = self._create_ozon_product(product, product_info)
 | 
						|
                self.marketplace_products.append(ozon_product)
 | 
						|
 | 
						|
        await self._write()
 | 
						|
 | 
						|
    async def synchronize_products(self):
 | 
						|
        self._clear()
 | 
						|
        synchronized_products_stmt = (
 | 
						|
            select(
 | 
						|
                Product
 | 
						|
            )
 | 
						|
            .join(
 | 
						|
                OzonProduct
 | 
						|
            ).options(
 | 
						|
                selectinload(Product.barcodes),
 | 
						|
                selectinload(Product.ozon_products)
 | 
						|
            )
 | 
						|
            .where(
 | 
						|
                OzonProduct.marketplace_id == self.marketplace.id
 | 
						|
            )
 | 
						|
        )
 | 
						|
        synchronized_products = await self.session.execute(synchronized_products_stmt)
 | 
						|
        synchronized_products = synchronized_products.scalars().all()
 | 
						|
        synchronized_products_ozon_id_dict = defaultdict(list)
 | 
						|
        for product in synchronized_products:
 | 
						|
            for ozon_product in product.ozon_products:
 | 
						|
                synchronized_products_ozon_id_dict[ozon_product.ozon_product_id].append(product)
 | 
						|
        synchronized_ozon_ids = list(synchronized_products_ozon_id_dict.keys())
 | 
						|
        max_products = 1000
 | 
						|
        for chunk in chunk_list(synchronized_ozon_ids, max_products):
 | 
						|
            data = {
 | 
						|
                'product_id': chunk
 | 
						|
            }
 | 
						|
            products_info = await self.api.get_products_info(data)
 | 
						|
            if not products_info:
 | 
						|
                continue
 | 
						|
            result = products_info
 | 
						|
            if not result:
 | 
						|
                continue
 | 
						|
            items = result.get('items')
 | 
						|
            for product_info in items:
 | 
						|
                ozon_id = product_info['id']
 | 
						|
                if ozon_id not in synchronized_ozon_ids:
 | 
						|
                    continue
 | 
						|
                products = synchronized_products_ozon_id_dict.get(ozon_id)
 | 
						|
                if not products:
 | 
						|
                    continue
 | 
						|
                product = products[0]
 | 
						|
                await self._update_product_info(product, product_info)
 | 
						|
        await self._write()
 | 
						|
 | 
						|
    async def _update_product_info(self, product, product_info):
 | 
						|
        product.name = product_info['name']
 | 
						|
        product.article = product_info['offer_id']
 | 
						|
        barcodes = self._update_barcodes(product, product_info['barcodes'])
 | 
						|
        self.barcodes.extend(barcodes)
 | 
						|
        images = self._update_images(product, product_info)
 | 
						|
        self.images.extend(images)
 | 
						|
 | 
						|
    def _update_barcodes(self, product, param):
 | 
						|
        existing_barcodes = {barcode.barcode for barcode in product.barcodes}
 | 
						|
        barcodes = []
 | 
						|
        for sku in param:
 | 
						|
            if sku not in existing_barcodes:
 | 
						|
                barcode = ProductBarcode(
 | 
						|
                    product=product,
 | 
						|
                    barcode=sku
 | 
						|
                )
 | 
						|
                barcodes.append(barcode)
 | 
						|
        return barcodes
 | 
						|
 | 
						|
    def _update_images(self, product, product_info):
 | 
						|
        existing_images = {image.image_url for image in product.images}
 | 
						|
        primary_image = product_info.get('primary_image')
 | 
						|
        if isinstance(primary_image,list) and primary_image:
 | 
						|
            primary_image = primary_image[0]
 | 
						|
 | 
						|
        if primary_image and primary_image not in existing_images:
 | 
						|
            image = ProductImage(
 | 
						|
                product=product,
 | 
						|
                image_url=primary_image
 | 
						|
            )
 | 
						|
            return [image]
 | 
						|
        return []
 | 
						|
 | 
						|
    async def _get_synchronized_product_ids(self):
 | 
						|
        synchronized_products_stmt = (
 | 
						|
            select(
 | 
						|
                OzonProduct.ozon_product_id
 | 
						|
            )
 | 
						|
            .where(
 | 
						|
                OzonProduct.marketplace_id == self.marketplace.id
 | 
						|
            )
 | 
						|
        )
 | 
						|
        synchronized_products = await self.session.execute(synchronized_products_stmt)
 | 
						|
        synchronized_product_ids = synchronized_products.scalars().all()
 | 
						|
        return set(synchronized_product_ids)
 | 
						|
 | 
						|
 | 
						|
class OzonController(BaseMarketplaceController):
 | 
						|
    def __init__(self, session, marketplace):
 | 
						|
        super().__init__(session, marketplace)
 | 
						|
        self.product_synchronizer = OzonProductSynchronizer(session, marketplace, self.api)
 | 
						|
 | 
						|
    async def synchronize_products(self):
 | 
						|
        await self.product_synchronizer.synchronize_products()
 | 
						|
 | 
						|
    async def create_products(self):
 | 
						|
        await self.product_synchronizer.create_products()
 |