119 lines
4.9 KiB
Python
119 lines
4.9 KiB
Python
from fastapi import HTTPException
|
||
from sqlalchemy import select, func, Integer, update
|
||
from sqlalchemy.orm import selectinload
|
||
|
||
from models.product import Product, ProductBarcode
|
||
from schemas.base import PaginationSchema
|
||
from services.base import BaseService
|
||
from schemas.product import *
|
||
|
||
|
||
class ProductService(BaseService):
|
||
async def create(self, request: ProductCreateRequest) -> ProductCreateResponse:
|
||
# Unique article validation
|
||
existing_product_query = await self.session.execute(
|
||
select(Product)
|
||
.where(Product.client_id == request.client_id,
|
||
Product.article == request.article)
|
||
)
|
||
existing_product = existing_product_query.first()
|
||
if existing_product:
|
||
return ProductCreateResponse(ok=False, message='Товар с таким артикулом уже существует у клиента')
|
||
|
||
# Creating product
|
||
product_dict = request.dict()
|
||
del product_dict['barcodes']
|
||
product = Product(**product_dict)
|
||
self.session.add(product)
|
||
|
||
# Creating barcodes
|
||
await self.session.flush()
|
||
for barcode in request.barcodes:
|
||
product_barcode = ProductBarcode(product_id=product.id,
|
||
barcode=barcode)
|
||
self.session.add(product_barcode)
|
||
await self.session.flush()
|
||
await self.session.commit()
|
||
return ProductCreateResponse(ok=True, message='Товар успешно создан', product_id=product.id)
|
||
|
||
async def delete(self, request: ProductDeleteRequest):
|
||
product = await self.session.get(Product, request.product_id)
|
||
if not product:
|
||
return ProductDeleteResponse(ok=False, message='Указанного товара не существует')
|
||
await self.session.delete(product)
|
||
await self.session.commit()
|
||
return ProductDeleteResponse(ok=True, message="Товар успешно удален!")
|
||
|
||
async def update(self, request: ProductUpdateRequest):
|
||
stmt = (
|
||
select(Product)
|
||
.where(Product.id == request.product.id)
|
||
.options(selectinload(Product.barcodes))
|
||
)
|
||
product_query = await self.session.execute(stmt)
|
||
product = product_query.scalar()
|
||
if not product:
|
||
return ProductUpdateResponse(ok=False, message='Указанного товара не существует')
|
||
product_dict = request.product.dict()
|
||
del product_dict['id']
|
||
del product_dict['barcodes']
|
||
await self.session.execute(
|
||
update(Product)
|
||
.where(Product.id == request.product.id)
|
||
.values(**product_dict)
|
||
)
|
||
# Updating barcodes
|
||
product_barcodes = set([barcode for barcode in product.barcodes])
|
||
request_barcodes = set(request.product.barcodes)
|
||
new_barcodes = request_barcodes.difference(product_barcodes)
|
||
deleted_barcodes = product_barcodes.difference(request_barcodes)
|
||
for product_barcode in product.barcodes:
|
||
if product_barcode not in deleted_barcodes:
|
||
continue
|
||
await self.session.delete(product_barcode)
|
||
for new_barcode in new_barcodes:
|
||
product_barcode = ProductBarcode(
|
||
product_id=product.id,
|
||
barcode=new_barcode
|
||
)
|
||
self.session.add(product_barcode)
|
||
await self.session.flush()
|
||
|
||
await self.session.commit()
|
||
return ProductUpdateResponse(ok=True, message='Товар успешно обновлен')
|
||
|
||
async def get_by_client_id(self, client_id: int, pagination: PaginationSchema) -> ProductGetResponse:
|
||
stmt = (
|
||
select(Product)
|
||
.options(selectinload(Product.barcodes))
|
||
.where(Product.client_id == client_id)
|
||
.order_by(Product.id)
|
||
)
|
||
total_products_query = await self.session.execute(
|
||
select(
|
||
func.cast(func.ceil(func.count() / pagination.items_per_page), Integer),
|
||
func.count()
|
||
)
|
||
.select_from(stmt.subquery())
|
||
)
|
||
total_pages, total_items = total_products_query.first()
|
||
pagination_info = PaginationInfoSchema(total_pages=total_pages, total_items=total_items)
|
||
query = await self.session.execute(
|
||
stmt
|
||
.offset(pagination.page * pagination.items_per_page)
|
||
.limit(pagination.items_per_page)
|
||
)
|
||
products: list[ProductSchema] = []
|
||
for product in query.scalars().all():
|
||
product: Product
|
||
|
||
barcodes = []
|
||
for barcode_obj in product.barcodes:
|
||
barcode_obj: ProductBarcode
|
||
barcodes.append(barcode_obj.barcode)
|
||
|
||
products.append(
|
||
ProductSchema.from_sql_model(product, {'barcodes': barcodes})
|
||
)
|
||
return ProductGetResponse(products=products, pagination_info=pagination_info)
|