from io import BytesIO from typing import Optional import starlette.status from fastapi import HTTPException, UploadFile from sqlalchemy import select, func, Integer, update, or_ from sqlalchemy.orm import selectinload, Query import utils.barcodes from backend import config from barcodes.images_uploader import BarcodeImagesUploader from barcodes.pdf.pdf_maker import PdfMaker from external.s3_uploader.uploader import S3Uploader from models import User from models.product import Product, ProductImage, ProductBarcodeImage from schemas.auth import UserUnion from schemas.base import PaginationSchema from schemas.product import * from services.base import BaseService from utils.auth import verify_user_deal_editor from utils.dependecies import is_valid_pagination class ProductService(BaseService): async def create(self, request: ProductCreateRequest, user: UserUnion) -> ProductCreateResponse: verify_user_deal_editor(user) # Creating product product_dict = request.dict() del product_dict['barcodes'] del product_dict['barcode_template'] del product_dict['images'] del product_dict['image_url'] if request.barcode_template: product_dict['barcode_template_id'] = request.barcode_template.id product = Product(**product_dict) self.session.add(product) # Creating barcodes await self.session.flush() for barcode in request.barcodes: product_barcode = ProductBarcode(product_id=product.id, barcode=barcode) self.session.add(product_barcode) await self.session.flush() await self.session.commit() return ProductCreateResponse(ok=True, message='Товар успешно создан', product_id=product.id) async def delete(self, request: ProductDeleteRequest): product = await self.session.get(Product, request.product_id) if not product: return ProductDeleteResponse(ok=False, message='Указанного товара не существует') await self.session.delete(product) await self.session.commit() return ProductDeleteResponse(ok=True, message="Товар успешно удален!") async def update(self, request: ProductUpdateRequest): stmt = ( select(Product) .where(Product.id == request.product.id) .options(selectinload(Product.barcodes)) ) product_query = await self.session.execute(stmt) product = product_query.scalar() if not product: return ProductUpdateResponse(ok=False, message='Указанного товара не существует') product_dict = request.product.dict() del product_dict['id'] del product_dict['barcodes'] del product_dict['barcode_template'] del product_dict['image_url'] del product_dict['images'] if request.product.barcode_template: product_dict['barcode_template_id'] = request.product.barcode_template.id await self.session.execute( update(Product) .where(Product.id == request.product.id) .values(**product_dict) ) # Updating barcodes product_barcodes = set([barcode for barcode in product.barcodes]) request_barcodes = set(request.product.barcodes) new_barcodes = request_barcodes.difference(product_barcodes) deleted_barcodes = product_barcodes.difference(request_barcodes) for product_barcode in product.barcodes: if product_barcode not in deleted_barcodes: continue await self.session.delete(product_barcode) for new_barcode in new_barcodes: product_barcode = ProductBarcode( product_id=product.id, barcode=new_barcode ) self.session.add(product_barcode) await self.session.flush() await self.session.commit() return ProductUpdateResponse(ok=True, message='Товар успешно обновлен') async def get_by_client_id( self, client_id: int, pagination: PaginationSchema, search_input: str ) -> ProductGetResponse: is_pagination_valid = is_valid_pagination(pagination) total_pages = 0 total_items = 0 stmt = ( select( Product ) .options( selectinload(Product.barcodes) .noload(ProductBarcode.product), selectinload(Product.barcode_image), ) .where( Product.client_id == client_id ) .order_by(Product.id) ) search_input = search_input.strip() if search_input: stmt = ( stmt.where( or_( Product.name.ilike(f'%{search_input}%'), Product.barcodes.any(ProductBarcode.barcode.ilike(f'%{search_input}%')), Product.article.ilike(f'%{search_input}%'), Product.factory_article.ilike(f'%{search_input}%'), ) ) ) if is_pagination_valid: total_products_query = await self.session.execute( select( func.cast(func.ceil(func.count() / pagination.items_per_page), Integer), func.count() ) .select_from(stmt.subquery()) ) total_pages, total_items = total_products_query.first() stmt = ( stmt .offset(pagination.page * pagination.items_per_page) .limit(pagination.items_per_page) ) query = await self.session.execute( stmt .order_by(Product.id.desc()) ) product_orm = query.scalars().all() if not is_pagination_valid: total_pages = 1 total_items = len(product_orm) pagination_info = PaginationInfoSchema(total_pages=total_pages, total_items=total_items) products: list[ProductSchema] = [] for product in product_orm: products.append(ProductSchema.model_validate(product)) return ProductGetResponse(products=products, pagination_info=pagination_info) async def get_by_id(self, product_id: int) -> ProductSchema: stmt = ( select(Product) .options(selectinload(Product.barcodes) .noload(ProductBarcode.product)) .where(Product.id == product_id) ) query = await self.session.execute(stmt) product = query.scalar() if not product: raise HTTPException(status_code=404, detail='Товар не найден') return ProductSchema.model_validate(product) async def upload_image(self, product_id: int, file_bytes: bytes) -> ProductUploadImageResponse: try: product: Product = await self.session.get(Product, product_id) if not product: raise Exception("Неудалось найти товар с указанным ID") # removing previous images for image in product.images: await self.session.delete(image) s3_uploader = S3Uploader(config.S3_API_KEY) response = await s3_uploader.upload(file_bytes) response_url = response.get('link') if not response_url: raise Exception("Неудалось загрузить изображение") product_image = ProductImage( product_id=product_id, image_url=response_url, ) self.session.add(product_image) await self.session.commit() return ProductUploadImageResponse(ok=True, message='Изображение успешно загружено', image_url=response_url) except Exception as e: return ProductUploadImageResponse(ok=False, message=str(e)) # region Barcodes async def add_barcode(self, request: ProductAddBarcodeRequest): try: product = await self.session.get(Product, request.product_id) if not product: raise HTTPException(status_code=404, detail='Товар не найден') existing_barcode_query = await self.session.execute( select(ProductBarcode) .where(ProductBarcode.product_id == request.product_id, ProductBarcode.barcode == request.barcode) ) existing_barcode = existing_barcode_query.first() if existing_barcode: return ProductAddBarcodeResponse(ok=False, message='Штрих-код уже существует у товара') product_barcode = ProductBarcode(product_id=product.id, barcode=request.barcode) self.session.add(product_barcode) await self.session.commit() return ProductAddBarcodeResponse(ok=True, message='Штрих-код успешно добавлен') except Exception as e: await self.session.rollback() return ProductAddBarcodeResponse(ok=False, message=str(e)) async def delete_barcode(self, request: ProductDeleteBarcodeRequest): try: product_barcode = await self.session.get(ProductBarcode, (request.product_id, request.barcode)) if not product_barcode: return ProductDeleteBarcodeResponse(ok=False, message='Штрих-код не найден') await self.session.delete(product_barcode) await self.session.commit() return ProductDeleteBarcodeResponse(ok=True, message='Штрих-код успешно удален') except Exception as e: await self.session.rollback() return ProductDeleteBarcodeResponse(ok=False, message=str(e)) async def exists_barcode(self, product_id: int, barcode: str) -> ProductExistsBarcodeResponse: product_barcode_query = await self.session.execute( select(ProductBarcode) .where(ProductBarcode.product_id == product_id, ProductBarcode.barcode == barcode) ) product_barcode = product_barcode_query.first() return ProductExistsBarcodeResponse(exists=bool(product_barcode)) async def generate_barcode(self, request: ProductGenerateBarcodeRequest) -> ProductGenerateBarcodeResponse: try: product = await self.session.get(Product, request.product_id) if not product: raise HTTPException(status_code=404, detail='Товар не найден') barcode = utils.barcodes.generate_barcode(product.id) barcode_exists_query = await self.session.execute( select(ProductBarcode) .where(ProductBarcode.barcode == barcode) ) barcode_exists = barcode_exists_query.first() if barcode_exists: raise Exception('Штрих-код уже существует') product_barcode = ProductBarcode(product_id=product.id, barcode=barcode) self.session.add(product_barcode) await self.session.commit() return ProductGenerateBarcodeResponse(ok=True, message='Штрих-код успешно сгенерирован', barcode=barcode) except Exception as e: return ProductGenerateBarcodeResponse(ok=False, message=str(e)) async def get_model_by_id(self, product_id: int) -> Product: product: Optional[Product] = await self.session.get(Product, product_id) if not product: raise Exception('Не удалось найти товар с указанным ID') return product async def delete_model_barcode_image(self, uploader: BarcodeImagesUploader, product_id: int) -> None: barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id) if barcode_image: uploader.delete(barcode_image.filename) await self.session.delete(barcode_image) await self.session.commit() async def upload_barcode_image(self, product_id: int, upload_file: UploadFile) -> ProductUploadBarcodeImageResponse: try: await self.get_model_by_id(product_id) uploader = BarcodeImagesUploader() await self.delete_model_barcode_image(uploader, product_id) file = PdfMaker.resize_pdf_with_reportlab(BytesIO(upload_file.file.read())) filename = await uploader.upload(file, upload_file.filename) barcode_image_url = uploader.get_url(filename) product_barcode_image = ProductBarcodeImage( product_id=product_id, filename=filename, ) self.session.add(product_barcode_image) await self.session.commit() return ProductUploadBarcodeImageResponse( ok=True, message='Штрих-код для товара успешно загружен', barcode_image_url=barcode_image_url, ) except Exception as e: return ProductUploadBarcodeImageResponse(ok=False, message=str(e)) async def delete_barcode_image(self, product_id: int) -> ProductDeleteBarcodeImageResponse: try: await self.get_model_by_id(product_id) uploader = BarcodeImagesUploader() await self.delete_model_barcode_image(uploader, product_id) return ProductDeleteBarcodeImageResponse( ok=True, message='Штрих-код для товара успешно удален', ) except Exception as e: return ProductDeleteBarcodeImageResponse(ok=False, message=str(e)) async def get_barcode_image(self, product_id: int) -> ProductGetBarcodeImageResponse: product: Optional[Product] = await self.session.get(Product, product_id) if not product: raise HTTPException(404, 'Не удалось найти товар с указанным ID') barcode_image: Optional[ProductBarcodeImage] = await self.session.get(ProductBarcodeImage, product_id) if not barcode_image: return ProductGetBarcodeImageResponse(barcode_image_url="") uploader = BarcodeImagesUploader() url = uploader.get_url(barcode_image.filename) return ProductGetBarcodeImageResponse(barcode_image_url=url) # endregion