Files
Fulfillment-Backend/services/time_tracking.py

123 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from collections import defaultdict
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from models import PaymentRecord, User
from schemas.time_tracking import *
from services.base import BaseService
from services.payroll import PayrollService
from services.user import UserService
class TimeTrackingService(BaseService):
async def get_records(self, request: GetTimeTrackingRecordsRequest) -> GetTimeTrackingRecordsResponse:
stmt = (
select(
PaymentRecord,
)
.options(
joinedload(
PaymentRecord.user
)
)
.join(
User,
PaymentRecord.user_id == User.id
)
.where(
PaymentRecord.start_date.between(request.date_from, request.date_to),
PaymentRecord.start_date == PaymentRecord.end_date,
User.is_deleted == False
)
.order_by(User.id)
)
query_result = (await self.session.scalars(stmt)).all()
records_dict = defaultdict(list)
users_dict = {}
amount_dict = defaultdict(list)
for payment_record in query_result:
user = UserSchema.model_validate(payment_record.user)
data = TimeTrackingData(
date=payment_record.start_date,
hours=payment_record.work_units,
amount=payment_record.amount
)
users_dict[user.id] = user
records_dict[user.id].append(data)
amount_dict[user.id].append(payment_record.amount)
records = []
for user_id, data_list in records_dict.items():
amount = sum(amount_dict[user_id])
user = users_dict[user_id]
record = TimeTrackingRecord(
user=user,
data=data_list,
total_amount=amount
)
records.append(record)
return GetTimeTrackingRecordsResponse(records=records)
async def update_work_record(
self,
user: User,
request: UpdateTimeTrackingRecordRequest,
commit: bool = True,
) -> tuple[bool, str]:
record_user = await UserService(self.session).get_by_id(user_id=request.user_id)
if not record_user:
return False, "Указанный пользователь не найден!"
if not record_user.pay_rate:
return False, "У пользователя не указана схема оплаты!"
existing_record_stmt = (
select(
PaymentRecord
)
.where(
PaymentRecord.user_id == request.user_id,
PaymentRecord.start_date == request.date,
PaymentRecord.end_date == request.date,
)
)
amount = (
PayrollService(
self.session
)
.get_amount(
user=record_user,
work_units=request.hours
)
)
existing_record = await self.session.scalar(existing_record_stmt)
if existing_record:
existing_record: PaymentRecord
existing_record.work_units = request.hours
existing_record.amount = amount
else:
new_record = PaymentRecord(
user_id=request.user_id,
created_by_user_id=user.id,
start_date=request.date,
end_date=request.date,
created_at=datetime.datetime.now(),
payroll_scheme_key=record_user.pay_rate.payroll_scheme_key,
amount=amount,
work_units=request.hours
)
self.session.add(new_record)
if commit:
await self.session.commit()
return True, "Запись успешно добавлена"
async def update_record(
self,
user: User,
request: UpdateTimeTrackingRecordRequest
) -> UpdateTimeTrackingRecordResponse:
try:
ok, message = await self.update_work_record(user, request)
return UpdateTimeTrackingRecordResponse(ok=ok, message=message)
except Exception as e:
return UpdateTimeTrackingRecordResponse(ok=False, message=str(e))