331 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			331 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from io import BytesIO
 | 
						||
from typing import Dict, Tuple
 | 
						||
 | 
						||
from sqlalchemy import select, update, insert
 | 
						||
from sqlalchemy.orm import selectinload, joinedload
 | 
						||
 | 
						||
from barcodes.attributes import AttributeWriterFactory
 | 
						||
from barcodes.generator.default_generator import DefaultBarcodeGenerator
 | 
						||
from models import BarcodeTemplate, BarcodeTemplateAttribute, barcode_template_attribute_link, Product, \
 | 
						||
    BarcodeTemplateAdditionalField, BarcodeTemplateSize, Deal, DealProduct
 | 
						||
from schemas.barcode import *
 | 
						||
from services.base import BaseService
 | 
						||
 | 
						||
 | 
						||
class BarcodeService(BaseService):
 | 
						||
 | 
						||
    # region Barcode
 | 
						||
    async def _get_barcode_template(self, request: GetProductBarcodeRequest, product: Product) -> BarcodeTemplate:
 | 
						||
        if request.barcode_template_id:
 | 
						||
            stmt = select(BarcodeTemplate).filter(BarcodeTemplate.id == request.barcode_template_id)
 | 
						||
            query = await self.session.execute(stmt)
 | 
						||
            barcode_template = query.scalar()
 | 
						||
            if not barcode_template:
 | 
						||
                raise ValueError('Шаблон не найден')
 | 
						||
        elif product.barcode_template:
 | 
						||
            barcode_template = product.barcode_template
 | 
						||
        elif product.client.barcode_template:
 | 
						||
            barcode_template = product.client.barcode_template
 | 
						||
        else:
 | 
						||
            stmt = select(BarcodeTemplate).filter(BarcodeTemplate.is_default == True)
 | 
						||
            query = await self.session.execute(stmt)
 | 
						||
            barcode_template = query.scalar()
 | 
						||
            if not barcode_template:
 | 
						||
                raise ValueError('Стандартный шаблон не найден')
 | 
						||
        barcode_template: BarcodeTemplate
 | 
						||
        return barcode_template
 | 
						||
 | 
						||
    async def get_barcode(self, request: GetProductBarcodeRequest) -> GetProductBarcodeResponse:
 | 
						||
        # get product by id
 | 
						||
        stmt = (
 | 
						||
            select(Product)
 | 
						||
            .options(
 | 
						||
                joinedload(Product.client)
 | 
						||
            )
 | 
						||
            .filter(Product.id == request.product_id)
 | 
						||
        )
 | 
						||
        query = await self.session.execute(stmt)
 | 
						||
        product: Product = query.scalar()
 | 
						||
        if not product:
 | 
						||
            raise ValueError('Товар не найден')
 | 
						||
        barcode_template = await self._get_barcode_template(request, product)
 | 
						||
        attributes_list = []
 | 
						||
        for attribute in barcode_template.attributes:
 | 
						||
            attribute_getter = AttributeWriterFactory.get_writer(attribute.key)
 | 
						||
            if not attribute_getter:
 | 
						||
                continue
 | 
						||
            value = attribute_getter.get_value(product)
 | 
						||
            if not value:
 | 
						||
                continue
 | 
						||
            attributes_list.append(
 | 
						||
                BarcodeAttributeSchema(
 | 
						||
                    name=attribute.name,
 | 
						||
                    value=value
 | 
						||
                )
 | 
						||
            )
 | 
						||
        for additional_attribute in barcode_template.additional_attributes:
 | 
						||
            attributes_list.append(
 | 
						||
                BarcodeAttributeSchema(
 | 
						||
                    name=additional_attribute.name,
 | 
						||
                    value=additional_attribute.value
 | 
						||
                )
 | 
						||
            )
 | 
						||
        barcode = BarcodeSchema(
 | 
						||
            barcode=request.barcode,
 | 
						||
            attributes=attributes_list,
 | 
						||
            additional_field=barcode_template.additional_field
 | 
						||
        )
 | 
						||
        return GetProductBarcodeResponse(barcode=barcode)
 | 
						||
 | 
						||
    async def get_barcode_pdf(self, request: GetProductBarcodePdfRequest) -> Tuple[str, BytesIO]:
 | 
						||
        stmt = (
 | 
						||
            select(Product)
 | 
						||
            .options(
 | 
						||
                joinedload(Product.client)
 | 
						||
            )
 | 
						||
            .filter(Product.id == request.product_id)
 | 
						||
        )
 | 
						||
        query = await self.session.execute(stmt)
 | 
						||
        product: Product = query.scalar()
 | 
						||
        if not product:
 | 
						||
            raise ValueError('Товар не найден')
 | 
						||
        barcode_template = await self._get_barcode_template(request, product)
 | 
						||
        default_generator = DefaultBarcodeGenerator()
 | 
						||
        filename = f'{product.id}_barcode.pdf'
 | 
						||
        pdf_buffer = default_generator.generate(
 | 
						||
            [{
 | 
						||
                "barcode": request.barcode,
 | 
						||
                "product": product,
 | 
						||
                "template": barcode_template,
 | 
						||
                "quantity": request.quantity
 | 
						||
            }]
 | 
						||
        )
 | 
						||
        return filename, pdf_buffer
 | 
						||
 | 
						||
    async def get_deal_barcodes_pdf(self, request: GetDealProductsBarcodesPdfRequest) -> Tuple[str, BytesIO]:
 | 
						||
        stmt = (
 | 
						||
            select(Deal)
 | 
						||
            .options(
 | 
						||
                selectinload(Deal.products).joinedload(DealProduct.product).selectinload(Product.client),
 | 
						||
                selectinload(Deal.products).joinedload(DealProduct.product).joinedload(Product.barcodes)
 | 
						||
            )
 | 
						||
            .filter(Deal.id == request.deal_id)
 | 
						||
        )
 | 
						||
        query = await self.session.execute(stmt)
 | 
						||
        deal: Deal = query.scalar()
 | 
						||
        if not deal:
 | 
						||
            raise ValueError('Сделка не найдена')
 | 
						||
 | 
						||
        barcodes_data: List[Dict[str, str | Product | BarcodeTemplate | int]] = []
 | 
						||
        for deal_product in deal.products:
 | 
						||
            product_request = GetProductBarcodeRequest(
 | 
						||
                product_id=deal_product.product_id,
 | 
						||
                barcode="",
 | 
						||
                barcode_template_id=deal_product.product.barcode_template_id,
 | 
						||
            )
 | 
						||
            barcode_template = await self._get_barcode_template(product_request, deal_product.product)
 | 
						||
            barcodes_data.append({
 | 
						||
                "barcode": deal_product.product.barcodes[0].barcode,
 | 
						||
                "product": deal_product.product,
 | 
						||
                "template": barcode_template,
 | 
						||
                "quantity": deal_product.quantity
 | 
						||
            })
 | 
						||
 | 
						||
        default_generator = DefaultBarcodeGenerator()
 | 
						||
        filename = f'{deal.id}_deal_barcodes.pdf'
 | 
						||
        pdf_buffer = default_generator.generate(barcodes_data)
 | 
						||
 | 
						||
        return filename, pdf_buffer
 | 
						||
 | 
						||
    # endregion
 | 
						||
 | 
						||
    # region Template
 | 
						||
    async def get_barcode_template_by_id(self,
 | 
						||
                                         request: GetBarcodeTemplateByIdRequest) -> GetBarcodeTemplateByIdResponse:
 | 
						||
        stmt = select(BarcodeTemplate).filter(BarcodeTemplate.id == request.id)
 | 
						||
        query = await self.session.execute(
 | 
						||
            stmt
 | 
						||
        )
 | 
						||
        return query.scalar()
 | 
						||
 | 
						||
    async def get_all_barcode_templates(self) -> GetAllBarcodeTemplatesResponse:
 | 
						||
        stmt = (
 | 
						||
            select(BarcodeTemplate)
 | 
						||
            .options(
 | 
						||
                selectinload(BarcodeTemplate.attributes)
 | 
						||
            )
 | 
						||
            .order_by(BarcodeTemplate.id))
 | 
						||
        query = await self.session.execute(stmt)
 | 
						||
        templates = query.scalars().all()
 | 
						||
        return GetAllBarcodeTemplatesResponse(templates=templates)
 | 
						||
 | 
						||
    async def create_barcode_template(self, request: BarcodeTemplateCreateRequest) -> BarcodeTemplateCreateResponse:
 | 
						||
        try:
 | 
						||
            if request.is_default:
 | 
						||
                stmt = select(BarcodeTemplate).filter(BarcodeTemplate.is_default == True)
 | 
						||
                query = await self.session.execute(stmt)
 | 
						||
                if query.scalar():
 | 
						||
                    raise ValueError('Стандартный шаблон уже существует')
 | 
						||
 | 
						||
            # prevent duplicate template
 | 
						||
            stmt = select(BarcodeTemplate).filter(BarcodeTemplate.name == request.name)
 | 
						||
            query = await self.session.execute(stmt)
 | 
						||
            if query.scalar():
 | 
						||
                raise ValueError('Шаблон с таким именем уже существует')
 | 
						||
 | 
						||
            # create template then add attributes
 | 
						||
            request_dict = request.dict()
 | 
						||
            del request_dict['attribute_ids']
 | 
						||
            del request_dict['additional_attributes']
 | 
						||
            del request_dict['size']
 | 
						||
            request_dict['size_id'] = request.size.id
 | 
						||
            template = BarcodeTemplate(**request_dict)
 | 
						||
            self.session.add(template)
 | 
						||
            await self.session.flush()
 | 
						||
            await self.session.refresh(template)
 | 
						||
            # get all attributes from database
 | 
						||
            stmt = select(BarcodeTemplateAttribute).filter(
 | 
						||
                BarcodeTemplateAttribute.id.in_(request.attribute_ids))
 | 
						||
            query = await self.session.execute(stmt)
 | 
						||
            attributes = query.scalars().all()
 | 
						||
 | 
						||
            # add attributes to template
 | 
						||
            for attribute in attributes:
 | 
						||
                template.attributes.append(attribute)
 | 
						||
            for additional_attribute in request.additional_attributes:
 | 
						||
                template.additional_attributes.append(
 | 
						||
                    BarcodeTemplateAdditionalField(**additional_attribute.dict(),
 | 
						||
                                                   barcode_template_id=template.id)
 | 
						||
                )
 | 
						||
            await self.session.commit()
 | 
						||
            return BarcodeTemplateCreateResponse(message='Шаблон успешно создан',
 | 
						||
                                                 ok=True,
 | 
						||
                                                 id=template.id)
 | 
						||
        except Exception as e:
 | 
						||
            await self.session.rollback()
 | 
						||
            return BarcodeTemplateCreateResponse(message=str(e),
 | 
						||
                                                 ok=False,
 | 
						||
                                                 id=-1)
 | 
						||
 | 
						||
    async def update_barcode_template(self, request: BarcodeTemplateUpdateRequest) -> BarcodeTemplateUpdateResponse:
 | 
						||
        try:
 | 
						||
            stmt = select(BarcodeTemplate).filter(BarcodeTemplate.id == request.id)
 | 
						||
            query = await self.session.execute(stmt)
 | 
						||
            template = query.scalar()
 | 
						||
            if not template:
 | 
						||
                raise ValueError('Шаблон не найден')
 | 
						||
 | 
						||
            if request.is_default:
 | 
						||
                stmt = select(BarcodeTemplate).filter(BarcodeTemplate.is_default == True)
 | 
						||
                query = await self.session.execute(stmt)
 | 
						||
                default_template = query.scalar()
 | 
						||
                if default_template and default_template.id != request.id:
 | 
						||
                    raise ValueError('Стандартный шаблон уже существует')
 | 
						||
 | 
						||
            # update template then add attributes with dict and value syntax
 | 
						||
            request_dict = request.dict()
 | 
						||
            del request_dict['id']
 | 
						||
            del request_dict['attribute_ids']
 | 
						||
            del request_dict['additional_attributes']
 | 
						||
            del request_dict['size']
 | 
						||
            request_dict['size_id'] = request.size.id
 | 
						||
 | 
						||
            await self.session.execute(
 | 
						||
                update(BarcodeTemplate)
 | 
						||
                .where(BarcodeTemplate.id == request.id)
 | 
						||
                .values(**request_dict)
 | 
						||
            )
 | 
						||
 | 
						||
            # difference deleted and new
 | 
						||
            template_attributes = set([attribute.id for attribute in template.attributes])
 | 
						||
            request_attributes = set(request.attribute_ids)
 | 
						||
            new_attribute_ids = request_attributes.difference(template_attributes)
 | 
						||
            deleted_attribute_ids = template_attributes.difference(request_attributes)
 | 
						||
 | 
						||
            # delete attributes
 | 
						||
            for attribute in template.attributes:
 | 
						||
                if attribute.id not in deleted_attribute_ids:
 | 
						||
                    continue
 | 
						||
                template.attributes.remove(attribute)
 | 
						||
            for new_attribute_id in new_attribute_ids:
 | 
						||
                stmt = insert(barcode_template_attribute_link).values({
 | 
						||
                    'barcode_template_id': template.id,
 | 
						||
                    'attribute_id': new_attribute_id
 | 
						||
                })
 | 
						||
                await self.session.execute(stmt)
 | 
						||
                await self.session.flush()
 | 
						||
 | 
						||
            # working with additional attributes
 | 
						||
            # deleting additioanl attributes that is deleted
 | 
						||
            template_additional_attributes = set([attribute.name for attribute in template.additional_attributes])
 | 
						||
            request_additional_attributes = set([attribute.name for attribute in request.additional_attributes])
 | 
						||
            new_additional_attributes = request_additional_attributes.difference(template_additional_attributes)
 | 
						||
            deleted_additional_attributes = template_additional_attributes.difference(request_additional_attributes)
 | 
						||
            for attribute in template.additional_attributes:
 | 
						||
                if attribute.name not in deleted_additional_attributes:
 | 
						||
                    continue
 | 
						||
                await self.session.delete(attribute)
 | 
						||
            for new_attribute in request.additional_attributes:
 | 
						||
                if new_attribute.name not in new_additional_attributes:
 | 
						||
                    continue
 | 
						||
                template.additional_attributes.append(
 | 
						||
                    BarcodeTemplateAdditionalField(**new_attribute.dict(),
 | 
						||
                                                   barcode_template_id=template.id)
 | 
						||
                )
 | 
						||
 | 
						||
            await self.session.commit()
 | 
						||
            return BarcodeTemplateUpdateResponse(message='Шаблон успешно обновлен',
 | 
						||
                                                 ok=True)
 | 
						||
        except Exception as e:
 | 
						||
            await self.session.rollback()
 | 
						||
            return BarcodeTemplateUpdateResponse(message=str(e),
 | 
						||
                                                 ok=False)
 | 
						||
 | 
						||
    async def delete_template(self, request: BarcodeTemplateDeleteRequest) -> BarcodeTemplateDeleteResponse:
 | 
						||
        try:
 | 
						||
            template = await self.session.get(BarcodeTemplate, request.id)
 | 
						||
            await self.session.delete(template)
 | 
						||
            await self.session.commit()
 | 
						||
            return BarcodeTemplateDeleteResponse(ok=True, message='Шаблон успешно удален')
 | 
						||
        except Exception as e:
 | 
						||
            return BarcodeTemplateDeleteResponse(ok=False, message=str(e))
 | 
						||
 | 
						||
    # endregion
 | 
						||
 | 
						||
    # region Template attributes
 | 
						||
    async def get_all_barcode_template_attributes(self) -> GetAllBarcodeTemplateAttributesResponse:
 | 
						||
        stmt = select(BarcodeTemplateAttribute).order_by(BarcodeTemplateAttribute.id)
 | 
						||
        query = await self.session.execute(stmt)
 | 
						||
        attributes = query.scalars().all()
 | 
						||
        return GetAllBarcodeTemplateAttributesResponse(attributes=attributes)
 | 
						||
 | 
						||
    async def create_barcode_template_attribute(self, request: CreateBarcodeTemplateAttributeRequest):
 | 
						||
        try:
 | 
						||
            # prevent duplicate attribute
 | 
						||
            stmt = select(BarcodeTemplateAttribute).filter(BarcodeTemplateAttribute.name == request.name)
 | 
						||
            query = await self.session.execute(stmt)
 | 
						||
            if query.scalar():
 | 
						||
                raise ValueError('Атрибут с таким именем уже существует')
 | 
						||
 | 
						||
            attribute = BarcodeTemplateAttribute(**request.dict())
 | 
						||
            self.session.add(attribute)
 | 
						||
            await self.session.commit()
 | 
						||
            return BarcodeTemplateCreateResponse(message='Атрибут успешно создан',
 | 
						||
                                                 ok=True,
 | 
						||
                                                 id=attribute.id)
 | 
						||
        except Exception as e:
 | 
						||
            await self.session.rollback()
 | 
						||
            return BarcodeTemplateCreateResponse(message=str(e),
 | 
						||
                                                 ok=False,
 | 
						||
                                                 id=-1)
 | 
						||
 | 
						||
    # endregion
 | 
						||
 | 
						||
    # region Template sizes
 | 
						||
    async def get_all_barcode_template_sizes(self) -> GetAllBarcodeTemplateSizesResponse:
 | 
						||
        stmt = select(BarcodeTemplateSize).order_by(BarcodeTemplateSize.id)
 | 
						||
        query = await self.session.execute(stmt)
 | 
						||
        sizes = query.scalars().all()
 | 
						||
        return GetAllBarcodeTemplateSizesResponse(sizes=sizes)
 | 
						||
    # endregion
 |