from datetime import datetime import math from fastapi import HTTPException from fastapi import status from sqlalchemy import delete, select, func, update, insert from models import User from models.expense import Expense, ExpenseTag from schemas.base import PaginationSchema from schemas.expense import * from services.base import BaseService from utils.dependecies import is_valid_pagination class ExpensesService(BaseService): async def get_all(self, pagination: PaginationSchema) -> GetAllExpensesResponse: if not is_valid_pagination(pagination): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid pagination') page = max(0, pagination.page - 1) stmt = ( select(Expense) .order_by(Expense.spent_date.desc()) .offset(page * pagination.items_per_page) .limit(pagination.items_per_page) ) total_records = await self.session.scalar(select(func.count()).select_from(Expense)) if not total_records: return GetAllExpensesResponse( expenses=[], pagination_info=PaginationInfoSchema() ) total_items = total_records total_pages = math.ceil(total_records / pagination.items_per_page) expenses = await self.session.execute(stmt) expenses = expenses.scalars().all() response = GetAllExpensesResponse( expenses=expenses, pagination_info=PaginationInfoSchema( total_items=total_items, total_pages=total_pages ) ) return response async def get_by_id(self, expense_id: int) -> Optional[Expense]: expense = await self.session.get(Expense, expense_id) return expense async def add_tags(self, expense: Expense, tag_names: list[str]): tags = [] for tag_name in tag_names: existing_tag = await self.get_tag_by_name(tag_name) if existing_tag: tags.append(existing_tag) else: tag = ExpenseTag(name=tag_name) self.session.add(tag) tags.append(tag) expense.tags = tags async def update_expense(self, user: User, request: UpdateExpenseRequest) -> UpdateExpenseResponse: expense = await self.get_by_id(request.expense.id or -1) if not expense: new_expense = Expense( created_at=datetime.datetime.now(), name=request.expense.name, comment=request.expense.comment, amount=request.expense.amount, spent_date=request.expense.spent_date, created_by_user_id=user.id, ) self.session.add(new_expense) await self.add_tags(new_expense, request.expense.tags) await self.session.commit() return UpdateExpenseResponse(ok=True, message='Запись о расходах успешно создана') expense.name = request.expense.name expense.amount = request.expense.amount expense.comment = request.expense.comment expense.spent_date = request.expense.spent_date self.session.add(expense) await self.add_tags(expense, request.expense.tags) await self.session.commit() return UpdateExpenseResponse(ok=True, message='Запись о расходах успешно изменена') async def delete_expense(self, expense_id) -> DeleteExpenseResponse: stmt = ( delete(Expense) .where(Expense.id == expense_id) ) await self.session.execute(stmt) await self.session.commit() return DeleteExpenseResponse(ok=True, message='Запись о расходах успешно удалена') async def get_all_tags(self) -> GetAllExpenseTagsResponse: stmt = ( select(ExpenseTag) .order_by(ExpenseTag.id) ) tags = await self.session.execute(stmt) return GetAllExpenseTagsResponse(tags=tags.scalars().all()) async def get_tag_by_id(self, expense_tag_id: int) -> Optional[ExpenseTag]: return await self.session.get(ExpenseTag, expense_tag_id) async def get_tag_by_name(self, expense_tag_name: str) -> Optional[ExpenseTag]: stmt = ( select(ExpenseTag) .where(ExpenseTag.name == expense_tag_name) ) tag = await self.session.scalar(stmt) return tag async def create_tag(self, request: CreateExpenseTagRequest) -> CreateExpenseTagResponse: tag = await self.get_tag_by_name(request.tag.name) if tag: return UpdateExpenseResponse(ok=False, message='Ошибка. Такой тег уже есть.') tag_dict = request.tag.model_dump() stmt = ( insert(ExpenseTag) .values(**tag_dict) ) await self.session.execute(stmt) await self.session.commit() return UpdateExpenseResponse(ok=True, message='Тег успешно создан.') async def update_tag(self, request: UpdateExpenseTagRequest) -> UpdateExpenseTagResponse: tag = await self.get_tag_by_name(request.tag.name) if tag: return UpdateExpenseTagResponse(ok=False, message='Ошибка. Тег с таким названием уже есть.') tag = await self.get_tag_by_id(request.tag.id) if not tag: return UpdateExpenseTagResponse(ok=False, message='Ошибка. Тег не найден.') tag_dict = request.tag.model_dump() del tag_dict['id'] stmt = ( update(ExpenseTag) .where(ExpenseTag.id == request.tag.id) .values(**tag_dict) ) await self.session.execute(stmt) await self.session.commit() return UpdateExpenseResponse(ok=True, message='Тег успешно изменен.') async def delete_tag(self, tag_id: int) -> DeleteExpenseTagResponse: tag = await self.get_tag_by_id(tag_id) if not tag: return DeleteExpenseTagResponse(ok=False, message='Ошибка. Тег не найден.') if len(tag.expenses) > 0: return DeleteExpenseTagResponse(ok=False, message='Ошибка. Тег прикреплен к записи о расходах.') stmt = ( delete(ExpenseTag) .where(ExpenseTag.id == tag_id) ) await self.session.execute(stmt) await self.session.commit() return DeleteExpenseTagResponse(ok=True, message='Тег удален')