From 5ed42d99dc7538d5d766c90fec24aa6aa432b2cf Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Fri, 1 Nov 2024 17:23:58 +0400 Subject: [PATCH 1/2] feat: CRUD for product barcode images --- barcodes/images_uploader/__init__.py | 1 + barcodes/images_uploader/base.py | 18 +++++ barcodes/images_uploader/images_uploader.py | 46 +++++++++++++ barcodes/pdf/generator.py | 8 +++ constants.py | 2 + main.py | 3 +- models/product.py | 10 +++ requirements.txt | 1 + routers/product.py | 37 +++++++++++ schemas/product.py | 14 ++++ services/product.py | 74 ++++++++++++++++++++- 11 files changed, 210 insertions(+), 4 deletions(-) create mode 100644 barcodes/images_uploader/__init__.py create mode 100644 barcodes/images_uploader/base.py create mode 100644 barcodes/images_uploader/images_uploader.py diff --git a/barcodes/images_uploader/__init__.py b/barcodes/images_uploader/__init__.py new file mode 100644 index 0000000..dc25cb2 --- /dev/null +++ b/barcodes/images_uploader/__init__.py @@ -0,0 +1 @@ +from .images_uploader import BarcodeImagesUploader diff --git a/barcodes/images_uploader/base.py b/barcodes/images_uploader/base.py new file mode 100644 index 0000000..03b39f2 --- /dev/null +++ b/barcodes/images_uploader/base.py @@ -0,0 +1,18 @@ +from abc import abstractmethod + +from fastapi import UploadFile + + +class BaseImagesUploader: + + @abstractmethod + def get_url(self, filename: str) -> bytes: + pass + + @abstractmethod + def delete(self, filename: str): + pass + + @abstractmethod + async def upload(self, upload_file: UploadFile) -> str: + pass \ No newline at end of file diff --git a/barcodes/images_uploader/images_uploader.py b/barcodes/images_uploader/images_uploader.py new file mode 100644 index 0000000..3a97452 --- /dev/null +++ b/barcodes/images_uploader/images_uploader.py @@ -0,0 +1,46 @@ +from pathlib import Path +from uuid import uuid4 + +from aioshutil import copyfileobj +from fastapi import UploadFile + +from barcodes.images_uploader.base import BaseImagesUploader +from barcodes.pdf import PDFGenerator +from constants import APP_PATH, API_ROOT + + +class BarcodeImagesUploader(BaseImagesUploader): + def __init__(self): + self.relative_path = Path("static/images/product_barcodes") + self.storage_path = APP_PATH / self.relative_path + if not Path.exists(self.storage_path): + Path.mkdir(self.storage_path) + + def get_url(self, filename: str) -> str: + file_location = self.relative_path / filename + return f"{API_ROOT}/{file_location}" + + def delete(self, filename: str): + file_location = self.storage_path / filename + if file_location.exists(): + file_location.unlink() + + async def upload(self, upload_file: UploadFile) -> str: + # Create temp file in filesystem + temp_filename = str(uuid4()) + '.' + upload_file.filename.split('.')[-1] + temp_file_location = self.storage_path / temp_filename + with open(temp_file_location, 'wb') as buffer: + await copyfileobj(upload_file.file, buffer) + + # Generate PDF file and save it + res_filename = str(uuid4()) + '.pdf' + res_file_location = f"{self.storage_path}/{res_filename}" + temp_file_url = f"{self.relative_path}/{temp_filename}" + + pdf_gen = PDFGenerator() + pdf_gen.generate_barcode_image(temp_file_url, res_file_location) + + # Remove temp file + self.delete(temp_filename) + + return res_filename diff --git a/barcodes/pdf/generator.py b/barcodes/pdf/generator.py index f5f6b90..7c55371 100644 --- a/barcodes/pdf/generator.py +++ b/barcodes/pdf/generator.py @@ -7,6 +7,7 @@ from reportlab.lib.pagesizes import mm from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont +from reportlab.pdfgen import canvas from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak from constants import APP_PATH @@ -181,3 +182,10 @@ class PDFGenerator: buffer.seek(0) return buffer + + def generate_barcode_image(self, barcode_image_url: str, path_to_save_pdf: str): + print(type(path_to_save_pdf)) + print(path_to_save_pdf) + c = canvas.Canvas(path_to_save_pdf, pagesize=(self.page_width, self.page_height)) + c.drawImage(barcode_image_url, 0, 0, width=self.page_width, height=self.page_height) + c.save() diff --git a/constants.py b/constants.py index 8b69779..b3b7879 100644 --- a/constants.py +++ b/constants.py @@ -12,6 +12,8 @@ ENV.globals['now'] = datetime.now ENV.globals['encode128'] = encode128 ENV.globals['format_number'] = lambda x: '{:,}'.format(x).replace(',', ' ') +API_ROOT = "/api" + APP_PATH = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(__file__) allowed_telegram_ids = [ diff --git a/main.py b/main.py index 5c9395b..8e34220 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ import platform from starlette.staticfiles import StaticFiles import routers +from constants import API_ROOT origins = [ 'http://localhost:5173' @@ -17,7 +18,7 @@ if platform.system() == 'Linux': class Worker(uvicorn.workers.UvicornWorker): CONFIG_KWARGS = { - 'root_path': '/api' + 'root_path': API_ROOT } app.add_middleware( diff --git a/models/product.py b/models/product.py index 902b794..b908e96 100644 --- a/models/product.py +++ b/models/product.py @@ -23,6 +23,8 @@ class Product(BaseModel): barcode_template_id = Column(Integer, ForeignKey('barcode_templates.id'), nullable=True) barcode_template = relationship('BarcodeTemplate', lazy='joined') + barcode_image = relationship('ProductBarcodeImage', back_populates='product', lazy='joined', uselist=False) + # Attributes # TODO move to another table brand = Column(String, nullable=True, comment='Бренд') @@ -61,3 +63,11 @@ class ProductBarcode(BaseModel): product: Mapped["Product"] = relationship(back_populates='barcodes') barcode = Column(String, nullable=False, index=True, comment='ШК товара', primary_key=True) + + +class ProductBarcodeImage(BaseModel): + __tablename__ = 'product_barcode_images' + product_id = Column(Integer, ForeignKey('products.id'), primary_key=True, comment='ID товара') + product: Mapped["Product"] = relationship(back_populates='barcode_image') + + filename = Column(String, nullable=False) diff --git a/requirements.txt b/requirements.txt index 5bbddae..253f479 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,6 +25,7 @@ openpyxl lexorank-py celery[redis] celery +aioshutil # PDF reportlab weasyprint diff --git a/routers/product.py b/routers/product.py index 041946a..1696332 100644 --- a/routers/product.py +++ b/routers/product.py @@ -164,3 +164,40 @@ async def upload_product_image( ): file_bytes = upload_file.file.read() return await ProductService(session).upload_image(product_id, file_bytes) + + +@product_router.post( + '/barcode/upload-image/{product_id}', + response_model=ProductUploadBarcodeImageResponse, + operation_id='upload_product_barcode_image' +) +async def upload_product_barcode_image( + product_id: int, + upload_file: UploadFile, + session: Annotated[AsyncSession, Depends(get_session)] +): + return await ProductService(session).upload_barcode_image(product_id, upload_file) + + +@product_router.post( + '/barcode/delete-image/{product_id}', + response_model=ProductDeleteBarcodeImageResponse, + operation_id='delete_product_barcode_image' +) +async def delete_product_barcode_image( + product_id: int, + session: Annotated[AsyncSession, Depends(get_session)] +): + return await ProductService(session).delete_barcode_image(product_id) + + +@product_router.post( + '/barcode/image/{product_id}', + response_model=ProductGetBarcodeImageResponse, + operation_id='get_product_barcode_image' +) +async def get_product_barcode_image( + product_id: int, + session: Annotated[AsyncSession, Depends(get_session)] +): + return await ProductService(session).get_barcode_image(product_id) diff --git a/schemas/product.py b/schemas/product.py index c181372..1320294 100644 --- a/schemas/product.py +++ b/schemas/product.py @@ -115,4 +115,18 @@ class ProductExistsBarcodeResponse(BaseSchema): class ProductUploadImageResponse(OkMessageSchema): image_url: str | None = None + + +class ProductUploadBarcodeImageResponse(OkMessageSchema): + barcode_image_url: str | None = None + + +class ProductDeleteBarcodeImageResponse(OkMessageSchema): + pass + + +class ProductGetBarcodeImageResponse(BaseSchema): + barcode_image_url: str | None = None + + # endregion diff --git a/services/product.py b/services/product.py index 71f2d08..406ab56 100644 --- a/services/product.py +++ b/services/product.py @@ -1,11 +1,14 @@ -from fastapi import HTTPException +from typing import Optional + +from fastapi import HTTPException, UploadFile from sqlalchemy import select, func, Integer, update, or_ from sqlalchemy.orm import selectinload, Query import utils.barcodes from backend import config +from barcodes.images_uploader import BarcodeImagesUploader from external.s3_uploader.uploader import S3Uploader -from models.product import Product, ProductImage +from models.product import Product, ProductImage, ProductBarcodeImage from schemas.base import PaginationSchema from schemas.product import * from services.base import BaseService @@ -107,7 +110,8 @@ class ProductService(BaseService): ) .options( selectinload(Product.barcodes) - .noload(ProductBarcode.product) + .noload(ProductBarcode.product), + selectinload(Product.barcode_image), ) .where( Product.client_id == client_id @@ -257,4 +261,68 @@ class ProductService(BaseService): return ProductGenerateBarcodeResponse(ok=True, message='Штрих-код успешно сгенерирован', barcode=barcode) except Exception as e: return ProductGenerateBarcodeResponse(ok=False, message=str(e)) + + async def get_model_by_id(self, product_id: int) -> Product: + product: Optional[Product] = await self.session.get(Product, product_id) + if not product: + raise Exception('Не удалось найти товар с указанным ID') + return product + + async def delete_model_barcode_image(self, uploader: BarcodeImagesUploader, product_id: int) -> None: + barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id) + if barcode_image: + uploader.delete(barcode_image.filename) + await self.session.delete(barcode_image) + await self.session.commit() + + async def upload_barcode_image(self, product_id: int, upload_file: UploadFile) -> ProductUploadBarcodeImageResponse: + try: + await self.get_model_by_id(product_id) + + uploader = BarcodeImagesUploader() + await self.delete_model_barcode_image(uploader, product_id) + filename = await uploader.upload(upload_file) + barcode_image_url = uploader.get_url(filename) + + product_barcode_image = ProductBarcodeImage( + product_id=product_id, + filename=filename, + ) + self.session.add(product_barcode_image) + await self.session.commit() + return ProductUploadBarcodeImageResponse( + ok=True, + message='Штрих-код для товара успешно загружен', + barcode_image_url=barcode_image_url, + ) + except Exception as e: + print(e) + return ProductUploadBarcodeImageResponse(ok=False, message=str(e)) + + async def delete_barcode_image(self, product_id: int) -> ProductDeleteBarcodeImageResponse: + try: + await self.get_model_by_id(product_id) + + uploader = BarcodeImagesUploader() + await self.delete_model_barcode_image(uploader, product_id) + + return ProductDeleteBarcodeImageResponse( + ok=True, + message='Штрих-код для товара успешно удален', + ) + except Exception as e: + return ProductDeleteBarcodeImageResponse(ok=False, message=str(e)) + + async def get_barcode_image(self, product_id: int) -> ProductGetBarcodeImageResponse: + product: Optional[Product] = await self.session.get(Product, product_id) + if not product: + raise HTTPException(404, 'Не удалось найти товар с указанным ID') + barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id) + if not barcode_image: + return ProductGetBarcodeImageResponse(barcode_image_url="") + + uploader = BarcodeImagesUploader() + url = uploader.get_url(barcode_image.filename) + return ProductGetBarcodeImageResponse(barcode_image_url=url) + # endregion From 25060322a1b8cf23568c30e5803007af39042424 Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Sat, 2 Nov 2024 00:53:45 +0400 Subject: [PATCH 2/2] feat: pdf generation using product barcode images --- barcodes/generator/default_generator.py | 55 ++++---- barcodes/images_uploader/base.py | 4 + barcodes/images_uploader/images_uploader.py | 4 + barcodes/pdf/generator.py | 143 ++++++++++---------- barcodes/pdf/pdf_maker.py | 38 ++++++ barcodes/types.py | 21 +++ requirements.txt | 2 + schemas/barcode.py | 6 - services/barcode.py | 64 +++++---- 9 files changed, 212 insertions(+), 125 deletions(-) create mode 100644 barcodes/pdf/pdf_maker.py create mode 100644 barcodes/types.py diff --git a/barcodes/generator/default_generator.py b/barcodes/generator/default_generator.py index 70b9288..ad94194 100644 --- a/barcodes/generator/default_generator.py +++ b/barcodes/generator/default_generator.py @@ -1,43 +1,44 @@ from io import BytesIO -from typing import List, Dict +from typing import List from barcodes.attributes import AttributeWriterFactory from barcodes.generator.base import BaseBarcodeGenerator from barcodes.pdf import PDFGenerator -from models import Product, BarcodeTemplate -from schemas.barcode import PdfBarcodeGenData +from barcodes.types import BarcodeData, PdfBarcodeGenData, PdfBarcodeImageGenData class DefaultBarcodeGenerator(BaseBarcodeGenerator): - def generate(self, barcodes_data: List[Dict[str, str | Product | BarcodeTemplate | int]]) -> BytesIO: + def generate(self, barcodes_data: List[BarcodeData | PdfBarcodeImageGenData]) -> BytesIO: pdf_generator = PDFGenerator() - pdf_barcodes_gen_data: List[PdfBarcodeGenData] = [] + pdf_barcodes_gen_data: List[PdfBarcodeGenData | PdfBarcodeImageGenData] = [] for barcode_data in barcodes_data: - attributes = {} - for attribute in barcode_data["template"].attributes: - attribute_getter = AttributeWriterFactory.get_writer(attribute.key) - if not attribute_getter: - continue - value = attribute_getter.get_value(barcode_data["product"]) + if "barcode" in barcode_data: + attributes = {} + for attribute in barcode_data["template"].attributes: + attribute_getter = AttributeWriterFactory.get_writer(attribute.key) + if not attribute_getter: + continue + value = attribute_getter.get_value(barcode_data["product"]) - if not value or not value.strip(): - continue - attributes[attribute.name] = value - for additional_attribute in barcode_data["template"].additional_attributes: - value = additional_attribute.value - if not value: - continue - attributes[additional_attribute.name] = value - barcode_text = '
'.join([f'{key}: {value}' for key, value in attributes.items()]) + if not value or not value.strip(): + continue + attributes[attribute.name] = value + for additional_attribute in barcode_data["template"].additional_attributes: + value = additional_attribute.value + if not value: + continue + attributes[additional_attribute.name] = value + barcode_text = '
'.join([f'{key}: {value}' for key, value in attributes.items()]) - pdf_barcodes_gen_data.append( - PdfBarcodeGenData( - barcode_value=barcode_data["barcode"], - text=barcode_text, - num_duplicates=barcode_data["quantity"] - ) - ) + pdf_barcodes_gen_data.append({ + "barcode_value": barcode_data["barcode"], + "text": barcode_text, + "num_duplicates": barcode_data["num_duplicates"] + }) + else: + pdf_barcodes_gen_data.append(barcode_data) return pdf_generator.generate(pdf_barcodes_gen_data) + diff --git a/barcodes/images_uploader/base.py b/barcodes/images_uploader/base.py index 03b39f2..4f727bc 100644 --- a/barcodes/images_uploader/base.py +++ b/barcodes/images_uploader/base.py @@ -9,6 +9,10 @@ class BaseImagesUploader: def get_url(self, filename: str) -> bytes: pass + @abstractmethod + def get_abs_path(self, filename: str) -> bytes: + pass + @abstractmethod def delete(self, filename: str): pass diff --git a/barcodes/images_uploader/images_uploader.py b/barcodes/images_uploader/images_uploader.py index 3a97452..4b3c9ee 100644 --- a/barcodes/images_uploader/images_uploader.py +++ b/barcodes/images_uploader/images_uploader.py @@ -20,6 +20,10 @@ class BarcodeImagesUploader(BaseImagesUploader): file_location = self.relative_path / filename return f"{API_ROOT}/{file_location}" + def get_abs_path(self, filename: str) -> str: + file_location = self.storage_path / filename + return file_location + def delete(self, filename: str): file_location = self.storage_path / filename if file_location.exists(): diff --git a/barcodes/pdf/generator.py b/barcodes/pdf/generator.py index 7c55371..b25e27b 100644 --- a/barcodes/pdf/generator.py +++ b/barcodes/pdf/generator.py @@ -10,8 +10,9 @@ from reportlab.pdfbase.ttfonts import TTFont from reportlab.pdfgen import canvas from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak +from barcodes.pdf.pdf_maker import PdfMaker +from barcodes.types import PdfBarcodeImageGenData, PdfBarcodeGenData from constants import APP_PATH -from schemas.barcode import PdfBarcodeGenData class PDFGenerator: @@ -102,90 +103,94 @@ class PDFGenerator: bottomMargin=1 * mm ) - def generate(self, barcodes_data: List[PdfBarcodeGenData]) -> BytesIO: + def _generate_for_one_product(self, barcode_data: PdfBarcodeGenData) -> BytesIO: buffer = BytesIO() - - # Создаем документ с указанным размером страницы doc = self._create_doc(buffer) - # Список элементов для добавления в документ + # Создаем абзац с новым стилем + paragraph = Paragraph(barcode_data['text'], self.small_style) + + # Получаем ширину и высоту абзаца + paragraph_width, paragraph_height = paragraph.wrap(self.page_width - 2 * mm, self.page_height) + + # Рассчитываем доступное пространство для штрихкода + human_readable_height = 6 * mm # Высота human-readable текста + space_between_text_and_barcode = 4 * mm # Отступ между текстом и штрихкодом + barcode_height = self.page_height - paragraph_height - human_readable_height - space_between_text_and_barcode - 4 * mm # Учитываем поля и отступы + + # Создаем штрихкод + available_width = self.page_width - 2 * mm # Учитываем поля + + # Приблизительное количество элементов в штрихкоде Code 128 для средней длины + num_elements = 11 * len(barcode_data['barcode_value']) # Примерная оценка: 11 элементов на символ + + # Рассчитываем ширину штриха + bar_width = available_width / num_elements + barcode = code128.Code128( + barcode_data['barcode_value'], + barWidth=bar_width, + barHeight=barcode_height, + humanReadable=True + ) + + # Добавление штрихкодов в список элементов документа elements = [] - product_barcode_canvases = [] - - for barcode_data in barcodes_data: - # Создаем абзац с новым стилем - paragraph = Paragraph(barcode_data.text, self.small_style) - - # Получаем ширину и высоту абзаца - paragraph_width, paragraph_height = paragraph.wrap(self.page_width - 2 * mm, self.page_height) - - # Рассчитываем доступное пространство для штрихкода - human_readable_height = 6 * mm # Высота human-readable текста - space_between_text_and_barcode = 4 * mm # Отступ между текстом и штрихкодом - barcode_height = self.page_height - paragraph_height - human_readable_height - space_between_text_and_barcode - 4 * mm # Учитываем поля и отступы - - # Создаем штрихкод - available_width = self.page_width - 2 * mm # Учитываем поля - - # Приблизительное количество элементов в штрихкоде Code 128 для средней длины - num_elements = 11 * len(barcode_data.barcode_value) # Примерная оценка: 11 элементов на символ - - # Рассчитываем ширину штриха - bar_width = available_width / num_elements - barcode = code128.Code128( - barcode_data.barcode_value, - barWidth=bar_width, - barHeight=barcode_height, - humanReadable=True - ) - product_barcode_canvases.append(barcode) - - # Добавление штрихкодов в список элементов документа - for _ in range(barcode_data.num_duplicates): - elements.append(paragraph) - elements.append(Spacer(1, space_between_text_and_barcode)) # Отступ между текстом и штрихкодом - elements.append(PageBreak()) - - # Добавление спейсеров - for _ in range(self.number_of_spacing_pages): - elements.append(PageBreak()) - - # Удалить последние спейсеры - elements = elements[:-2] - - product_counter, product_barcodes_counter = 0, 0 + for _ in range(barcode_data['num_duplicates']): + elements.append(paragraph) + elements.append(Spacer(1, space_between_text_and_barcode)) # Отступ между текстом и штрихкодом + elements.append(PageBreak()) # Функция для отрисовки штрихкода на canvas def add_barcode(canvas, doc): - nonlocal product_barcodes_counter, product_counter - - barcode_canvas = product_barcode_canvases[product_counter] - - product_barcodes_counter += 1 - - # Если данная страница это спейсер, то оставить пустой - num_duplicates = barcodes_data[product_counter].num_duplicates - if product_barcodes_counter > num_duplicates: - if product_barcodes_counter >= num_duplicates + self.number_of_spacing_pages: - product_barcodes_counter = 0 - product_counter += 1 - return - - # Отрисовка штрихкода - barcode_width = barcode_canvas.width + barcode_width = barcode.width barcode_x = (self.page_width - barcode_width) / 2 # Центрируем штрихкод barcode_y = human_readable_height + 2 * mm # Размещаем штрихкод снизу с учетом отступа - barcode_canvas.drawOn(canvas, barcode_x, barcode_y) + barcode.drawOn(canvas, barcode_x, barcode_y) # Создаем документ - doc.build(elements[:-1], onFirstPage=add_barcode, onLaterPages=add_barcode) # Убираем последний PageBreak + doc.build(elements, onFirstPage=add_barcode, onLaterPages=add_barcode) # Убираем последний PageBreak buffer.seek(0) return buffer + def _generate_for_one_product_using_img(self, barcode_data: PdfBarcodeImageGenData) -> BytesIO: + with open(barcode_data["barcode_image_url"], 'rb') as pdf_file: + pdf_bytes = pdf_file.read() + + pdf_maker = PdfMaker((self.page_width, self.page_height)) + for _ in range(barcode_data['num_duplicates']): + pdf_maker.add_pdfs(BytesIO(pdf_bytes)) + + return pdf_maker.get_bytes() + + def _generate_spacers(self) -> BytesIO: + buffer = BytesIO() + doc = self._create_doc(buffer) + elements = [] + for _ in range(self.number_of_spacing_pages): + elements.append(PageBreak()) + doc.build(elements) + buffer.seek(0) + return buffer + + def generate(self, barcodes_data: List[PdfBarcodeGenData | PdfBarcodeImageGenData]) -> BytesIO: + pdf_maker = PdfMaker((self.page_width, self.page_height)) + + pdf_files: list[BytesIO] = [] + + for barcode_data in barcodes_data: + if "barcode_value" in barcode_data: + pdf_files.append(self._generate_for_one_product(barcode_data)) + else: + pdf_files.append(self._generate_for_one_product_using_img(barcode_data)) + pdf_files.append(self._generate_spacers()) + + for file in pdf_files[:-1]: + pdf_maker.add_pdfs(file) + + return pdf_maker.get_bytes() + def generate_barcode_image(self, barcode_image_url: str, path_to_save_pdf: str): - print(type(path_to_save_pdf)) - print(path_to_save_pdf) c = canvas.Canvas(path_to_save_pdf, pagesize=(self.page_width, self.page_height)) c.drawImage(barcode_image_url, 0, 0, width=self.page_width, height=self.page_height) c.save() diff --git a/barcodes/pdf/pdf_maker.py b/barcodes/pdf/pdf_maker.py new file mode 100644 index 0000000..f261bb6 --- /dev/null +++ b/barcodes/pdf/pdf_maker.py @@ -0,0 +1,38 @@ +from io import BytesIO + +from fpdf import FPDF +import pdfrw + +class PdfMaker: + def __init__(self, size: tuple): + self.size = size + + self.writer = pdfrw.PdfWriter() + + def clear(self): + del self.writer + self.writer = pdfrw.PdfWriter() + + def add_image(self, image_data): + size = self.size + + fpdf = FPDF(format=size, unit="pt") + width, height = self.size + fpdf.add_page() + fpdf.image(image_data, 0, 0, width, height) + fpdf_reader: pdfrw.PdfReader = pdfrw.PdfReader(fdata=bytes(fpdf.output())) + self.writer.addpage(fpdf_reader.getPage(0)) + + def add_pdf(self, pdf_data: BytesIO): + pdf_reader = pdfrw.PdfReader(fdata=bytes(pdf_data.read())) + self.writer.addpage(pdf_reader.getPage(0)) + + def add_pdfs(self, pdf_data: BytesIO): + pdf_reader = pdfrw.PdfReader(fdata=bytes(pdf_data.read())) + self.writer.addpages(pdf_reader.readpages(pdf_reader.Root)) + + def get_bytes(self): + result_io = BytesIO() + self.writer.write(result_io) + result_io.seek(0) + return result_io diff --git a/barcodes/types.py b/barcodes/types.py new file mode 100644 index 0000000..54d0b72 --- /dev/null +++ b/barcodes/types.py @@ -0,0 +1,21 @@ +from typing import TypedDict + +from models import BarcodeTemplate, Product + + +class BarcodeData(TypedDict): + barcode: str + template: BarcodeTemplate + product: Product + num_duplicates: int + + +class PdfBarcodeGenData(TypedDict): + barcode_value: str + text: str + num_duplicates: int + + +class PdfBarcodeImageGenData(TypedDict): + num_duplicates: int + barcode_image_url: str diff --git a/requirements.txt b/requirements.txt index 253f479..4b7bb1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,3 +30,5 @@ aioshutil reportlab weasyprint number_to_string +pdfrw +fpdf \ No newline at end of file diff --git a/schemas/barcode.py b/schemas/barcode.py index b886abb..d794ad6 100644 --- a/schemas/barcode.py +++ b/schemas/barcode.py @@ -47,12 +47,6 @@ class BarcodeSchema(BaseSchema): additional_field: str | None = None -class PdfBarcodeGenData(BaseSchema): - barcode_value: str - text: str - num_duplicates: int = 1 - - # endregion # region Requests diff --git a/services/barcode.py b/services/barcode.py index ec89bc7..e7253d5 100644 --- a/services/barcode.py +++ b/services/barcode.py @@ -6,6 +6,7 @@ from sqlalchemy.orm import selectinload, joinedload from barcodes.attributes import AttributeWriterFactory from barcodes.generator.default_generator import DefaultBarcodeGenerator +from barcodes.images_uploader import BarcodeImagesUploader from models import BarcodeTemplate, BarcodeTemplateAttribute, barcode_template_attribute_link, Product, \ BarcodeTemplateAdditionalField, BarcodeTemplateSize, Deal, DealProduct from schemas.barcode import * @@ -89,17 +90,27 @@ class BarcodeService(BaseService): product: Product = query.scalar() if not product: raise ValueError('Товар не найден') - barcode_template = await self._get_barcode_template(request, product) - default_generator = DefaultBarcodeGenerator() filename = f'{product.id}_barcode.pdf' - pdf_buffer = default_generator.generate( - [{ - "barcode": request.barcode, - "product": product, - "template": barcode_template, - "quantity": request.quantity - }] - ) + default_generator = DefaultBarcodeGenerator() + + if product.barcode_image: + uploader = BarcodeImagesUploader() + pdf_buffer = default_generator.generate( + [{ + "barcode_image_url": uploader.get_abs_path(product.barcode_image.filename), + "num_duplicates": request.quantity + }] + ) + else: + barcode_template = await self._get_barcode_template(request, product) + pdf_buffer = default_generator.generate( + [{ + "barcode": request.barcode, + "product": product, + "template": barcode_template, + "num_duplicates": request.quantity + }] + ) return filename, pdf_buffer async def get_deal_barcodes_pdf(self, request: GetDealProductsBarcodesPdfRequest) -> Tuple[str, BytesIO]: @@ -107,7 +118,7 @@ class BarcodeService(BaseService): select(Deal) .options( selectinload(Deal.products).joinedload(DealProduct.product).selectinload(Product.client), - selectinload(Deal.products).joinedload(DealProduct.product).joinedload(Product.barcodes) + selectinload(Deal.products).joinedload(DealProduct.product).joinedload(Product.barcodes), ) .filter(Deal.id == request.deal_id) ) @@ -116,20 +127,27 @@ class BarcodeService(BaseService): if not deal: raise ValueError('Сделка не найдена') + uploader = BarcodeImagesUploader() barcodes_data: List[Dict[str, str | Product | BarcodeTemplate | int]] = [] for deal_product in deal.products: - product_request = GetProductBarcodeRequest( - product_id=deal_product.product_id, - barcode="", - barcode_template_id=deal_product.product.barcode_template_id, - ) - barcode_template = await self._get_barcode_template(product_request, deal_product.product) - barcodes_data.append({ - "barcode": deal_product.product.barcodes[0].barcode, - "product": deal_product.product, - "template": barcode_template, - "quantity": deal_product.quantity - }) + if deal_product.product.barcode_image: + barcodes_data.append({ + "barcode_image_url": uploader.get_abs_path(deal_product.product.barcode_image.filename), + "num_duplicates": deal_product.quantity + }) + else: + product_request = GetProductBarcodeRequest( + product_id=deal_product.product_id, + barcode="", + barcode_template_id=deal_product.product.barcode_template_id, + ) + barcode_template = await self._get_barcode_template(product_request, deal_product.product) + barcodes_data.append({ + "barcode": deal_product.product.barcodes[0].barcode, + "product": deal_product.product, + "template": barcode_template, + "num_duplicates": deal_product.quantity + }) default_generator = DefaultBarcodeGenerator() filename = f'{deal.id}_deal_barcodes.pdf'