Files
Fulfillment-Backend/services/expenses.py
2024-11-26 01:36:59 +04:00

86 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
from typing import Optional
import math
from fastapi import HTTPException
from sqlalchemy import delete, select, func
from fastapi import status
from models import User
from models.expense import Expense
from schemas.base import PaginationSchema, PaginationInfoSchema
from schemas.expense import UpdateExpenseResponse, UpdateExpenseRequest, DeleteExpenseResponse, GetAllExpensesResponse
from services.base import BaseService
from utils.dependecies import is_valid_pagination
class ExpensesService(BaseService):
async def get_all(self, pagination: PaginationSchema) -> GetAllExpensesResponse:
if not is_valid_pagination(pagination):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid pagination')
page = max(0, pagination.page - 1)
stmt = (
select(Expense)
.order_by(Expense.spent_date.desc())
.offset(page * pagination.items_per_page)
.limit(pagination.items_per_page)
)
total_records = await self.session.scalar(select(func.count()).select_from(Expense))
if not total_records:
return GetAllExpensesResponse(
expenses=[],
pagination_info=PaginationInfoSchema()
)
total_items = total_records
total_pages = math.ceil(total_records / pagination.items_per_page)
expenses = await self.session.execute(stmt)
expenses = expenses.scalars().all()
response = GetAllExpensesResponse(
expenses=expenses,
pagination_info=PaginationInfoSchema(
total_items=total_items,
total_pages=total_pages
)
)
return response
async def get_by_id(self, expense_id) -> Optional[Expense]:
expense = await self.session.get(Expense, expense_id)
return expense
async def update_expense(self, user: User, request: UpdateExpenseRequest) -> UpdateExpenseResponse:
expense = await self.get_by_id(request.expense.id)
if not expense:
expense = Expense(
created_at=datetime.now(),
name=request.expense.name,
comment=request.expense.comment,
amount=request.expense.amount,
spent_date=request.expense.spent_date,
created_by_user_id=user.id,
)
self.session.add(expense)
await self.session.commit()
return UpdateExpenseResponse(ok=True, message='Запись о расходах успешно создана')
expense.name = request.expense.name
expense.amount = request.expense.amount
expense.comment = request.expense.comment
expense.spent_date = request.expense.spent_date
self.session.add(expense)
await self.session.commit()
return UpdateExpenseResponse(ok=True, message='Запись о расходах успешно изменена')
async def delete_expense(self, expense_id) -> DeleteExpenseResponse:
stmt = (
delete(Expense)
.where(Expense.id == expense_id)
)
await self.session.execute(stmt)
await self.session.commit()
return DeleteExpenseResponse(ok=True, message='Запись о расходах успешно удалена')