75 lines
3.1 KiB
Python
75 lines
3.1 KiB
Python
import datetime
|
|
from typing import Union, Annotated
|
|
|
|
from fastapi import Depends
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from models import Client, ClientDetails, User
|
|
from services.auth import get_current_user
|
|
from services.base import BaseService
|
|
from schemas.client import *
|
|
|
|
|
|
class ClientService(BaseService):
|
|
|
|
async def get_by_name(self, name: str) -> Union[Client, None]:
|
|
client = await self.session.scalar(select(Client).where(Client.name == name))
|
|
return client
|
|
|
|
async def get_by_id(self, client_id: int) -> Union[Client, None]:
|
|
return await self.session.get(Client, client_id)
|
|
|
|
async def get_details_by_client_id(self, client_id: int) -> Union[ClientDetails, None]:
|
|
details = await self.session.scalar(select(ClientDetails).where(ClientDetails.client_id == client_id))
|
|
return details
|
|
|
|
async def get_all(self) -> ClientGetAllResponse:
|
|
clients_query = await self.session.scalars(select(Client).options(joinedload(Client.details)))
|
|
clients = clients_query.all()
|
|
result = []
|
|
for client in clients:
|
|
result.append(ClientSchema.model_validate(client))
|
|
return ClientGetAllResponse(clients=result)
|
|
|
|
async def create_details(self, user, client: Client, request: ClientDetailsSchema):
|
|
dict_data = request.dict()
|
|
dict_data['client_id'] = client.id
|
|
dict_data['last_modified_at'] = datetime.datetime.now()
|
|
dict_data['modified_by_user_id'] = user.id
|
|
details = ClientDetails(**dict_data)
|
|
self.session.add(details)
|
|
await self.session.flush()
|
|
return details
|
|
|
|
async def update_details(self, user: User, client: Client, request: ClientDetailsSchema) -> ClientDetails:
|
|
details = await self.get_details_by_client_id(client_id=client.id)
|
|
if not details:
|
|
details = await self.create_details(user, client, request)
|
|
dict_data = request.dict()
|
|
dict_data['last_modified_at'] = datetime.datetime.now()
|
|
dict_data['modified_by_user_id'] = user.id
|
|
await self.session.execute(update(ClientDetails).where(ClientDetails.id == details.id).values(**dict_data))
|
|
await self.session.flush()
|
|
|
|
async def create_client_raw(self,
|
|
user: User,
|
|
client_name: str,
|
|
details_schema: ClientDetailsSchema) -> Client:
|
|
client = Client(name=client_name, created_at=datetime.datetime.now())
|
|
self.session.add(client)
|
|
await self.session.flush()
|
|
|
|
await self.update_details(user, client, details_schema)
|
|
|
|
return client
|
|
|
|
async def search_clients(self, request: ClientSearchRequest) -> ClientSearchResponse:
|
|
query = await self.session.scalars(select(Client)
|
|
.where(Client.name.ilike(f'%{request.name}%'))
|
|
.options(joinedload(Client.details)))
|
|
clients = []
|
|
for client in query.all():
|
|
clients.append(ClientSchema.model_validate(client))
|
|
return ClientSearchResponse(clients=clients)
|