119 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			119 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from fastapi import HTTPException
 | 
						||
from sqlalchemy import select, func, Integer, update
 | 
						||
from sqlalchemy.orm import selectinload
 | 
						||
 | 
						||
from models.product import Product, ProductBarcode
 | 
						||
from schemas.base import PaginationSchema
 | 
						||
from services.base import BaseService
 | 
						||
from schemas.product import *
 | 
						||
 | 
						||
 | 
						||
class ProductService(BaseService):
 | 
						||
    async def create(self, request: ProductCreateRequest) -> ProductCreateResponse:
 | 
						||
        # Unique article validation
 | 
						||
        existing_product_query = await self.session.execute(
 | 
						||
            select(Product)
 | 
						||
            .where(Product.client_id == request.client_id,
 | 
						||
                   Product.article == request.article)
 | 
						||
        )
 | 
						||
        existing_product = existing_product_query.first()
 | 
						||
        if existing_product:
 | 
						||
            return ProductCreateResponse(ok=False, message='Товар с таким артикулом уже существует у клиента')
 | 
						||
 | 
						||
        # Creating product
 | 
						||
        product_dict = request.dict()
 | 
						||
        del product_dict['barcodes']
 | 
						||
        product = Product(**product_dict)
 | 
						||
        self.session.add(product)
 | 
						||
 | 
						||
        # Creating barcodes
 | 
						||
        await self.session.flush()
 | 
						||
        for barcode in request.barcodes:
 | 
						||
            product_barcode = ProductBarcode(product_id=product.id,
 | 
						||
                                             barcode=barcode)
 | 
						||
            self.session.add(product_barcode)
 | 
						||
            await self.session.flush()
 | 
						||
        await self.session.commit()
 | 
						||
        return ProductCreateResponse(ok=True, message='Товар успешно создан', product_id=product.id)
 | 
						||
 | 
						||
    async def delete(self, request: ProductDeleteRequest):
 | 
						||
        product = await self.session.get(Product, request.product_id)
 | 
						||
        if not product:
 | 
						||
            return ProductDeleteResponse(ok=False, message='Указанного товара не существует')
 | 
						||
        await self.session.delete(product)
 | 
						||
        await self.session.commit()
 | 
						||
        return ProductDeleteResponse(ok=True, message="Товар успешно удален!")
 | 
						||
 | 
						||
    async def update(self, request: ProductUpdateRequest):
 | 
						||
        stmt = (
 | 
						||
            select(Product)
 | 
						||
            .where(Product.id == request.product.id)
 | 
						||
            .options(selectinload(Product.barcodes))
 | 
						||
        )
 | 
						||
        product_query = await self.session.execute(stmt)
 | 
						||
        product = product_query.scalar()
 | 
						||
        if not product:
 | 
						||
            return ProductUpdateResponse(ok=False, message='Указанного товара не существует')
 | 
						||
        product_dict = request.product.dict()
 | 
						||
        del product_dict['id']
 | 
						||
        del product_dict['barcodes']
 | 
						||
        await self.session.execute(
 | 
						||
            update(Product)
 | 
						||
            .where(Product.id == request.product.id)
 | 
						||
            .values(**product_dict)
 | 
						||
        )
 | 
						||
        # Updating barcodes
 | 
						||
        product_barcodes = set([barcode for barcode in product.barcodes])
 | 
						||
        request_barcodes = set(request.product.barcodes)
 | 
						||
        new_barcodes = request_barcodes.difference(product_barcodes)
 | 
						||
        deleted_barcodes = product_barcodes.difference(request_barcodes)
 | 
						||
        for product_barcode in product.barcodes:
 | 
						||
            if product_barcode not in deleted_barcodes:
 | 
						||
                continue
 | 
						||
            await self.session.delete(product_barcode)
 | 
						||
        for new_barcode in new_barcodes:
 | 
						||
            product_barcode = ProductBarcode(
 | 
						||
                product_id=product.id,
 | 
						||
                barcode=new_barcode
 | 
						||
            )
 | 
						||
            self.session.add(product_barcode)
 | 
						||
            await self.session.flush()
 | 
						||
 | 
						||
        await self.session.commit()
 | 
						||
        return ProductUpdateResponse(ok=True, message='Товар успешно обновлен')
 | 
						||
 | 
						||
    async def get_by_client_id(self, client_id: int, pagination: PaginationSchema) -> ProductGetResponse:
 | 
						||
        stmt = (
 | 
						||
            select(Product)
 | 
						||
            .options(selectinload(Product.barcodes))
 | 
						||
            .where(Product.client_id == client_id)
 | 
						||
            .order_by(Product.id)
 | 
						||
        )
 | 
						||
        total_products_query = await self.session.execute(
 | 
						||
            select(
 | 
						||
                func.cast(func.ceil(func.count() / pagination.items_per_page), Integer),
 | 
						||
                func.count()
 | 
						||
            )
 | 
						||
            .select_from(stmt.subquery())
 | 
						||
        )
 | 
						||
        total_pages, total_items = total_products_query.first()
 | 
						||
        pagination_info = PaginationInfoSchema(total_pages=total_pages, total_items=total_items)
 | 
						||
        query = await self.session.execute(
 | 
						||
            stmt
 | 
						||
            .offset(pagination.page * pagination.items_per_page)
 | 
						||
            .limit(pagination.items_per_page)
 | 
						||
        )
 | 
						||
        products: list[ProductSchema] = []
 | 
						||
        for product in query.scalars().all():
 | 
						||
            product: Product
 | 
						||
 | 
						||
            barcodes = []
 | 
						||
            for barcode_obj in product.barcodes:
 | 
						||
                barcode_obj: ProductBarcode
 | 
						||
                barcodes.append(barcode_obj.barcode)
 | 
						||
 | 
						||
            products.append(
 | 
						||
                ProductSchema.from_sql_model(product, {'barcodes': barcodes})
 | 
						||
            )
 | 
						||
        return ProductGetResponse(products=products, pagination_info=pagination_info)
 |