Files
Fulfillment-Backend/services/user.py

128 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sqlalchemy import select, update, delete, insert, and_
from backend import config
from external.s3_uploader.uploader import S3Uploader
from models import User, user_position, user_pay_rate, PassportImage
from services.base import BaseService
from schemas.user import *
class UserService(BaseService):
async def get_all(self) -> GetAllUsersResponse:
stmt = (
select(User)
.order_by(User.id.desc())
.where(User.is_deleted == False)
)
users = (await self.session.scalars(stmt)).all()
users_schemas = [UserSchema.model_validate(user) for user in users]
return GetAllUsersResponse(users=users_schemas)
async def get_managers(self) -> GetManagersResponse:
stmt = (
select(User)
.join(user_position)
.where(and_(User.is_deleted == False, user_position.c.position_key == "sales_manager"))
.order_by(User.id.desc())
)
users = (await self.session.scalars(stmt)).all()
users_schemas = [UserSchema.model_validate(user) for user in users]
return GetManagersResponse(managers=users_schemas)
async def get_by_id(self, user_id: int) -> Optional[User]:
return await self.session.scalar(select(User).where(User.id == user_id))
async def create(self, request: CreateUserRequest) -> CreateUserResponse:
try:
base_fields = request.data.model_dump_parent()
del base_fields['pay_rate']
del base_fields['passport_image_url']
user = User(**base_fields)
self.session.add(user)
await self.session.flush()
if request.data.position_key:
stmt = insert(user_position).values(**{
'user_id': user.id,
'position_key': request.data.position_key
})
await self.session.execute(stmt)
if request.data.pay_rate:
stmt = insert(user_pay_rate).values(**{
'user_id': user.id,
'pay_rate_id': request.data.pay_rate.id
})
await self.session.execute(stmt)
await self.session.commit()
return CreateUserResponse(ok=True, message='Пользователь успешно создан')
except Exception as e:
return CreateUserResponse(ok=False, message=str(e))
async def update(self, request: UpdateUserRequest) -> UpdateUserResponse:
try:
if not await self.get_by_id(request.data.id):
return UpdateUserResponse(ok=False, message='Указанный пользователь не найден')
base_fields = request.data.model_dump_parent()
del base_fields['pay_rate']
del base_fields['passport_image_url']
del base_fields['passport_images']
stmt = update(User).values(**base_fields).where(User.id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
# Deleting previous position
stmt = delete(user_position).where(user_position.c.user_id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
# Deleting previous pay rate
stmt = delete(user_pay_rate).where(user_pay_rate.c.user_id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
if request.data.position_key:
stmt = insert(user_position).values(**{
'user_id': request.data.id,
'position_key': request.data.position_key
})
await self.session.execute(stmt)
if request.data.pay_rate:
stmt = insert(user_pay_rate).values(**{
'user_id': request.data.id,
'pay_rate_id': request.data.pay_rate.id
})
await self.session.execute(stmt)
await self.session.commit()
return UpdateUserResponse(ok=True, message='Пользователь успешно обновлен')
except Exception as e:
return UpdateUserResponse(ok=False, message=str(e))
async def upload_passport_image(self, user_id: int, file_bytes: bytes) -> UploadPassportImageResponse:
try:
user: Optional[User] = await self.session.get(User, user_id)
if not user:
raise Exception("Не удалось пользователя с указанным ID")
# removing previous images
for image in user.passport_images:
await self.session.delete(image)
s3_uploader = S3Uploader(config.S3_API_KEY)
response = await s3_uploader.upload(file_bytes)
response_url = response.get('link')
if not response_url:
raise Exception("Не удалось загрузить изображение")
passport_image = PassportImage(
user_id=user_id,
image_url=response_url,
)
self.session.add(passport_image)
await self.session.commit()
return UploadPassportImageResponse(
ok=True,
message='Изображение успешно загружено',
image_url=response_url
)
except Exception as e:
return UploadPassportImageResponse(ok=False, message=str(e))