246 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			246 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from fastapi import HTTPException
 | 
						||
from sqlalchemy import select, func, Integer, update
 | 
						||
from sqlalchemy.orm import selectinload
 | 
						||
 | 
						||
import utils.barcodes
 | 
						||
from backend import config
 | 
						||
from external.s3_uploader.uploader import S3Uploader
 | 
						||
from models.product import Product, ProductBarcode, ProductImage
 | 
						||
from schemas.base import PaginationSchema
 | 
						||
from services.base import BaseService
 | 
						||
from schemas.product import *
 | 
						||
from utils.dependecies import is_valid_pagination
 | 
						||
 | 
						||
 | 
						||
class ProductService(BaseService):
 | 
						||
 | 
						||
    async def create(self, request: ProductCreateRequest) -> ProductCreateResponse:
 | 
						||
        # Unique article validation
 | 
						||
        # existing_product_query = await self.session.execute(
 | 
						||
        #     select(Product)
 | 
						||
        #     .where(Product.client_id == request.client_id,
 | 
						||
        #            Product.article == request.article)
 | 
						||
        # )
 | 
						||
        # existing_product = existing_product_query.first()
 | 
						||
        # if existing_product:
 | 
						||
        #     return ProductCreateResponse(ok=False, message='Товар с таким артикулом уже существует у клиента')
 | 
						||
 | 
						||
        # Creating product
 | 
						||
        product_dict = request.dict()
 | 
						||
        del product_dict['barcodes']
 | 
						||
        del product_dict['barcode_template']
 | 
						||
        del product_dict['images']
 | 
						||
        del product_dict['image_url']
 | 
						||
        if request.barcode_template:
 | 
						||
            product_dict['barcode_template_id'] = request.barcode_template.id
 | 
						||
 | 
						||
        product = Product(**product_dict)
 | 
						||
        self.session.add(product)
 | 
						||
 | 
						||
        # Creating barcodes
 | 
						||
        await self.session.flush()
 | 
						||
        for barcode in request.barcodes:
 | 
						||
            product_barcode = ProductBarcode(product_id=product.id,
 | 
						||
                                             barcode=barcode)
 | 
						||
            self.session.add(product_barcode)
 | 
						||
            await self.session.flush()
 | 
						||
 | 
						||
        await self.session.commit()
 | 
						||
        return ProductCreateResponse(ok=True, message='Товар успешно создан', product_id=product.id)
 | 
						||
 | 
						||
    async def delete(self, request: ProductDeleteRequest):
 | 
						||
        product = await self.session.get(Product, request.product_id)
 | 
						||
        if not product:
 | 
						||
            return ProductDeleteResponse(ok=False, message='Указанного товара не существует')
 | 
						||
        await self.session.delete(product)
 | 
						||
        await self.session.commit()
 | 
						||
        return ProductDeleteResponse(ok=True, message="Товар успешно удален!")
 | 
						||
 | 
						||
    async def update(self, request: ProductUpdateRequest):
 | 
						||
        stmt = (
 | 
						||
            select(Product)
 | 
						||
            .where(Product.id == request.product.id)
 | 
						||
            .options(selectinload(Product.barcodes))
 | 
						||
        )
 | 
						||
        product_query = await self.session.execute(stmt)
 | 
						||
        product = product_query.scalar()
 | 
						||
        if not product:
 | 
						||
            return ProductUpdateResponse(ok=False, message='Указанного товара не существует')
 | 
						||
        product_dict = request.product.dict()
 | 
						||
        del product_dict['id']
 | 
						||
        del product_dict['barcodes']
 | 
						||
        del product_dict['barcode_template']
 | 
						||
        del product_dict['image_url']
 | 
						||
        del product_dict['images']
 | 
						||
        if request.product.barcode_template:
 | 
						||
            product_dict['barcode_template_id'] = request.product.barcode_template.id
 | 
						||
 | 
						||
        await self.session.execute(
 | 
						||
            update(Product)
 | 
						||
            .where(Product.id == request.product.id)
 | 
						||
            .values(**product_dict)
 | 
						||
        )
 | 
						||
        # Updating barcodes
 | 
						||
        product_barcodes = set([barcode for barcode in product.barcodes])
 | 
						||
        request_barcodes = set(request.product.barcodes)
 | 
						||
        new_barcodes = request_barcodes.difference(product_barcodes)
 | 
						||
        deleted_barcodes = product_barcodes.difference(request_barcodes)
 | 
						||
        for product_barcode in product.barcodes:
 | 
						||
            if product_barcode not in deleted_barcodes:
 | 
						||
                continue
 | 
						||
            await self.session.delete(product_barcode)
 | 
						||
        for new_barcode in new_barcodes:
 | 
						||
            product_barcode = ProductBarcode(
 | 
						||
                product_id=product.id,
 | 
						||
                barcode=new_barcode
 | 
						||
            )
 | 
						||
            self.session.add(product_barcode)
 | 
						||
            await self.session.flush()
 | 
						||
 | 
						||
        await self.session.commit()
 | 
						||
        return ProductUpdateResponse(ok=True, message='Товар успешно обновлен')
 | 
						||
 | 
						||
    async def get_by_client_id(self, client_id: int, pagination: PaginationSchema) -> ProductGetResponse:
 | 
						||
        is_pagination_valid = is_valid_pagination(pagination)
 | 
						||
        total_pages = 0
 | 
						||
        total_items = 0
 | 
						||
        stmt = (
 | 
						||
            select(Product)
 | 
						||
            .options(selectinload(Product.barcodes)
 | 
						||
                     .noload(ProductBarcode.product))
 | 
						||
            .where(Product.client_id == client_id)
 | 
						||
            .order_by(Product.id)
 | 
						||
        )
 | 
						||
        if is_pagination_valid:
 | 
						||
            total_products_query = await self.session.execute(
 | 
						||
                select(
 | 
						||
                    func.cast(func.ceil(func.count() / pagination.items_per_page), Integer),
 | 
						||
                    func.count()
 | 
						||
                )
 | 
						||
                .select_from(stmt.subquery())
 | 
						||
            )
 | 
						||
            total_pages, total_items = total_products_query.first()
 | 
						||
            stmt = (
 | 
						||
                stmt
 | 
						||
                .offset(pagination.page * pagination.items_per_page)
 | 
						||
                .limit(pagination.items_per_page)
 | 
						||
            )
 | 
						||
        query = await self.session.execute(
 | 
						||
            stmt
 | 
						||
            .order_by(Product.id.desc())
 | 
						||
        )
 | 
						||
        product_orm = query.scalars().all()
 | 
						||
        if not is_pagination_valid:
 | 
						||
            total_pages = 1
 | 
						||
            total_items = len(product_orm)
 | 
						||
        pagination_info = PaginationInfoSchema(total_pages=total_pages, total_items=total_items)
 | 
						||
 | 
						||
        products: list[ProductSchema] = []
 | 
						||
        for product in product_orm:
 | 
						||
            products.append(ProductSchema.model_validate(product))
 | 
						||
        return ProductGetResponse(products=products, pagination_info=pagination_info)
 | 
						||
 | 
						||
    async def get_by_id(self, product_id: int) -> ProductSchema:
 | 
						||
        stmt = (
 | 
						||
            select(Product)
 | 
						||
            .options(selectinload(Product.barcodes)
 | 
						||
                     .noload(ProductBarcode.product))
 | 
						||
            .where(Product.id == product_id)
 | 
						||
        )
 | 
						||
        query = await self.session.execute(stmt)
 | 
						||
        product = query.scalar()
 | 
						||
        if not product:
 | 
						||
            raise HTTPException(status_code=404, detail='Товар не найден')
 | 
						||
        return ProductSchema.model_validate(product)
 | 
						||
 | 
						||
    async def upload_image(self, product_id: int, file_bytes: bytes) -> ProductUploadImageResponse:
 | 
						||
        try:
 | 
						||
            product: Product = await self.session.get(Product, product_id)
 | 
						||
            if not product:
 | 
						||
                raise Exception("Неудалось найти товар с указанным ID")
 | 
						||
            # removing previous images
 | 
						||
            for image in product.images:
 | 
						||
                await self.session.delete(image)
 | 
						||
            s3_uploader = S3Uploader(config.S3_API_KEY)
 | 
						||
            response = await s3_uploader.upload(file_bytes)
 | 
						||
            response_url = response.get('link')
 | 
						||
            if not response_url:
 | 
						||
                raise Exception("Неудалось загрузить изображение")
 | 
						||
            product_image = ProductImage(
 | 
						||
                product_id=product_id,
 | 
						||
                image_url=response_url,
 | 
						||
            )
 | 
						||
            self.session.add(product_image)
 | 
						||
            await self.session.commit()
 | 
						||
            return ProductUploadImageResponse(ok=True,
 | 
						||
                                              message='Изображение успешно загружено',
 | 
						||
                                              image_url=response_url)
 | 
						||
        except Exception as e:
 | 
						||
            return ProductUploadImageResponse(ok=False, message=str(e))
 | 
						||
 | 
						||
    # region Barcodes
 | 
						||
    async def add_barcode(self, request: ProductAddBarcodeRequest):
 | 
						||
        try:
 | 
						||
            product = await self.session.get(Product, request.product_id)
 | 
						||
            if not product:
 | 
						||
                raise HTTPException(status_code=404, detail='Товар не найден')
 | 
						||
            existing_barcode_query = await self.session.execute(
 | 
						||
                select(ProductBarcode)
 | 
						||
                .where(ProductBarcode.product_id == request.product_id,
 | 
						||
                       ProductBarcode.barcode == request.barcode)
 | 
						||
            )
 | 
						||
            existing_barcode = existing_barcode_query.first()
 | 
						||
            if existing_barcode:
 | 
						||
                return ProductAddBarcodeResponse(ok=False, message='Штрих-код уже существует у товара')
 | 
						||
            product_barcode = ProductBarcode(product_id=product.id,
 | 
						||
                                             barcode=request.barcode)
 | 
						||
            self.session.add(product_barcode)
 | 
						||
            await self.session.commit()
 | 
						||
            return ProductAddBarcodeResponse(ok=True, message='Штрих-код успешно добавлен')
 | 
						||
        except Exception as e:
 | 
						||
            await self.session.rollback()
 | 
						||
            return ProductAddBarcodeResponse(ok=False, message=str(e))
 | 
						||
 | 
						||
    async def delete_barcode(self, request: ProductDeleteBarcodeRequest):
 | 
						||
        try:
 | 
						||
            product_barcode = await self.session.get(ProductBarcode, (request.product_id, request.barcode))
 | 
						||
            if not product_barcode:
 | 
						||
                return ProductDeleteBarcodeResponse(ok=False, message='Штрих-код не найден')
 | 
						||
            await self.session.delete(product_barcode)
 | 
						||
            await self.session.commit()
 | 
						||
            return ProductDeleteBarcodeResponse(ok=True, message='Штрих-код успешно удален')
 | 
						||
        except Exception as e:
 | 
						||
            await self.session.rollback()
 | 
						||
            return ProductDeleteBarcodeResponse(ok=False, message=str(e))
 | 
						||
 | 
						||
    async def exists_barcode(self, product_id: int, barcode: str) -> ProductExistsBarcodeResponse:
 | 
						||
        product_barcode_query = await self.session.execute(
 | 
						||
            select(ProductBarcode)
 | 
						||
            .where(ProductBarcode.product_id == product_id,
 | 
						||
                   ProductBarcode.barcode == barcode)
 | 
						||
        )
 | 
						||
        product_barcode = product_barcode_query.first()
 | 
						||
        return ProductExistsBarcodeResponse(exists=bool(product_barcode))
 | 
						||
 | 
						||
    async def generate_barcode(self, request: ProductGenerateBarcodeRequest) -> ProductGenerateBarcodeResponse:
 | 
						||
        try:
 | 
						||
            product = await self.session.get(Product, request.product_id)
 | 
						||
            if not product:
 | 
						||
                raise HTTPException(status_code=404, detail='Товар не найден')
 | 
						||
            barcode = utils.barcodes.generate_barcode(product.id)
 | 
						||
            barcode_exists_query = await self.session.execute(
 | 
						||
                select(ProductBarcode)
 | 
						||
                .where(ProductBarcode.barcode == barcode)
 | 
						||
            )
 | 
						||
            barcode_exists = barcode_exists_query.first()
 | 
						||
            if barcode_exists:
 | 
						||
                raise Exception('Штрих-код уже существует')
 | 
						||
            product_barcode = ProductBarcode(product_id=product.id,
 | 
						||
                                             barcode=barcode)
 | 
						||
            self.session.add(product_barcode)
 | 
						||
            await self.session.commit()
 | 
						||
            return ProductGenerateBarcodeResponse(ok=True, message='Штрих-код успешно сгенерирован', barcode=barcode)
 | 
						||
        except Exception as e:
 | 
						||
            return ProductGenerateBarcodeResponse(ok=False, message=str(e))
 | 
						||
    # endregion
 |