Merge remote-tracking branch 'origin/productBarcodeImages'

This commit is contained in:
2024-11-05 00:04:21 +03:00
16 changed files with 420 additions and 127 deletions

View File

@@ -1,43 +1,44 @@
from io import BytesIO
from typing import List, Dict
from typing import List
from barcodes.attributes import AttributeWriterFactory
from barcodes.generator.base import BaseBarcodeGenerator
from barcodes.pdf import PDFGenerator
from models import Product, BarcodeTemplate
from schemas.barcode import PdfBarcodeGenData
from barcodes.types import BarcodeData, PdfBarcodeGenData, PdfBarcodeImageGenData
class DefaultBarcodeGenerator(BaseBarcodeGenerator):
def generate(self, barcodes_data: List[Dict[str, str | Product | BarcodeTemplate | int]]) -> BytesIO:
def generate(self, barcodes_data: List[BarcodeData | PdfBarcodeImageGenData]) -> BytesIO:
pdf_generator = PDFGenerator()
pdf_barcodes_gen_data: List[PdfBarcodeGenData] = []
pdf_barcodes_gen_data: List[PdfBarcodeGenData | PdfBarcodeImageGenData] = []
for barcode_data in barcodes_data:
attributes = {}
for attribute in barcode_data["template"].attributes:
attribute_getter = AttributeWriterFactory.get_writer(attribute.key)
if not attribute_getter:
continue
value = attribute_getter.get_value(barcode_data["product"])
if "barcode" in barcode_data:
attributes = {}
for attribute in barcode_data["template"].attributes:
attribute_getter = AttributeWriterFactory.get_writer(attribute.key)
if not attribute_getter:
continue
value = attribute_getter.get_value(barcode_data["product"])
if not value or not value.strip():
continue
attributes[attribute.name] = value
for additional_attribute in barcode_data["template"].additional_attributes:
value = additional_attribute.value
if not value:
continue
attributes[additional_attribute.name] = value
barcode_text = '<br/>'.join([f'{key}: {value}' for key, value in attributes.items()])
if not value or not value.strip():
continue
attributes[attribute.name] = value
for additional_attribute in barcode_data["template"].additional_attributes:
value = additional_attribute.value
if not value:
continue
attributes[additional_attribute.name] = value
barcode_text = '<br/>'.join([f'{key}: {value}' for key, value in attributes.items()])
pdf_barcodes_gen_data.append(
PdfBarcodeGenData(
barcode_value=barcode_data["barcode"],
text=barcode_text,
num_duplicates=barcode_data["quantity"]
)
)
pdf_barcodes_gen_data.append({
"barcode_value": barcode_data["barcode"],
"text": barcode_text,
"num_duplicates": barcode_data["num_duplicates"]
})
else:
pdf_barcodes_gen_data.append(barcode_data)
return pdf_generator.generate(pdf_barcodes_gen_data)

View File

@@ -0,0 +1 @@
from .images_uploader import BarcodeImagesUploader

View File

@@ -0,0 +1,22 @@
from abc import abstractmethod
from fastapi import UploadFile
class BaseImagesUploader:
@abstractmethod
def get_url(self, filename: str) -> bytes:
pass
@abstractmethod
def get_abs_path(self, filename: str) -> bytes:
pass
@abstractmethod
def delete(self, filename: str):
pass
@abstractmethod
async def upload(self, upload_file: UploadFile) -> str:
pass

View File

