85 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			85 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from collections import defaultdict
 | 
						||
from typing import Optional
 | 
						||
 | 
						||
from sqlalchemy import select, and_
 | 
						||
from sqlalchemy.orm import joinedload
 | 
						||
 | 
						||
from models import Position
 | 
						||
from models.work_shifts import PlannedWorkShift
 | 
						||
from schemas.work_shifts_planning import *
 | 
						||
from services.base import BaseService
 | 
						||
 | 
						||
 | 
						||
class WorkShiftsPlanningService(BaseService):
 | 
						||
    async def get_work_shifts(self, request: GetWorkShiftsPlanningDataRequest) -> GetPlannedWorkShiftsResponse:
 | 
						||
        users_shifts_ids = (
 | 
						||
            select(PlannedWorkShift)
 | 
						||
            .options(
 | 
						||
                joinedload(PlannedWorkShift.user),
 | 
						||
            )
 | 
						||
            .where(PlannedWorkShift.shift_date.between(request.date_from, request.date_to))
 | 
						||
            .order_by(PlannedWorkShift.shift_date)
 | 
						||
        )
 | 
						||
        shifts = (await self.session.scalars(users_shifts_ids)).all()
 | 
						||
 | 
						||
        user_shifts_dict = defaultdict(list)
 | 
						||
        for shift in shifts:
 | 
						||
            user_shifts_dict[shift.user].append(shift)
 | 
						||
 | 
						||
        result_shifts = []
 | 
						||
        for user, shifts in user_shifts_dict.items():
 | 
						||
            user_schema = UserSchema.model_validate(user)
 | 
						||
            shifts_schema = [PlannedWorkShiftSchema.model_validate(shift) for shift in shifts]
 | 
						||
            result_shifts.append(
 | 
						||
                PlanningTableRow(
 | 
						||
                    user=user_schema,
 | 
						||
                    shifts=shifts_schema,
 | 
						||
                )
 | 
						||
            )
 | 
						||
 | 
						||
        return GetPlannedWorkShiftsResponse(shifts=result_shifts)
 | 
						||
 | 
						||
    async def _get_work_shift(self, user_id: int, shift_date: date) -> Optional[PlannedWorkShift]:
 | 
						||
        stmt = (
 | 
						||
            select(PlannedWorkShift)
 | 
						||
            .where(
 | 
						||
                and_(
 | 
						||
                    PlannedWorkShift.user_id == user_id,
 | 
						||
                    PlannedWorkShift.shift_date == shift_date,
 | 
						||
                )
 | 
						||
            )
 | 
						||
        )
 | 
						||
 | 
						||
        work_shift = await self.session.execute(stmt)
 | 
						||
        work_shift = work_shift.one_or_none()
 | 
						||
        return work_shift[0] if work_shift else None
 | 
						||
 | 
						||
    async def update_work_shift(self, request: UpdatePlanningWorkShiftRequest) -> UpdatePlanningWorkShiftResponse:
 | 
						||
        work_shift = await self._get_work_shift(request.user_id, request.shift_date)
 | 
						||
 | 
						||
        positions: list[Position] = []
 | 
						||
        for position_key in request.position_keys:
 | 
						||
            position: Optional[Position] = await self.session.get(Position, position_key)
 | 
						||
            if position:
 | 
						||
                positions.append(position)
 | 
						||
            else:
 | 
						||
                return UpdatePlanningWorkShiftResponse(ok=False, message=f"Должность с ID: {position_key} не найдена")
 | 
						||
 | 
						||
        if not work_shift:
 | 
						||
            work_shift = PlannedWorkShift(
 | 
						||
                user_id=request.user_id,
 | 
						||
                shift_date=request.shift_date,
 | 
						||
                positions=positions,
 | 
						||
                created_at=datetime.now(),
 | 
						||
            )
 | 
						||
            self.session.add(work_shift)
 | 
						||
        else:
 | 
						||
            if len(positions) == 0:
 | 
						||
                await self.session.delete(work_shift)
 | 
						||
            else:
 | 
						||
                work_shift.positions = positions
 | 
						||
 | 
						||
        await self.session.commit()
 | 
						||
 | 
						||
        return UpdatePlanningWorkShiftResponse(ok=True, message="Данные о смене сохранены")
 |