from collections import defaultdict from datetime import datetime, timedelta from fastapi import HTTPException from sqlalchemy import select, and_, union_all, func, Subquery from enums.profit_table_group_by import ProfitTableGroupBy from models import DealService, Deal, DealStatusHistory, DealProductService, DealProduct, Service, Client, \ ShippingWarehouse, BaseMarketplace from schemas.statistics import GetProfitChartDataResponse, GetProfitChartDataRequest, ProfitChartDataItem, \ GetProfitTableDataResponse, GetProfitTableDataRequest, ProfitTableDataItem from services.base import BaseService class StatisticsService(BaseService): @staticmethod def _get_sub_deals_created_at(date_from: datetime.date, date_to: datetime.date): return ( select( Deal.id.label('deal_id'), Deal.created_at.label('deal_date'), Deal.current_status, ) .where(Deal.created_at.between(date_from, date_to)) .subquery() ) @staticmethod def _get_sub_status_history(): last_statuses = ( select( DealStatusHistory.deal_id, func.max(DealStatusHistory.changed_at).label('changed_at') ) .group_by(DealStatusHistory.deal_id) .subquery() ) return ( select( Deal.id.label('deal_id'), last_statuses.c.changed_at.label('deal_date'), Deal.current_status, ) .join(last_statuses, last_statuses.c.deal_id == Deal.id) .subquery() ) @staticmethod def _get_filtered_sub_status_history(date_from: datetime.date, date_to: datetime.date): sub_status_history = StatisticsService._get_sub_status_history() return ( select(sub_status_history) .where(sub_status_history.c.deal_date.between(date_from, date_to)) .subquery() ) @staticmethod def _get_deals_dates(deal_status_id: int, date_from: datetime.date, date_to: datetime.date): if deal_status_id == -1: return StatisticsService._get_sub_deals_created_at(date_from, date_to) return StatisticsService._get_filtered_sub_status_history(date_from, date_to) @staticmethod def _fill_dates_gaps(rows, date_from: datetime.date, date_to: datetime.date, is_chart: bool = True) -> ( list[ProfitChartDataItem] | list[ProfitTableDataItem] ): dates = defaultdict(lambda: {"deals_count": 0, "profit": 0, "revenue": 0}) for row in rows: dates[row.deal_date.date()] = { "deals_count": row.deals_count, "profit": row.profit, "revenue": row.revenue } data = [] while date_from < date_to: if is_chart: data_item = ProfitChartDataItem( date=date_from, deals_count=dates[date_from]["deals_count"], profit=dates[date_from]["profit"], revenue=dates[date_from]["revenue"], ) else: data_item = ProfitTableDataItem( grouped_value=date_from, deals_count=dates[date_from]["deals_count"], profit=dates[date_from]["profit"], revenue=dates[date_from]["revenue"], ) data.append(data_item) date_from += timedelta(days=1) return data @staticmethod def _get_stmt_deal_services(sub_filtered_status_history: Subquery): return ( select( Deal.id.label("deal_id"), func.date_trunc( "day", sub_filtered_status_history.c.deal_date, ).label("deal_date"), func.sum(DealService.price * DealService.quantity).label("revenue"), func.sum((DealService.price - Service.cost) * DealService.quantity).label("profit"), ) .join(DealService, Deal.id == DealService.deal_id) .join(Service, DealService.service_id == Service.id) .join(sub_filtered_status_history, Deal.id == sub_filtered_status_history.c.deal_id) .where(Deal.is_deleted == False) .group_by(Deal.id, "deal_date") ) @staticmethod def _apply_filters(request: GetProfitChartDataRequest, stmt_deal_services, stmt_deal_product_services): if request.client_id != -1: stmt_deal_services = stmt_deal_services.where(Deal.client_id == request.client_id) stmt_deal_product_services = stmt_deal_product_services.where(Deal.client_id == request.client_id) if request.base_marketplace_key != "all": stmt_deal_services = stmt_deal_services.where(Deal.base_marketplace_key == request.base_marketplace_key) stmt_deal_product_services = stmt_deal_product_services.where( Deal.base_marketplace_key == request.base_marketplace_key) if request.deal_status_id != -1: stmt_deal_services = stmt_deal_services.where(Deal.current_status == request.deal_status_id) stmt_deal_product_services = stmt_deal_product_services.where(Deal.current_status == request.deal_status_id) return stmt_deal_services, stmt_deal_product_services @staticmethod def _get_stmt_product_services(): return ( select( Deal.id.label("deal_id"), func.sum(DealProductService.price * DealProduct.quantity).label("revenue"), func.sum((DealProductService.price - Service.cost) * DealProduct.quantity).label("profit"), ) .join(DealProduct, Deal.id == DealProduct.deal_id) .join( DealProductService, and_( DealProductService.deal_id == Deal.id, DealProductService.product_id == DealProduct.product_id, ) ) .join(Service, DealProductService.service_id == Service.id) .where(Deal.is_deleted == False) .group_by(Deal.id) ) @staticmethod def _get_joined_deals_and_statuses(sub_deal_product_services: Subquery, sub_deals_dates: Subquery): return ( select( sub_deal_product_services.c.deal_id, func.date_trunc( "day", sub_deals_dates.c.deal_date ).label("deal_date"), sub_deal_product_services.c.revenue.label("revenue"), sub_deal_product_services.c.profit.label("profit"), ) .join(sub_deals_dates, sub_deal_product_services.c.deal_id == sub_deals_dates.c.deal_id) ) @staticmethod def _group_by_deals(stmt_union: Subquery): return ( select( stmt_union.c.deal_id, stmt_union.c.deal_date, func.sum(stmt_union.c.profit).label("profit"), func.sum(stmt_union.c.revenue).label("revenue"), ) .group_by(stmt_union.c.deal_id, stmt_union.c.deal_date) ) @staticmethod def _group_by_date(stmt): return ( select( stmt.c.deal_date, func.count(stmt.c.deal_id).label("deals_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .group_by(stmt.c.deal_date) .order_by(stmt.c.deal_date.asc()) ) @staticmethod def _join_and_group_by_clients(stmt): return ( select( Client.id, Client.name.label("grouped_value"), func.count(stmt.c.deal_id).label("deals_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Deal, Deal.id == stmt.c.deal_id) .join(Client, Client.id == Deal.client_id) .group_by(Client.id, Client.name) ) @staticmethod def _join_and_group_by_statuses(stmt): return ( select( Deal.current_status.label("grouped_value"), func.count(stmt.c.deal_id).label("deals_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Deal, Deal.id == stmt.c.deal_id) .group_by(Deal.current_status) ) @staticmethod def _join_and_group_by_warehouses(stmt): return ( select( ShippingWarehouse.id, ShippingWarehouse.name.label("grouped_value"), ShippingWarehouse.is_deleted, func.count(stmt.c.deal_id).label("deals_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Deal, Deal.id == stmt.c.deal_id) .join(ShippingWarehouse, Deal.shipping_warehouse_id == ShippingWarehouse.id) .where(ShippingWarehouse.is_deleted == False) .group_by(ShippingWarehouse.is_deleted, ShippingWarehouse.id, ShippingWarehouse.name) ) @staticmethod def _join_and_group_by_marketplaces(stmt): return ( select( BaseMarketplace.name.label("grouped_value"), func.count(stmt.c.deal_id).label("deals_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Deal, Deal.id == stmt.c.deal_id) .join(BaseMarketplace, Deal.base_marketplace_key == BaseMarketplace.key) .group_by(BaseMarketplace.name) ) async def _get_data_rows_grouped_by_date( self, stmt_deal_services, stmt_deal_product_services, sub_deals_dates: Subquery, ): sub_deal_product_services = stmt_deal_product_services.subquery() stmt_join_deals_statuses = self._get_joined_deals_and_statuses(sub_deal_product_services, sub_deals_dates) sub_union = union_all(stmt_deal_services, stmt_join_deals_statuses).subquery() sub_grouped_by_deals = self._group_by_deals(sub_union) stmt_grouped_by_date = self._group_by_date(sub_grouped_by_deals) result = await self.session.execute(stmt_grouped_by_date) rows = result.all() return rows async def get_profit_chart_data(self, request: GetProfitChartDataRequest) -> GetProfitChartDataResponse: date_from, date_to = request.date_range date_to += timedelta(days=1) sub_deals_dates = self._get_deals_dates(request.deal_status_id, date_from, date_to) stmt_deal_services = self._get_stmt_deal_services(sub_deals_dates) stmt_deal_product_services = self._get_stmt_product_services() stmt_deal_services, stmt_deal_product_services = self._apply_filters( request, stmt_deal_services, stmt_deal_product_services ) rows = await self._get_data_rows_grouped_by_date( stmt_deal_services, stmt_deal_product_services, sub_deals_dates ) data = self._fill_dates_gaps(rows, date_from, date_to) return GetProfitChartDataResponse(data=data) async def _get_table_grouped_by_dates(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: date_from, date_to = request.date_range date_to += timedelta(days=1) sub_deals_dates = self._get_sub_deals_created_at(date_from, date_to) stmt_deal_services = self._get_stmt_deal_services(sub_deals_dates) stmt_deal_product_services = self._get_stmt_product_services() rows = await self._get_data_rows_grouped_by_date( stmt_deal_services, stmt_deal_product_services, sub_deals_dates ) data = self._fill_dates_gaps(rows, date_from, date_to, False) return GetProfitTableDataResponse(data=data) async def _table_data_from_stmt(self, stmt) -> GetProfitTableDataResponse: result = await self.session.execute(stmt) rows = result.all() data = [] for row in rows: data.append(ProfitTableDataItem( grouped_value=row.grouped_value, revenue=row.revenue, profit=row.profit, deals_count=row.deals_count, )) return GetProfitTableDataResponse(data=data) def _get_common_table_grouped(self, request: GetProfitTableDataRequest): date_from, date_to = request.date_range date_to += timedelta(days=1) sub_deals_dates = self._get_sub_deals_created_at(date_from, date_to) stmt_deal_services = self._get_stmt_deal_services(sub_deals_dates) stmt_deal_product_services = self._get_stmt_product_services() stmt_deal_product_services = ( select( stmt_deal_product_services.c.deal_id, func.date_trunc( "day", Deal.created_at ).label("deal_date"), stmt_deal_product_services.c.revenue.label("revenue"), stmt_deal_product_services.c.profit.label("profit"), ) .join(Deal, Deal.id == stmt_deal_product_services.c.deal_id) ) sub_union = union_all(stmt_deal_services, stmt_deal_product_services).subquery() sub_grouped_by_deals = self._group_by_deals(sub_union) return sub_grouped_by_deals async def _get_table_grouped_by_clients(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_deals = self._get_common_table_grouped(request) stmt_grouped_by_clients = self._join_and_group_by_clients(sub_grouped_by_deals) return await self._table_data_from_stmt(stmt_grouped_by_clients) async def _get_table_grouped_by_statuses(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: date_from, date_to = request.date_range date_to += timedelta(days=1) sub_deals_dates = self._get_filtered_sub_status_history(date_from, date_to) stmt_deal_services = self._get_stmt_deal_services(sub_deals_dates) sub_deal_product_services = self._get_stmt_product_services().subquery() stmt_join_deals_statuses = self._get_joined_deals_and_statuses(sub_deal_product_services, sub_deals_dates) sub_union = union_all(stmt_deal_services, stmt_join_deals_statuses).subquery() sub_grouped_by_deals = self._group_by_deals(sub_union) stmt_grouped_by_date = self._join_and_group_by_statuses(sub_grouped_by_deals) return await self._table_data_from_stmt(stmt_grouped_by_date) async def _get_table_grouped_by_warehouses(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_deals = self._get_common_table_grouped(request) stmt_grouped_by_clients = self._join_and_group_by_warehouses(sub_grouped_by_deals) return await self._table_data_from_stmt(stmt_grouped_by_clients) async def _get_table_grouped_by_marketplace(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_deals = self._get_common_table_grouped(request) stmt_grouped_by_clients = self._join_and_group_by_marketplaces(sub_grouped_by_deals) return await self._table_data_from_stmt(stmt_grouped_by_clients) async def get_profit_table_data(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: match request.group_table_by: case ProfitTableGroupBy.BY_DATES: return await self._get_table_grouped_by_dates(request) case ProfitTableGroupBy.BY_CLIENTS: return await self._get_table_grouped_by_clients(request) case ProfitTableGroupBy.BY_STATUSES: return await self._get_table_grouped_by_statuses(request) case ProfitTableGroupBy.BY_WAREHOUSES: return await self._get_table_grouped_by_warehouses(request) case ProfitTableGroupBy.BY_MARKETPLACES: return await self._get_table_grouped_by_marketplace(request) raise HTTPException(status_code=400, detail='Указана некорректная группировка')