Files
Fulfillment-Backend/services/work_shifts.py

118 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, date, timedelta
from sqlalchemy import select, delete
from sqlalchemy.orm import joinedload
from models import WorkShift, User
from schemas.time_tracking import UpdateTimeTrackingRecordRequest
from schemas.user import *
from schemas.work_shifts import StartShiftResponse, FinishShiftResponse, ActiveWorkShiftsResponse, DeleteShiftResponse, \
FinishShiftByIdResponse
from services.base import BaseService
from services.time_tracking import TimeTrackingService
from utils.work_time import hours_to_hours_and_minutes
class WorkShiftsService(BaseService):
async def _get_last_work_shift_for_today(self, user_id: int) -> Optional[WorkShift]:
stmt = (
select(WorkShift)
.where(WorkShift.user_id == user_id)
.order_by(WorkShift.started_at.desc())
.limit(1)
)
work_shift = await self.session.execute(stmt)
work_shift = work_shift.one_or_none()
work_shift = work_shift[0] if work_shift else None
if work_shift and work_shift.started_at.date() == date.today():
return work_shift
return None
async def start_shift(self, user_id: int) -> StartShiftResponse:
employee = await self.session.get(User, user_id)
if not employee or employee.is_deleted:
return StartShiftResponse(ok=False, message=f"Пользователь с ID {user_id} не найден")
work_shift = await self._get_last_work_shift_for_today(user_id)
if work_shift:
if not work_shift.finished_at:
return StartShiftResponse(ok=False, message="Предыдущая смена еще не закончена")
return StartShiftResponse(ok=False, message="Смена для сотрудника на сегодня уже закончена")
work_shift = WorkShift(user_id=user_id, started_at=datetime.now())
self.session.add(work_shift)
await self.session.commit()
return StartShiftResponse(ok=True, message="Смена начата")
async def finish_shift(self, user: User, user_id: int) -> FinishShiftResponse:
employee = await self.session.get(User, user_id)
if not employee or employee.is_deleted:
return FinishShiftResponse(ok=False, message=f"Пользователь с ID {user_id} не найден")
work_shift = await self._get_last_work_shift_for_today(user_id)
if not work_shift or work_shift.finished_at:
return FinishShiftResponse(ok=False, message="Смена для сотрудника еще не начата")
work_shift.finished_at = datetime.now()
await self.session.commit()
diff: timedelta = work_shift.finished_at - work_shift.started_at
hours = diff.total_seconds() / 3600
data = UpdateTimeTrackingRecordRequest(
user_id=user_id,
date=work_shift.started_at.date(),
hours=hours,
)
await TimeTrackingService(self.session).update_record(user, data)
hours, minutes = hours_to_hours_and_minutes(hours)
return FinishShiftResponse(ok=True, message=f"Смена закончена. Отработано {hours} ч. {minutes} мин.")
async def finish_shift_by_id(self, user: User, shift_id: int) -> FinishShiftByIdResponse:
work_shift = await self.session.get(WorkShift, shift_id)
if not work_shift or work_shift.finished_at:
return FinishShiftByIdResponse(ok=False, message="Смена для сотрудника еще не начата")
work_shift.finished_at = datetime.now()
await self.session.commit()
diff: timedelta = work_shift.finished_at - work_shift.started_at
hours = diff.total_seconds() / 3600
data = UpdateTimeTrackingRecordRequest(
user_id=work_shift.user_id,
date=work_shift.started_at.date(),
hours=hours,
)
await TimeTrackingService(self.session).update_record(user, data)
hours, minutes = hours_to_hours_and_minutes(hours)
return FinishShiftByIdResponse(ok=True, message=f"Смена закончена. Отработано {hours} ч. {minutes} мин.")
async def get_active_shifts(self) -> ActiveWorkShiftsResponse:
stmt = (
select(WorkShift)
.options(joinedload(WorkShift.user))
.where(WorkShift.finished_at == None)
.order_by(WorkShift.started_at.desc())
)
shifts = await self.session.execute(stmt)
shifts = shifts.scalars().all()
response = ActiveWorkShiftsResponse(shifts=shifts)
return response
async def delete_shift(self, shift_id: int) -> DeleteShiftResponse:
stmt = (
delete(WorkShift)
.where(WorkShift.id == shift_id)
)
await self.session.execute(stmt)
await self.session.commit()
return DeleteShiftResponse(ok=True, message="Запись о смене успешно удалена")