330 lines
14 KiB
Python
330 lines
14 KiB
Python
from typing import Optional
|
||
|
||
from fastapi import HTTPException, UploadFile
|
||
from sqlalchemy import select, func, Integer, update, or_
|
||
from sqlalchemy.orm import selectinload, Query
|
||
|
||
import utils.barcodes
|
||
from backend import config
|
||
from barcodes.images_uploader import BarcodeImagesUploader
|
||
from barcodes.pdf.pdf_maker import PdfMaker
|
||
from external.s3_uploader.uploader import S3Uploader
|
||
from models.product import Product, ProductImage, ProductBarcodeImage
|
||
from schemas.base import PaginationSchema
|
||
from schemas.product import *
|
||
from services.base import BaseService
|
||
from utils.dependecies import is_valid_pagination
|
||
|
||
|
||
class ProductService(BaseService):
|
||
|
||
async def create(self, request: ProductCreateRequest) -> ProductCreateResponse:
|
||
|
||
# Creating product
|
||
product_dict = request.dict()
|
||
del product_dict['barcodes']
|
||
del product_dict['barcode_template']
|
||
del product_dict['images']
|
||
del product_dict['image_url']
|
||
if request.barcode_template:
|
||
product_dict['barcode_template_id'] = request.barcode_template.id
|
||
|
||
product = Product(**product_dict)
|
||
self.session.add(product)
|
||
|
||
# Creating barcodes
|
||
await self.session.flush()
|
||
for barcode in request.barcodes:
|
||
product_barcode = ProductBarcode(product_id=product.id,
|
||
barcode=barcode)
|
||
self.session.add(product_barcode)
|
||
await self.session.flush()
|
||
|
||
await self.session.commit()
|
||
return ProductCreateResponse(ok=True, message='Товар успешно создан', product_id=product.id)
|
||
|
||
async def delete(self, request: ProductDeleteRequest):
|
||
product = await self.session.get(Product, request.product_id)
|
||
if not product:
|
||
return ProductDeleteResponse(ok=False, message='Указанного товара не существует')
|
||
await self.session.delete(product)
|
||
await self.session.commit()
|
||
return ProductDeleteResponse(ok=True, message="Товар успешно удален!")
|
||
|
||
async def update(self, request: ProductUpdateRequest):
|
||
stmt = (
|
||
select(Product)
|
||
.where(Product.id == request.product.id)
|
||
.options(selectinload(Product.barcodes))
|
||
)
|
||
product_query = await self.session.execute(stmt)
|
||
product = product_query.scalar()
|
||
if not product:
|
||
return ProductUpdateResponse(ok=False, message='Указанного товара не существует')
|
||
product_dict = request.product.dict()
|
||
del product_dict['id']
|
||
del product_dict['barcodes']
|
||
del product_dict['barcode_template']
|
||
del product_dict['image_url']
|
||
del product_dict['images']
|
||
if request.product.barcode_template:
|
||
product_dict['barcode_template_id'] = request.product.barcode_template.id
|
||
|
||
await self.session.execute(
|
||
update(Product)
|
||
.where(Product.id == request.product.id)
|
||
.values(**product_dict)
|
||
)
|
||
# Updating barcodes
|
||
product_barcodes = set([barcode for barcode in product.barcodes])
|
||
request_barcodes = set(request.product.barcodes)
|
||
new_barcodes = request_barcodes.difference(product_barcodes)
|
||
deleted_barcodes = product_barcodes.difference(request_barcodes)
|
||
for product_barcode in product.barcodes:
|
||
if product_barcode not in deleted_barcodes:
|
||
continue
|
||
await self.session.delete(product_barcode)
|
||
for new_barcode in new_barcodes:
|
||
product_barcode = ProductBarcode(
|
||
product_id=product.id,
|
||
barcode=new_barcode
|
||
)
|
||
self.session.add(product_barcode)
|
||
await self.session.flush()
|
||
|
||
await self.session.commit()
|
||
return ProductUpdateResponse(ok=True, message='Товар успешно обновлен')
|
||
|
||
async def get_by_client_id(
|
||
self,
|
||
client_id: int,
|
||
pagination: PaginationSchema,
|
||
search_input: str
|
||
) -> ProductGetResponse:
|
||
|
||
is_pagination_valid = is_valid_pagination(pagination)
|
||
total_pages = 0
|
||
total_items = 0
|
||
stmt = (
|
||
select(
|
||
Product
|
||
)
|
||
.options(
|
||
selectinload(Product.barcodes)
|
||
.noload(ProductBarcode.product),
|
||
selectinload(Product.barcode_image),
|
||
)
|
||
.where(
|
||
Product.client_id == client_id
|
||
)
|
||
.order_by(Product.id)
|
||
)
|
||
search_input = search_input.strip()
|
||
if search_input:
|
||
stmt = (
|
||
stmt.where(
|
||
or_(
|
||
Product.name.ilike(f'%{search_input}%'),
|
||
Product.barcodes.any(ProductBarcode.barcode.ilike(f'%{search_input}%')),
|
||
Product.article.ilike(f'%{search_input}%')
|
||
)
|
||
)
|
||
)
|
||
|
||
if is_pagination_valid:
|
||
total_products_query = await self.session.execute(
|
||
select(
|
||
func.cast(func.ceil(func.count() / pagination.items_per_page), Integer),
|
||
func.count()
|
||
)
|
||
.select_from(stmt.subquery())
|
||
)
|
||
total_pages, total_items = total_products_query.first()
|
||
stmt = (
|
||
stmt
|
||
.offset(pagination.page * pagination.items_per_page)
|
||
.limit(pagination.items_per_page)
|
||
)
|
||
query = await self.session.execute(
|
||
stmt
|
||
.order_by(Product.id.desc())
|
||
)
|
||
product_orm = query.scalars().all()
|
||
if not is_pagination_valid:
|
||
total_pages = 1
|
||
total_items = len(product_orm)
|
||
pagination_info = PaginationInfoSchema(total_pages=total_pages, total_items=total_items)
|
||
|
||
products: list[ProductSchema] = []
|
||
for product in product_orm:
|
||
products.append(ProductSchema.model_validate(product))
|
||
return ProductGetResponse(products=products, pagination_info=pagination_info)
|
||
|
||
async def get_by_id(self, product_id: int) -> ProductSchema:
|
||
stmt = (
|
||
select(Product)
|
||
.options(selectinload(Product.barcodes)
|
||
.noload(ProductBarcode.product))
|
||
.where(Product.id == product_id)
|
||
)
|
||
query = await self.session.execute(stmt)
|
||
product = query.scalar()
|
||
if not product:
|
||
raise HTTPException(status_code=404, detail='Товар не найден')
|
||
return ProductSchema.model_validate(product)
|
||
|
||
async def upload_image(self, product_id: int, file_bytes: bytes) -> ProductUploadImageResponse:
|
||
try:
|
||
product: Product = await self.session.get(Product, product_id)
|
||
if not product:
|
||
raise Exception("Неудалось найти товар с указанным ID")
|
||
# removing previous images
|
||
for image in product.images:
|
||
await self.session.delete(image)
|
||
s3_uploader = S3Uploader(config.S3_API_KEY)
|
||
response = await s3_uploader.upload(file_bytes)
|
||
response_url = response.get('link')
|
||
if not response_url:
|
||
raise Exception("Неудалось загрузить изображение")
|
||
product_image = ProductImage(
|
||
product_id=product_id,
|
||
image_url=response_url,
|
||
)
|
||
self.session.add(product_image)
|
||
await self.session.commit()
|
||
return ProductUploadImageResponse(ok=True,
|
||
message='Изображение успешно загружено',
|
||
image_url=response_url)
|
||
except Exception as e:
|
||
return ProductUploadImageResponse(ok=False, message=str(e))
|
||
|
||
# region Barcodes
|
||
async def add_barcode(self, request: ProductAddBarcodeRequest):
|
||
try:
|
||
product = await self.session.get(Product, request.product_id)
|
||
if not product:
|
||
raise HTTPException(status_code=404, detail='Товар не найден')
|
||
existing_barcode_query = await self.session.execute(
|
||
select(ProductBarcode)
|
||
.where(ProductBarcode.product_id == request.product_id,
|
||
ProductBarcode.barcode == request.barcode)
|
||
)
|
||
existing_barcode = existing_barcode_query.first()
|
||
if existing_barcode:
|
||
return ProductAddBarcodeResponse(ok=False, message='Штрих-код уже существует у товара')
|
||
product_barcode = ProductBarcode(product_id=product.id,
|
||
barcode=request.barcode)
|
||
self.session.add(product_barcode)
|
||
await self.session.commit()
|
||
return ProductAddBarcodeResponse(ok=True, message='Штрих-код успешно добавлен')
|
||
except Exception as e:
|
||
await self.session.rollback()
|
||
return ProductAddBarcodeResponse(ok=False, message=str(e))
|
||
|
||
async def delete_barcode(self, request: ProductDeleteBarcodeRequest):
|
||
try:
|
||
product_barcode = await self.session.get(ProductBarcode, (request.product_id, request.barcode))
|
||
if not product_barcode:
|
||
return ProductDeleteBarcodeResponse(ok=False, message='Штрих-код не найден')
|
||
await self.session.delete(product_barcode)
|
||
await self.session.commit()
|
||
return ProductDeleteBarcodeResponse(ok=True, message='Штрих-код успешно удален')
|
||
except Exception as e:
|
||
await self.session.rollback()
|
||
return ProductDeleteBarcodeResponse(ok=False, message=str(e))
|
||
|
||
async def exists_barcode(self, product_id: int, barcode: str) -> ProductExistsBarcodeResponse:
|
||
product_barcode_query = await self.session.execute(
|
||
select(ProductBarcode)
|
||
.where(ProductBarcode.product_id == product_id,
|
||
ProductBarcode.barcode == barcode)
|
||
)
|
||
product_barcode = product_barcode_query.first()
|
||
return ProductExistsBarcodeResponse(exists=bool(product_barcode))
|
||
|
||
async def generate_barcode(self, request: ProductGenerateBarcodeRequest) -> ProductGenerateBarcodeResponse:
|
||
try:
|
||
product = await self.session.get(Product, request.product_id)
|
||
if not product:
|
||
raise HTTPException(status_code=404, detail='Товар не найден')
|
||
barcode = utils.barcodes.generate_barcode(product.id)
|
||
barcode_exists_query = await self.session.execute(
|
||
select(ProductBarcode)
|
||
.where(ProductBarcode.barcode == barcode)
|
||
)
|
||
barcode_exists = barcode_exists_query.first()
|
||
if barcode_exists:
|
||
raise Exception('Штрих-код уже существует')
|
||
product_barcode = ProductBarcode(product_id=product.id,
|
||
barcode=barcode)
|
||
self.session.add(product_barcode)
|
||
await self.session.commit()
|
||
return ProductGenerateBarcodeResponse(ok=True, message='Штрих-код успешно сгенерирован', barcode=barcode)
|
||
except Exception as e:
|
||
return ProductGenerateBarcodeResponse(ok=False, message=str(e))
|
||
|
||
async def get_model_by_id(self, product_id: int) -> Product:
|
||
product: Optional[Product] = await self.session.get(Product, product_id)
|
||
if not product:
|
||
raise Exception('Не удалось найти товар с указанным ID')
|
||
return product
|
||
|
||
async def delete_model_barcode_image(self, uploader: BarcodeImagesUploader, product_id: int) -> None:
|
||
barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id)
|
||
if barcode_image:
|
||
uploader.delete(barcode_image.filename)
|
||
await self.session.delete(barcode_image)
|
||
await self.session.commit()
|
||
|
||
async def upload_barcode_image(self, product_id: int, upload_file: UploadFile) -> ProductUploadBarcodeImageResponse:
|
||
try:
|
||
PdfMaker.check_is_correct_aspects_ratio(upload_file.file)
|
||
await self.get_model_by_id(product_id)
|
||
|
||
uploader = BarcodeImagesUploader()
|
||
await self.delete_model_barcode_image(uploader, product_id)
|
||
filename = await uploader.upload(upload_file)
|
||
barcode_image_url = uploader.get_url(filename)
|
||
|
||
product_barcode_image = ProductBarcodeImage(
|
||
product_id=product_id,
|
||
filename=filename,
|
||
)
|
||
self.session.add(product_barcode_image)
|
||
await self.session.commit()
|
||
return ProductUploadBarcodeImageResponse(
|
||
ok=True,
|
||
message='Штрих-код для товара успешно загружен',
|
||
barcode_image_url=barcode_image_url,
|
||
)
|
||
except Exception as e:
|
||
return ProductUploadBarcodeImageResponse(ok=False, message=str(e))
|
||
|
||
async def delete_barcode_image(self, product_id: int) -> ProductDeleteBarcodeImageResponse:
|
||
try:
|
||
await self.get_model_by_id(product_id)
|
||
|
||
uploader = BarcodeImagesUploader()
|
||
await self.delete_model_barcode_image(uploader, product_id)
|
||
|
||
return ProductDeleteBarcodeImageResponse(
|
||
ok=True,
|
||
message='Штрих-код для товара успешно удален',
|
||
)
|
||
except Exception as e:
|
||
return ProductDeleteBarcodeImageResponse(ok=False, message=str(e))
|
||
|
||
async def get_barcode_image(self, product_id: int) -> ProductGetBarcodeImageResponse:
|
||
product: Optional[Product] = await self.session.get(Product, product_id)
|
||
if not product:
|
||
raise HTTPException(404, 'Не удалось найти товар с указанным ID')
|
||
barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id)
|
||
if not barcode_image:
|
||
return ProductGetBarcodeImageResponse(barcode_image_url="")
|
||
|
||
uploader = BarcodeImagesUploader()
|
||
url = uploader.get_url(barcode_image.filename)
|
||
return ProductGetBarcodeImageResponse(barcode_image_url=url)
|
||
|
||
# endregion
|