from io import BytesIO from typing import Dict, Tuple from sqlalchemy import select, update, insert from sqlalchemy.orm import selectinload, joinedload from barcodes.attributes import AttributeWriterFactory from barcodes.generator.default_generator import DefaultBarcodeGenerator from barcodes.images_uploader import BarcodeImagesUploader from models import BarcodeTemplate, BarcodeTemplateAttribute, barcode_template_attribute_link, Product, \ BarcodeTemplateAdditionalField, BarcodeTemplateSize, Card, CardProduct from schemas.barcode import * from services.base import BaseService class BarcodeService(BaseService): # region Barcode async def _get_barcode_template(self, request: GetProductBarcodeRequest, product: Product) -> BarcodeTemplate: if request.barcode_template_id: stmt = select(BarcodeTemplate).filter(BarcodeTemplate.id == request.barcode_template_id) query = await self.session.execute(stmt) barcode_template = query.scalar() if not barcode_template: raise ValueError('Шаблон не найден') elif product.barcode_template: barcode_template = product.barcode_template elif product.client.barcode_template: barcode_template = product.client.barcode_template else: stmt = select(BarcodeTemplate).filter(BarcodeTemplate.is_default == True) query = await self.session.execute(stmt) barcode_template = query.scalar() if not barcode_template: raise ValueError('Стандартный шаблон не найден') barcode_template: BarcodeTemplate return barcode_template async def get_barcode(self, request: GetProductBarcodeRequest) -> GetProductBarcodeResponse: # get product by id stmt = ( select(Product) .options( joinedload(Product.client) ) .filter(Product.id == request.product_id) ) query = await self.session.execute(stmt) product: Product = query.scalar() if not product: raise ValueError('Товар не найден') barcode_template = await self._get_barcode_template(request, product) attributes_list = [] for attribute in barcode_template.attributes: attribute_getter = AttributeWriterFactory.get_writer(attribute.key) if not attribute_getter: continue value = attribute_getter.get_value(product) if not value: continue attributes_list.append( BarcodeAttributeSchema( name=attribute.name, value=value ) ) for additional_attribute in barcode_template.additional_attributes: attributes_list.append( BarcodeAttributeSchema( name=additional_attribute.name, value=additional_attribute.value ) ) barcode = BarcodeSchema( barcode=request.barcode, attributes=attributes_list, additional_field=barcode_template.additional_field ) return GetProductBarcodeResponse(barcode=barcode) async def get_barcode_pdf(self, request: GetProductBarcodePdfRequest) -> Tuple[str, BytesIO]: stmt = ( select(Product) .options( joinedload(Product.client) ) .filter(Product.id == request.product_id) ) query = await self.session.execute(stmt) product: Product = query.scalar() if not product: raise ValueError('Товар не найден') filename = f'{product.id}_barcode.pdf' default_generator = DefaultBarcodeGenerator() if product.barcode_image: uploader = BarcodeImagesUploader() pdf_buffer = default_generator.generate( [{ "barcode_image_url": uploader.get_abs_path(product.barcode_image.filename), "num_duplicates": request.quantity }] ) else: barcode_template = await self._get_barcode_template(request, product) pdf_buffer = default_generator.generate( [{ "barcode": request.barcode, "product": product, "template": barcode_template, "num_duplicates": request.quantity }] ) return filename, pdf_buffer async def get_card_barcodes_pdf(self, request: GetCardProductsBarcodesPdfRequest) -> Tuple[str, BytesIO]: stmt = ( select(Card) .options( selectinload(Card.products).joinedload(CardProduct.product).selectinload(Product.client), selectinload(Card.products).joinedload(CardProduct.product).joinedload(Product.barcodes), ) .filter(Card.id == request.card_id) ) query = await self.session.execute(stmt) card: Card = query.scalar() if not card: raise ValueError('Сделка не найдена') uploader = BarcodeImagesUploader() barcodes_data: List[Dict[str, str | Product | BarcodeTemplate | int]] = [] for card_product in card.products: if card_product.product.barcode_image: barcodes_data.append({ "barcode_image_url": uploader.get_abs_path(card_product.product.barcode_image.filename), "num_duplicates": card_product.quantity }) else: product_request = GetProductBarcodeRequest( product_id=card_product.product_id, barcode="", barcode_template_id=card_product.product.barcode_template_id, ) barcode_template = await self._get_barcode_template(product_request, card_product.product) barcodes_data.append({ "barcode": card_product.product.barcodes[0].barcode, "product": card_product.product, "template": barcode_template, "num_duplicates": card_product.quantity }) default_generator = DefaultBarcodeGenerator() filename = f'{card.id}_deal_barcodes.pdf' pdf_buffer = default_generator.generate(barcodes_data) return filename, pdf_buffer # endregion # region Template async def get_barcode_template_by_id(self, request: GetBarcodeTemplateByIdRequest) -> GetBarcodeTemplateByIdResponse: stmt = select(BarcodeTemplate).filter(BarcodeTemplate.id == request.id) query = await self.session.execute( stmt ) return query.scalar() async def get_all_barcode_templates(self) -> GetAllBarcodeTemplatesResponse: stmt = ( select(BarcodeTemplate) .options( selectinload(BarcodeTemplate.attributes) ) .order_by(BarcodeTemplate.id)) query = await self.session.execute(stmt) templates = query.scalars().all() return GetAllBarcodeTemplatesResponse(templates=templates) async def create_barcode_template(self, request: BarcodeTemplateCreateRequest) -> BarcodeTemplateCreateResponse: try: if request.is_default: stmt = select(BarcodeTemplate).filter(BarcodeTemplate.is_default == True) query = await self.session.execute(stmt) if query.scalar(): raise ValueError('Стандартный шаблон уже существует') # prevent duplicate template stmt = select(BarcodeTemplate).filter(BarcodeTemplate.name == request.name) query = await self.session.execute(stmt) if query.scalar(): raise ValueError('Шаблон с таким именем уже существует') # create template then add attributes request_dict = request.dict() del request_dict['attribute_ids'] del request_dict['additional_attributes'] del request_dict['size'] request_dict['size_id'] = request.size.id template = BarcodeTemplate(**request_dict) self.session.add(template) await self.session.flush() await self.session.refresh(template) # get all attributes from database stmt = select(BarcodeTemplateAttribute).filter( BarcodeTemplateAttribute.id.in_(request.attribute_ids)) query = await self.session.execute(stmt) attributes = query.scalars().all() # add attributes to template for attribute in attributes: template.attributes.append(attribute) for additional_attribute in request.additional_attributes: template.additional_attributes.append( BarcodeTemplateAdditionalField(**additional_attribute.dict(), barcode_template_id=template.id) ) await self.session.commit() return BarcodeTemplateCreateResponse(message='Шаблон успешно создан', ok=True, id=template.id) except Exception as e: await self.session.rollback() return BarcodeTemplateCreateResponse(message=str(e), ok=False, id=-1) async def update_barcode_template(self, request: BarcodeTemplateUpdateRequest) -> BarcodeTemplateUpdateResponse: try: stmt = select(BarcodeTemplate).filter(BarcodeTemplate.id == request.id) query = await self.session.execute(stmt) template = query.scalar() if not template: raise ValueError('Шаблон не найден') if request.is_default: stmt = select(BarcodeTemplate).filter(BarcodeTemplate.is_default == True) query = await self.session.execute(stmt) default_template = query.scalar() if default_template and default_template.id != request.id: raise ValueError('Стандартный шаблон уже существует') # update template then add attributes with dict and value syntax request_dict = request.dict() del request_dict['id'] del request_dict['attribute_ids'] del request_dict['additional_attributes'] del request_dict['size'] request_dict['size_id'] = request.size.id await self.session.execute( update(BarcodeTemplate) .where(BarcodeTemplate.id == request.id) .values(**request_dict) ) # difference deleted and new template_attributes = set([attribute.id for attribute in template.attributes]) request_attributes = set(request.attribute_ids) new_attribute_ids = request_attributes.difference(template_attributes) deleted_attribute_ids = template_attributes.difference(request_attributes) # delete attributes for attribute in template.attributes: if attribute.id not in deleted_attribute_ids: continue template.attributes.remove(attribute) for new_attribute_id in new_attribute_ids: stmt = insert(barcode_template_attribute_link).values({ 'barcode_template_id': template.id, 'attribute_id': new_attribute_id }) await self.session.execute(stmt) await self.session.flush() # working with additional attributes # deleting additioanl attributes that is deleted template_additional_attributes = set([attribute.name for attribute in template.additional_attributes]) request_additional_attributes = set([attribute.name for attribute in request.additional_attributes]) new_additional_attributes = request_additional_attributes.difference(template_additional_attributes) deleted_additional_attributes = template_additional_attributes.difference(request_additional_attributes) for attribute in template.additional_attributes: if attribute.name not in deleted_additional_attributes: continue await self.session.delete(attribute) for new_attribute in request.additional_attributes: if new_attribute.name not in new_additional_attributes: continue template.additional_attributes.append( BarcodeTemplateAdditionalField(**new_attribute.dict(), barcode_template_id=template.id) ) await self.session.commit() return BarcodeTemplateUpdateResponse(message='Шаблон успешно обновлен', ok=True) except Exception as e: await self.session.rollback() return BarcodeTemplateUpdateResponse(message=str(e), ok=False) async def delete_template(self, request: BarcodeTemplateDeleteRequest) -> BarcodeTemplateDeleteResponse: try: template = await self.session.get(BarcodeTemplate, request.id) await self.session.delete(template) await self.session.commit() return BarcodeTemplateDeleteResponse(ok=True, message='Шаблон успешно удален') except Exception as e: return BarcodeTemplateDeleteResponse(ok=False, message=str(e)) # endregion # region Template attributes async def get_all_barcode_template_attributes(self) -> GetAllBarcodeTemplateAttributesResponse: stmt = select(BarcodeTemplateAttribute).order_by(BarcodeTemplateAttribute.id) query = await self.session.execute(stmt) attributes = query.scalars().all() return GetAllBarcodeTemplateAttributesResponse(attributes=attributes) async def create_barcode_template_attribute(self, request: CreateBarcodeTemplateAttributeRequest): try: # prevent duplicate attribute stmt = select(BarcodeTemplateAttribute).filter(BarcodeTemplateAttribute.name == request.name) query = await self.session.execute(stmt) if query.scalar(): raise ValueError('Атрибут с таким именем уже существует') attribute = BarcodeTemplateAttribute(**request.dict()) self.session.add(attribute) await self.session.commit() return BarcodeTemplateCreateResponse(message='Атрибут успешно создан', ok=True, id=attribute.id) except Exception as e: await self.session.rollback() return BarcodeTemplateCreateResponse(message=str(e), ok=False, id=-1) # endregion # region Template sizes async def get_all_barcode_template_sizes(self) -> GetAllBarcodeTemplateSizesResponse: stmt = select(BarcodeTemplateSize).order_by(BarcodeTemplateSize.id) query = await self.session.execute(stmt) sizes = query.scalars().all() return GetAllBarcodeTemplateSizesResponse(sizes=sizes) # endregion