Files
Fulfillment-Backend/parsers/deal_parser.py

144 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from io import BytesIO
from typing import Optional
from openpyxl.reader.excel import load_workbook
from openpyxl.utils.cell import get_column_letter
from sqlalchemy import select, exists
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from models import ProductBarcode, Product, ShippingWarehouse, BaseMarketplace
from schemas.deal import ParsedProductRowSchema, ParseDealsExcelResponse, ParsedCityBreakdownSchema, \
OptionalShippingWarehouseSchema
from schemas.marketplace import MarketplaceSchema, BaseMarketplaceSchema
from schemas.product import ProductSchema
class DealParser:
def __init__(self, session: AsyncSession):
self._session = session
self._warehouses: dict[str, OptionalShippingWarehouseSchema] = {}
self._marketplaces: dict[str, BaseMarketplaceSchema] = {}
self._errors: list[str] = []
self._warehouses_and_marketplaces: dict[
int, dict[str, Optional[MarketplaceSchema] | OptionalShippingWarehouseSchema]
] = {}
async def _set_warehouses(self):
stmt = select(ShippingWarehouse)
warehouses = (await self._session.execute(stmt)).scalars().all()
for warehouse in warehouses:
self._warehouses[warehouse.name.lower()] = OptionalShippingWarehouseSchema.model_validate(warehouse)
async def _set_marketplaces(self):
stmt = select(BaseMarketplace)
marketplaces = (await self._session.execute(stmt)).scalars().all()
for marketplace in marketplaces:
self._marketplaces[marketplace.key] = BaseMarketplaceSchema.model_validate(marketplace)
async def _parse_product_barcode(self, barcode_value: str) -> list[ProductSchema]:
products_stmt = (
select(Product)
.join(ProductBarcode)
.where(ProductBarcode.barcode == barcode_value)
.options(
selectinload(Product.barcodes)
.noload(ProductBarcode.product),
selectinload(Product.barcode_image),
)
)
products = (await self._session.execute(products_stmt)).scalars().all()
return [ProductSchema.model_validate(product) for product in products]
async def _parse_warehouse_and_marketplace_labels(self, value: str) -> dict[
str, Optional[MarketplaceSchema] | OptionalShippingWarehouseSchema
]:
stripped = value.split()
base_mp_value, warehouse_value = stripped[0], " ".join(stripped[1:])
base_mp_lower, warehouse_lower = base_mp_value.lower(), warehouse_value.lower()
base_mp = self._marketplaces.get(base_mp_lower)
warehouse = self._warehouses.get(warehouse_lower)
if not warehouse:
warehouse = OptionalShippingWarehouseSchema(name=warehouse_value)
return {
"marketplace": base_mp,
"warehouse": warehouse,
}
async def _barcode_exists(self, barcode: str) -> bool:
stmt = select(exists(ProductBarcode).where(ProductBarcode.barcode == barcode))
barcode_exists = (await self._session.execute(stmt)).scalar()
return barcode_exists
async def _parse_warehouses_and_marketplaces(self, ws):
mp_wh_row, mp_wh_start_col = 3, 9
await self._set_warehouses()
await self._set_marketplaces()
self._warehouses_and_marketplaces = {}
while True:
value = ws.cell(row=mp_wh_row, column=mp_wh_start_col).value
if not value:
break
mp_and_wh = await self._parse_warehouse_and_marketplace_labels(value)
if mp_and_wh["marketplace"]:
self._warehouses_and_marketplaces[mp_wh_start_col] = mp_and_wh
else:
self._errors.append(f"В ячейке {get_column_letter(mp_wh_start_col)}3 не распознан маркетплейс")
mp_wh_start_col += 1
async def _parse_barcodes(self, ws) -> list[ParsedProductRowSchema]:
rows: list[ParsedProductRowSchema] = []
barcode_row, barcode_col, count_empty_rows = 4, 3, 0
barcode_col_name = get_column_letter(barcode_col)
max_empty_rows_between_barcodes = 6
while True:
barcode: Optional[int] = ws.cell(row=barcode_row, column=barcode_col).value
if barcode and type(barcode) == int:
count_empty_rows = 0
str_barcode = str(barcode)
barcode_exists = await self._barcode_exists(str_barcode)
if barcode_exists:
products: list[ProductSchema] = await self._parse_product_barcode(str_barcode)
breakdowns: list[ParsedCityBreakdownSchema] = []
for col, data in self._warehouses_and_marketplaces.items():
quantity: Optional[str] = ws.cell(row=barcode_row, column=col).value
if quantity:
warehouse_excel_info = ParsedCityBreakdownSchema(
base_marketplace=data["marketplace"],
shippingWarehouse=data["warehouse"],
quantity=int(quantity),
)
breakdowns.append(warehouse_excel_info)
if len(products) > 0 and len(breakdowns) > 0:
rows.append(
ParsedProductRowSchema(products=products, breakdowns=breakdowns, barcode=str(barcode))
)
else:
self._errors.append(f"Штрихкод из ячейки {barcode_col_name}{barcode_row} не найден")
else:
count_empty_rows += 1
if count_empty_rows > max_empty_rows_between_barcodes:
break
barcode_row += 1
return rows
async def parse(self, file_bytes: bytes) -> ParseDealsExcelResponse:
wb = load_workbook(filename=BytesIO(file_bytes))
ws = wb.worksheets[0]
await self._parse_warehouses_and_marketplaces(ws)
rows = await self._parse_barcodes(ws)
return ParseDealsExcelResponse(rows=rows, errors=self._errors)