from collections import defaultdict from datetime import datetime, timedelta from sqlalchemy import select, and_, union_all, func, Subquery from models import DealService, Deal, DealStatusHistory, DealProductService, DealProduct, Service from schemas.statistics import GetProfitDataResponse, GetProfitDataRequest, ProfitDataItem from services.base import BaseService class StatisticsService(BaseService): @staticmethod def _get_sub_status_history(): last_statuses = ( select( DealStatusHistory.deal_id, func.max(DealStatusHistory.changed_at).label('changed_at') ) .group_by(DealStatusHistory.deal_id) .subquery() ) return ( select( Deal.id.label('deal_id'), last_statuses.c.changed_at, Deal.current_status, ) .join(last_statuses, last_statuses.c.deal_id == Deal.id) .subquery() ) @staticmethod def _get_filtered_sub_status_history(date_from: datetime.date, date_to: datetime.date): sub_status_history = StatisticsService._get_sub_status_history() return ( select(sub_status_history) .where(sub_status_history.c.changed_at.between(date_from, date_to)) .subquery() ) @staticmethod def _fill_dates_gaps(rows, date_from: datetime.date, date_to: datetime.date) -> list[ProfitDataItem]: dates = defaultdict(lambda: {"deals_count": 0, "profit": 0, "revenue": 0}) for row in rows: dates[row.changed_at.date()] = { "deals_count": row.deals_count, "profit": row.profit, "revenue": row.revenue } data = [] while date_from < date_to: data_item = ProfitDataItem( date=date_from, deals_count=dates[date_from]["deals_count"], profit=dates[date_from]["profit"], revenue=dates[date_from]["revenue"], ) data.append(data_item) date_from += timedelta(days=1) return data @staticmethod def _get_stmt_deal_services(sub_filtered_status_history: Subquery): return ( select( Deal.id.label("deal_id"), func.date_trunc( "day", sub_filtered_status_history.c.changed_at, ).label("changed_at"), func.sum(DealService.price * DealService.quantity).label("revenue"), func.sum((DealService.price - Service.cost) * DealService.quantity).label("profit"), ) .join(DealService, Deal.id == DealService.deal_id) .join(Service, DealService.service_id == Service.id) .join(sub_filtered_status_history, Deal.id == sub_filtered_status_history.c.deal_id) .where(Deal.is_deleted == False) .group_by(Deal.id, "changed_at") ) @staticmethod def _apply_filters(request: GetProfitDataRequest, stmt_deal_services, stmt_deal_product_services): if request.client_id != -1: stmt_deal_services = stmt_deal_services.where(Deal.client_id == request.client_id) stmt_deal_product_services = stmt_deal_product_services.where(Deal.client_id == request.client_id) if request.base_marketplace_key != "all": stmt_deal_services = stmt_deal_services.where(Deal.base_marketplace_key == request.base_marketplace_key) stmt_deal_product_services = stmt_deal_product_services.where( Deal.base_marketplace_key == request.base_marketplace_key) if request.deal_status_id != -1: stmt_deal_services = stmt_deal_services.where(Deal.current_status == request.deal_status_id) stmt_deal_product_services = stmt_deal_product_services.where(Deal.current_status == request.deal_status_id) return stmt_deal_services, stmt_deal_product_services @staticmethod def _get_stmt_product_services(): return ( select( Deal.id.label("deal_id"), func.sum(DealProductService.price * DealProduct.quantity).label("revenue"), func.sum((DealProductService.price - Service.cost) * DealProduct.quantity).label("profit"), ) .join(DealProduct, Deal.id == DealProduct.deal_id) .join( DealProductService, and_( DealProductService.deal_id == Deal.id, DealProductService.product_id == DealProduct.product_id, ) ) .join(Service, DealProductService.service_id == Service.id) .where(Deal.is_deleted == False) .group_by(Deal.id) ) @staticmethod def _group_by_date(stmt): return ( select( stmt.c.changed_at, func.count(stmt.c.deal_id).label("deals_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .group_by(stmt.c.changed_at) .order_by(stmt.c.changed_at.asc()) ) async def get_profit_data(self, request: GetProfitDataRequest) -> GetProfitDataResponse: date_from, date_to = request.date_range date_to += timedelta(days=1) sub_filtered_status_history = self._get_filtered_sub_status_history(date_from, date_to) stmt_deal_services = self._get_stmt_deal_services(sub_filtered_status_history) stmt_deal_product_services = self._get_stmt_product_services() stmt_deal_services, stmt_deal_product_services = self._apply_filters( request, stmt_deal_services, stmt_deal_product_services ) sub_deal_product_services = stmt_deal_product_services.subquery() stmt_join_deals_statuses = ( select( sub_deal_product_services.c.deal_id, func.date_trunc( "day", sub_filtered_status_history.c.changed_at ).label("changed_at"), sub_deal_product_services.c.revenue.label("revenue"), sub_deal_product_services.c.profit.label("profit"), ) .join(sub_filtered_status_history, sub_deal_product_services.c.deal_id == sub_filtered_status_history.c.deal_id) ) stmt_union = union_all(stmt_deal_services, stmt_join_deals_statuses).subquery() sub_grouped_by_deals = ( select( stmt_union.c.deal_id, stmt_union.c.changed_at, func.sum(stmt_union.c.profit).label("profit"), func.sum(stmt_union.c.revenue).label("revenue"), ) .group_by(stmt_union.c.deal_id, stmt_union.c.changed_at) ) stmt_grouped_by_date = self._group_by_date(sub_grouped_by_deals) result = await self.session.execute(stmt_grouped_by_date) rows = result.all() data = self._fill_dates_gaps(rows, date_from, date_to) return GetProfitDataResponse(data=data)