from fastapi import HTTPException from sqlalchemy import select, func, Integer, update from sqlalchemy.orm import selectinload import utils.barcodes from models.product import Product, ProductBarcode from schemas.base import PaginationSchema from services.base import BaseService from schemas.product import * from utils.dependecies import is_valid_pagination class ProductService(BaseService): async def create(self, request: ProductCreateRequest) -> ProductCreateResponse: # Unique article validation existing_product_query = await self.session.execute( select(Product) .where(Product.client_id == request.client_id, Product.article == request.article) ) existing_product = existing_product_query.first() if existing_product: return ProductCreateResponse(ok=False, message='Товар с таким артикулом уже существует у клиента') # Creating product product_dict = request.dict() del product_dict['barcodes'] del product_dict['barcode_template'] if request.barcode_template: product_dict['barcode_template_id'] = request.barcode_template.id product = Product(**product_dict) self.session.add(product) # Creating barcodes await self.session.flush() for barcode in request.barcodes: product_barcode = ProductBarcode(product_id=product.id, barcode=barcode) self.session.add(product_barcode) await self.session.flush() await self.session.commit() return ProductCreateResponse(ok=True, message='Товар успешно создан', product_id=product.id) async def delete(self, request: ProductDeleteRequest): product = await self.session.get(Product, request.product_id) if not product: return ProductDeleteResponse(ok=False, message='Указанного товара не существует') await self.session.delete(product) await self.session.commit() return ProductDeleteResponse(ok=True, message="Товар успешно удален!") async def update(self, request: ProductUpdateRequest): stmt = ( select(Product) .where(Product.id == request.product.id) .options(selectinload(Product.barcodes)) ) product_query = await self.session.execute(stmt) product = product_query.scalar() if not product: return ProductUpdateResponse(ok=False, message='Указанного товара не существует') product_dict = request.product.dict() del product_dict['id'] del product_dict['barcodes'] del product_dict['barcode_template'] if request.product.barcode_template: product_dict['barcode_template_id'] = request.product.barcode_template.id await self.session.execute( update(Product) .where(Product.id == request.product.id) .values(**product_dict) ) # Updating barcodes product_barcodes = set([barcode for barcode in product.barcodes]) request_barcodes = set(request.product.barcodes) new_barcodes = request_barcodes.difference(product_barcodes) deleted_barcodes = product_barcodes.difference(request_barcodes) for product_barcode in product.barcodes: if product_barcode not in deleted_barcodes: continue await self.session.delete(product_barcode) for new_barcode in new_barcodes: product_barcode = ProductBarcode( product_id=product.id, barcode=new_barcode ) self.session.add(product_barcode) await self.session.flush() await self.session.commit() return ProductUpdateResponse(ok=True, message='Товар успешно обновлен') async def get_by_client_id(self, client_id: int, pagination: PaginationSchema) -> ProductGetResponse: is_pagination_valid = is_valid_pagination(pagination) total_pages = 0 total_items = 0 stmt = ( select(Product) .options(selectinload(Product.barcodes) .noload(ProductBarcode.product)) .where(Product.client_id == client_id) .order_by(Product.id) ) if is_pagination_valid: total_products_query = await self.session.execute( select( func.cast(func.ceil(func.count() / pagination.items_per_page), Integer), func.count() ) .select_from(stmt.subquery()) ) total_pages, total_items = total_products_query.first() stmt = ( stmt .offset(pagination.page * pagination.items_per_page) .limit(pagination.items_per_page) ) query = await self.session.execute( stmt .order_by(Product.id) ) product_orm = query.scalars().all() if not is_pagination_valid: total_pages = 1 total_items = len(product_orm) pagination_info = PaginationInfoSchema(total_pages=total_pages, total_items=total_items) products: list[ProductSchema] = [] for product in product_orm: products.append(ProductSchema.model_validate(product)) return ProductGetResponse(products=products, pagination_info=pagination_info) async def get_by_id(self, product_id: int) -> ProductSchema: stmt = ( select(Product) .options(selectinload(Product.barcodes) .noload(ProductBarcode.product)) .where(Product.id == product_id) ) query = await self.session.execute(stmt) product = query.scalar() if not product: raise HTTPException(status_code=404, detail='Товар не найден') return ProductSchema.model_validate(product) # region Barcodes async def add_barcode(self, request: ProductAddBarcodeRequest): try: product = await self.session.get(Product, request.product_id) if not product: raise HTTPException(status_code=404, detail='Товар не найден') existing_barcode_query = await self.session.execute( select(ProductBarcode) .where(ProductBarcode.product_id == request.product_id, ProductBarcode.barcode == request.barcode) ) existing_barcode = existing_barcode_query.first() if existing_barcode: return ProductAddBarcodeResponse(ok=False, message='Штрих-код уже существует у товара') product_barcode = ProductBarcode(product_id=product.id, barcode=request.barcode) self.session.add(product_barcode) await self.session.commit() return ProductAddBarcodeResponse(ok=True, message='Штрих-код успешно добавлен') except Exception as e: await self.session.rollback() return ProductAddBarcodeResponse(ok=False, message=str(e)) async def delete_barcode(self, request: ProductDeleteBarcodeRequest): try: product_barcode = await self.session.get(ProductBarcode, (request.product_id, request.barcode)) if not product_barcode: return ProductDeleteBarcodeResponse(ok=False, message='Штрих-код не найден') await self.session.delete(product_barcode) await self.session.commit() return ProductDeleteBarcodeResponse(ok=True, message='Штрих-код успешно удален') except Exception as e: await self.session.rollback() return ProductDeleteBarcodeResponse(ok=False, message=str(e)) async def exists_barcode(self, product_id: int, barcode: str) -> ProductExistsBarcodeResponse: product_barcode_query = await self.session.execute( select(ProductBarcode) .where(ProductBarcode.product_id == product_id, ProductBarcode.barcode == barcode) ) product_barcode = product_barcode_query.first() return ProductExistsBarcodeResponse(exists=bool(product_barcode)) async def generate_barcode(self, request: ProductGenerateBarcodeRequest) -> ProductGenerateBarcodeResponse: try: product = await self.session.get(Product, request.product_id) if not product: raise HTTPException(status_code=404, detail='Товар не найден') barcode = utils.barcodes.generate_barcode(product.id) barcode_exists_query = await self.session.execute( select(ProductBarcode) .where(ProductBarcode.barcode == barcode) ) barcode_exists = barcode_exists_query.first() if barcode_exists: raise Exception('Штрих-код уже существует') product_barcode = ProductBarcode(product_id=product.id, barcode=barcode) self.session.add(product_barcode) await self.session.commit() return ProductGenerateBarcodeResponse(ok=True, message='Штрих-код успешно сгенерирован', barcode=barcode) except Exception as e: return ProductGenerateBarcodeResponse(ok=False, message=str(e)) # endregion