feat: work shifts by QR codes

This commit is contained in:
2024-11-20 13:04:52 +04:00
parent 29211dd442
commit 9707faf60c
10 changed files with 349 additions and 0 deletions

114
services/work_shifts.py Normal file
View File

@@ -0,0 +1,114 @@
from datetime import datetime, date, timedelta
from sqlalchemy import select, delete
from sqlalchemy.orm import joinedload
from models import WorkShift, User
from schemas.time_tracking import UpdateTimeTrackingRecordRequest
from schemas.user import *
from schemas.work_shifts import StartShiftResponse, FinishShiftResponse, ActiveWorkShiftsResponse, DeleteShiftResponse, \
FinishShiftByIdResponse
from services.base import BaseService
from services.time_tracking import TimeTrackingService
class WorkShiftsService(BaseService):
async def _get_last_work_shift_for_today(self, user_id: int) -> Optional[WorkShift]:
stmt = (
select(WorkShift)
.where(WorkShift.user_id == user_id)
.order_by(WorkShift.started_at.desc())
.limit(1)
)
work_shift = await self.session.execute(stmt)
work_shift = work_shift.one_or_none()
work_shift = work_shift[0] if work_shift else None
if work_shift and work_shift.started_at.date() == date.today():
return work_shift
return None
async def start_shift(self, user_id: int) -> StartShiftResponse:
employee = await self.session.get(User, user_id)
if not employee:
return StartShiftResponse(ok=False, message=f"Пользователь с ID {user_id} не найден")
work_shift = await self._get_last_work_shift_for_today(user_id)
if work_shift:
if not work_shift.finished_at:
return StartShiftResponse(ok=False, message="Предыдущая смена еще не закончена")
return StartShiftResponse(ok=False, message="Смена для сотрудника на сегодня уже закончена")
work_shift = WorkShift(user_id=user_id, started_at=datetime.now())
self.session.add(work_shift)
await self.session.commit()
return StartShiftResponse(ok=True, message="Смена начата")
async def finish_shift(self, user: User, user_id: int) -> FinishShiftResponse:
employee = await self.session.get(User, user_id)
if not employee:
return FinishShiftResponse(ok=False, message=f"Пользователь с ID {user_id} не найден")
work_shift = await self._get_last_work_shift_for_today(user_id)
if not work_shift or work_shift.finished_at:
return FinishShiftResponse(ok=False, message="Смена для сотрудника еще не начата")
work_shift.finished_at = datetime.now()
await self.session.commit()
diff: timedelta = work_shift.finished_at - work_shift.started_at
hours = int(diff.total_seconds() // 3600)
data = UpdateTimeTrackingRecordRequest(
user_id=user_id,
date=work_shift.started_at.date(),
hours=hours,
)
await TimeTrackingService(self.session).update_record(user, data)
return FinishShiftResponse(ok=True, message=f"Смена закончена. Отработано {hours} ч.")
async def finish_shift_by_id(self, user: User, shift_id: int) -> FinishShiftByIdResponse:
work_shift = await self.session.get(WorkShift, shift_id)
if not work_shift or work_shift.finished_at:
return FinishShiftByIdResponse(ok=False, message="Смена для сотрудника еще не начата")
work_shift.finished_at = datetime.now()
await self.session.commit()
diff: timedelta = work_shift.finished_at - work_shift.started_at
hours = int(diff.total_seconds() // 3600)
data = UpdateTimeTrackingRecordRequest(
user_id=work_shift.user_id,
date=work_shift.started_at.date(),
hours=hours,
)
await TimeTrackingService(self.session).update_record(user, data)
return FinishShiftByIdResponse(ok=True, message=f"Смена закончена. Отработано {hours} ч.")
async def get_active_shifts(self) -> ActiveWorkShiftsResponse:
stmt = (
select(WorkShift)
.options(joinedload(WorkShift.user))
.where(WorkShift.finished_at == None)
.order_by(WorkShift.started_at.desc())
)
shifts = await self.session.execute(stmt)
shifts = shifts.scalars().all()
response = ActiveWorkShiftsResponse(shifts=shifts)
return response
async def delete_shift(self, shift_id: int) -> DeleteShiftResponse:
stmt = (
delete(WorkShift)
.where(WorkShift.id == shift_id)
)
await self.session.execute(stmt)
await self.session.commit()
return DeleteShiftResponse(ok=True, message="Запись о смене успешно удалена")