@@ -0,0 +1,50 @@
from pathlib import Path
from uuid import uuid4
from aioshutil import copyfileobj
from fastapi import UploadFile
from barcodes.images_uploader.base import BaseImagesUploader
from barcodes.pdf import PDFGenerator
from constants import APP_PATH, API_ROOT
class BarcodeImagesUploader(BaseImagesUploader):
def __init__(self):
self.relative_path = Path("static/images/product_barcodes")
self.storage_path = APP_PATH / self.relative_path
if not Path.exists(self.storage_path):
Path.mkdir(self.storage_path)
def get_url(self, filename: str) -> str:
file_location = self.relative_path / filename
return f"{API_ROOT}/{file_location}"
def get_abs_path(self, filename: str) -> str:
file_location = self.storage_path / filename
return file_location
def delete(self, filename: str):
file_location = self.storage_path / filename
if file_location.exists():
file_location.unlink()
async def upload(self, upload_file: UploadFile) -> str:
# Create temp file in filesystem
temp_filename = str(uuid4()) + '.' + upload_file.filename.split('.')[-1]
temp_file_location = self.storage_path / temp_filename
with open(temp_file_location, 'wb') as buffer:
await copyfileobj(upload_file.file, buffer)
# Generate PDF file and save it
res_filename = str(uuid4()) + '.pdf'
res_file_location = f"{self.storage_path}/{res_filename}"
temp_file_url = f"{self.relative_path}/{temp_filename}"
pdf_gen = PDFGenerator()
pdf_gen.generate_barcode_image(temp_file_url, res_file_location)
# Remove temp file
self.delete(temp_filename)
return res_filename

View File

