143 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			143 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from io import BytesIO
 | 
						||
from typing import List, Dict, Optional
 | 
						||
 | 
						||
from sqlalchemy import select
 | 
						||
from sqlalchemy.ext.asyncio import AsyncSession
 | 
						||
from sqlalchemy.orm import selectinload, joinedload
 | 
						||
from weasyprint import HTML, CSS
 | 
						||
 | 
						||
from constants import DEAL_STATUS_STR, ENV, APP_PATH
 | 
						||
from generators.deal_pdf_generator.deal_data import DealTechSpecProductData, DealTechSpecData
 | 
						||
from models import Deal, DealProduct, DealService as DealServiceModel, Product, DealGroup
 | 
						||
from services.deal_group import DealGroupService
 | 
						||
from utils.images_fetcher import fetch_images
 | 
						||
 | 
						||
 | 
						||
# Генерация ключа для группировки deal_product по артикулу и услугам
 | 
						||
def _gen_key_for_product(deal_product: DealProduct) -> str:
 | 
						||
    return f"{deal_product.product.article} - " + ",".join(
 | 
						||
        str(service.service_id) for service in deal_product.services
 | 
						||
    )
 | 
						||
 | 
						||
 | 
						||
# Генерация ключа для группировки deal_product из группы сделок по артикулу, сервисам, а также товарам
 | 
						||
def _regen_key_for_product(product_data: DealTechSpecProductData) -> str:
 | 
						||
    if len(product_data['deal_products']) == 0:
 | 
						||
        return ""
 | 
						||
 | 
						||
    article = product_data['deal_products'][0].product.article
 | 
						||
    services_ids = ",".join(str(service.service_id) for service in product_data['deal_products'][0].services)
 | 
						||
 | 
						||
    if len(product_data['deal_products']) == 1:
 | 
						||
        products = product_data['deal_products'][0].product_id
 | 
						||
    else:
 | 
						||
        products = ",".join(
 | 
						||
            f"{deal_product.product_id}-{deal_product.quantity}" for deal_product in product_data['deal_products']
 | 
						||
        )
 | 
						||
 | 
						||
    return f"{article}+{services_ids}+{products}"
 | 
						||
 | 
						||
 | 
						||
