Files
Fulfillment-Backend/services/user.py
2024-11-30 18:52:24 +04:00

96 lines
4.0 KiB
Python

from sqlalchemy import select, update, delete, insert, and_
from models import User, user_position, user_pay_rate
from services.base import BaseService
from schemas.user import *
class UserService(BaseService):
async def get_all(self) -> GetAllUsersResponse:
stmt = (
select(User)
.order_by(User.id.desc())
.where(User.is_deleted == False)
)
users = (await self.session.scalars(stmt)).all()
users_schemas = [UserSchema.model_validate(user) for user in users]
return GetAllUsersResponse(users=users_schemas)
async def get_managers(self) -> GetManagersResponse:
stmt = (
select(User)
.join(user_position)
.where(and_(User.is_deleted == False, user_position.c.position_key == "sales_manager"))
.order_by(User.id.desc())
)
users = (await self.session.scalars(stmt)).all()
users_schemas = [UserSchema.model_validate(user) for user in users]
return GetManagersResponse(managers=users_schemas)
async def get_by_id(self, user_id: int) -> Optional[User]:
return await self.session.scalar(select(User).where(User.id == user_id))
async def create(self, request: CreateUserRequest) -> CreateUserResponse:
try:
base_fields = request.data.model_dump_parent()
del base_fields['pay_rate']
user = User(**base_fields)
self.session.add(user)
await self.session.flush()
if request.data.position_key:
stmt = insert(user_position).values(**{
'user_id': user.id,
'position_key': request.data.position_key
})
await self.session.execute(stmt)
if request.data.pay_rate:
stmt = insert(user_pay_rate).values(**{
'user_id': user.id,
'pay_rate_id': request.data.pay_rate.id
})
await self.session.execute(stmt)
await self.session.commit()
return CreateUserResponse(ok=True, message='Пользователь успешно создан')
except Exception as e:
return CreateUserResponse(ok=False, message=str(e))
async def update(self, request: UpdateUserRequest) -> UpdateUserResponse:
try:
if not await self.get_by_id(request.data.id):
return UpdateUserResponse(ok=False, message='Указанный пользователь не найден')
base_fields = request.data.model_dump_parent()
del base_fields['pay_rate']
stmt = update(User).values(**base_fields).where(User.id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
# Deleting previous position
stmt = delete(user_position).where(user_position.c.user_id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
# Deleting previous pay rate
stmt = delete(user_pay_rate).where(user_pay_rate.c.user_id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
if request.data.position_key:
stmt = insert(user_position).values(**{
'user_id': request.data.id,
'position_key': request.data.position_key
})
await self.session.execute(stmt)
if request.data.pay_rate:
stmt = insert(user_pay_rate).values(**{
'user_id': request.data.id,
'pay_rate_id': request.data.pay_rate.id
})
await self.session.execute(stmt)
await self.session.commit()
return UpdateUserResponse(ok=True, message='Пользователь успешно обновлен')
except Exception as e:
return UpdateUserResponse(ok=False, message=str(e))