from datetime import date from sqlalchemy import select, func, Subquery, cast, CTE, case, or_, and_ from sqlalchemy.dialects.postgresql import TIMESTAMP from models import Transaction, transactions_transaction_tags, TransactionTag from schemas.statistics import CommonProfitFilters from services.base import BaseService from services.statistics.common import generate_date_range class TransactionsStatisticsService(BaseService): date_from: date date_to: date @staticmethod def _fill_date_gaps(expenses: Subquery, all_dates: CTE) -> Subquery: return ( select( all_dates.c.date, (all_dates.c.expenses + func.coalesce(expenses.c.expenses, 0)).label("expenses"), (all_dates.c.revenue + func.coalesce(expenses.c.revenue, 0)).label("revenue"), ) .join(expenses, all_dates.c.date == expenses.c.date, isouter=True) .order_by(all_dates.c.date) .subquery() ) def _get_additional_transactions_sub(self, income_tag_id: int, expense_tag_id: int) -> Subquery: all_dates = generate_date_range(self.date_from, self.date_to, ["expenses", "revenue"]) filtered_tags_sub = ( select(TransactionTag) .where( or_( and_(TransactionTag.is_income == True, or_(income_tag_id == -1, TransactionTag.id == income_tag_id)), and_(TransactionTag.is_income == False, or_(expense_tag_id == -1, TransactionTag.id == expense_tag_id)), ) ) .subquery() ) transaction_ids = ( select(transactions_transaction_tags.c.transaction_id) .join(filtered_tags_sub, filtered_tags_sub.c.id == transactions_transaction_tags.c.transaction_tag_id) .group_by(transactions_transaction_tags.c.transaction_id) .subquery() ) transactions = ( select(Transaction) .join(transaction_ids, transaction_ids.c.transaction_id == Transaction.id) .subquery() ) transactions = ( select( func.sum(case((transactions.c.is_income, transactions.c.amount), else_=0)).label("revenue"), func.sum(case((~transactions.c.is_income, transactions.c.amount), else_=0)).label("expenses"), cast(transactions.c.spent_date, TIMESTAMP(timezone=False)).label("date"), ) .where(transactions.c.spent_date.between(self.date_from, self.date_to)) .group_by("date") .subquery() ) expenses_with_filled_gaps = self._fill_date_gaps(transactions, all_dates) return expenses_with_filled_gaps @staticmethod def _apply_transactions(deals_by_dates: Subquery, transactions: Subquery): return ( select( deals_by_dates.c.date, deals_by_dates.c.deals_count, (deals_by_dates.c.revenue + transactions.c.revenue).label("revenue"), (func.coalesce(deals_by_dates.c.profit, 0) - func.coalesce(transactions.c.expenses, 0) + func.coalesce( transactions.c.revenue, 0)) .label("profit"), (deals_by_dates.c.expenses + transactions.c.expenses).label("expenses"), ) .join(transactions, transactions.c.date == deals_by_dates.c.date) ) def apply_transactions(self, filters: CommonProfitFilters, deals_by_dates: Subquery): self.date_from, self.date_to = filters.date_range additional_expenses = self._get_additional_transactions_sub(filters.income_tag_id, filters.expense_tag_id) deals_by_dates = self._apply_transactions(deals_by_dates, additional_expenses) return deals_by_dates