@@ -7,10 +7,12 @@ from reportlab.lib.pagesizes import mm
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfgen import canvas
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak
from barcodes.pdf.pdf_maker import PdfMaker
from barcodes.types import PdfBarcodeImageGenData, PdfBarcodeGenData
from constants import APP_PATH
from schemas.barcode import PdfBarcodeGenData
class PDFGenerator:
@@ -101,83 +103,94 @@ class PDFGenerator:
bottomMargin=1 * mm
)
def generate(self, barcodes_data: List[PdfBarcodeGenData]) -> BytesIO:
def _generate_for_one_product(self, barcode_data: PdfBarcodeGenData) -> BytesIO:
buffer = BytesIO()
# Создаем документ с указанным размером страницы
doc = self._create_doc(buffer)
# Список элементов для добавления в документ
# Создаем абзац с новым стилем
paragraph = Paragraph(barcode_data['text'], self.small_style)
# Получаем ширину и высоту абзаца
paragraph_width, paragraph_height = paragraph.wrap(self.page_width - 2 * mm, self.page_height)
# Рассчитываем доступное пространство для штрихкода
human_readable_height = 6 * mm # Высота human-readable текста
space_between_text_and_barcode = 4 * mm # Отступ между текстом и штрихкодом
barcode_height = self.page_height - paragraph_height - human_readable_height - space_between_text_and_barcode - 4 * mm # Учитываем поля и отступы
# Создаем штрихкод
available_width = self.page_width - 2 * mm # Учитываем поля
# Приблизительное количество элементов в штрихкоде Code 128 для средней длины
num_elements = 11 * len(barcode_data['barcode_value']) # Примерная оценка: 11 элементов на символ
# Рассчитываем ширину штриха
bar_width = available_width / num_elements
barcode = code128.Code128(
barcode_data['barcode_value'],
barWidth=bar_width,
barHeight=barcode_height,
humanReadable=True
)
# Добавление штрихкодов в список элементов документа
elements = []
product_barcode_canvases = []
for barcode_data in barcodes_data:
# Создаем абзац с новым стилем
paragraph = Paragraph(barcode_data.text, self.small_style)
# Получаем ширину и высоту абзаца
paragraph_width, paragraph_height = paragraph.wrap(self.page_width - 2 * mm, self.page_height)
# Рассчитываем доступное пространство для штрихкода
human_readable_height = 6 * mm # Высота human-readable текста
space_between_text_and_barcode = 4 * mm # Отступ между текстом и штрихкодом
barcode_height = self.page_height - paragraph_height - human_readable_height - space_between_text_and_barcode - 4 * mm # Учитываем поля и отступы
# Создаем штрихкод
available_width = self.page_width - 2 * mm # Учитываем поля
# Приблизительное количество элементов в штрихкоде Code 128 для средней длины
num_elements = 11 * len(barcode_data.barcode_value) # Примерная оценка: 11 элементов на символ
# Рассчитываем ширину штриха
bar_width = available_width / num_elements
barcode = code128.Code128(
barcode_data.barcode_value,
barWidth=bar_width,
barHeight=barcode_height,
humanReadable=True
)
product_barcode_canvases.append(barcode)
# Добавление штрихкодов в список элементов документа
for _ in range(barcode_data.num_duplicates):
elements.append(paragraph)
elements.append(Spacer(1, space_between_text_and_barcode)) # Отступ между текстом и штрихкодом
elements.append(PageBreak())
# Добавление спейсеров
for _ in range(self.number_of_spacing_pages):
elements.append(PageBreak())
# Удалить последние спейсеры
elements = elements[:-2]
product_counter, product_barcodes_counter = 0, 0
for _ in range(barcode_data['num_duplicates']):
elements.append(paragraph)
elements.append(Spacer(1, space_between_text_and_barcode)) # Отступ между текстом и штрихкодом
elements.append(PageBreak())
# Функция для отрисовки штрихкода на canvas
def add_barcode(canvas, doc):
nonlocal product_barcodes_counter, product_counter
barcode_canvas = product_barcode_canvases[product_counter]
product_barcodes_counter += 1
# Если данная страница это спейсер, то оставить пустой
num_duplicates = barcodes_data[product_counter].num_duplicates
if product_barcodes_counter > num_duplicates:
if product_barcodes_counter >= num_duplicates + self.number_of_spacing_pages:
product_barcodes_counter = 0
product_counter += 1
return
# Отрисовка штрихкода
barcode_width = barcode_canvas.width
barcode_width = barcode.width
barcode_x = (self.page_width - barcode_width) / 2 # Центрируем штрихкод
barcode_y = human_readable_height + 2 * mm # Размещаем штрихкод снизу с учетом отступа
barcode_canvas.drawOn(canvas, barcode_x, barcode_y)
barcode.drawOn(canvas, barcode_x, barcode_y)
# Создаем документ
doc.build(elements[:-1], onFirstPage=add_barcode, onLaterPages=add_barcode) # Убираем последний PageBreak
doc.build(elements, onFirstPage=add_barcode, onLaterPages=add_barcode) # Убираем последний PageBreak
buffer.seek(0)
return buffer
def _generate_for_one_product_using_img(self, barcode_data: PdfBarcodeImageGenData) -> BytesIO:
with open(barcode_data["barcode_image_url"], 'rb') as pdf_file:
pdf_bytes = pdf_file.read()
pdf_maker = PdfMaker((self.page_width, self.page_height))
for _ in range(barcode_data['num_duplicates']):
pdf_maker.add_pdfs(BytesIO(pdf_bytes))
return pdf_maker.get_bytes()
def _generate_spacers(self) -> BytesIO:
buffer = BytesIO()
doc = self._create_doc(buffer)
elements = []
for _ in range(self.number_of_spacing_pages):
elements.append(PageBreak())
doc.build(elements)
buffer.seek(0)
return buffer
def generate(self, barcodes_data: List[PdfBarcodeGenData | PdfBarcodeImageGenData]) -> BytesIO:
pdf_maker = PdfMaker((self.page_width, self.page_height))
pdf_files: list[BytesIO] = []
for barcode_data in barcodes_data:
if "barcode_value" in barcode_data:
pdf_files.append(self._generate_for_one_product(barcode_data))
else:
pdf_files.append(self._generate_for_one_product_using_img(barcode_data))
pdf_files.append(self._generate_spacers())
for file in pdf_files[:-1]:
pdf_maker.add_pdfs(file)
return pdf_maker.get_bytes()
def generate_barcode_image(self, barcode_image_url: str, path_to_save_pdf: str):
c = canvas.Canvas(path_to_save_pdf, pagesize=(self.page_width, self.page_height))
c.drawImage(barcode_image_url, 0, 0, width=self.page_width, height=self.page_height)
c.save()

38
barcodes/pdf/pdf_maker.py Normal file
View File

