Files
Fulfillment-Backend/services/product.py

338 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from io import BytesIO
from typing import Optional
import starlette.status
from fastapi import HTTPException, UploadFile
from sqlalchemy import select, func, Integer, update, or_
from sqlalchemy.orm import selectinload, Query
import utils.barcodes
from backend import config
from barcodes.images_uploader import BarcodeImagesUploader
from barcodes.pdf.pdf_maker import PdfMaker
from external.s3_uploader.uploader import S3Uploader
from models import User
from models.product import Product, ProductImage, ProductBarcodeImage
from schemas.auth import UserUnion
from schemas.base import PaginationSchema
from schemas.product import *
from services.base import BaseService
from utils.auth import verify_user_deal_editor
from utils.dependecies import is_valid_pagination
class ProductService(BaseService):
async def create(self, request: ProductCreateRequest, user: UserUnion) -> ProductCreateResponse:
verify_user_deal_editor(user)
# Creating product
product_dict = request.dict()
del product_dict['barcodes']
del product_dict['barcode_template']
del product_dict['images']
del product_dict['image_url']
if request.barcode_template:
product_dict['barcode_template_id'] = request.barcode_template.id
product = Product(**product_dict)
self.session.add(product)
# Creating barcodes
await self.session.flush()
for barcode in request.barcodes:
product_barcode = ProductBarcode(product_id=product.id,
barcode=barcode)
self.session.add(product_barcode)
await self.session.flush()
await self.session.commit()
return ProductCreateResponse(ok=True, message='Товар успешно создан', product_id=product.id)
async def delete(self, request: ProductDeleteRequest):
product = await self.session.get(Product, request.product_id)
if not product:
return ProductDeleteResponse(ok=False, message='Указанного товара не существует')
await self.session.delete(product)
await self.session.commit()
return ProductDeleteResponse(ok=True, message="Товар успешно удален!")
async def update(self, request: ProductUpdateRequest):
stmt = (
select(Product)
.where(Product.id == request.product.id)
.options(selectinload(Product.barcodes))
)
product_query = await self.session.execute(stmt)
product = product_query.scalar()
if not product:
return ProductUpdateResponse(ok=False, message='Указанного товара не существует')
product_dict = request.product.dict()
del product_dict['id']
del product_dict['barcodes']
del product_dict['barcode_template']
del product_dict['image_url']
del product_dict['images']
if request.product.barcode_template:
product_dict['barcode_template_id'] = request.product.barcode_template.id
await self.session.execute(
update(Product)
.where(Product.id == request.product.id)
.values(**product_dict)
)
# Updating barcodes
product_barcodes = set([barcode for barcode in product.barcodes])
request_barcodes = set(request.product.barcodes)
new_barcodes = request_barcodes.difference(product_barcodes)
deleted_barcodes = product_barcodes.difference(request_barcodes)
for product_barcode in product.barcodes:
if product_barcode not in deleted_barcodes:
continue
await self.session.delete(product_barcode)
for new_barcode in new_barcodes:
product_barcode = ProductBarcode(
product_id=product.id,
barcode=new_barcode
)
self.session.add(product_barcode)
await self.session.flush()
await self.session.commit()
return ProductUpdateResponse(ok=True, message='Товар успешно обновлен')
async def get_by_client_id(
self,
client_id: int,
pagination: PaginationSchema,
search_input: str
) -> ProductGetResponse:
is_pagination_valid = is_valid_pagination(pagination)
total_pages = 0
total_items = 0
stmt = (
select(
Product
)
.options(
selectinload(Product.barcodes)
.noload(ProductBarcode.product),
selectinload(Product.barcode_image),
)
.where(
Product.client_id == client_id
)
.order_by(Product.id)
)
search_input = search_input.strip()
if search_input:
stmt = (
stmt.where(
or_(
Product.name.ilike(f'%{search_input}%'),
Product.barcodes.any(ProductBarcode.barcode.ilike(f'%{search_input}%')),
Product.article.ilike(f'%{search_input}%'),
Product.factory_article.ilike(f'%{search_input}%'),
)
)
)
if is_pagination_valid:
total_products_query = await self.session.execute(
select(
func.cast(func.ceil(func.count() / pagination.items_per_page), Integer),
func.count()
)
.select_from(stmt.subquery())
)
total_pages, total_items = total_products_query.first()
stmt = (
stmt
.offset(pagination.page * pagination.items_per_page)
.limit(pagination.items_per_page)
)
query = await self.session.execute(
stmt
.order_by(Product.id.desc())
)
product_orm = query.scalars().all()
if not is_pagination_valid:
total_pages = 1
total_items = len(product_orm)
pagination_info = PaginationInfoSchema(total_pages=total_pages, total_items=total_items)
products: list[ProductSchema] = []
for product in product_orm:
products.append(ProductSchema.model_validate(product))
return ProductGetResponse(products=products, pagination_info=pagination_info)
async def get_by_id(self, product_id: int) -> ProductSchema:
stmt = (
select(Product)
.options(selectinload(Product.barcodes)
.noload(ProductBarcode.product))
.where(Product.id == product_id)
)
query = await self.session.execute(stmt)
product = query.scalar()
if not product:
raise HTTPException(status_code=404, detail='Товар не найден')
return ProductSchema.model_validate(product)
async def upload_image(self, product_id: int, file_bytes: bytes) -> ProductUploadImageResponse:
try:
product: Product = await self.session.get(Product, product_id)
if not product:
raise Exception("Неудалось найти товар с указанным ID")
# removing previous images
for image in product.images:
await self.session.delete(image)
s3_uploader = S3Uploader(config.S3_API_KEY)
response = await s3_uploader.upload(file_bytes)
response_url = response.get('link')
if not response_url:
raise Exception("Неудалось загрузить изображение")
product_image = ProductImage(
product_id=product_id,
image_url=response_url,
)
self.session.add(product_image)
await self.session.commit()
return ProductUploadImageResponse(ok=True,
message='Изображение успешно загружено',
image_url=response_url)
except Exception as e:
return ProductUploadImageResponse(ok=False, message=str(e))
# region Barcodes
async def add_barcode(self, request: ProductAddBarcodeRequest):
try:
product = await self.session.get(Product, request.product_id)
if not product:
raise HTTPException(status_code=404, detail='Товар не найден')
existing_barcode_query = await self.session.execute(
select(ProductBarcode)
.where(ProductBarcode.product_id == request.product_id,
ProductBarcode.barcode == request.barcode)
)
existing_barcode = existing_barcode_query.first()
if existing_barcode:
return ProductAddBarcodeResponse(ok=False, message='Штрих-код уже существует у товара')
product_barcode = ProductBarcode(product_id=product.id,
barcode=request.barcode)
self.session.add(product_barcode)
await self.session.commit()
return ProductAddBarcodeResponse(ok=True, message='Штрих-код успешно добавлен')
except Exception as e:
await self.session.rollback()
return ProductAddBarcodeResponse(ok=False, message=str(e))
async def delete_barcode(self, request: ProductDeleteBarcodeRequest):
try:
product_barcode = await self.session.get(ProductBarcode, (request.product_id, request.barcode))
if not product_barcode:
return ProductDeleteBarcodeResponse(ok=False, message='Штрих-код не найден')
await self.session.delete(product_barcode)
await self.session.commit()
return ProductDeleteBarcodeResponse(ok=True, message='Штрих-код успешно удален')
except Exception as e:
await self.session.rollback()
return ProductDeleteBarcodeResponse(ok=False, message=str(e))
async def exists_barcode(self, product_id: int, barcode: str) -> ProductExistsBarcodeResponse:
product_barcode_query = await self.session.execute(
select(ProductBarcode)
.where(ProductBarcode.product_id == product_id,
ProductBarcode.barcode == barcode)
)
product_barcode = product_barcode_query.first()
return ProductExistsBarcodeResponse(exists=bool(product_barcode))
async def generate_barcode(self, request: ProductGenerateBarcodeRequest) -> ProductGenerateBarcodeResponse:
try:
product = await self.session.get(Product, request.product_id)
if not product:
raise HTTPException(status_code=404, detail='Товар не найден')
barcode = utils.barcodes.generate_barcode(product.id)
barcode_exists_query = await self.session.execute(
select(ProductBarcode)
.where(ProductBarcode.barcode == barcode)
)
barcode_exists = barcode_exists_query.first()
if barcode_exists:
raise Exception('Штрих-код уже существует')
product_barcode = ProductBarcode(product_id=product.id,
barcode=barcode)
self.session.add(product_barcode)
await self.session.commit()
return ProductGenerateBarcodeResponse(ok=True, message='Штрих-код успешно сгенерирован', barcode=barcode)
except Exception as e:
return ProductGenerateBarcodeResponse(ok=False, message=str(e))
async def get_model_by_id(self, product_id: int) -> Product:
product: Optional[Product] = await self.session.get(Product, product_id)
if not product:
raise Exception('Не удалось найти товар с указанным ID')
return product
async def delete_model_barcode_image(self, uploader: BarcodeImagesUploader, product_id: int) -> None:
barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id)
if barcode_image:
uploader.delete(barcode_image.filename)
await self.session.delete(barcode_image)
await self.session.commit()
async def upload_barcode_image(self, product_id: int, upload_file: UploadFile) -> ProductUploadBarcodeImageResponse:
try:
await self.get_model_by_id(product_id)
uploader = BarcodeImagesUploader()
await self.delete_model_barcode_image(uploader, product_id)
file = PdfMaker.resize_pdf_with_reportlab(BytesIO(upload_file.file.read()))
filename = await uploader.upload(file, upload_file.filename)
barcode_image_url = uploader.get_url(filename)
product_barcode_image = ProductBarcodeImage(
product_id=product_id,
filename=filename,
)
self.session.add(product_barcode_image)
await self.session.commit()
return ProductUploadBarcodeImageResponse(
ok=True,
message='Штрих-код для товара успешно загружен',
barcode_image_url=barcode_image_url,
)
except Exception as e:
return ProductUploadBarcodeImageResponse(ok=False, message=str(e))
async def delete_barcode_image(self, product_id: int) -> ProductDeleteBarcodeImageResponse:
try:
await self.get_model_by_id(product_id)
uploader = BarcodeImagesUploader()
await self.delete_model_barcode_image(uploader, product_id)
return ProductDeleteBarcodeImageResponse(
ok=True,
message='Штрих-код для товара успешно удален',
)
except Exception as e:
return ProductDeleteBarcodeImageResponse(ok=False, message=str(e))
async def get_barcode_image(self, product_id: int) -> ProductGetBarcodeImageResponse:
product: Optional[Product] = await self.session.get(Product, product_id)
if not product:
raise HTTPException(404, 'Не удалось найти товар с указанным ID')
barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id)
if not barcode_image:
return ProductGetBarcodeImageResponse(barcode_image_url="")
uploader = BarcodeImagesUploader()
url = uploader.get_url(barcode_image.filename)
return ProductGetBarcodeImageResponse(barcode_image_url=url)
# endregion