85 lines
3.1 KiB
Python
85 lines
3.1 KiB
Python
from collections import defaultdict
|
||
from typing import Optional
|
||
|
||
from sqlalchemy import select, and_
|
||
from sqlalchemy.orm import joinedload
|
||
|
||
from models import Position
|
||
from models.work_shifts import PlannedWorkShift
|
||
from schemas.work_shifts_planning import *
|
||
from services.base import BaseService
|
||
|
||
|
||
class WorkShiftsPlanningService(BaseService):
|
||
async def get_work_shifts(self, request: GetWorkShiftsPlanningDataRequest) -> GetPlannedWorkShiftsResponse:
|
||
users_shifts_ids = (
|
||
select(PlannedWorkShift)
|
||
.options(
|
||
joinedload(PlannedWorkShift.user),
|
||
)
|
||
.where(PlannedWorkShift.shift_date.between(request.date_from, request.date_to))
|
||
.order_by(PlannedWorkShift.shift_date)
|
||
)
|
||
shifts = (await self.session.scalars(users_shifts_ids)).all()
|
||
|
||
user_shifts_dict = defaultdict(list)
|
||
for shift in shifts:
|
||
user_shifts_dict[shift.user].append(shift)
|
||
|
||
result_shifts = []
|
||
for user, shifts in user_shifts_dict.items():
|
||
user_schema = UserSchema.model_validate(user)
|
||
shifts_schema = [PlannedWorkShiftSchema.model_validate(shift) for shift in shifts]
|
||
result_shifts.append(
|
||
PlanningTableRow(
|
||
user=user_schema,
|
||
shifts=shifts_schema,
|
||
)
|
||
)
|
||
|
||
return GetPlannedWorkShiftsResponse(shifts=result_shifts)
|
||
|
||
async def _get_work_shift(self, user_id: int, shift_date: date) -> Optional[PlannedWorkShift]:
|
||
stmt = (
|
||
select(PlannedWorkShift)
|
||
.where(
|
||
and_(
|
||
PlannedWorkShift.user_id == user_id,
|
||
PlannedWorkShift.shift_date == shift_date,
|
||
)
|
||
)
|
||
)
|
||
|
||
work_shift = await self.session.execute(stmt)
|
||
work_shift = work_shift.one_or_none()
|
||
return work_shift[0] if work_shift else None
|
||
|
||
async def update_work_shift(self, request: UpdatePlanningWorkShiftRequest) -> UpdatePlanningWorkShiftResponse:
|
||
work_shift = await self._get_work_shift(request.user_id, request.shift_date)
|
||
|
||
positions: list[Position] = []
|
||
for position_key in request.position_keys:
|
||
position: Optional[Position] = await self.session.get(Position, position_key)
|
||
if position:
|
||
positions.append(position)
|
||
else:
|
||
return UpdatePlanningWorkShiftResponse(ok=False, message=f"Должность с ID: {position_key} не найдена")
|
||
|
||
if not work_shift:
|
||
work_shift = PlannedWorkShift(
|
||
user_id=request.user_id,
|
||
shift_date=request.shift_date,
|
||
positions=positions,
|
||
created_at=datetime.now(),
|
||
)
|
||
self.session.add(work_shift)
|
||
else:
|
||
if len(positions) == 0:
|
||
await self.session.delete(work_shift)
|
||
else:
|
||
work_shift.positions = positions
|
||
|
||
await self.session.commit()
|
||
|
||
return UpdatePlanningWorkShiftResponse(ok=True, message="Данные о смене сохранены")
|