from sqlalchemy import select, update, delete, insert, and_ from backend import config from external.s3_uploader.uploader import S3Uploader from models import User, user_position, user_pay_rate, PassportImage from services.base import BaseService from schemas.user import * class UserService(BaseService): async def get_all(self) -> GetAllUsersResponse: stmt = ( select(User) .order_by(User.id.desc()) .where(User.is_deleted == False) ) users = (await self.session.scalars(stmt)).all() users_schemas = [UserSchema.model_validate(user) for user in users] return GetAllUsersResponse(users=users_schemas) async def get_managers(self) -> GetManagersResponse: stmt = ( select(User) .join(user_position) .where(and_(User.is_deleted == False, user_position.c.position_key == "sales_manager")) .order_by(User.id.desc()) ) users = (await self.session.scalars(stmt)).all() users_schemas = [UserSchema.model_validate(user) for user in users] return GetManagersResponse(managers=users_schemas) async def get_by_id(self, user_id: int) -> Optional[User]: return await self.session.scalar(select(User).where(User.id == user_id)) async def create(self, request: CreateUserRequest) -> CreateUserResponse: try: base_fields = request.data.model_dump_parent() del base_fields['pay_rate'] del base_fields['passport_image_url'] user = User(**base_fields) self.session.add(user) await self.session.flush() if request.data.position_key: stmt = insert(user_position).values(**{ 'user_id': user.id, 'position_key': request.data.position_key }) await self.session.execute(stmt) if request.data.pay_rate: stmt = insert(user_pay_rate).values(**{ 'user_id': user.id, 'pay_rate_id': request.data.pay_rate.id }) await self.session.execute(stmt) await self.session.commit() return CreateUserResponse(ok=True, message='Пользователь успешно создан') except Exception as e: return CreateUserResponse(ok=False, message=str(e)) async def update(self, request: UpdateUserRequest) -> UpdateUserResponse: try: if not await self.get_by_id(request.data.id): return UpdateUserResponse(ok=False, message='Указанный пользователь не найден') base_fields = request.data.model_dump_parent() del base_fields['pay_rate'] del base_fields['passport_image_url'] del base_fields['passport_images'] stmt = update(User).values(**base_fields).where(User.id == request.data.id) await self.session.execute(stmt) await self.session.flush() # Deleting previous position stmt = delete(user_position).where(user_position.c.user_id == request.data.id) await self.session.execute(stmt) await self.session.flush() # Deleting previous pay rate stmt = delete(user_pay_rate).where(user_pay_rate.c.user_id == request.data.id) await self.session.execute(stmt) await self.session.flush() if request.data.position_key: stmt = insert(user_position).values(**{ 'user_id': request.data.id, 'position_key': request.data.position_key }) await self.session.execute(stmt) if request.data.pay_rate: stmt = insert(user_pay_rate).values(**{ 'user_id': request.data.id, 'pay_rate_id': request.data.pay_rate.id }) await self.session.execute(stmt) await self.session.commit() return UpdateUserResponse(ok=True, message='Пользователь успешно обновлен') except Exception as e: return UpdateUserResponse(ok=False, message=str(e)) async def upload_passport_image(self, user_id: int, file_bytes: bytes) -> UploadPassportImageResponse: try: user: Optional[User] = await self.session.get(User, user_id) if not user: raise Exception("Не удалось пользователя с указанным ID") # removing previous images for image in user.passport_images: await self.session.delete(image) s3_uploader = S3Uploader(config.S3_API_KEY) response = await s3_uploader.upload(file_bytes) response_url = response.get('link') if not response_url: raise Exception("Не удалось загрузить изображение") passport_image = PassportImage( user_id=user_id, image_url=response_url, ) self.session.add(passport_image) await self.session.commit() return UploadPassportImageResponse( ok=True, message='Изображение успешно загружено', image_url=response_url ) except Exception as e: return UploadPassportImageResponse(ok=False, message=str(e))