From 2cdccb33ca85f4d73c3e5e39f134939b7987bf6d Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Thu, 27 Mar 2025 15:13:10 +0400 Subject: [PATCH] feat: creating chats for cards and clients, sending and deleting text messages --- backend/config.py | 7 + external/chat/__init__.py | 0 external/chat/chat_client.py | 36 ++++ external/chat/schemas.py | 32 +++ external/kafka/__init__.py | 2 + external/kafka/consumer.py | 29 +++ external/kafka/enums.py | 7 + external/kafka/producer.py | 5 + external/kafka/schemas/consumer.py | 44 +++++ external/kafka/schemas/producer.py | 36 ++++ external/kafka/services/consumer_service.py | 92 +++++++++ external/kafka/services/producer_service.py | 60 ++++++ main.py | 28 ++- models/__init__.py | 1 + models/auth.py | 8 +- models/card.py | 7 +- models/chat.py | 154 +++++++++++++++ models/client.py | 5 +- requirements.txt | 4 + routers/__init__.py | 1 + routers/chat.py | 73 +++++++ schemas/card.py | 2 + schemas/chat.py | 91 +++++++++ schemas/client.py | 2 + services/chat.py | 209 ++++++++++++++++++++ 25 files changed, 928 insertions(+), 7 deletions(-) create mode 100644 external/chat/__init__.py create mode 100644 external/chat/chat_client.py create mode 100644 external/chat/schemas.py create mode 100644 external/kafka/__init__.py create mode 100644 external/kafka/consumer.py create mode 100644 external/kafka/enums.py create mode 100644 external/kafka/producer.py create mode 100644 external/kafka/schemas/consumer.py create mode 100644 external/kafka/schemas/producer.py create mode 100644 external/kafka/services/consumer_service.py create mode 100644 external/kafka/services/producer_service.py create mode 100644 models/chat.py create mode 100644 routers/chat.py create mode 100644 schemas/chat.py create mode 100644 services/chat.py diff --git a/backend/config.py b/backend/config.py index e3fd5e2..65cb7c3 100644 --- a/backend/config.py +++ b/backend/config.py @@ -19,6 +19,13 @@ S3_API_KEY = os.environ.get('S3_API_KEY') BILLING_API_KEY = os.environ.get('BILLING_API_KEY') +CHAT_CONNECTOR_API_KEY = os.environ.get('CHAT_CONNECTOR_API_KEY') +CHATS_SYNC_URL = os.environ.get('CHATS_SYNC_URL') + +KAFKA_URL = os.environ.get('KAFKA_URL') +KAFKA_CONSUMER_TOPIC = os.environ.get('KAFKA_CONSUMER_TOPIC') +KAFKA_PRODUCER_TOPIC = os.environ.get('KAFKA_PRODUCER_TOPIC') + # Celery CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') diff --git a/external/chat/__init__.py b/external/chat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/external/chat/chat_client.py b/external/chat/chat_client.py new file mode 100644 index 0000000..b902ad8 --- /dev/null +++ b/external/chat/chat_client.py @@ -0,0 +1,36 @@ +import aiohttp +import jwt + +from backend.config import CHATS_SYNC_URL, CHAT_CONNECTOR_API_KEY +from external.chat.schemas import * +from services.auth import algorithm + + +class ChatClient: + def __init__(self, api_key: str): + self.api_key = api_key + self.headers = { + 'Authorization': 'Bearer ' + self.create_jwt_token() + } + self.base_url = CHATS_SYNC_URL + self.chats_sync_endpoint = '/chats-sync' + self.groups_endpoint = '/group' + + def create_jwt_token(self): + return jwt.encode({'sub': self.api_key}, CHAT_CONNECTOR_API_KEY, algorithm=algorithm) + + async def _method(self, http_method, method, **kwargs): + async with aiohttp.ClientSession(headers=self.headers) as session: + async with session.request(http_method, self.base_url + method, **kwargs) as response: + print(response) + return await response.json() + + async def create_group(self, request: ExternalCreateGroupRequest) -> ExternalCreateGroupResponse: + json_data = request.model_dump() + response = await self._method('POST', self.groups_endpoint + '/create', json=json_data) + return ExternalCreateGroupResponse.model_validate(response) + + async def create_topic(self, request: ExternalCreateTopicRequest) -> ExternalCreateTopicResponse: + json_data = request.model_dump() + response = await self._method('POST', self.groups_endpoint + '/topic/create', json=json_data) + return ExternalCreateTopicResponse.model_validate(response) diff --git a/external/chat/schemas.py b/external/chat/schemas.py new file mode 100644 index 0000000..a06ea29 --- /dev/null +++ b/external/chat/schemas.py @@ -0,0 +1,32 @@ +from typing import Optional +from uuid import UUID + +from schemas.base import BaseSchema + + +# region Requests + +class ExternalCreateGroupRequest(BaseSchema): + title: str + + +class ExternalCreateTopicRequest(BaseSchema): + group_id: str + title: str + icon_emoji_id: Optional[int] = None + + +# endregion + +# region Responses + +class ExternalCreateGroupResponse(BaseSchema): + tg_group_id: int + group_id: UUID + tg_invite_link: str + + +class ExternalCreateTopicResponse(BaseSchema): + tg_topic_id: int + +# endregion diff --git a/external/kafka/__init__.py b/external/kafka/__init__.py new file mode 100644 index 0000000..f7e8c97 --- /dev/null +++ b/external/kafka/__init__.py @@ -0,0 +1,2 @@ +from .producer import producer +from .consumer import consume_messages diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py new file mode 100644 index 0000000..dd7c207 --- /dev/null +++ b/external/kafka/consumer.py @@ -0,0 +1,29 @@ +from aiokafka import AIOKafkaConsumer +from aiokafka.errors import KafkaConnectionError + +from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC +from backend.session import session_maker +from external.kafka.services.consumer_service import ConsumerService + +consumer = AIOKafkaConsumer( + KAFKA_CONSUMER_TOPIC, + bootstrap_servers=KAFKA_URL, +) + + +async def consume_messages(): + try: + await consumer.start() + except KafkaConnectionError as e: + print(e) + return + + async with session_maker() as session: + consumer_service = ConsumerService(session) + print("started consuming messages") + + try: + async for message in consumer: + await consumer_service.consume_message(message) + finally: + await consumer.stop() diff --git a/external/kafka/enums.py b/external/kafka/enums.py new file mode 100644 index 0000000..09b348b --- /dev/null +++ b/external/kafka/enums.py @@ -0,0 +1,7 @@ +import enum + + +class KafkaMessageType(enum.Enum): + SEND = 1 + EDIT = 2 + DELETE = 3 diff --git a/external/kafka/producer.py b/external/kafka/producer.py new file mode 100644 index 0000000..c600117 --- /dev/null +++ b/external/kafka/producer.py @@ -0,0 +1,5 @@ +from aiokafka import AIOKafkaProducer + +from backend.config import KAFKA_URL + +producer = AIOKafkaProducer(bootstrap_servers=KAFKA_URL) diff --git a/external/kafka/schemas/consumer.py b/external/kafka/schemas/consumer.py new file mode 100644 index 0000000..f92feeb --- /dev/null +++ b/external/kafka/schemas/consumer.py @@ -0,0 +1,44 @@ +from schemas.base import OkMessageSchema, BaseSchema + + +# region Entities + +class TelegramUserSchema(BaseSchema): + id: int + first_name: str + last_name: str + username: str + + +class MessageFromTelegramSchema(BaseSchema): + group_id: str + tg_topic_id: int + text: str + sender: TelegramUserSchema + + +# endregion + +# region Requests + +class MessageFromTelegramRequest(BaseSchema): + message: MessageFromTelegramSchema + + +# endregion + +# region Responses + +class BaseConnectorResponse(OkMessageSchema): + message_type: int + + +class SendMessageToConnectorResponse(BaseConnectorResponse): + message_id: int + tg_message_id: int + + +class DeleteMessageResponse(BaseConnectorResponse): + message_id: int + +# endregion diff --git a/external/kafka/schemas/producer.py b/external/kafka/schemas/producer.py new file mode 100644 index 0000000..60ea636 --- /dev/null +++ b/external/kafka/schemas/producer.py @@ -0,0 +1,36 @@ +from typing import Optional + +from schemas.base import BaseSchema + + +# region Entities + +class BaseMessageSchema(BaseSchema): + message_id: int + tg_message_id: Optional[int] = None + group_id: str + + +class MessageSchema(BaseMessageSchema): + text: str + topic_id: int + + +# endregion + +# region Requests + +class BaseConnectorRequest(BaseSchema): + message_type: int + app_auth_key: str + message: BaseMessageSchema + + +class SendMessageToConnectorRequest(BaseConnectorRequest): + message: MessageSchema + + +class SendMessageDeletingToConnectorRequest(BaseConnectorRequest): + pass + +# endregion diff --git a/external/kafka/services/consumer_service.py b/external/kafka/services/consumer_service.py new file mode 100644 index 0000000..e09be82 --- /dev/null +++ b/external/kafka/services/consumer_service.py @@ -0,0 +1,92 @@ +import pickle +from datetime import datetime +from typing import Optional +from uuid import UUID + +from aiokafka import ConsumerRecord +from sqlalchemy import select + +from external.kafka.enums import KafkaMessageType +from external.kafka.schemas.consumer import * +from models import Message, MessageStatus, TgUser, Chat, TgGroup +from services.base import BaseService + + +class ConsumerService(BaseService): + async def consume_message(self, message: ConsumerRecord): + value = pickle.loads(message.value) + + try: + if 'ok' in value: + """ Received response after sending message from crm """ + await self._process_connector_response(message) + else: + """ Received message from client """ + request = MessageFromTelegramRequest.model_validate(value) + await self._receive_message_from_client(request) + except Exception as e: + print(e) + + async def _process_connector_response(self, message: ConsumerRecord): + value = pickle.loads(message.value) + message_type = KafkaMessageType(value['message_type']) + + match message_type: + case KafkaMessageType.SEND: + response = SendMessageToConnectorResponse.model_validate(value) + await self._process_connector_send_response(response) + case KafkaMessageType.EDIT: + pass + case KafkaMessageType.DELETE: + response = DeleteMessageResponse.model_validate(value) + await self._process_connector_delete_response(response) + case _: + raise Exception('Unexpected message type in crm consumer') + + async def _process_connector_send_response(self, response: SendMessageToConnectorResponse): + message = await self.session.get(Message, response.message_id) + message.tg_message_id = response.tg_message_id + if response.ok: + message.status = MessageStatus.success + else: + message.status = MessageStatus.error + + await self.session.commit() + + async def _process_connector_delete_response(self, response: DeleteMessageResponse): + if not response.ok: + return + + message = await self.session.get(Message, response.message_id) + message.is_deleted = True + + async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]: + stmt = ( + select(Chat) + .join(TgGroup) + .where( + Chat.tg_topic_id == tg_topic_id, + TgGroup.id == UUID(group_id), + ) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def _receive_message_from_client(self, request: MessageFromTelegramRequest): + tg_sender: Optional[TgUser] = await self.session.get(TgUser, request.message.sender.id) + if not tg_sender: + tg_sender: TgUser = TgUser(**request.message.sender.model_dump()) + self.session.add(tg_sender) + + chat = await self._get_chat(request.message.group_id, request.message.tg_topic_id) + if not chat: + return + + message = Message( + text=request.message.text, + created_at=datetime.now(), + tg_sender_id=tg_sender.id, + chat_id=chat.id, + status=MessageStatus.success, + ) + self.session.add(message) + await self.session.commit() diff --git a/external/kafka/services/producer_service.py b/external/kafka/services/producer_service.py new file mode 100644 index 0000000..0866979 --- /dev/null +++ b/external/kafka/services/producer_service.py @@ -0,0 +1,60 @@ +import pickle +from uuid import UUID + +from aiohttp import ClientConnectorError + +from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY +from external.kafka import producer +from external.kafka.enums import KafkaMessageType +from external.kafka.schemas.producer import MessageSchema, SendMessageToConnectorRequest, \ + SendMessageDeletingToConnectorRequest, BaseMessageSchema +from services.base import BaseService + + +class ProducerService(BaseService): + @staticmethod + async def send_message_to_connector( + message_text: str, + group_id: UUID, + topic_id: int, + message_id: int, + ) -> tuple[bool, str]: + try: + request = SendMessageToConnectorRequest( + message=MessageSchema( + message_id=message_id, + text=message_text, + group_id=str(group_id), + topic_id=topic_id, + ), + message_type=KafkaMessageType.SEND, + app_auth_key=CHAT_CONNECTOR_API_KEY, + ) + await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) + except ClientConnectorError: + return False, 'Ошибка подключения к коннектору' + except Exception as e: + return False, str(e) + + return True, 'Сообщение отправлено' + + @staticmethod + async def send_message_deleting_to_connector(message_id: int, tg_message_id: int, group_id: UUID) -> tuple[bool, str]: + try: + request = SendMessageDeletingToConnectorRequest( + message_type=KafkaMessageType.DELETE, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=BaseMessageSchema( + message_id=message_id, + tg_message_id=tg_message_id, + group_id=str(group_id), + ), + ) + await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) + except ClientConnectorError: + return False, 'Ошибка подключения к коннектору' + except Exception as e: + return False, str(e) + + return True, 'Сообщение отправлено' + \ No newline at end of file diff --git a/main.py b/main.py index a969ddd..34239fb 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,37 @@ +import asyncio +import platform +from contextlib import asynccontextmanager + +from aiokafka.errors import KafkaConnectionError from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -import platform - from starlette.staticfiles import StaticFiles import routers from constants import API_ROOT +from external.kafka import producer, consume_messages origins = [ 'http://localhost:5173' ] -app = FastAPI(separate_input_output_schemas=False) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + try: + await producer.start() + except KafkaConnectionError as e: + print(e) + + consumer_task = asyncio.create_task(consume_messages()) + + yield + + await producer.stop() + consumer_task.cancel() + + +app = FastAPI(lifespan=lifespan, separate_input_output_schemas=False) if platform.system() == 'Linux': import uvicorn.workers @@ -58,6 +79,7 @@ routers_list = [ routers.board_router, routers.status_router, routers.card_tag_router, + routers.chat_router, ] for router in routers_list: app.include_router(router) diff --git a/models/__init__.py b/models/__init__.py index 1ee7bfa..235496e 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -23,5 +23,6 @@ from .card_group import * from .transaction import * from .residues import * from .shipping import * +from .chat import * configure_mappers() diff --git a/models/auth.py b/models/auth.py index c6c9da0..ddd79f9 100644 --- a/models/auth.py +++ b/models/auth.py @@ -10,7 +10,7 @@ from models.work_shifts import WorkShift if TYPE_CHECKING: from models.payroll import PayRate, PaymentRecord - from models import Card, CardEmployees + from models import Card, CardEmployees, Message role_permissions = Table( 'role_permissions', @@ -138,6 +138,12 @@ class User(BaseModel): lazy="noload", ) + messages: Mapped[list['Message']] = relationship( + 'Message', + back_populates='crm_sender', + lazy='noload', + ) + class InviteCode(BaseModel): __tablename__ = 'invite_codes' diff --git a/models/card.py b/models/card.py index 90f29e6..5a83446 100644 --- a/models/card.py +++ b/models/card.py @@ -12,8 +12,8 @@ from .shipping_warehouse import ShippingWarehouse if TYPE_CHECKING: from . import ( CardBillRequest, User, BaseModel, Board, CardStatus, CardGroup, CardAttribute, Client, CardTag, - CardService as CardServiceModel, CardProduct, - ) + CardService as CardServiceModel, CardProduct, Chat, +) class Card(BaseModel): @@ -114,6 +114,9 @@ class Card(BaseModel): # module employees employees: Mapped[list['CardEmployees']] = relationship(back_populates='card', lazy='selectin') + # module chat + chat: Mapped[Optional['Chat']] = relationship(back_populates='card', lazy='joined') + # endregion diff --git a/models/chat.py b/models/chat.py new file mode 100644 index 0000000..0fa5fa1 --- /dev/null +++ b/models/chat.py @@ -0,0 +1,154 @@ +import enum +from datetime import datetime +from typing import Optional, TYPE_CHECKING +from uuid import UUID + +from sqlalchemy import ForeignKey, BigInteger, Enum, Uuid +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from models import BaseModel, User + +if TYPE_CHECKING: + from models import Client, Card + + +class TgUser(BaseModel): + __tablename__ = 'tg_users' + + id: Mapped[int] = mapped_column( + BigInteger(), + primary_key=True, + comment='Telegram user ID', + ) + username: Mapped[str] = mapped_column( + index=True, + nullable=False, + unique=True, + ) + first_name: Mapped[str] = mapped_column(nullable=True) + last_name: Mapped[str] = mapped_column(nullable=True) + + messages: Mapped['Message'] = relationship( + 'Message', + lazy='noload', + back_populates='tg_sender', + ) + + +class TgGroup(BaseModel): + __tablename__ = 'tg_groups' + + id: Mapped[UUID] = mapped_column(Uuid, primary_key=True) + + tg_group_id: Mapped[int] = mapped_column( + BigInteger(), + nullable=False, + unique=True, + ) + tg_invite_link: Mapped[str] = mapped_column(nullable=False) + + client_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('clients.id'), + unique=True, + ) + client: Mapped[Optional['Client']] = relationship( + 'Client', + lazy='joined', + back_populates='tg_group', + ) + + chats: Mapped[list['Chat']] = relationship( + 'Chat', + lazy='noload', + back_populates='tg_group', + ) + + +class Chat(BaseModel): + __tablename__ = 'chats' + + id: Mapped[int] = mapped_column(primary_key=True) + + tg_topic_id: Mapped[int] = mapped_column(nullable=False) + + card_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('cards.id'), + unique=True, + ) + card: Mapped[Optional['Card']] = relationship( + 'Card', + lazy='joined', + back_populates='chat', + ) + + client_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('clients.id'), + unique=True, + ) + client: Mapped[Optional['Client']] = relationship( + 'Client', + lazy='joined', + back_populates='chat', + ) + + tg_group_id: Mapped[UUID] = mapped_column( + ForeignKey('tg_groups.id'), + nullable=False, + ) + tg_group: Mapped[TgGroup] = relationship( + 'TgGroup', + lazy='joined', + back_populates='chats', + ) + + messages: Mapped[list['Message']] = relationship( + 'Message', + lazy='selectin', + back_populates='chat', + order_by='Message.created_at.desc()', + ) + + +class MessageStatus(enum.Enum): + sending = 'SENDING' + success = 'SUCCESS' + error = 'ERROR' + + +class Message(BaseModel): + __tablename__ = 'messages' + + id: Mapped[int] = mapped_column(primary_key=True) + tg_message_id: Mapped[Optional[int]] = mapped_column(nullable=True) + + text: Mapped[str] = mapped_column(nullable=False) + created_at: Mapped[datetime] = mapped_column(nullable=False) + status: Mapped[MessageStatus] = mapped_column(Enum(MessageStatus), nullable=False) + is_deleted: Mapped[bool] = mapped_column(default=False, server_default='0', nullable=False) + + tg_sender_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('tg_users.id'), + nullable=True, + ) + tg_sender: Mapped[TgUser] = relationship( + 'TgUser', + lazy='selectin', + back_populates='messages', + ) + + crm_sender_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('users.id'), + nullable=True, + ) + crm_sender: Mapped[Optional['User']] = relationship( + 'User', + lazy='selectin', + back_populates='messages', + ) + + chat_id: Mapped[int] = mapped_column(ForeignKey('chats.id')) + chat: Mapped[Chat] = relationship( + 'Chat', + lazy='noload', + back_populates='messages', + ) diff --git a/models/client.py b/models/client.py index 66aff4b..74a8ad7 100644 --- a/models/client.py +++ b/models/client.py @@ -7,7 +7,7 @@ from sqlalchemy.orm import relationship, Mapped, mapped_column from models import BaseModel if TYPE_CHECKING: - from models import ResidualPallet, ResidualBox, Product, BarcodeTemplate, User + from models import ResidualPallet, ResidualBox, Product, BarcodeTemplate, User, TgGroup, Chat class Client(BaseModel): @@ -39,6 +39,9 @@ class Client(BaseModel): pallets: Mapped[list['ResidualPallet']] = relationship(back_populates='client', lazy='selectin') boxes: Mapped[list['ResidualBox']] = relationship(back_populates='client', lazy='selectin') + tg_group: Mapped[Optional['TgGroup']] = relationship('TgGroup', back_populates='client', lazy='joined') + chat: Mapped[Optional['Chat']] = relationship('Chat', back_populates='client', lazy='joined') + class ClientDetails(BaseModel): __tablename__ = 'client_details' diff --git a/requirements.txt b/requirements.txt index ba641fd..557d00a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ jinja2 # Security python-jose[cryptography] passlib[bcrypt] +pyjwt # Database sqlalchemy @@ -17,6 +18,9 @@ sqlalchemy[asyncio] asyncpg alembic +# Kafka +aiokafka + # Other python-dotenv aiohttp diff --git a/routers/__init__.py b/routers/__init__.py index b9a55a1..1829815 100644 --- a/routers/__init__.py +++ b/routers/__init__.py @@ -26,3 +26,4 @@ from .project import project_router from .board import board_router from .status import status_router from .card_tag import card_tag_router +from .chat import chat_router diff --git a/routers/chat.py b/routers/chat.py new file mode 100644 index 0000000..91ab6f5 --- /dev/null +++ b/routers/chat.py @@ -0,0 +1,73 @@ +from typing import Annotated + +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from backend.session import get_session +from schemas.chat import * +from services.chat import ChatService + +chat_router = APIRouter( + prefix='/chat', + tags=['chat'], +) + + +@chat_router.post( + '/message', + operation_id='send_message', + response_model=SendMessageResponse, +) +async def send_message( + session: Annotated[AsyncSession, Depends(get_session)], + request: SendMessageRequest, +): + return await ChatService(session).send_message(request) + + +@chat_router.delete( + '/message/{message_id}', + operation_id='delete_message', + response_model=DeleteMessageResponse, +) +async def delete_message( + session: Annotated[AsyncSession, Depends(get_session)], + message_id: int, +): + return await ChatService(session).delete_message(message_id) + + +@chat_router.post( + '/', + operation_id='get_chat', + response_model=GetChatResponse, +) +async def get_chat( + session: Annotated[AsyncSession, Depends(get_session)], + request: GetChatRequest, +): + return await ChatService(session).get_chat(request) + + +@chat_router.post( + '/create', + operation_id='create_chat', + response_model=CreateChatResponse, +) +async def create_chat( + session: Annotated[AsyncSession, Depends(get_session)], + request: CreateChatRequest, +): + return await ChatService(session).create_chat(request) + + +@chat_router.post( + '/messages', + operation_id='get_messages', + response_model=GetMessagesResponse, +) +async def get_messages( + session: Annotated[AsyncSession, Depends(get_session)], + request: GetMessagesRequest, +): + return await ChatService(session).get_messages(request) diff --git a/schemas/card.py b/schemas/card.py index 488c68a..569288d 100644 --- a/schemas/card.py +++ b/schemas/card.py @@ -8,6 +8,7 @@ from schemas.base import BaseSchema, OkMessageSchema from schemas.billing import CardBillRequestSchema from schemas.board import BoardSchema from schemas.card_tag import CardTagSchema +from schemas.chat import ChatSchema from schemas.client import ClientSchema from schemas.group import CardGroupSchema from schemas.marketplace import BaseMarketplaceSchema @@ -108,6 +109,7 @@ class BaseCardSchema(BaseSchema): class CardSchema(BaseCardSchema): attributes: list[CardAttributeSchema] + chat: Optional[ChatSchema] class CardGeneralInfoSchema(BaseSchemaWithAttributes): diff --git a/schemas/chat.py b/schemas/chat.py new file mode 100644 index 0000000..17bebec --- /dev/null +++ b/schemas/chat.py @@ -0,0 +1,91 @@ +from datetime import datetime +from typing import Optional + +from schemas.base import BaseSchema, OkMessageSchema + + +# region Entities + +class TgUserSchema(BaseSchema): + id: int + first_name: str + last_name: str + username: str + + +class MessageFromTgSchema(BaseSchema): + chat_id: int + text: str + sender: TgUserSchema + + +class BaseMessageSchema(BaseSchema): + text: str + chat_id: int + + +class MessageSchema(BaseMessageSchema): + id: int + created_at: datetime + tg_sender: Optional[TgUserSchema] + status: str + + +class TgGroupSchema(BaseSchema): + tg_group_id: int + tg_invite_link: str + + +class ChatSchema(BaseSchema): + id: int + client_id: Optional[int] + card_id: Optional[int] + tg_group: Optional[TgGroupSchema] + +# endregion + +# region Requests + +class SendMessageRequest(BaseSchema): + message: BaseMessageSchema + + +class GetChatRequest(BaseSchema): + client_id: int + card_id: Optional[int] + + +class CreateChatRequest(BaseSchema): + client_id: int + card_id: Optional[int] + + +class GetMessagesRequest(BaseSchema): + chat_id: int + offset: int + limit: int + +# endregion + +# region Responses + +class SendMessageResponse(OkMessageSchema): + pass + + +class DeleteMessageResponse(OkMessageSchema): + pass + + +class GetChatResponse(BaseSchema): + chat: Optional[ChatSchema] + + +class CreateChatResponse(OkMessageSchema): + pass + + +class GetMessagesResponse(BaseSchema): + messages: list[MessageSchema] + +# endregion diff --git a/schemas/client.py b/schemas/client.py index 6707d75..4d3c019 100644 --- a/schemas/client.py +++ b/schemas/client.py @@ -4,6 +4,7 @@ from pydantic import field_validator from schemas.barcode import BarcodeTemplateSchema from schemas.base import BaseSchema, OkMessageSchema +from schemas.chat import ChatSchema from schemas.residues import ResidualBoxSchema, ResidualPalletSchema @@ -26,6 +27,7 @@ class ClientSchema(BaseSchema): barcode_template: BarcodeTemplateSchema | None = None comment: Optional[str] = None details: ClientDetailsSchema | None = None + chat: ChatSchema | None = None class ClientDetailedSchema(ClientSchema): diff --git a/services/chat.py b/services/chat.py new file mode 100644 index 0000000..eb1b65a --- /dev/null +++ b/services/chat.py @@ -0,0 +1,209 @@ +from uuid import UUID + +from aiohttp import ClientConnectorError +from sqlalchemy import select +from sqlalchemy.orm import joinedload + +from backend.config import CHAT_CONNECTOR_API_KEY +from external.chat.chat_client import ChatClient +from external.chat.schemas import ExternalCreateGroupRequest, ExternalCreateTopicRequest +from external.kafka.services.producer_service import ProducerService +from models import Message, Chat, MessageStatus, TgGroup, Client, Card +from schemas.chat import * +from services.base import BaseService + + +class ChatService(BaseService): + async def _get_chat(self, client_id: int, card_id: Optional[int]) -> Optional[Chat]: + stmt = ( + select(Chat) + .where( + Chat.card_id == card_id, + TgGroup.client_id == client_id, + ) + .join(TgGroup) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def get_chat(self, request: GetChatRequest) -> GetChatResponse: + chat = await self._get_chat(request.client_id, request.card_id) + return GetChatResponse(chat=chat) + + async def _get_group(self, client_id: int) -> Optional[TgGroup]: + stmt = ( + select(TgGroup) + .where(TgGroup.client_id == client_id) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def _create_group_for_client(self, client_id: int, title: str) -> Optional[TgGroup]: + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + + request = ExternalCreateGroupRequest(title=title) + response = await chat_client.create_group(request) + + group = TgGroup( + id=response.group_id, + client_id=client_id, + tg_group_id=response.tg_group_id, + tg_invite_link=response.tg_invite_link, + ) + + chat = Chat( + tg_topic_id=1, + card_id=None, + client_id=client_id, + tg_group_id=response.group_id, + ) + self.session.add_all([group, chat]) + await self.session.commit() + + return group + + @staticmethod + def _get_title_for_client_chat(client: Client) -> str: + return f'[{client.id}] {client.name}' + + @staticmethod + def _get_title_for_card_chat(card: Card) -> str: + return f'[{card.id}] {card.name}' + + async def _create_topic_for_card( + self, + group_id: UUID, + card: Card, + ) -> Chat: + card_chat_icon_id = 5348227245599105972 # 💼 + + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + + request = ExternalCreateTopicRequest( + group_id=str(group_id), + title=self._get_title_for_card_chat(card), + icon_emoji_id=card_chat_icon_id, + ) + response = await chat_client.create_topic(request) + + chat = Chat( + tg_topic_id=response.tg_topic_id, + card_id=card.id, + client_id=None, + tg_group_id=group_id, + ) + self.session.add(chat) + await self.session.commit() + + return chat + + async def _create_chat_for_card(self, request: CreateChatRequest, card: Card, client: Client) -> CreateChatResponse: + existing_chat = await self._get_chat(request.client_id, request.card_id) + if existing_chat: + return CreateChatResponse( + ok=False, + message=f'Чат для клиента ID {request.client_id} и карточки ID {request.card_id} уже существует' + ) + + group: Optional[TgGroup] = await self._get_group(request.client_id) + if not group: + group: TgGroup = await self._create_group_for_client( + request.client_id, + self._get_title_for_client_chat(client) + ) + + await self._create_topic_for_card(group.id, card) + + return CreateChatResponse(ok=True, message='Чат успешно создан') + + async def _create_chat_for_client(self, request: CreateChatRequest, client: Client) -> CreateChatResponse: + existing_chat = await self._get_chat(client.id, request.card_id) + if existing_chat: + return CreateChatResponse(ok=False, message=f'Чат для клиента ID {client.id} уже существует') + + group = await self._get_group(client.id) + if group: + return CreateChatResponse(ok=False, message=f'Группа для клиента ID {client.id} уже существует') + + client_chat_title = self._get_title_for_client_chat(client) + await self._create_group_for_client(request.client_id, client_chat_title) + + return CreateChatResponse(ok=True, message='Чат успешно создан') + + async def create_chat(self, request: CreateChatRequest) -> CreateChatResponse: + client: Optional[Client] = await self.session.get(Client, request.client_id) + if not client: + return CreateChatResponse(ok=False, message=f'Клиент с ID {request.client_id} не найден') + + try: + if not request.card_id: + return await self._create_chat_for_client(request, client) + + card: Optional[Card] = await self.session.get(Card, request.card_id) + if not card: + return CreateChatResponse(ok=False, message=f'Карточка с ID {request.card_id} не найдена') + + return await self._create_chat_for_card(request, card, client) + except ClientConnectorError: + return CreateChatResponse(ok=False, message=f'Ошибка подключения к сервису') + except Exception as e: + return CreateChatResponse(ok=False, message=str(e)) + + async def get_messages(self, request: GetMessagesRequest) -> GetMessagesResponse: + stmt = ( + select(Message) + .where( + Message.chat_id == request.chat_id, + Message.is_deleted == False, + ) + .order_by(Message.created_at.desc()) + .offset(request.offset) + .limit(request.limit) + ) + messages = (await self.session.scalars(stmt)).all() + return GetMessagesResponse(messages=messages) + + async def send_message(self, request: SendMessageRequest) -> SendMessageResponse: + chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id) + if not chat: + return SendMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') + + message = Message( + text=request.message.text, + created_at=datetime.now(), + chat_id=request.message.chat_id, + status=MessageStatus.sending, + ) + self.session.add(message) + await self.session.commit() + + ok, message = await ProducerService.send_message_to_connector( + request.message.text, + chat.tg_group_id, + chat.tg_topic_id, + message.id, + ) + + return SendMessageResponse(ok=ok, message=message) + + async def _get_message_by_id(self, message_id: int) -> Optional[Message]: + stmt = ( + select(Message) + .where(Message.id == message_id) + .options(joinedload(Message.chat)) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def delete_message(self, message_id: int) -> DeleteMessageResponse: + message: Optional[Message] = await self._get_message_by_id(message_id) + if not message: + return DeleteMessageResponse(ok=False, message=f'Сообщение с ID: {message_id} не найдено') + + ok, msg = await ProducerService.send_message_deleting_to_connector( + message_id, + message.tg_message_id, + message.chat.tg_group_id, + ) + + message.is_deleted = True + await self.session.commit() + + return DeleteMessageResponse(ok=ok, message=msg)