from fastapi import HTTPException from sqlalchemy import select, and_, union_all, func, Subquery, literal, or_ from enums.profit_table_group_by import ProfitTableGroupBy from models import CardService, Card, CardStatusHistory, CardProductService, CardProduct, Service, Client, \ ShippingWarehouse, BaseMarketplace, User, Project, Board, CardTag, cards_card_tags, user_position, CardStatus from schemas.statistics import GetProfitChartDataResponse, GetProfitChartDataRequest, ProfitChartDataItem, \ GetProfitTableDataResponse, GetProfitTableDataRequest, ProfitTableDataItem, CommonProfitFilters from services.base import BaseService from services.statistics.common import generate_date_range from services.statistics.expenses_statistics import PaymentStatisticsService from services.statistics.transactions_statistics import TransactionsStatisticsService class ProfitStatisticsService(BaseService): @staticmethod def _get_sub_status_history(): last_statuses = ( select( CardStatusHistory.card_id, func.max(CardStatusHistory.changed_at).label('changed_at') ) .group_by(CardStatusHistory.card_id) .subquery() ) return ( select( Card.id.label('card_id'), func.date_trunc( 'day', last_statuses.c.changed_at, ).label('date'), Card.current_status_id, ) .join(last_statuses, last_statuses.c.card_id == Card.id) .join(CardStatus, CardStatus.id == Card.current_status_id) .where( or_( CardStatus.is_finishing == True, Card.is_completed == True, ) ) .subquery() ) def _get_cards_dates(self): sub_status_history = ProfitStatisticsService._get_sub_status_history() return ( select(sub_status_history) .where(sub_status_history.c.date.between(self.date_from, self.date_to)) .subquery() ) @staticmethod def _to_schema(rows, is_chart: bool = True) -> ( list[ProfitChartDataItem] | list[ProfitTableDataItem] ): data = [] for row in rows: if is_chart: data_item = ProfitChartDataItem( date=row.date.date(), cards_count=row.cards_count, profit=row.profit, revenue=row.revenue, expenses=row.expenses, ) else: data_item = ProfitTableDataItem( grouped_value=row.date.date(), cards_count=row.cards_count, profit=row.profit, revenue=row.revenue, expenses=row.expenses, ) data.append(data_item) return data def _get_stmt_card_services(self, sub_filtered_status_history: Subquery): return ( select( Card.id.label("card_id"), func.date_trunc( "day", sub_filtered_status_history.c.date, ).label("date"), func.sum(CardService.price * CardService.quantity).label("revenue"), func.sum(CardService.price * CardService.quantity).label("profit"), ) .join(CardService, Card.id == CardService.card_id) .join(Service, CardService.service_id == Service.id) .join(sub_filtered_status_history, Card.id == sub_filtered_status_history.c.card_id) .where( Card.is_deleted == False, Card.is_services_profit_accounted == True, ) .group_by(Card.id, sub_filtered_status_history.c.date) ) @staticmethod def _board_ids_for_project(project_id: int, stmt): board_ids_stmt = ( select(Board.id) .where(Board.project_id == project_id) ) return stmt.where(Card.board_id.in_(board_ids_stmt)) @staticmethod def _apply_tag_filter(tag_id: int, stmt): sub_card_ids = ( select(cards_card_tags.c.card_id) .where(cards_card_tags.c.card_tag_id == tag_id) ) return stmt.where(Card.id.in_(sub_card_ids)) @staticmethod def _apply_filters(request: CommonProfitFilters, stmt_card_services, stmt_card_product_services): if request.client_id != -1: stmt_card_services = stmt_card_services.where(Card.client_id == request.client_id) stmt_card_product_services = stmt_card_product_services.where(Card.client_id == request.client_id) if request.base_marketplace_key != "all": stmt_card_services = stmt_card_services.where(Card.base_marketplace_key == request.base_marketplace_key) stmt_card_product_services = stmt_card_product_services.where( Card.base_marketplace_key == request.base_marketplace_key) if request.project_id != -1: stmt_card_services = ProfitStatisticsService._board_ids_for_project(request.project_id, stmt_card_services) stmt_card_product_services = ProfitStatisticsService._board_ids_for_project( request.project_id, stmt_card_product_services, ) if request.board_id != -1: stmt_card_services = stmt_card_services.where(Card.board_id == request.board_id) stmt_card_product_services = stmt_card_product_services.where(Card.board_id == request.board_id) if request.card_tag_id != -1: stmt_card_services = ProfitStatisticsService._apply_tag_filter(request.card_tag_id, stmt_card_services) stmt_card_product_services = ProfitStatisticsService._apply_tag_filter( request.card_tag_id, stmt_card_product_services ) if request.manager_id != -1: stmt_card_services = stmt_card_services.where(Card.manager_id == request.manager_id) stmt_card_product_services = stmt_card_product_services.where(Card.manager_id == request.manager_id) return stmt_card_services, stmt_card_product_services def _get_stmt_product_services(self): return ( select( Card.id.label("card_id"), func.sum(CardProductService.price * CardProduct.quantity).label("revenue"), func.sum(CardProductService.price * CardProduct.quantity).label("profit"), ) .join(CardProduct, Card.id == CardProduct.card_id) .join( CardProductService, and_( CardProductService.card_id == Card.id, CardProductService.product_id == CardProduct.product_id, ) ) .join(Service, CardProductService.service_id == Service.id) .where( and_( Card.is_deleted == False, Card.is_services_profit_accounted == True, ) ) .group_by(Card.id) ) @staticmethod def _get_joined_cards_and_statuses(sub_card_product_services: Subquery, sub_cards_dates: Subquery): return ( select( sub_card_product_services.c.card_id, func.date_trunc( "day", sub_cards_dates.c.date ).label("date"), sub_card_product_services.c.revenue.label("revenue"), sub_card_product_services.c.profit.label("profit"), ) .join(sub_cards_dates, sub_card_product_services.c.card_id == sub_cards_dates.c.card_id) ) def _group_by_date(self, stmt): all_dates = generate_date_range(self.date_from, self.date_to, ["cards_count", "revenue", "profit", "expenses"]) cards = ( select( stmt.c.date, func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .group_by(stmt.c.date) .subquery() ) cards_with_filled_gaps = ( select( all_dates.c.date, (all_dates.c.cards_count + func.coalesce(cards.c.cards_count, 0)).label("cards_count"), (all_dates.c.revenue + func.coalesce(cards.c.revenue, 0)).label("revenue"), (all_dates.c.profit + func.coalesce(cards.c.profit, 0)).label("profit"), literal(0).label("expenses"), ) .join(cards, all_dates.c.date == cards.c.date, isouter=True) .order_by(all_dates.c.date.asc()) ) return cards_with_filled_gaps @staticmethod def _group_by_cards(stmt_union: Subquery): return ( select( stmt_union.c.card_id, stmt_union.c.date, func.sum(stmt_union.c.profit).label("profit"), func.sum(stmt_union.c.revenue).label("revenue"), ) .group_by(stmt_union.c.card_id, stmt_union.c.date) ) @staticmethod def _join_and_group_by_clients(stmt): return ( select( Client.id, Client.name.label("grouped_value"), func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Card, Card.id == stmt.c.card_id) .join(Client, Client.id == Card.client_id) .group_by(Client.id, Client.name) ) @staticmethod def _join_and_group_by_projects(stmt): return ( select( Project.id, Project.name.label("grouped_value"), func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Card, Card.id == stmt.c.card_id) .join(Board, Board.id == Card.board_id) .join(Project, Project.id == Board.project_id) .group_by(Project.id, Project.name) ) @staticmethod def _join_and_group_by_boards(stmt): return ( select( Board.id, Board.name.label("grouped_value"), func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Card, Card.id == stmt.c.card_id) .join(Board, Board.id == Card.board_id) .group_by(Board.id, Board.name) ) @staticmethod def _join_and_group_by_statuses(stmt): return ( select( Card.current_status_id.label("grouped_value"), func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Card, Card.id == stmt.c.card_id) .group_by(Card.current_status_id) ) @staticmethod def _join_and_group_by_warehouses(stmt): return ( select( ShippingWarehouse.id, ShippingWarehouse.name.label("grouped_value"), ShippingWarehouse.is_deleted, func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Card, Card.id == stmt.c.card_id) .join(ShippingWarehouse, Card.shipping_warehouse_id == ShippingWarehouse.id) .where(ShippingWarehouse.is_deleted == False) .group_by(ShippingWarehouse.is_deleted, ShippingWarehouse.id, ShippingWarehouse.name) ) @staticmethod def _join_and_group_by_marketplaces(stmt): return ( select( BaseMarketplace.name.label("grouped_value"), func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Card, Card.id == stmt.c.card_id) .join(BaseMarketplace, Card.base_marketplace_key == BaseMarketplace.key) .group_by(BaseMarketplace.name) ) @staticmethod def _join_and_group_by_managers(stmt): managers = ( select(User) .join(user_position) .where(and_(User.is_deleted == False, user_position.c.position_key == "sales_manager")) .subquery() ) return ( select( managers.c.id, (managers.c.first_name + " " + managers.c.second_name).label("grouped_value"), func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(Card, Card.id == stmt.c.card_id) .join(managers, managers.c.id == Card.manager_id) .group_by(managers.c.id, "grouped_value") ) @staticmethod def _join_and_group_by_tags(stmt): return ( select( CardTag.id, CardTag.name.label("grouped_value"), CardTag.is_deleted, func.count(stmt.c.card_id).label("cards_count"), func.sum(stmt.c.revenue).label("revenue"), func.sum(stmt.c.profit).label("profit"), ) .join(cards_card_tags, cards_card_tags.c.card_id == stmt.c.card_id) .join(CardTag, cards_card_tags.c.card_tag_id == CardTag.id) .where(CardTag.is_deleted == False) .group_by(CardTag.is_deleted, CardTag.id, CardTag.name) ) async def _get_data_rows_grouped_by_date( self, stmt_card_services, stmt_card_product_services, sub_cards_dates: Subquery, ): sub_card_product_services = stmt_card_product_services.subquery() stmt_join_cards_statuses = self._get_joined_cards_and_statuses(sub_card_product_services, sub_cards_dates) sub_union = union_all(stmt_card_services, stmt_join_cards_statuses).subquery() sub_grouped_by_cards = self._group_by_cards(sub_union) sub_cards_grouped_by_date = self._group_by_date(sub_grouped_by_cards).subquery() expenses_statistics_service = PaymentStatisticsService(self.session) stmt_cards_applied_expenses = expenses_statistics_service.apply_payments( self.filters, sub_cards_grouped_by_date ) transactions_statistics_service = TransactionsStatisticsService(self.session) stmt_cards_applied_transactions = transactions_statistics_service.apply_transactions( self.filters, stmt_cards_applied_expenses.subquery() ) result = await self.session.execute(stmt_cards_applied_transactions) rows = result.all() return rows async def _get_data_grouped_by_date(self, request: CommonProfitFilters, is_chart: bool = True): self.date_from, self.date_to = request.date_range self.filters = request sub_cards_dates = self._get_cards_dates() stmt_card_services = self._get_stmt_card_services(sub_cards_dates) stmt_card_product_services = self._get_stmt_product_services() stmt_card_services, stmt_card_product_services = self._apply_filters( request, stmt_card_services, stmt_card_product_services ) rows = await self._get_data_rows_grouped_by_date( stmt_card_services, stmt_card_product_services, sub_cards_dates ) return self._to_schema(rows, is_chart) async def get_profit_chart_data(self, request: GetProfitChartDataRequest) -> GetProfitChartDataResponse: data = await self._get_data_grouped_by_date(request) return GetProfitChartDataResponse(data=data) async def _get_table_grouped_by_dates(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: data = await self._get_data_grouped_by_date(request, False) return GetProfitTableDataResponse(data=data) async def _table_data_from_stmt(self, stmt) -> GetProfitTableDataResponse: result = await self.session.execute(stmt) rows = result.all() data = [] for row in rows: data.append(ProfitTableDataItem( grouped_value=row.grouped_value, revenue=row.revenue, profit=row.profit, cards_count=row.cards_count, )) return GetProfitTableDataResponse(data=data) def _get_common_table_grouped(self, request: GetProfitTableDataRequest): self.date_from, self.date_to = request.date_range self.filters = request sub_cards_dates = self._get_cards_dates() stmt_card_services = self._get_stmt_card_services(sub_cards_dates) stmt_card_product_services = self._get_stmt_product_services() stmt_card_product_services = ( select( stmt_card_product_services.c.card_id, func.date_trunc( "day", Card.created_at ).label("date"), stmt_card_product_services.c.revenue.label("revenue"), stmt_card_product_services.c.profit.label("profit"), ) .join(Card, Card.id == stmt_card_product_services.c.card_id) ) stmt_card_services, stmt_card_product_services = self._apply_filters( request, stmt_card_services, stmt_card_product_services ) sub_union = union_all(stmt_card_services, stmt_card_product_services).subquery() sub_grouped_by_cards = self._group_by_cards(sub_union) return sub_grouped_by_cards async def _get_table_grouped_by_clients(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_cards = self._get_common_table_grouped(request) stmt_grouped_by_clients = self._join_and_group_by_clients(sub_grouped_by_cards) return await self._table_data_from_stmt(stmt_grouped_by_clients) async def _get_table_grouped_by_projects(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_cards = self._get_common_table_grouped(request) stmt_grouped_by_projects = self._join_and_group_by_projects(sub_grouped_by_cards) return await self._table_data_from_stmt(stmt_grouped_by_projects) async def _get_table_grouped_by_boards(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_cards = self._get_common_table_grouped(request) stmt_grouped_by_boards = self._join_and_group_by_boards(sub_grouped_by_cards) return await self._table_data_from_stmt(stmt_grouped_by_boards) async def _get_table_grouped_by_statuses(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: self.date_from, self.date_to = request.date_range sub_cards_dates = self._get_sub_status_history() stmt_card_services = self._get_stmt_card_services(sub_cards_dates) stmt_card_product_services = self._get_stmt_product_services() stmt_card_services, stmt_card_product_services = self._apply_filters( request, stmt_card_services, stmt_card_product_services ) stmt_join_cards_statuses = self._get_joined_cards_and_statuses( stmt_card_product_services.subquery(), sub_cards_dates ) sub_union = union_all(stmt_card_services, stmt_join_cards_statuses).subquery() sub_grouped_by_cards = self._group_by_cards(sub_union) stmt_grouped_by_date = self._join_and_group_by_statuses(sub_grouped_by_cards) return await self._table_data_from_stmt(stmt_grouped_by_date) async def _get_table_grouped_by_warehouses(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_cards = self._get_common_table_grouped(request) stmt_grouped_by_warehouses = self._join_and_group_by_warehouses(sub_grouped_by_cards) return await self._table_data_from_stmt(stmt_grouped_by_warehouses) async def _get_table_grouped_by_marketplace(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_cards = self._get_common_table_grouped(request) stmt_grouped_by_marketplaces = self._join_and_group_by_marketplaces(sub_grouped_by_cards) return await self._table_data_from_stmt(stmt_grouped_by_marketplaces) async def _get_table_grouped_by_managers(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_cards = self._get_common_table_grouped(request) stmt_grouped_by_managers = self._join_and_group_by_managers(sub_grouped_by_cards) return await self._table_data_from_stmt(stmt_grouped_by_managers) async def _get_table_grouped_by_tags(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: sub_grouped_by_cards = self._get_common_table_grouped(request) stmt_grouped_by_tags = self._join_and_group_by_tags(sub_grouped_by_cards) return await self._table_data_from_stmt(stmt_grouped_by_tags) async def get_profit_table_data(self, request: GetProfitTableDataRequest) -> GetProfitTableDataResponse: match request.group_table_by: case ProfitTableGroupBy.BY_DATES: return await self._get_table_grouped_by_dates(request) case ProfitTableGroupBy.BY_CLIENTS: return await self._get_table_grouped_by_clients(request) case ProfitTableGroupBy.BY_PROJECTS: return await self._get_table_grouped_by_projects(request) case ProfitTableGroupBy.BY_BOARDS: return await self._get_table_grouped_by_boards(request) case ProfitTableGroupBy.BY_STATUSES: return await self._get_table_grouped_by_statuses(request) case ProfitTableGroupBy.BY_WAREHOUSES: return await self._get_table_grouped_by_warehouses(request) case ProfitTableGroupBy.BY_MARKETPLACES: return await self._get_table_grouped_by_marketplace(request) case ProfitTableGroupBy.BY_MANAGERS: return await self._get_table_grouped_by_managers(request) case ProfitTableGroupBy.BY_TAGS: return await self._get_table_grouped_by_tags(request) raise HTTPException(status_code=400, detail='Указана некорректная группировка')