class DealTechSpecPdfGenerator:
 | 
						||
    def __init__(self, session: AsyncSession):
 | 
						||
        self._session = session
 | 
						||
        self.deal_doc: DealTechSpecData = {
 | 
						||
            "deals": [],
 | 
						||
            "products": {},
 | 
						||
            "product_images": (),
 | 
						||
            "deal_ids_header": "",
 | 
						||
            "deal_status_str": DEAL_STATUS_STR,
 | 
						||
        }
 | 
						||
        self.deal: Deal
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    async def _group_deal_products_by_products(deal_products: List[DealProduct]) -> Dict[str, DealTechSpecProductData]:
 | 
						||
        products: Dict[str, DealTechSpecProductData] = {}
 | 
						||
        additional_info: Optional[str]
 | 
						||
 | 
						||
        for deal_product in deal_products:
 | 
						||
            # Для группировки по артикулу и услугам
 | 
						||
            key = _gen_key_for_product(deal_product)
 | 
						||
 | 
						||
            if key not in products:
 | 
						||
                products[key] = {
 | 
						||
                    "deal": deal_product.deal,
 | 
						||
                    "deal_products": [deal_product],
 | 
						||
                    "quantity": deal_product.quantity,
 | 
						||
                    "additional_info": deal_product.product.additional_info,
 | 
						||
                    "warehouses_and_quantities": [],
 | 
						||
                }
 | 
						||
            else:
 | 
						||
                products[key]["deal_products"].append(deal_product)
 | 
						||
                products[key]["quantity"] += deal_product.quantity
 | 
						||
                if not products[key]["additional_info"]:
 | 
						||
                    products[key]["additional_info"] = deal_product.product.additional_info
 | 
						||
 | 
						||
        return products
 | 
						||
 | 
						||
    async def _get_deal_by_id(self, deal_id: int) -> Optional[Deal]:
 | 
						||
        deal: Deal | None = await self._session.scalar(
 | 
						||
            select(Deal)
 | 
						||
            .where(Deal.id == deal_id)
 | 
						||
            .options(
 | 
						||
                selectinload(Deal.products).selectinload(DealProduct.services),
 | 
						||
                selectinload(Deal.products).selectinload(DealProduct.product).selectinload(Product.barcodes),
 | 
						||
                selectinload(Deal.services).selectinload(DealServiceModel.service),
 | 
						||
                selectinload(Deal.status_history),
 | 
						||
                selectinload(Deal.group).selectinload(DealGroup.deals),
 | 
						||
                joinedload(Deal.client),
 | 
						||
                joinedload(Deal.shipping_warehouse),
 | 
						||
            )
 | 
						||
        )
 | 
						||
        return deal
 | 
						||
 | 
						||
    def _set_deals_ids_header(self):
 | 
						||
        self.deal_doc["deal_ids_header"] = f"ID: {self.deal.id}"
 | 
						||
        if self.deal.group:
 | 
						||
            self.deal_doc["deal_ids_header"] = "ID: " + ", ".join(str(d.id) for d in self.deal.group.deals)
 | 
						||
 | 
						||
    async def _create_deal_tech_spec_document_html(self, deal_id: int):
 | 
						||
        deal = await self._get_deal_by_id(deal_id)
 | 
						||
        if not deal:
 | 
						||
            return ""
 | 
						||
        self.deal = deal
 | 
						||
 | 
						||
        self._set_deals_ids_header()
 | 
						||
 | 
						||
        if deal.group:
 | 
						||
            deals = await DealGroupService(self._session).get_deals_by_group_id(deal.group.id)
 | 
						||
            for d in deals:
 | 
						||
                self.deal_doc["deals"].append(d)
 | 
						||
                grouped_products = await self._group_deal_products_by_products(d.products)
 | 
						||
                for product in grouped_products.values():
 | 
						||
                    key = _regen_key_for_product(product)
 | 
						||
                    if key not in self.deal_doc["products"]:
 | 
						||
                        self.deal_doc["products"][key] = product
 | 
						||
                    else:
 | 
						||
                        self.deal_doc["products"][key]["quantity"] += product["quantity"]
 | 
						||
                    self.deal_doc["products"][key]["warehouses_and_quantities"].append((
 | 
						||
                        product["deal"].shipping_warehouse.name, product["quantity"],
 | 
						||
                    ))
 | 
						||
        else:
 | 
						||
            self.deal_doc["deals"] = [deal]
 | 
						||
            self.deal_doc["products"] = await self._group_deal_products_by_products(deal.products)
 | 
						||
 | 
						||
        product_urls: List[Optional[str]] = []
 | 
						||
        for product in self.deal_doc["products"].values():
 | 
						||
            if len(product["deal_products"][0].product.images) > 0:
 | 
						||
                product_urls.append(product["deal_products"][0].product.images[0].image_url)
 | 
						||
            else:
 | 
						||
                product_urls.append(None)
 | 
						||
        self.deal_doc["product_images"] = await fetch_images(product_urls)
 | 
						||
 | 
						||
        template = ENV.get_template("deal/deal-tech-spec.html")
 | 
						||
 | 
						||
        result = template.render({"data": self.deal_doc, "sign_place_text": "_" * 22})
 | 
						||
        return result
 | 
						||
 | 
						||
    async def create_deal_tech_spec_pdf(self, deal_id) -> BytesIO:
 | 
						||
        doc = await self._create_deal_tech_spec_document_html(deal_id)
 | 
						||
        pdf_file = BytesIO()
 | 
						||
        HTML(string=doc).write_pdf(pdf_file, stylesheets=[CSS(APP_PATH + '/static/css/deal-tech-spec.css')])
 | 
						||
        return pdf_file
 |