@@ -0,0 +1,38 @@
from io import BytesIO
from fpdf import FPDF
import pdfrw
class PdfMaker:
def __init__(self, size: tuple):
self.size = size
self.writer = pdfrw.PdfWriter()
def clear(self):
del self.writer
self.writer = pdfrw.PdfWriter()
def add_image(self, image_data):
size = self.size
fpdf = FPDF(format=size, unit="pt")
width, height = self.size
fpdf.add_page()
fpdf.image(image_data, 0, 0, width, height)
fpdf_reader: pdfrw.PdfReader = pdfrw.PdfReader(fdata=bytes(fpdf.output()))
self.writer.addpage(fpdf_reader.getPage(0))
def add_pdf(self, pdf_data: BytesIO):
pdf_reader = pdfrw.PdfReader(fdata=bytes(pdf_data.read()))
self.writer.addpage(pdf_reader.getPage(0))
def add_pdfs(self, pdf_data: BytesIO):
pdf_reader = pdfrw.PdfReader(fdata=bytes(pdf_data.read()))
self.writer.addpages(pdf_reader.readpages(pdf_reader.Root))
def get_bytes(self):
result_io = BytesIO()
self.writer.write(result_io)
result_io.seek(0)
return result_io

21
barcodes/types.py Normal file
View File

@@ -0,0 +1,21 @@
from typing import TypedDict
from models import BarcodeTemplate, Product
class BarcodeData(TypedDict):
barcode: str
template: BarcodeTemplate
product: Product
num_duplicates: int
class PdfBarcodeGenData(TypedDict):
barcode_value: str
text: str
num_duplicates: int
class PdfBarcodeImageGenData(TypedDict):
num_duplicates: int
barcode_image_url: str

View File

