import lexorank import models.secondary from typing import Union import models.deal from fastapi import HTTPException from sqlalchemy import select, func, update, delete, insert from sqlalchemy.orm import joinedload, selectinload from models import User, Service, Client from models.deal import * from schemas.client import ClientDetailsSchema from schemas.deal import * from services.base import BaseService from services.client import ClientService from services.shipping_warehouse import ShippingWarehouseService class DealService(BaseService): # region Deal async def _get_deal_by_id(self, deal_id) -> Union[Deal, None]: return await self.session.get(Deal, deal_id) async def _get_rank_for_deal(self, deal_status: DealStatus) -> str: deal_query = await self.session.execute( select(Deal).where(Deal.current_status == deal_status).order_by(Deal.lexorank.desc()).limit(1)) deal = deal_query.scalar_one_or_none() if not deal: prev = lexorank.middle(lexorank.Bucket.BUCEKT_0) return str(prev.next()) return str(lexorank.parse(deal.lexorank).next()) async def change_status(self, deal: Deal, status: DealStatus, user: User, deadline: datetime.datetime = None, rank=None, comment: str = ''): if not deal.current_status == status: deadline = deadline status_change = DealStatusHistory( deal_id=deal.id, user_id=user.id, changed_at=datetime.datetime.now(), from_status=deal.current_status, to_status=status, next_status_deadline=deadline, comment=comment ) self.session.add(status_change) deal.current_status = status if not rank: rank = await self._get_rank_for_deal(status) if rank: deal.lexorank = rank await self.session.flush() async def create(self, request: DealCreateRequest, user: User) -> DealCreateResponse: rank = self._get_rank_for_deal(DealStatus.CREATED) deal = Deal( name=request.name, created_at=datetime.datetime.now(), current_status=DealStatus.CREATED, lexorank=rank ) self.session.add(deal) await self.session.flush() # Append status history await self.change_status(deal, DealStatus.AWAITING_ACCEPTANCE, user) await self.session.commit() return DealCreateResponse(ok=True) async def delete(self, request: DealDeleteRequest) -> DealDeleteResponse: deal = await self._get_deal_by_id(request.deal_id) if not deal: return DealDeleteResponse(ok=False, message="Сделка не найдена") deal.is_deleted = True await self.session.commit() return DealDeleteResponse(ok=True, message="Сделка успешно удалена") async def quick_create(self, request: DealQuickCreateRequest, user: User) -> DealQuickCreateResponse: client_service = ClientService(self.session) client = await client_service.get_by_name(request.client_name) if not client: client = await client_service.create_client_raw( user, request.client_name, ClientDetailsSchema() ) shipping_warehouse_service = ShippingWarehouseService(self.session) shipping_warehouse = await shipping_warehouse_service.get_by_name(name=request.shipping_warehouse) if not shipping_warehouse: shipping_warehouse = await shipping_warehouse_service.create_by_name(name=request.shipping_warehouse) rank = await self._get_rank_for_deal(DealStatus.CREATED) deal = Deal( name=request.name, created_at=datetime.datetime.now(), client_id=client.id, current_status=DealStatus.CREATED, lexorank=rank, shipping_warehouse_id=shipping_warehouse.id, base_marketplace_key=request.base_marketplace.key ) self.session.add(deal) await self.session.flush() await self.change_status(deal, DealStatus.AWAITING_ACCEPTANCE, user, deadline=request.acceptance_date, comment=request.comment) await self.session.commit() return DealQuickCreateResponse(deal_id=deal.id) async def change_status_manual(self, request: DealChangeStatusRequest, user: User) -> DealChangeStatusResponse: # Changing current status deal = await self._get_deal_by_id(request.deal_id) if not deal: return DealChangeStatusResponse(ok=False) await self.change_status(deal, DealStatus(request.new_status), user) await self.session.commit() return DealChangeStatusResponse(ok=True) def _get_price_subquery(self): deal_services_subquery = ( select( models.secondary.DealService.deal_id, func.sum(models.secondary.DealService.quantity * models.secondary.DealService.price).label( 'total_price') ) .join(Service) .group_by(models.secondary.DealService.deal_id) ) product_services_subquery = select( select( models.secondary.DealProductService.deal_id, func.sum(models.DealProduct.quantity * models.secondary.DealProductService.price).label('total_price') ) .join(models.secondary.DealProduct) .group_by(models.secondary.DealProductService.deal_id) .subquery() ) union_subqueries = deal_services_subquery.union(product_services_subquery).subquery() final_subquery = ( select( union_subqueries.c.deal_id, func.sum(union_subqueries.c.total_price).label('total_price') ) .group_by(union_subqueries.c.deal_id) .subquery() ) return final_subquery async def get_summary(self) -> DealSummaryResponse: price_subquery = self._get_price_subquery() q = ( select( Deal, func.coalesce(price_subquery.c.total_price, 0), func.row_number().over( partition_by=Deal.current_status, order_by=Deal.lexorank ).label('rank') ) .options( selectinload(Deal.status_history), joinedload(Deal.client) ) .outerjoin( price_subquery, Deal.id == price_subquery.c.deal_id) .where( Deal.is_deleted == False, Deal.is_completed == False, Deal.current_status != DealStatus.COMPLETED ) ) deals_query = await self.session.execute(q) summaries = [] for deal, total_price, rank in deals_query.all(): deal: Deal last_status: DealStatusHistory = max(deal.status_history, key=lambda status: status.changed_at) deadline = last_status.next_status_deadline base_marketplace = None if deal.base_marketplace: base_marketplace = BaseMarketplaceSchema.model_validate(deal.base_marketplace) summaries.append( DealSummary( id=deal.id, client_name=deal.client.name, name=deal.name, changed_at=last_status.changed_at, deadline=deadline, status=last_status.to_status, total_price=total_price, rank=rank, base_marketplace=base_marketplace ) ) return DealSummaryResponse(summaries=summaries) async def get_all(self) -> DealGetAllResponse: deals_query = await self.session.scalars(select(Deal).options(joinedload(Deal.client))) deals = deals_query.all() result = [] for deal in deals: result.append(DealSchema.model_validate(deal)) return DealGetAllResponse(deals=result) async def get_by_id(self, deal_id: int) -> DealSchema: deal = await self.session.scalar( select(Deal) .options( joinedload(Deal.shipping_warehouse), joinedload(Deal.client) .joinedload(Client.details), selectinload(Deal.services) .options( joinedload(models.secondary.DealService.service).joinedload(Service.category), selectinload(models.secondary.DealService.employees) ), selectinload(Deal.products) .joinedload(models.secondary.DealProduct.product) .joinedload(models.Product.client), selectinload(Deal.products) .joinedload(models.secondary.DealProduct.product) .joinedload(models.Product.barcodes), selectinload(Deal.products) .joinedload(models.secondary.DealProduct.services) .options( joinedload(models.secondary.DealProductService.service), selectinload(models.secondary.DealProductService.employees) ), selectinload(Deal.status_history) .joinedload(DealStatusHistory.user), selectinload(Deal.status_history) .noload(DealStatusHistory.deal), ) .where(Deal.id == deal_id) ) if not deal: raise HTTPException(status_code=404, detail="Сделка не найдена") return DealSchema.model_validate(deal) async def update_general_info(self, request: DealUpdateGeneralInfoRequest) -> DealUpdateGeneralInfoResponse: try: deal: Deal = await self.session.scalar(select(Deal).where(Deal.id == request.deal_id)) if not deal: raise HTTPException(status_code=404, detail="Сделка не найдена") deal.name = request.data.name deal.comment = request.data.comment deal.is_deleted = request.data.is_deleted deal.is_completed = request.data.is_completed # Updating shipping warehouse shipping_warehouse_service = ShippingWarehouseService(self.session) shipping_warehouse = await shipping_warehouse_service.get_by_name(request.data.shipping_warehouse) if not shipping_warehouse and request.data.shipping_warehouse: shipping_warehouse = await shipping_warehouse_service.create_by_name(request.data.shipping_warehouse) deal.shipping_warehouse = shipping_warehouse await self.session.commit() return DealUpdateGeneralInfoResponse(ok=True, message='Данные о сделке успешно обновлены') except Exception as e: await self.session.rollback() return DealUpdateGeneralInfoResponse(ok=False, message=str(e)) async def reorder(self, request: DealSummaryReorderRequest, user: User) -> DealSummaryResponse: deal: Deal = await self.session.scalar(select(Deal).where(Deal.id == request.deal_id)) if request.index == 1: request.index = 0 is_first = request.index == 0 stmt = ( select(Deal) .where( Deal.current_status == request.status, Deal.id != request.deal_id, Deal.is_deleted == False, Deal.is_completed == False ) .order_by(Deal.lexorank) .offset(max([request.index - 2, 0])) .limit(2 if not is_first else 1) ) query = await self.session.execute(stmt) boundaries = query.scalars().all() top_boundary: Union[Deal, None] = boundaries[0] if not is_first else None bottom_boundary: Union[Deal, None] = boundaries[1] if len(boundaries) == 2 else None # working when between two elements if top_boundary and bottom_boundary: top_lexorank = lexorank.parse(top_boundary.lexorank) bottom_lexorank = lexorank.parse(bottom_boundary.lexorank) new_rank = lexorank.between(top_lexorank, bottom_lexorank) # working when at the bottom elif top_boundary and not bottom_boundary: new_rank = lexorank.parse(top_boundary.lexorank).next() # working when at the top elif bottom_boundary and not top_boundary: new_rank = lexorank.parse(bottom_boundary.lexorank).prev() elif not top_boundary and not bottom_boundary and len(boundaries) > 0: new_rank = lexorank.parse(boundaries[0].lexorank).prev() else: new_rank = lexorank.middle(lexorank.Bucket.BUCEKT_0) await self.change_status(deal, request.status, user, deadline=request.deadline, comment=request.comment, rank=str(new_rank)) await self.session.commit() return await self.get_summary() # endregion # region Deal services async def add_services(self, request: DealAddServicesRequest): # TODO refactor deal: Deal = await self.session.scalar( select(Deal) .options(selectinload(Deal.services)) .where(Deal.id == request.deal_id) ) if not deal: raise HTTPException(status_code=404, detail="Deal is not found") services_ids = [service.id for service in request.services] existing_service_ids = {service.service_id for service in deal.services} request_services_dict = {service.id: service.quantity for service in request.services} services_query = await self.session.scalars(select(Service).where(Service.id.in_(services_ids))) services = services_query.all() if len(services) != len(services_ids): raise HTTPException(status_code=404, detail="Some of services is not found") # Adding quantity for deal_service in deal.services: deal_service: models.secondary.DealService if deal_service.service_id not in services_ids: continue deal_service.quantity += request_services_dict[deal_service.service_id] # Adding new services for service in services: if service.id in existing_service_ids: continue quantity = request_services_dict[service.id] deal.services.append( models.secondary.DealService( service_id=service.id, deal_id=deal.id, quantity=quantity ) ) await self.session.commit() return DealAddServicesResponse(ok=True, message='Услуги успешно добавлены') async def update_service_quantity(self, request: DealUpdateServiceQuantityRequest) -> DealUpdateServiceQuantityResponse: try: deal_service = await self.session.scalar( select(models.secondary.DealService) .where(models.secondary.DealService.deal_id == request.deal_id, models.secondary.DealService.service_id == request.service_id) ) if not deal_service: raise HTTPException(status_code=404, detail="Сделка не найдена") deal_service.quantity = request.quantity await self.session.commit() return DealUpdateServiceQuantityResponse(ok=True, message='Количество успешно обновлено') except Exception as e: await self.session.rollback() return DealUpdateServiceQuantityResponse(ok=False, message=str(e)) async def add_service(self, request: DealAddServiceRequest) -> DealAddServiceResponse: try: deal = await self.session.scalar(select(Deal).where(Deal.id == request.deal_id)) if not deal: raise HTTPException(status_code=404, detail="Сделка не найдена") service: models.Service = await self.session.scalar(select(Service).where(Service.id == request.service_id)) if not service: raise HTTPException(status_code=404, detail="Услуга не найдена") # Preventing duplicates deal_service = await self.session.scalar( select(models.secondary.DealService) .where(models.secondary.DealService.deal_id == request.deal_id, models.secondary.DealService.service_id == request.service_id) ) if deal_service: raise HTTPException(status_code=400, detail="Услуга уже добавлена") deal_service = models.secondary.DealService( deal_id=request.deal_id, service_id=request.service_id, quantity=request.quantity, price=request.price ) self.session.add(deal_service) await self.session.commit() return DealAddServiceResponse(ok=True, message='Услуга успешно добавлена') except Exception as e: await self.session.rollback() return DealAddServiceResponse(ok=False, message=str(e)) async def delete_service(self, request: DealDeleteServiceRequest) -> DealDeleteServiceResponse: try: deal_service = await self.session.scalar( select(models.secondary.DealService) .where(models.secondary.DealService.deal_id == request.deal_id, models.secondary.DealService.service_id == request.service_id) ) if not deal_service: raise HTTPException(status_code=404, detail="Сделка не найдена") await self.session.delete(deal_service) await self.session.commit() return DealDeleteServiceResponse(ok=True, message='Услуга успешно удалена') except Exception as e: await self.session.rollback() return DealDeleteServiceResponse(ok=False, message=str(e)) async def delete_services(self, request: DealDeleteServicesRequest) -> DealDeleteServicesResponse: try: deal_services = await self.session.scalars( select(models.secondary.DealService) .where(models.secondary.DealService.deal_id == request.deal_id, models.secondary.DealService.service_id.in_(request.service_ids)) ) for deal_service in deal_services: await self.session.delete(deal_service) await self.session.commit() return DealDeleteServicesResponse(ok=True, message='Услуги успешно удалены') except Exception as e: await self.session.rollback() return DealDeleteServicesResponse(ok=False, message=str(e)) async def update_service(self, request: DealUpdateServiceRequest) -> DealUpdateServiceResponse: try: deal_service = await self.session.scalar( select(models.secondary.DealService) .where(models.secondary.DealService.deal_id == request.deal_id, models.secondary.DealService.service_id == request.service.service.id) ) if not deal_service: raise HTTPException(status_code=404, detail="Сделка не найдена") service_dict = request.service.dict() del service_dict['service'] del service_dict['employees'] service_dict['service_id'] = request.service.service.id await self.session.execute( update(models.secondary.DealService) .where(models.secondary.DealService.deal_id == request.deal_id, models.secondary.DealService.service_id == request.service.service.id) .values(**service_dict) ) # Updating deleting previous employees delete_stmt = ( delete( models.deal_service_employees ) .where( models.deal_service_employees.c.deal_id == request.deal_id, models.deal_service_employees.c.service_id == request.service.service.id, ) ) await self.session.execute(delete_stmt) await self.session.flush() insert_data = [] for employee in request.service.employees: insert_data.append({ 'deal_id': request.deal_id, 'service_id': request.service.service.id, 'user_id': employee.id }) if insert_data: await self.session.execute( insert(models.deal_service_employees), insert_data ) await self.session.flush() await self.session.commit() return DealUpdateServiceQuantityResponse(ok=True, message='Услуга успешно обновлена') except Exception as e: await self.session.rollback() return DealUpdateServiceQuantityResponse(ok=False, message=str(e)) # endregion # region Deal products async def update_product_quantity(self, request: DealUpdateProductQuantityRequest) -> DealUpdateProductQuantityResponse: try: # check if there is no deal or no product with different exceptions deal_product = await self.session.scalar( select(models.secondary.DealProduct) .where(models.secondary.DealProduct.deal_id == request.deal_id, models.secondary.DealProduct.product_id == request.product_id) ) if not deal_product: raise HTTPException(status_code=404, detail="Сделка или товар не найдена") deal_product.quantity = request.quantity await self.session.commit() return DealUpdateProductQuantityResponse(ok=True, message='Количество успешно обновлено') except Exception as e: await self.session.rollback() return DealUpdateProductQuantityResponse(ok=False, message=str(e)) async def add_product(self, request: DealAddProductRequest) -> DealAddProductResponse: try: deal = await self.session.scalar(select(Deal).where(Deal.id == request.deal_id)) if not deal: raise HTTPException(status_code=404, detail="Сделка не найдена") product = await self.session.scalar( select(models.Product).where(models.Product.id == request.product.product.id)) if not product: raise HTTPException(status_code=404, detail="Товар не найден") # Preventing duplicates deal_product = await self.session.scalar( select(models.secondary.DealProduct) .where(models.secondary.DealProduct.deal_id == request.deal_id, models.secondary.DealProduct.product_id == request.product.product.id) ) if deal_product: raise HTTPException(status_code=400, detail="Товар уже добавлен") deal_product = models.secondary.DealProduct( deal_id=request.deal_id, product_id=request.product.product.id, quantity=request.product.quantity ) self.session.add(deal_product) await self.session.flush() for service in request.product.services: deal_product_service = models.secondary.DealProductService( deal_id=request.deal_id, product_id=request.product.product.id, service_id=service.service.id, price=service.price ) self.session.add(deal_product_service) await self.session.commit() return DealAddProductResponse(ok=True, message='Товар успешно добавлен') except Exception as e: await self.session.rollback() return DealAddProductResponse(ok=False, message=str(e)) async def delete_product(self, request: DealDeleteProductRequest) -> DealDeleteProductResponse: try: deal_product = await self.session.scalar( select(models.secondary.DealProduct) .where(models.secondary.DealProduct.deal_id == request.deal_id, models.secondary.DealProduct.product_id == request.product_id) ) if not deal_product: raise HTTPException(status_code=404, detail="Сделка не найдена") await self.session.delete(deal_product) await self.session.commit() return DealDeleteProductResponse(ok=True, message='Товар успешно удален') except Exception as e: await self.session.rollback() return DealDeleteProductResponse(ok=False, message=str(e)) async def delete_products(self, request: DealDeleteProductsRequest) -> DealDeleteProductsResponse: try: deal_products = await self.session.scalars( select(models.secondary.DealProduct) .where(models.secondary.DealProduct.deal_id == request.deal_id, models.secondary.DealProduct.product_id.in_(request.product_ids)) ) for deal_product in deal_products: await self.session.delete(deal_product) await self.session.commit() return DealDeleteProductsResponse(ok=True, message='Товары успешно удалены') except Exception as e: await self.session.rollback() return DealDeleteProductsResponse(ok=False, message=str(e)) async def update_product(self, request: DealUpdateProductRequest): try: deal_product: models.DealProduct = await self.session.scalar( select(models.secondary.DealProduct) .where(models.secondary.DealProduct.deal_id == request.deal_id, models.secondary.DealProduct.product_id == request.product.product.id) ) if not deal_product: raise HTTPException(status_code=404, detail="Указанный товар не найден") # getting new services and deleted database_services = set([service.service_id for service in deal_product.services]) request_services = set([service.service.id for service in request.product.services]) new_services = request_services.difference(database_services) deleted_services = database_services.difference(request_services) services_dict = {service.service.id: service for service in request.product.services} # Deleting and updating existing services for service in deal_product.services: service: models.DealProductService if service.service_id in deleted_services: await self.session.delete(service) await self.session.flush() continue request_service = services_dict[service.service_id] service.price = request_service.price await self.session.flush() # Creating services for service in request.product.services: if service.service.id not in new_services: continue deal_product_service = models.DealProductService( deal_id=request.deal_id, product_id=request.product.product.id, service_id=service.service.id, price=service.price ) self.session.add(deal_product_service) await self.session.flush() # Updating product deal_product.quantity = request.product.quantity # Updating deleting old employees delete_stmt = ( delete( models.deal_product_service_employees ) .where( models.deal_product_service_employees.c.deal_id == request.deal_id, models.deal_product_service_employees.c.service_id.in_(request_services.union(database_services)), models.deal_product_service_employees.c.product_id == request.product.product.id ) ) await self.session.execute(delete_stmt) await self.session.flush() insert_data = [] for service in request.product.services: service: DealProductServiceSchema for employee in service.employees: insert_data.append({ 'deal_id': request.deal_id, 'service_id': service.service.id, 'product_id': request.product.product.id, 'user_id': employee.id }) if insert_data: await self.session.execute(insert(models.deal_product_service_employees), insert_data) await self.session.flush() await self.session.commit() return DealUpdateProductResponse(ok=True, message='Товар успешно обновлен') except Exception as e: await self.session.rollback() return DealUpdateProductResponse(ok=False, message=str(e)) # endregion