148 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			148 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from io import BytesIO
 | 
						||
from typing import Optional
 | 
						||
from unittest import mock
 | 
						||
 | 
						||
from openpyxl.reader.excel import load_workbook
 | 
						||
from openpyxl.utils.cell import get_column_letter
 | 
						||
from sqlalchemy import select, exists
 | 
						||
from sqlalchemy.ext.asyncio import AsyncSession
 | 
						||
from sqlalchemy.orm import selectinload
 | 
						||
 | 
						||
from models import ProductBarcode, Product, ShippingWarehouse, BaseMarketplace
 | 
						||
from schemas.card import ParsedProductRowSchema, ParseCardsExcelResponse, ParsedCityBreakdownSchema, \
 | 
						||
    OptionalShippingWarehouseSchema
 | 
						||
from schemas.marketplace import MarketplaceSchema, BaseMarketplaceSchema
 | 
						||
from schemas.product import ProductSchema
 | 
						||
 | 
						||
 | 
						||
class DealParser:
 | 
						||
    def __init__(self, session: AsyncSession):
 | 
						||
        self._session = session
 | 
						||
        self._warehouses: dict[str, OptionalShippingWarehouseSchema] = {}
 | 
						||
        self._marketplaces: dict[str, BaseMarketplaceSchema] = {}
 | 
						||
        self._errors: list[str] = []
 | 
						||
        self._warehouses_and_marketplaces: dict[
 | 
						||
            int, dict[str, Optional[MarketplaceSchema] | OptionalShippingWarehouseSchema]
 | 
						||
        ] = {}
 | 
						||
 | 
						||
    async def _set_warehouses(self):
 | 
						||
        stmt = select(ShippingWarehouse)
 | 
						||
        warehouses = (await self._session.execute(stmt)).scalars().all()
 | 
						||
        for warehouse in warehouses:
 | 
						||
            self._warehouses[warehouse.name.lower()] = OptionalShippingWarehouseSchema.model_validate(warehouse)
 | 
						||
 | 
						||
    async def _set_marketplaces(self):
 | 
						||
        stmt = select(BaseMarketplace)
 | 
						||
        marketplaces = (await self._session.execute(stmt)).scalars().all()
 | 
						||
        for marketplace in marketplaces:
 | 
						||
            self._marketplaces[marketplace.key] = BaseMarketplaceSchema.model_validate(marketplace)
 | 
						||
 | 
						||
    async def _parse_product_barcode(self, barcode_value: str) -> list[ProductSchema]:
 | 
						||
        products_stmt = (
 | 
						||
            select(Product)
 | 
						||
            .join(ProductBarcode)
 | 
						||
            .where(ProductBarcode.barcode == barcode_value)
 | 
						||
            .options(
 | 
						||
                selectinload(Product.barcodes)
 | 
						||
                .noload(ProductBarcode.product),
 | 
						||
                selectinload(Product.barcode_image),
 | 
						||
            )
 | 
						||
        )
 | 
						||
        products = (await self._session.execute(products_stmt)).scalars().all()
 | 
						||
 | 
						||
        return [ProductSchema.model_validate(product) for product in products]
 | 
						||
 | 
						||
    async def _parse_warehouse_and_marketplace_labels(self, value: str) -> dict[
 | 
						||
        str, Optional[MarketplaceSchema] | OptionalShippingWarehouseSchema
 | 
						||
    ]:
 | 
						||
        stripped = value.split()
 | 
						||
        base_mp_value, warehouse_value = stripped[0], " ".join(stripped[1:])
 | 
						||
        base_mp_lower, warehouse_lower = base_mp_value.lower(), warehouse_value.lower()
 | 
						||
 | 
						||
        base_mp = self._marketplaces.get(base_mp_lower)
 | 
						||
        warehouse = self._warehouses.get(warehouse_lower)
 | 
						||
        if not warehouse:
 | 
						||
            warehouse = OptionalShippingWarehouseSchema(name=warehouse_value)
 | 
						||
 | 
						||
        return {
 | 
						||
            "marketplace": base_mp,
 | 
						||
            "warehouse": warehouse,
 | 
						||
        }
 | 
						||
 | 
						||
    async def _barcode_exists(self, barcode: str) -> bool:
 | 
						||
        stmt = select(exists(ProductBarcode).where(ProductBarcode.barcode == barcode))
 | 
						||
        barcode_exists = (await self._session.execute(stmt)).scalar()
 | 
						||
        return barcode_exists
 | 
						||
 | 
						||
    async def _parse_warehouses_and_marketplaces(self, ws):
 | 
						||
        mp_wh_row, mp_wh_start_col = 3, 9
 | 
						||
 | 
						||
        await self._set_warehouses()
 | 
						||
        await self._set_marketplaces()
 | 
						||
 | 
						||
        self._warehouses_and_marketplaces = {}
 | 
						||
        while True:
 | 
						||
            value = ws.cell(row=mp_wh_row, column=mp_wh_start_col).value
 | 
						||
            if not value:
 | 
						||
                break
 | 
						||
            mp_and_wh = await self._parse_warehouse_and_marketplace_labels(value)
 | 
						||
            if mp_and_wh["marketplace"]:
 | 
						||
                self._warehouses_and_marketplaces[mp_wh_start_col] = mp_and_wh
 | 
						||
            else:
 | 
						||
                self._errors.append(f"В ячейке {get_column_letter(mp_wh_start_col)}3 не распознан маркетплейс")
 | 
						||
            mp_wh_start_col += 1
 | 
						||
 | 
						||
    async def _parse_barcodes(self, ws) -> list[ParsedProductRowSchema]:
 | 
						||
        rows: list[ParsedProductRowSchema] = []
 | 
						||
        barcode_row, barcode_col, count_empty_rows = 4, 3, 0
 | 
						||
        barcode_col_name = get_column_letter(barcode_col)
 | 
						||
        max_empty_rows_between_barcodes = 6
 | 
						||
 | 
						||
        while True:
 | 
						||
            barcode: Optional[int] = ws.cell(row=barcode_row, column=barcode_col).value
 | 
						||
            if barcode and type(barcode) == int:
 | 
						||
                count_empty_rows = 0
 | 
						||
                str_barcode = str(barcode)
 | 
						||
                barcode_exists = await self._barcode_exists(str_barcode)
 | 
						||
                if barcode_exists:
 | 
						||
                    products: list[ProductSchema] = await self._parse_product_barcode(str_barcode)
 | 
						||
 | 
						||
                    breakdowns: list[ParsedCityBreakdownSchema] = []
 | 
						||
 | 
						||
                    for col, data in self._warehouses_and_marketplaces.items():
 | 
						||
                        quantity: Optional[str] = ws.cell(row=barcode_row, column=col).value
 | 
						||
                        if quantity:
 | 
						||
                            warehouse_excel_info = ParsedCityBreakdownSchema(
 | 
						||
                                base_marketplace=data["marketplace"],
 | 
						||
                                shippingWarehouse=data["warehouse"],
 | 
						||
                                quantity=int(quantity),
 | 
						||
                            )
 | 
						||
                            breakdowns.append(warehouse_excel_info)
 | 
						||
                    if len(products) > 0 and len(breakdowns) > 0:
 | 
						||
                        rows.append(
 | 
						||
                            ParsedProductRowSchema(products=products, breakdowns=breakdowns, barcode=str(barcode))
 | 
						||
                        )
 | 
						||
                else:
 | 
						||
                    self._errors.append(f"Штрихкод из ячейки {barcode_col_name}{barcode_row} не найден")
 | 
						||
            else:
 | 
						||
                count_empty_rows += 1
 | 
						||
                if count_empty_rows > max_empty_rows_between_barcodes:
 | 
						||
                    break
 | 
						||
 | 
						||
            barcode_row += 1
 | 
						||
 | 
						||
        return rows
 | 
						||
 | 
						||
    async def parse(self, file_bytes: bytes) -> ParseCardsExcelResponse:
 | 
						||
        p = mock.patch('openpyxl.styles.fonts.Font.family.max', new=100)
 | 
						||
        p.start()
 | 
						||
 | 
						||
        wb = load_workbook(filename=BytesIO(file_bytes), data_only=True)
 | 
						||
        ws = wb.worksheets[0]
 | 
						||
 | 
						||
        await self._parse_warehouses_and_marketplaces(ws)
 | 
						||
 | 
						||
        rows = await self._parse_barcodes(ws)
 | 
						||
 | 
						||
        return ParseCardsExcelResponse(rows=rows, errors=self._errors)
 |