@@ -12,6 +12,8 @@ ENV.globals['now'] = datetime.now
ENV.globals['encode128'] = encode128
ENV.globals['format_number'] = lambda x: '{:,}'.format(x).replace(',', ' ')
API_ROOT = "/api"
APP_PATH = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(__file__)
allowed_telegram_ids = [

View File

@@ -5,6 +5,7 @@ import platform
from starlette.staticfiles import StaticFiles
import routers
from constants import API_ROOT
origins = [
'http://localhost:5173'
@@ -17,7 +18,7 @@ if platform.system() == 'Linux':
class Worker(uvicorn.workers.UvicornWorker):
CONFIG_KWARGS = {
'root_path': '/api'
'root_path': API_ROOT
}
app.add_middleware(

View File

@@ -23,6 +23,8 @@ class Product(BaseModel):
barcode_template_id = Column(Integer, ForeignKey('barcode_templates.id'), nullable=True)
barcode_template = relationship('BarcodeTemplate', lazy='joined')
barcode_image = relationship('ProductBarcodeImage', back_populates='product', lazy='joined', uselist=False)
# Attributes
# TODO move to another table
brand = Column(String, nullable=True, comment='Бренд')
@@ -61,3 +63,11 @@ class ProductBarcode(BaseModel):
product: Mapped["Product"] = relationship(back_populates='barcodes')
barcode = Column(String, nullable=False, index=True, comment='ШК товара', primary_key=True)
class ProductBarcodeImage(BaseModel):
__tablename__ = 'product_barcode_images'
product_id = Column(Integer, ForeignKey('products.id'), primary_key=True, comment='ID товара')
product: Mapped["Product"] = relationship(back_populates='barcode_image')
filename = Column(String, nullable=False)

View File

@@ -25,7 +25,10 @@ openpyxl
lexorank-py
celery[redis]
celery
aioshutil
# PDF
reportlab
weasyprint
number_to_string
pdfrw
fpdf

View File

@@ -164,3 +164,40 @@ async def upload_product_image(
):
file_bytes = upload_file.file.read()
return await ProductService(session).upload_image(product_id, file_bytes)
@product_router.post(
'/barcode/upload-image/{product_id}',
response_model=ProductUploadBarcodeImageResponse,
operation_id='upload_product_barcode_image'
)
async def upload_product_barcode_image(
product_id: int,
upload_file: UploadFile,
session: Annotated[AsyncSession, Depends(get_session)]
):
return await ProductService(session).upload_barcode_image(product_id, upload_file)
@product_router.post(
'/barcode/delete-image/{product_id}',
response_model=ProductDeleteBarcodeImageResponse,
operation_id='delete_product_barcode_image'
)
async def delete_product_barcode_image(
product_id: int,
session: Annotated[AsyncSession, Depends(get_session)]
):
return await ProductService(session).delete_barcode_image(product_id)
@product_router.post(
'/barcode/image/{product_id}',
response_model=ProductGetBarcodeImageResponse,
operation_id='get_product_barcode_image'
)
async def get_product_barcode_image(
product_id: int,
session: Annotated[AsyncSession, Depends(get_session)]
):
return await ProductService(session).get_barcode_image(product_id)

View File

@@ -47,12 +47,6 @@ class BarcodeSchema(BaseSchema):
additional_field: str | None = None
class PdfBarcodeGenData(BaseSchema):
barcode_value: str
text: str
num_duplicates: int = 1
# endregion
# region Requests

View File

@@ -115,4 +115,18 @@ class ProductExistsBarcodeResponse(BaseSchema):
class ProductUploadImageResponse(OkMessageSchema):
image_url: str | None = None
class ProductUploadBarcodeImageResponse(OkMessageSchema):
barcode_image_url: str | None = None
class ProductDeleteBarcodeImageResponse(OkMessageSchema):
pass
class ProductGetBarcodeImageResponse(BaseSchema):
barcode_image_url: str | None = None
# endregion

View File

@@ -6,6 +6,7 @@ from sqlalchemy.orm import selectinload, joinedload
from barcodes.attributes import AttributeWriterFactory
from barcodes.generator.default_generator import DefaultBarcodeGenerator
from barcodes.images_uploader import BarcodeImagesUploader
from models import BarcodeTemplate, BarcodeTemplateAttribute, barcode_template_attribute_link, Product, \
BarcodeTemplateAdditionalField, BarcodeTemplateSize, Deal, DealProduct
from schemas.barcode import *
@@ -89,17 +90,27 @@ class BarcodeService(BaseService):
product: Product = query.scalar()
if not product:
raise ValueError('Товар не найден')
barcode_template = await self._get_barcode_template(request, product)
default_generator = DefaultBarcodeGenerator()
filename = f'{product.id}_barcode.pdf'
pdf_buffer = default_generator.generate(
[{
"barcode": request.barcode,
"product": product,
"template": barcode_template,
"quantity": request.quantity
}]
)
default_generator = DefaultBarcodeGenerator()
if product.barcode_image:
uploader = BarcodeImagesUploader()
pdf_buffer = default_generator.generate(
[{
"barcode_image_url": uploader.get_abs_path(product.barcode_image.filename),
"num_duplicates": request.quantity
}]
)
else:
barcode_template = await self._get_barcode_template(request, product)
pdf_buffer = default_generator.generate(
[{
"barcode": request.barcode,
"product": product,
"template": barcode_template,
"num_duplicates": request.quantity
}]
)
return filename, pdf_buffer
async def get_deal_barcodes_pdf(self, request: GetDealProductsBarcodesPdfRequest) -> Tuple[str, BytesIO]:
@@ -107,7 +118,7 @@ class BarcodeService(BaseService):
select(Deal)
.options(
selectinload(Deal.products).joinedload(DealProduct.product).selectinload(Product.client),
selectinload(Deal.products).joinedload(DealProduct.product).joinedload(Product.barcodes)
selectinload(Deal.products).joinedload(DealProduct.product).joinedload(Product.barcodes),
)
.filter(Deal.id == request.deal_id)
)
@@ -116,20 +127,27 @@ class BarcodeService(BaseService):
if not deal:
raise ValueError('Сделка не найдена')
uploader = BarcodeImagesUploader()
barcodes_data: List[Dict[str, str | Product | BarcodeTemplate | int]] = []
for deal_product in deal.products:
product_request = GetProductBarcodeRequest(
product_id=deal_product.product_id,
barcode="",
barcode_template_id=deal_product.product.barcode_template_id,
)
barcode_template = await self._get_barcode_template(product_request, deal_product.product)
barcodes_data.append({
"barcode": deal_product.product.barcodes[0].barcode,
"product": deal_product.product,
"template": barcode_template,
"quantity": deal_product.quantity
})
if deal_product.product.barcode_image:
barcodes_data.append({
"barcode_image_url": uploader.get_abs_path(deal_product.product.barcode_image.filename),
"num_duplicates": deal_product.quantity
})
else:
product_request = GetProductBarcodeRequest(
product_id=deal_product.product_id,
barcode="",
barcode_template_id=deal_product.product.barcode_template_id,
)
barcode_template = await self._get_barcode_template(product_request, deal_product.product)
barcodes_data.append({
"barcode": deal_product.product.barcodes[0].barcode,
"product": deal_product.product,
"template": barcode_template,
"num_duplicates": deal_product.quantity
})
default_generator = DefaultBarcodeGenerator()
filename = f'{deal.id}_deal_barcodes.pdf'

View File

@@ -1,11 +1,14 @@
from fastapi import HTTPException
from typing import Optional
from fastapi import HTTPException, UploadFile
from sqlalchemy import select, func, Integer, update, or_
from sqlalchemy.orm import selectinload, Query
import utils.barcodes
from backend import config
from barcodes.images_uploader import BarcodeImagesUploader
from external.s3_uploader.uploader import S3Uploader
from models.product import Product, ProductImage
from models.product import Product, ProductImage, ProductBarcodeImage
from schemas.base import PaginationSchema
from schemas.product import *
from services.base import BaseService
@@ -107,7 +110,8 @@ class ProductService(BaseService):
)
.options(
selectinload(Product.barcodes)
.noload(ProductBarcode.product)
.noload(ProductBarcode.product),
selectinload(Product.barcode_image),
)
.where(
Product.client_id == client_id
@@ -257,4 +261,68 @@ class ProductService(BaseService):
return ProductGenerateBarcodeResponse(ok=True, message='Штрих-код успешно сгенерирован', barcode=barcode)
except Exception as e:
return ProductGenerateBarcodeResponse(ok=False, message=str(e))
async def get_model_by_id(self, product_id: int) -> Product:
product: Optional[Product] = await self.session.get(Product, product_id)
if not product:
raise Exception('Не удалось найти товар с указанным ID')
return product
async def delete_model_barcode_image(self, uploader: BarcodeImagesUploader, product_id: int) -> None:
barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id)
if barcode_image:
uploader.delete(barcode_image.filename)
await self.session.delete(barcode_image)
await self.session.commit()
async def upload_barcode_image(self, product_id: int, upload_file: UploadFile) -> ProductUploadBarcodeImageResponse:
try:
await self.get_model_by_id(product_id)
uploader = BarcodeImagesUploader()
await self.delete_model_barcode_image(uploader, product_id)
filename = await uploader.upload(upload_file)
barcode_image_url = uploader.get_url(filename)
product_barcode_image = ProductBarcodeImage(
product_id=product_id,
filename=filename,
)
self.session.add(product_barcode_image)
await self.session.commit()
return ProductUploadBarcodeImageResponse(
ok=True,
message='Штрих-код для товара успешно загружен',
barcode_image_url=barcode_image_url,
)
except Exception as e:
print(e)
return ProductUploadBarcodeImageResponse(ok=False, message=str(e))
async def delete_barcode_image(self, product_id: int) -> ProductDeleteBarcodeImageResponse:
try:
await self.get_model_by_id(product_id)
uploader = BarcodeImagesUploader()
await self.delete_model_barcode_image(uploader, product_id)
return ProductDeleteBarcodeImageResponse(
ok=True,
message='Штрих-код для товара успешно удален',
)
except Exception as e:
return ProductDeleteBarcodeImageResponse(ok=False, message=str(e))
async def get_barcode_image(self, product_id: int) -> ProductGetBarcodeImageResponse:
product: Optional[Product] = await self.session.get(Product, product_id)
if not product:
raise HTTPException(404, 'Не удалось найти товар с указанным ID')
barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id)
if not barcode_image:
return ProductGetBarcodeImageResponse(barcode_image_url="")
uploader = BarcodeImagesUploader()
url = uploader.get_url(barcode_image.filename)
return ProductGetBarcodeImageResponse(barcode_image_url=url)
# endregion