Files
Fulfillment-Backend/services/user.py
2025-03-05 18:39:54 +03:00

206 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import random
import string
import time
from sqlalchemy import select, update, delete, insert, and_
from sqlalchemy.orm import selectinload
from backend import config
from external.s3_uploader.uploader import S3Uploader
from models import User, user_position, user_pay_rate, PassportImage, DepartmentSection, UserDepartmentSection, \
InviteCode
from services.base import BaseService
from schemas.user import *
class UserService(BaseService):
async def get_all(self) -> GetAllUsersResponse:
stmt = (
select(User)
.options(
selectinload(User.department_sections),
)
.order_by(User.id.desc())
.where(User.is_deleted == False)
)
users = (await self.session.scalars(stmt)).all()
users_schemas = [UserSchema.model_validate(user) for user in users]
return GetAllUsersResponse(users=users_schemas)
async def get_managers(self) -> GetManagersResponse:
stmt = (
select(User)
.join(user_position)
.where(and_(User.is_deleted == False, user_position.c.position_key == "sales_manager"))
.order_by(User.id.desc())
)
users = (await self.session.scalars(stmt)).all()
users_schemas = [UserSchema.model_validate(user) for user in users]
return GetManagersResponse(managers=users_schemas)
async def get_by_id(self, user_id: int) -> Optional[User]:
return await self.session.scalar(select(User).where(User.id == user_id))
async def create(self, request: CreateUserRequest) -> CreateUserResponse:
try:
base_fields = request.data.model_dump_parent()
del base_fields['pay_rate']
del base_fields['passport_image_url']
user = User(**base_fields)
self.session.add(user)
await self.session.flush()
if request.data.position_key:
stmt = insert(user_position).values(**{
'user_id': user.id,
'position_key': request.data.position_key
})
await self.session.execute(stmt)
if request.data.pay_rate:
stmt = insert(user_pay_rate).values(**{
'user_id': user.id,
'pay_rate_id': request.data.pay_rate.id
})
await self.session.execute(stmt)
await self.session.commit()
return CreateUserResponse(ok=True, message='Пользователь успешно создан')
except Exception as e:
return CreateUserResponse(ok=False, message=str(e))
async def update(self, request: UpdateUserRequest) -> UpdateUserResponse:
try:
if not await self.get_by_id(request.data.id):
return UpdateUserResponse(ok=False, message='Указанный пользователь не найден')
base_fields = request.data.model_dump_parent()
del base_fields['pay_rate']
del base_fields['passport_image_url']
del base_fields['passport_images']
stmt = update(User).values(**base_fields).where(User.id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
# Deleting previous position
stmt = delete(user_position).where(user_position.c.user_id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
# Deleting previous pay rate
stmt = delete(user_pay_rate).where(user_pay_rate.c.user_id == request.data.id)
await self.session.execute(stmt)
await self.session.flush()
if request.data.position_key:
stmt = insert(user_position).values(**{
'user_id': request.data.id,
'position_key': request.data.position_key
})
await self.session.execute(stmt)
if request.data.pay_rate:
stmt = insert(user_pay_rate).values(**{
'user_id': request.data.id,
'pay_rate_id': request.data.pay_rate.id
})
await self.session.execute(stmt)
await self.session.commit()
return UpdateUserResponse(ok=True, message='Пользователь успешно обновлен')
except Exception as e:
return UpdateUserResponse(ok=False, message=str(e))
async def update_department_sections(
self,
user_id: int,
request: UpdateUserDepartmentSectionsRequest
) -> UpdateUserDepartmentSectionsResponse:
user = await self.get_by_id(user_id)
if not user:
return UpdateUserDepartmentSectionsResponse(ok=False, message=f"Пользователь с ID: {user_id} не найден")
stmt_delete = delete(UserDepartmentSection).where(UserDepartmentSection.user_id == user_id)
await self.session.execute(stmt_delete)
for section_schema in request.department_sections:
section = await self.session.get(DepartmentSection, section_schema.section_id)
if not section:
await self.session.rollback()
return UpdateUserDepartmentSectionsResponse(
ok=False, message=f"Отдел с ID: {section_schema.section_id} не найден"
)
user_section = UserDepartmentSection(
user_id=user_id,
is_chief=section_schema.is_chief,
section_id=section_schema.section_id
)
user.department_sections.append(user_section)
await self.session.commit()
return UpdateUserDepartmentSectionsResponse(ok=True, message="Отделы пользователя успешно обновлены")
async def upload_passport_image(self, user_id: int, file_bytes: bytes) -> UploadPassportImageResponse:
try:
user: Optional[User] = await self.session.get(User, user_id)
if not user:
raise Exception("Не удалось пользователя с указанным ID")
# removing previous images
for image in user.passport_images:
await self.session.delete(image)
s3_uploader = S3Uploader(config.S3_API_KEY)
response = await s3_uploader.upload(file_bytes)
response_url = response.get('link')
if not response_url:
raise Exception("Не удалось загрузить изображение")
passport_image = PassportImage(
user_id=user_id,
image_url=response_url,
)
self.session.add(passport_image)
await self.session.commit()
return UploadPassportImageResponse(
ok=True,
message='Изображение успешно загружено',
image_url=response_url
)
except Exception as e:
return UploadPassportImageResponse(ok=False, message=str(e))
@staticmethod
def _generate_invite_code(length=10):
timestamp = str(int(time.time() * 1000))[-6:]
chars = string.ascii_letters + string.digits
random_part = ''.join(random.choice(chars) for _ in range(length - len(timestamp))).upper()
code = list(timestamp + random_part)
random.shuffle(code)
return ''.join(code)
async def generate_invite_code(self, user: User) -> GenerateInviteCodeResponse:
MAX_ATTEMPTS = 5
try:
if not user.is_admin:
return GenerateInviteCodeResponse(ok=False,
message="Сгенерировать код приглашения может только администратор")
code_in_database = True
attempt = 0
invite_code = ""
while code_in_database and attempt < MAX_ATTEMPTS:
invite_code = self._generate_invite_code()
stmt = select(InviteCode).where(InviteCode.code == invite_code, InviteCode.is_activated == False)
user_with_code = await self.session.scalar(stmt)
code_in_database = bool(user_with_code)
attempt += 1
if code_in_database or not invite_code:
return GenerateInviteCodeResponse(ok=False, message="Не удалось сгенерировать уникальный код")
new_invite_code = InviteCode(
code=invite_code,
created_at=datetime.datetime.now(),
created_by_id=user.id
)
self.session.add(new_invite_code)
await self.session.commit()
return GenerateInviteCodeResponse(ok=True, message="Код приглашения успешно создан",
invite_code=invite_code)
except Exception as e:
return GenerateInviteCodeResponse(ok=False, message=str(e))