Files
Fulfillment-Backend/services/time_tracking.py
2024-08-03 05:39:05 +03:00

112 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from collections import defaultdict
from sqlalchemy import select, func
from sqlalchemy.orm import joinedload
from models import PaymentRecord, User
from schemas.time_tracking import *
from services.base import BaseService
from services.payroll import PayrollService
from services.user import UserService
class TimeTrackingService(BaseService):
async def get_records(self, request: GetTimeTrackingRecordsRequest) -> GetTimeTrackingRecordsResponse:
stmt = (
select(
PaymentRecord,
)
.options(
joinedload(
PaymentRecord.user
)
)
.where(
func.date(func.date_trunc('month', PaymentRecord.start_date)) == request.date,
func.date(func.date_trunc('month', PaymentRecord.end_date)) == request.date,
PaymentRecord.start_date == PaymentRecord.end_date,
# PaymentRecord.user_id.in_(request.user_ids)
)
)
query_result = (await self.session.scalars(stmt)).all()
records_dict = defaultdict(list)
users_dict = {}
amount_dict = defaultdict(list)
for payment_record in query_result:
user = UserSchema.model_validate(payment_record.user)
data = TimeTrackingData(
date=payment_record.start_date,
hours=payment_record.work_units,
amount=payment_record.amount
)
users_dict[user.id] = user
records_dict[user.id].append(data)
amount_dict[user.id].append(payment_record.amount)
records = []
for user_id, data_list in records_dict.items():
amount = sum(amount_dict[user_id])
user = users_dict[user_id]
record = TimeTrackingRecord(
user=user,
data=data_list,
total_amount=amount
)
records.append(
record
)
return GetTimeTrackingRecordsResponse(
records=records
)
async def update_record(self,
user: User,
request: UpdateTimeTrackingRecordRequest
) -> UpdateTimeTrackingRecordResponse:
try:
record_user = await UserService(self.session).get_by_id(user_id=request.user_id)
if not record_user:
return UpdateTimeTrackingRecordResponse(ok=False, message="Указанный пользователь не найден!")
if not record_user.pay_rate:
return UpdateTimeTrackingRecordResponse(ok=False, message="У пользователя не указана схема оплаты!")
existing_record_stmt = (
select(
PaymentRecord
)
.where(
PaymentRecord.user_id == request.user_id,
PaymentRecord.start_date == request.date,
PaymentRecord.end_date == request.date,
)
)
amount = (
PayrollService(
self.session
)
.get_amount(
user=record_user,
work_units=request.hours
)
)
existing_record = await self.session.scalar(existing_record_stmt)
if existing_record:
existing_record: PaymentRecord
existing_record.work_units = request.hours
existing_record.amount = amount
else:
new_record = PaymentRecord(
user_id=request.user_id,
created_by_user_id=user.id,
start_date=request.date,
end_date=request.date,
created_at=datetime.datetime.now(),
payroll_scheme_key=record_user.pay_rate.payroll_scheme_key,
amount=amount,
work_units=request.hours
)
self.session.add(new_record)
await self.session.commit()
return UpdateTimeTrackingRecordResponse(ok=True, message="Запись успешно обновлена")
except Exception as e:
return UpdateTimeTrackingRecordResponse(ok=False, message=str(e))