Files
Fulfillment-Backend/generators/deal_pdf_generator/generator.py

159 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from io import BytesIO
from typing import List, Dict, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload, joinedload
from weasyprint import HTML, CSS
from constants import DEAL_STATUS_STR, ENV, APP_PATH
from generators.deal_pdf_generator.deal_data import DealTechSpecProductData, DealTechSpecData
from models import Deal, DealProduct, DealService as DealServiceModel, Product, DealGroup
from utils.images_fetcher import fetch_images
# Генерация ключа для группировки deal_product по артикулу и услугам
def _gen_key_for_product(deal_product: DealProduct) -> str:
return f"{deal_product.product.article} - " + ",".join(
str(service.service_id) for service in deal_product.services
)
# Генерация ключа для группировки deal_product из группы сделок по артикулу, сервисам, а также товарам
def _regen_key_for_product(product_data: DealTechSpecProductData) -> str:
if len(product_data['deal_products']) == 0:
return ""
article = product_data['deal_products'][0].product.article
services_ids = ",".join(str(service.service_id) for service in product_data['deal_products'][0].services)
if len(product_data['deal_products']) == 1:
products = product_data['deal_products'][0].product_id
else:
products = ",".join(
f"{deal_product.product_id}-{deal_product.quantity}" for deal_product in product_data['deal_products']
)
return f"{article}+{services_ids}+{products}"
class DealTechSpecPdfGenerator:
def __init__(self, session: AsyncSession):
self._session = session
self.deal_doc: DealTechSpecData = {
"deals": [],
"products": {},
"product_images": (),
"deal_ids_header": "",
"deal_status_str": DEAL_STATUS_STR,
}
self.deal: Deal
@staticmethod
async def _group_deal_products_by_products(deal_products: List[DealProduct]) -> Dict[str, DealTechSpecProductData]:
products: Dict[str, DealTechSpecProductData] = {}
additional_info: Optional[str]
for deal_product in deal_products:
# Для группировки по артикулу и услугам
key = _gen_key_for_product(deal_product)
if key not in products:
products[key] = {
"deal": deal_product.deal,
"deal_products": [deal_product],
"quantity": deal_product.quantity,
"additional_info": deal_product.product.additional_info,
"warehouses_and_quantities": [],
}
else:
products[key]["deal_products"].append(deal_product)
products[key]["quantity"] += deal_product.quantity
if not products[key]["additional_info"]:
products[key]["additional_info"] = deal_product.product.additional_info
return products
async def _get_deal_by_id(self, deal_id: int) -> Optional[Deal]:
deal: Deal | None = await self._session.scalar(
select(Deal)
.where(Deal.id == deal_id)
.options(
selectinload(Deal.products).selectinload(DealProduct.services),
selectinload(Deal.products).selectinload(DealProduct.product).selectinload(Product.barcodes),
selectinload(Deal.services).selectinload(DealServiceModel.service),
selectinload(Deal.status_history),
selectinload(Deal.group).selectinload(DealGroup.deals),
joinedload(Deal.client),
joinedload(Deal.shipping_warehouse),
)
)
return deal
async def _get_deals_by_group_id(self, group_id: int) -> List[Deal]:
group: DealGroup | None = await self._session.scalar(
select(DealGroup)
.where(DealGroup.id == group_id)
.options(
selectinload(DealGroup.deals).selectinload(Deal.products).selectinload(DealProduct.services),
selectinload(DealGroup.deals).selectinload(Deal.products)
.selectinload(DealProduct.product).selectinload(Product.barcodes),
selectinload(DealGroup.deals).selectinload(Deal.services).selectinload(DealServiceModel.service),
selectinload(DealGroup.deals).selectinload(Deal.status_history),
selectinload(DealGroup.deals).selectinload(Deal.group).selectinload(DealGroup.deals),
selectinload(DealGroup.deals).joinedload(Deal.client),
selectinload(DealGroup.deals).joinedload(Deal.shipping_warehouse),
)
)
return group.deals if group else []
def _set_deals_ids_header(self):
self.deal_doc["deal_ids_header"] = f"ID: {self.deal.id}"
if self.deal.group:
self.deal_doc["deal_ids_header"] = "ID: " + ", ".join(str(d.id) for d in self.deal.group.deals)
async def _create_deal_tech_spec_document_html(self, deal_id: int):
deal = await self._get_deal_by_id(deal_id)
if not deal:
return ""
self.deal = deal
self._set_deals_ids_header()
if deal.group:
deals = await self._get_deals_by_group_id(deal.group.id)
for d in deals:
self.deal_doc["deals"].append(d)
grouped_products = await self._group_deal_products_by_products(d.products)
for product in grouped_products.values():
key = _regen_key_for_product(product)
if key not in self.deal_doc["products"]:
self.deal_doc["products"][key] = product
else:
self.deal_doc["products"][key]["quantity"] += product["quantity"]
self.deal_doc["products"][key]["warehouses_and_quantities"].append((
product["deal"].shipping_warehouse.name, product["quantity"],
))
else:
self.deal_doc["deals"] = [deal]
self.deal_doc["products"] = await self._group_deal_products_by_products(deal.products)
product_urls: List[Optional[str]] = []
for product in self.deal_doc["products"].values():
if len(product["deal_products"][0].product.images) > 0:
product_urls.append(product["deal_products"][0].product.images[0].image_url)
else:
product_urls.append(None)
self.deal_doc["product_images"] = await fetch_images(product_urls)
template = ENV.get_template("deal/deal-tech-spec.html")
result = template.render({"data": self.deal_doc, "sign_place_text": "_" * 22})
return result
async def create_deal_tech_spec_pdf(self, deal_id) -> BytesIO:
doc = await self._create_deal_tech_spec_document_html(deal_id)
pdf_file = BytesIO()
HTML(string=doc).write_pdf(pdf_file, stylesheets=[CSS(APP_PATH + '/static/css/deal-tech-spec.css')])
return pdf_file