From 2cdccb33ca85f4d73c3e5e39f134939b7987bf6d Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Thu, 27 Mar 2025 15:13:10 +0400 Subject: [PATCH 1/4] feat: creating chats for cards and clients, sending and deleting text messages --- backend/config.py | 7 + external/chat/__init__.py | 0 external/chat/chat_client.py | 36 ++++ external/chat/schemas.py | 32 +++ external/kafka/__init__.py | 2 + external/kafka/consumer.py | 29 +++ external/kafka/enums.py | 7 + external/kafka/producer.py | 5 + external/kafka/schemas/consumer.py | 44 +++++ external/kafka/schemas/producer.py | 36 ++++ external/kafka/services/consumer_service.py | 92 +++++++++ external/kafka/services/producer_service.py | 60 ++++++ main.py | 28 ++- models/__init__.py | 1 + models/auth.py | 8 +- models/card.py | 7 +- models/chat.py | 154 +++++++++++++++ models/client.py | 5 +- requirements.txt | 4 + routers/__init__.py | 1 + routers/chat.py | 73 +++++++ schemas/card.py | 2 + schemas/chat.py | 91 +++++++++ schemas/client.py | 2 + services/chat.py | 209 ++++++++++++++++++++ 25 files changed, 928 insertions(+), 7 deletions(-) create mode 100644 external/chat/__init__.py create mode 100644 external/chat/chat_client.py create mode 100644 external/chat/schemas.py create mode 100644 external/kafka/__init__.py create mode 100644 external/kafka/consumer.py create mode 100644 external/kafka/enums.py create mode 100644 external/kafka/producer.py create mode 100644 external/kafka/schemas/consumer.py create mode 100644 external/kafka/schemas/producer.py create mode 100644 external/kafka/services/consumer_service.py create mode 100644 external/kafka/services/producer_service.py create mode 100644 models/chat.py create mode 100644 routers/chat.py create mode 100644 schemas/chat.py create mode 100644 services/chat.py diff --git a/backend/config.py b/backend/config.py index e3fd5e2..65cb7c3 100644 --- a/backend/config.py +++ b/backend/config.py @@ -19,6 +19,13 @@ S3_API_KEY = os.environ.get('S3_API_KEY') BILLING_API_KEY = os.environ.get('BILLING_API_KEY') +CHAT_CONNECTOR_API_KEY = os.environ.get('CHAT_CONNECTOR_API_KEY') +CHATS_SYNC_URL = os.environ.get('CHATS_SYNC_URL') + +KAFKA_URL = os.environ.get('KAFKA_URL') +KAFKA_CONSUMER_TOPIC = os.environ.get('KAFKA_CONSUMER_TOPIC') +KAFKA_PRODUCER_TOPIC = os.environ.get('KAFKA_PRODUCER_TOPIC') + # Celery CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') diff --git a/external/chat/__init__.py b/external/chat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/external/chat/chat_client.py b/external/chat/chat_client.py new file mode 100644 index 0000000..b902ad8 --- /dev/null +++ b/external/chat/chat_client.py @@ -0,0 +1,36 @@ +import aiohttp +import jwt + +from backend.config import CHATS_SYNC_URL, CHAT_CONNECTOR_API_KEY +from external.chat.schemas import * +from services.auth import algorithm + + +class ChatClient: + def __init__(self, api_key: str): + self.api_key = api_key + self.headers = { + 'Authorization': 'Bearer ' + self.create_jwt_token() + } + self.base_url = CHATS_SYNC_URL + self.chats_sync_endpoint = '/chats-sync' + self.groups_endpoint = '/group' + + def create_jwt_token(self): + return jwt.encode({'sub': self.api_key}, CHAT_CONNECTOR_API_KEY, algorithm=algorithm) + + async def _method(self, http_method, method, **kwargs): + async with aiohttp.ClientSession(headers=self.headers) as session: + async with session.request(http_method, self.base_url + method, **kwargs) as response: + print(response) + return await response.json() + + async def create_group(self, request: ExternalCreateGroupRequest) -> ExternalCreateGroupResponse: + json_data = request.model_dump() + response = await self._method('POST', self.groups_endpoint + '/create', json=json_data) + return ExternalCreateGroupResponse.model_validate(response) + + async def create_topic(self, request: ExternalCreateTopicRequest) -> ExternalCreateTopicResponse: + json_data = request.model_dump() + response = await self._method('POST', self.groups_endpoint + '/topic/create', json=json_data) + return ExternalCreateTopicResponse.model_validate(response) diff --git a/external/chat/schemas.py b/external/chat/schemas.py new file mode 100644 index 0000000..a06ea29 --- /dev/null +++ b/external/chat/schemas.py @@ -0,0 +1,32 @@ +from typing import Optional +from uuid import UUID + +from schemas.base import BaseSchema + + +# region Requests + +class ExternalCreateGroupRequest(BaseSchema): + title: str + + +class ExternalCreateTopicRequest(BaseSchema): + group_id: str + title: str + icon_emoji_id: Optional[int] = None + + +# endregion + +# region Responses + +class ExternalCreateGroupResponse(BaseSchema): + tg_group_id: int + group_id: UUID + tg_invite_link: str + + +class ExternalCreateTopicResponse(BaseSchema): + tg_topic_id: int + +# endregion diff --git a/external/kafka/__init__.py b/external/kafka/__init__.py new file mode 100644 index 0000000..f7e8c97 --- /dev/null +++ b/external/kafka/__init__.py @@ -0,0 +1,2 @@ +from .producer import producer +from .consumer import consume_messages diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py new file mode 100644 index 0000000..dd7c207 --- /dev/null +++ b/external/kafka/consumer.py @@ -0,0 +1,29 @@ +from aiokafka import AIOKafkaConsumer +from aiokafka.errors import KafkaConnectionError + +from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC +from backend.session import session_maker +from external.kafka.services.consumer_service import ConsumerService + +consumer = AIOKafkaConsumer( + KAFKA_CONSUMER_TOPIC, + bootstrap_servers=KAFKA_URL, +) + + +async def consume_messages(): + try: + await consumer.start() + except KafkaConnectionError as e: + print(e) + return + + async with session_maker() as session: + consumer_service = ConsumerService(session) + print("started consuming messages") + + try: + async for message in consumer: + await consumer_service.consume_message(message) + finally: + await consumer.stop() diff --git a/external/kafka/enums.py b/external/kafka/enums.py new file mode 100644 index 0000000..09b348b --- /dev/null +++ b/external/kafka/enums.py @@ -0,0 +1,7 @@ +import enum + + +class KafkaMessageType(enum.Enum): + SEND = 1 + EDIT = 2 + DELETE = 3 diff --git a/external/kafka/producer.py b/external/kafka/producer.py new file mode 100644 index 0000000..c600117 --- /dev/null +++ b/external/kafka/producer.py @@ -0,0 +1,5 @@ +from aiokafka import AIOKafkaProducer + +from backend.config import KAFKA_URL + +producer = AIOKafkaProducer(bootstrap_servers=KAFKA_URL) diff --git a/external/kafka/schemas/consumer.py b/external/kafka/schemas/consumer.py new file mode 100644 index 0000000..f92feeb --- /dev/null +++ b/external/kafka/schemas/consumer.py @@ -0,0 +1,44 @@ +from schemas.base import OkMessageSchema, BaseSchema + + +# region Entities + +class TelegramUserSchema(BaseSchema): + id: int + first_name: str + last_name: str + username: str + + +class MessageFromTelegramSchema(BaseSchema): + group_id: str + tg_topic_id: int + text: str + sender: TelegramUserSchema + + +# endregion + +# region Requests + +class MessageFromTelegramRequest(BaseSchema): + message: MessageFromTelegramSchema + + +# endregion + +# region Responses + +class BaseConnectorResponse(OkMessageSchema): + message_type: int + + +class SendMessageToConnectorResponse(BaseConnectorResponse): + message_id: int + tg_message_id: int + + +class DeleteMessageResponse(BaseConnectorResponse): + message_id: int + +# endregion diff --git a/external/kafka/schemas/producer.py b/external/kafka/schemas/producer.py new file mode 100644 index 0000000..60ea636 --- /dev/null +++ b/external/kafka/schemas/producer.py @@ -0,0 +1,36 @@ +from typing import Optional + +from schemas.base import BaseSchema + + +# region Entities + +class BaseMessageSchema(BaseSchema): + message_id: int + tg_message_id: Optional[int] = None + group_id: str + + +class MessageSchema(BaseMessageSchema): + text: str + topic_id: int + + +# endregion + +# region Requests + +class BaseConnectorRequest(BaseSchema): + message_type: int + app_auth_key: str + message: BaseMessageSchema + + +class SendMessageToConnectorRequest(BaseConnectorRequest): + message: MessageSchema + + +class SendMessageDeletingToConnectorRequest(BaseConnectorRequest): + pass + +# endregion diff --git a/external/kafka/services/consumer_service.py b/external/kafka/services/consumer_service.py new file mode 100644 index 0000000..e09be82 --- /dev/null +++ b/external/kafka/services/consumer_service.py @@ -0,0 +1,92 @@ +import pickle +from datetime import datetime +from typing import Optional +from uuid import UUID + +from aiokafka import ConsumerRecord +from sqlalchemy import select + +from external.kafka.enums import KafkaMessageType +from external.kafka.schemas.consumer import * +from models import Message, MessageStatus, TgUser, Chat, TgGroup +from services.base import BaseService + + +class ConsumerService(BaseService): + async def consume_message(self, message: ConsumerRecord): + value = pickle.loads(message.value) + + try: + if 'ok' in value: + """ Received response after sending message from crm """ + await self._process_connector_response(message) + else: + """ Received message from client """ + request = MessageFromTelegramRequest.model_validate(value) + await self._receive_message_from_client(request) + except Exception as e: + print(e) + + async def _process_connector_response(self, message: ConsumerRecord): + value = pickle.loads(message.value) + message_type = KafkaMessageType(value['message_type']) + + match message_type: + case KafkaMessageType.SEND: + response = SendMessageToConnectorResponse.model_validate(value) + await self._process_connector_send_response(response) + case KafkaMessageType.EDIT: + pass + case KafkaMessageType.DELETE: + response = DeleteMessageResponse.model_validate(value) + await self._process_connector_delete_response(response) + case _: + raise Exception('Unexpected message type in crm consumer') + + async def _process_connector_send_response(self, response: SendMessageToConnectorResponse): + message = await self.session.get(Message, response.message_id) + message.tg_message_id = response.tg_message_id + if response.ok: + message.status = MessageStatus.success + else: + message.status = MessageStatus.error + + await self.session.commit() + + async def _process_connector_delete_response(self, response: DeleteMessageResponse): + if not response.ok: + return + + message = await self.session.get(Message, response.message_id) + message.is_deleted = True + + async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]: + stmt = ( + select(Chat) + .join(TgGroup) + .where( + Chat.tg_topic_id == tg_topic_id, + TgGroup.id == UUID(group_id), + ) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def _receive_message_from_client(self, request: MessageFromTelegramRequest): + tg_sender: Optional[TgUser] = await self.session.get(TgUser, request.message.sender.id) + if not tg_sender: + tg_sender: TgUser = TgUser(**request.message.sender.model_dump()) + self.session.add(tg_sender) + + chat = await self._get_chat(request.message.group_id, request.message.tg_topic_id) + if not chat: + return + + message = Message( + text=request.message.text, + created_at=datetime.now(), + tg_sender_id=tg_sender.id, + chat_id=chat.id, + status=MessageStatus.success, + ) + self.session.add(message) + await self.session.commit() diff --git a/external/kafka/services/producer_service.py b/external/kafka/services/producer_service.py new file mode 100644 index 0000000..0866979 --- /dev/null +++ b/external/kafka/services/producer_service.py @@ -0,0 +1,60 @@ +import pickle +from uuid import UUID + +from aiohttp import ClientConnectorError + +from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY +from external.kafka import producer +from external.kafka.enums import KafkaMessageType +from external.kafka.schemas.producer import MessageSchema, SendMessageToConnectorRequest, \ + SendMessageDeletingToConnectorRequest, BaseMessageSchema +from services.base import BaseService + + +class ProducerService(BaseService): + @staticmethod + async def send_message_to_connector( + message_text: str, + group_id: UUID, + topic_id: int, + message_id: int, + ) -> tuple[bool, str]: + try: + request = SendMessageToConnectorRequest( + message=MessageSchema( + message_id=message_id, + text=message_text, + group_id=str(group_id), + topic_id=topic_id, + ), + message_type=KafkaMessageType.SEND, + app_auth_key=CHAT_CONNECTOR_API_KEY, + ) + await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) + except ClientConnectorError: + return False, 'Ошибка подключения к коннектору' + except Exception as e: + return False, str(e) + + return True, 'Сообщение отправлено' + + @staticmethod + async def send_message_deleting_to_connector(message_id: int, tg_message_id: int, group_id: UUID) -> tuple[bool, str]: + try: + request = SendMessageDeletingToConnectorRequest( + message_type=KafkaMessageType.DELETE, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=BaseMessageSchema( + message_id=message_id, + tg_message_id=tg_message_id, + group_id=str(group_id), + ), + ) + await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) + except ClientConnectorError: + return False, 'Ошибка подключения к коннектору' + except Exception as e: + return False, str(e) + + return True, 'Сообщение отправлено' + \ No newline at end of file diff --git a/main.py b/main.py index a969ddd..34239fb 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,37 @@ +import asyncio +import platform +from contextlib import asynccontextmanager + +from aiokafka.errors import KafkaConnectionError from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -import platform - from starlette.staticfiles import StaticFiles import routers from constants import API_ROOT +from external.kafka import producer, consume_messages origins = [ 'http://localhost:5173' ] -app = FastAPI(separate_input_output_schemas=False) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + try: + await producer.start() + except KafkaConnectionError as e: + print(e) + + consumer_task = asyncio.create_task(consume_messages()) + + yield + + await producer.stop() + consumer_task.cancel() + + +app = FastAPI(lifespan=lifespan, separate_input_output_schemas=False) if platform.system() == 'Linux': import uvicorn.workers @@ -58,6 +79,7 @@ routers_list = [ routers.board_router, routers.status_router, routers.card_tag_router, + routers.chat_router, ] for router in routers_list: app.include_router(router) diff --git a/models/__init__.py b/models/__init__.py index 1ee7bfa..235496e 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -23,5 +23,6 @@ from .card_group import * from .transaction import * from .residues import * from .shipping import * +from .chat import * configure_mappers() diff --git a/models/auth.py b/models/auth.py index c6c9da0..ddd79f9 100644 --- a/models/auth.py +++ b/models/auth.py @@ -10,7 +10,7 @@ from models.work_shifts import WorkShift if TYPE_CHECKING: from models.payroll import PayRate, PaymentRecord - from models import Card, CardEmployees + from models import Card, CardEmployees, Message role_permissions = Table( 'role_permissions', @@ -138,6 +138,12 @@ class User(BaseModel): lazy="noload", ) + messages: Mapped[list['Message']] = relationship( + 'Message', + back_populates='crm_sender', + lazy='noload', + ) + class InviteCode(BaseModel): __tablename__ = 'invite_codes' diff --git a/models/card.py b/models/card.py index 90f29e6..5a83446 100644 --- a/models/card.py +++ b/models/card.py @@ -12,8 +12,8 @@ from .shipping_warehouse import ShippingWarehouse if TYPE_CHECKING: from . import ( CardBillRequest, User, BaseModel, Board, CardStatus, CardGroup, CardAttribute, Client, CardTag, - CardService as CardServiceModel, CardProduct, - ) + CardService as CardServiceModel, CardProduct, Chat, +) class Card(BaseModel): @@ -114,6 +114,9 @@ class Card(BaseModel): # module employees employees: Mapped[list['CardEmployees']] = relationship(back_populates='card', lazy='selectin') + # module chat + chat: Mapped[Optional['Chat']] = relationship(back_populates='card', lazy='joined') + # endregion diff --git a/models/chat.py b/models/chat.py new file mode 100644 index 0000000..0fa5fa1 --- /dev/null +++ b/models/chat.py @@ -0,0 +1,154 @@ +import enum +from datetime import datetime +from typing import Optional, TYPE_CHECKING +from uuid import UUID + +from sqlalchemy import ForeignKey, BigInteger, Enum, Uuid +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from models import BaseModel, User + +if TYPE_CHECKING: + from models import Client, Card + + +class TgUser(BaseModel): + __tablename__ = 'tg_users' + + id: Mapped[int] = mapped_column( + BigInteger(), + primary_key=True, + comment='Telegram user ID', + ) + username: Mapped[str] = mapped_column( + index=True, + nullable=False, + unique=True, + ) + first_name: Mapped[str] = mapped_column(nullable=True) + last_name: Mapped[str] = mapped_column(nullable=True) + + messages: Mapped['Message'] = relationship( + 'Message', + lazy='noload', + back_populates='tg_sender', + ) + + +class TgGroup(BaseModel): + __tablename__ = 'tg_groups' + + id: Mapped[UUID] = mapped_column(Uuid, primary_key=True) + + tg_group_id: Mapped[int] = mapped_column( + BigInteger(), + nullable=False, + unique=True, + ) + tg_invite_link: Mapped[str] = mapped_column(nullable=False) + + client_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('clients.id'), + unique=True, + ) + client: Mapped[Optional['Client']] = relationship( + 'Client', + lazy='joined', + back_populates='tg_group', + ) + + chats: Mapped[list['Chat']] = relationship( + 'Chat', + lazy='noload', + back_populates='tg_group', + ) + + +class Chat(BaseModel): + __tablename__ = 'chats' + + id: Mapped[int] = mapped_column(primary_key=True) + + tg_topic_id: Mapped[int] = mapped_column(nullable=False) + + card_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('cards.id'), + unique=True, + ) + card: Mapped[Optional['Card']] = relationship( + 'Card', + lazy='joined', + back_populates='chat', + ) + + client_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('clients.id'), + unique=True, + ) + client: Mapped[Optional['Client']] = relationship( + 'Client', + lazy='joined', + back_populates='chat', + ) + + tg_group_id: Mapped[UUID] = mapped_column( + ForeignKey('tg_groups.id'), + nullable=False, + ) + tg_group: Mapped[TgGroup] = relationship( + 'TgGroup', + lazy='joined', + back_populates='chats', + ) + + messages: Mapped[list['Message']] = relationship( + 'Message', + lazy='selectin', + back_populates='chat', + order_by='Message.created_at.desc()', + ) + + +class MessageStatus(enum.Enum): + sending = 'SENDING' + success = 'SUCCESS' + error = 'ERROR' + + +class Message(BaseModel): + __tablename__ = 'messages' + + id: Mapped[int] = mapped_column(primary_key=True) + tg_message_id: Mapped[Optional[int]] = mapped_column(nullable=True) + + text: Mapped[str] = mapped_column(nullable=False) + created_at: Mapped[datetime] = mapped_column(nullable=False) + status: Mapped[MessageStatus] = mapped_column(Enum(MessageStatus), nullable=False) + is_deleted: Mapped[bool] = mapped_column(default=False, server_default='0', nullable=False) + + tg_sender_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('tg_users.id'), + nullable=True, + ) + tg_sender: Mapped[TgUser] = relationship( + 'TgUser', + lazy='selectin', + back_populates='messages', + ) + + crm_sender_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('users.id'), + nullable=True, + ) + crm_sender: Mapped[Optional['User']] = relationship( + 'User', + lazy='selectin', + back_populates='messages', + ) + + chat_id: Mapped[int] = mapped_column(ForeignKey('chats.id')) + chat: Mapped[Chat] = relationship( + 'Chat', + lazy='noload', + back_populates='messages', + ) diff --git a/models/client.py b/models/client.py index 66aff4b..74a8ad7 100644 --- a/models/client.py +++ b/models/client.py @@ -7,7 +7,7 @@ from sqlalchemy.orm import relationship, Mapped, mapped_column from models import BaseModel if TYPE_CHECKING: - from models import ResidualPallet, ResidualBox, Product, BarcodeTemplate, User + from models import ResidualPallet, ResidualBox, Product, BarcodeTemplate, User, TgGroup, Chat class Client(BaseModel): @@ -39,6 +39,9 @@ class Client(BaseModel): pallets: Mapped[list['ResidualPallet']] = relationship(back_populates='client', lazy='selectin') boxes: Mapped[list['ResidualBox']] = relationship(back_populates='client', lazy='selectin') + tg_group: Mapped[Optional['TgGroup']] = relationship('TgGroup', back_populates='client', lazy='joined') + chat: Mapped[Optional['Chat']] = relationship('Chat', back_populates='client', lazy='joined') + class ClientDetails(BaseModel): __tablename__ = 'client_details' diff --git a/requirements.txt b/requirements.txt index ba641fd..557d00a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ jinja2 # Security python-jose[cryptography] passlib[bcrypt] +pyjwt # Database sqlalchemy @@ -17,6 +18,9 @@ sqlalchemy[asyncio] asyncpg alembic +# Kafka +aiokafka + # Other python-dotenv aiohttp diff --git a/routers/__init__.py b/routers/__init__.py index b9a55a1..1829815 100644 --- a/routers/__init__.py +++ b/routers/__init__.py @@ -26,3 +26,4 @@ from .project import project_router from .board import board_router from .status import status_router from .card_tag import card_tag_router +from .chat import chat_router diff --git a/routers/chat.py b/routers/chat.py new file mode 100644 index 0000000..91ab6f5 --- /dev/null +++ b/routers/chat.py @@ -0,0 +1,73 @@ +from typing import Annotated + +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from backend.session import get_session +from schemas.chat import * +from services.chat import ChatService + +chat_router = APIRouter( + prefix='/chat', + tags=['chat'], +) + + +@chat_router.post( + '/message', + operation_id='send_message', + response_model=SendMessageResponse, +) +async def send_message( + session: Annotated[AsyncSession, Depends(get_session)], + request: SendMessageRequest, +): + return await ChatService(session).send_message(request) + + +@chat_router.delete( + '/message/{message_id}', + operation_id='delete_message', + response_model=DeleteMessageResponse, +) +async def delete_message( + session: Annotated[AsyncSession, Depends(get_session)], + message_id: int, +): + return await ChatService(session).delete_message(message_id) + + +@chat_router.post( + '/', + operation_id='get_chat', + response_model=GetChatResponse, +) +async def get_chat( + session: Annotated[AsyncSession, Depends(get_session)], + request: GetChatRequest, +): + return await ChatService(session).get_chat(request) + + +@chat_router.post( + '/create', + operation_id='create_chat', + response_model=CreateChatResponse, +) +async def create_chat( + session: Annotated[AsyncSession, Depends(get_session)], + request: CreateChatRequest, +): + return await ChatService(session).create_chat(request) + + +@chat_router.post( + '/messages', + operation_id='get_messages', + response_model=GetMessagesResponse, +) +async def get_messages( + session: Annotated[AsyncSession, Depends(get_session)], + request: GetMessagesRequest, +): + return await ChatService(session).get_messages(request) diff --git a/schemas/card.py b/schemas/card.py index 488c68a..569288d 100644 --- a/schemas/card.py +++ b/schemas/card.py @@ -8,6 +8,7 @@ from schemas.base import BaseSchema, OkMessageSchema from schemas.billing import CardBillRequestSchema from schemas.board import BoardSchema from schemas.card_tag import CardTagSchema +from schemas.chat import ChatSchema from schemas.client import ClientSchema from schemas.group import CardGroupSchema from schemas.marketplace import BaseMarketplaceSchema @@ -108,6 +109,7 @@ class BaseCardSchema(BaseSchema): class CardSchema(BaseCardSchema): attributes: list[CardAttributeSchema] + chat: Optional[ChatSchema] class CardGeneralInfoSchema(BaseSchemaWithAttributes): diff --git a/schemas/chat.py b/schemas/chat.py new file mode 100644 index 0000000..17bebec --- /dev/null +++ b/schemas/chat.py @@ -0,0 +1,91 @@ +from datetime import datetime +from typing import Optional + +from schemas.base import BaseSchema, OkMessageSchema + + +# region Entities + +class TgUserSchema(BaseSchema): + id: int + first_name: str + last_name: str + username: str + + +class MessageFromTgSchema(BaseSchema): + chat_id: int + text: str + sender: TgUserSchema + + +class BaseMessageSchema(BaseSchema): + text: str + chat_id: int + + +class MessageSchema(BaseMessageSchema): + id: int + created_at: datetime + tg_sender: Optional[TgUserSchema] + status: str + + +class TgGroupSchema(BaseSchema): + tg_group_id: int + tg_invite_link: str + + +class ChatSchema(BaseSchema): + id: int + client_id: Optional[int] + card_id: Optional[int] + tg_group: Optional[TgGroupSchema] + +# endregion + +# region Requests + +class SendMessageRequest(BaseSchema): + message: BaseMessageSchema + + +class GetChatRequest(BaseSchema): + client_id: int + card_id: Optional[int] + + +class CreateChatRequest(BaseSchema): + client_id: int + card_id: Optional[int] + + +class GetMessagesRequest(BaseSchema): + chat_id: int + offset: int + limit: int + +# endregion + +# region Responses + +class SendMessageResponse(OkMessageSchema): + pass + + +class DeleteMessageResponse(OkMessageSchema): + pass + + +class GetChatResponse(BaseSchema): + chat: Optional[ChatSchema] + + +class CreateChatResponse(OkMessageSchema): + pass + + +class GetMessagesResponse(BaseSchema): + messages: list[MessageSchema] + +# endregion diff --git a/schemas/client.py b/schemas/client.py index 6707d75..4d3c019 100644 --- a/schemas/client.py +++ b/schemas/client.py @@ -4,6 +4,7 @@ from pydantic import field_validator from schemas.barcode import BarcodeTemplateSchema from schemas.base import BaseSchema, OkMessageSchema +from schemas.chat import ChatSchema from schemas.residues import ResidualBoxSchema, ResidualPalletSchema @@ -26,6 +27,7 @@ class ClientSchema(BaseSchema): barcode_template: BarcodeTemplateSchema | None = None comment: Optional[str] = None details: ClientDetailsSchema | None = None + chat: ChatSchema | None = None class ClientDetailedSchema(ClientSchema): diff --git a/services/chat.py b/services/chat.py new file mode 100644 index 0000000..eb1b65a --- /dev/null +++ b/services/chat.py @@ -0,0 +1,209 @@ +from uuid import UUID + +from aiohttp import ClientConnectorError +from sqlalchemy import select +from sqlalchemy.orm import joinedload + +from backend.config import CHAT_CONNECTOR_API_KEY +from external.chat.chat_client import ChatClient +from external.chat.schemas import ExternalCreateGroupRequest, ExternalCreateTopicRequest +from external.kafka.services.producer_service import ProducerService +from models import Message, Chat, MessageStatus, TgGroup, Client, Card +from schemas.chat import * +from services.base import BaseService + + +class ChatService(BaseService): + async def _get_chat(self, client_id: int, card_id: Optional[int]) -> Optional[Chat]: + stmt = ( + select(Chat) + .where( + Chat.card_id == card_id, + TgGroup.client_id == client_id, + ) + .join(TgGroup) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def get_chat(self, request: GetChatRequest) -> GetChatResponse: + chat = await self._get_chat(request.client_id, request.card_id) + return GetChatResponse(chat=chat) + + async def _get_group(self, client_id: int) -> Optional[TgGroup]: + stmt = ( + select(TgGroup) + .where(TgGroup.client_id == client_id) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def _create_group_for_client(self, client_id: int, title: str) -> Optional[TgGroup]: + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + + request = ExternalCreateGroupRequest(title=title) + response = await chat_client.create_group(request) + + group = TgGroup( + id=response.group_id, + client_id=client_id, + tg_group_id=response.tg_group_id, + tg_invite_link=response.tg_invite_link, + ) + + chat = Chat( + tg_topic_id=1, + card_id=None, + client_id=client_id, + tg_group_id=response.group_id, + ) + self.session.add_all([group, chat]) + await self.session.commit() + + return group + + @staticmethod + def _get_title_for_client_chat(client: Client) -> str: + return f'[{client.id}] {client.name}' + + @staticmethod + def _get_title_for_card_chat(card: Card) -> str: + return f'[{card.id}] {card.name}' + + async def _create_topic_for_card( + self, + group_id: UUID, + card: Card, + ) -> Chat: + card_chat_icon_id = 5348227245599105972 # 💼 + + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + + request = ExternalCreateTopicRequest( + group_id=str(group_id), + title=self._get_title_for_card_chat(card), + icon_emoji_id=card_chat_icon_id, + ) + response = await chat_client.create_topic(request) + + chat = Chat( + tg_topic_id=response.tg_topic_id, + card_id=card.id, + client_id=None, + tg_group_id=group_id, + ) + self.session.add(chat) + await self.session.commit() + + return chat + + async def _create_chat_for_card(self, request: CreateChatRequest, card: Card, client: Client) -> CreateChatResponse: + existing_chat = await self._get_chat(request.client_id, request.card_id) + if existing_chat: + return CreateChatResponse( + ok=False, + message=f'Чат для клиента ID {request.client_id} и карточки ID {request.card_id} уже существует' + ) + + group: Optional[TgGroup] = await self._get_group(request.client_id) + if not group: + group: TgGroup = await self._create_group_for_client( + request.client_id, + self._get_title_for_client_chat(client) + ) + + await self._create_topic_for_card(group.id, card) + + return CreateChatResponse(ok=True, message='Чат успешно создан') + + async def _create_chat_for_client(self, request: CreateChatRequest, client: Client) -> CreateChatResponse: + existing_chat = await self._get_chat(client.id, request.card_id) + if existing_chat: + return CreateChatResponse(ok=False, message=f'Чат для клиента ID {client.id} уже существует') + + group = await self._get_group(client.id) + if group: + return CreateChatResponse(ok=False, message=f'Группа для клиента ID {client.id} уже существует') + + client_chat_title = self._get_title_for_client_chat(client) + await self._create_group_for_client(request.client_id, client_chat_title) + + return CreateChatResponse(ok=True, message='Чат успешно создан') + + async def create_chat(self, request: CreateChatRequest) -> CreateChatResponse: + client: Optional[Client] = await self.session.get(Client, request.client_id) + if not client: + return CreateChatResponse(ok=False, message=f'Клиент с ID {request.client_id} не найден') + + try: + if not request.card_id: + return await self._create_chat_for_client(request, client) + + card: Optional[Card] = await self.session.get(Card, request.card_id) + if not card: + return CreateChatResponse(ok=False, message=f'Карточка с ID {request.card_id} не найдена') + + return await self._create_chat_for_card(request, card, client) + except ClientConnectorError: + return CreateChatResponse(ok=False, message=f'Ошибка подключения к сервису') + except Exception as e: + return CreateChatResponse(ok=False, message=str(e)) + + async def get_messages(self, request: GetMessagesRequest) -> GetMessagesResponse: + stmt = ( + select(Message) + .where( + Message.chat_id == request.chat_id, + Message.is_deleted == False, + ) + .order_by(Message.created_at.desc()) + .offset(request.offset) + .limit(request.limit) + ) + messages = (await self.session.scalars(stmt)).all() + return GetMessagesResponse(messages=messages) + + async def send_message(self, request: SendMessageRequest) -> SendMessageResponse: + chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id) + if not chat: + return SendMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') + + message = Message( + text=request.message.text, + created_at=datetime.now(), + chat_id=request.message.chat_id, + status=MessageStatus.sending, + ) + self.session.add(message) + await self.session.commit() + + ok, message = await ProducerService.send_message_to_connector( + request.message.text, + chat.tg_group_id, + chat.tg_topic_id, + message.id, + ) + + return SendMessageResponse(ok=ok, message=message) + + async def _get_message_by_id(self, message_id: int) -> Optional[Message]: + stmt = ( + select(Message) + .where(Message.id == message_id) + .options(joinedload(Message.chat)) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def delete_message(self, message_id: int) -> DeleteMessageResponse: + message: Optional[Message] = await self._get_message_by_id(message_id) + if not message: + return DeleteMessageResponse(ok=False, message=f'Сообщение с ID: {message_id} не найдено') + + ok, msg = await ProducerService.send_message_deleting_to_connector( + message_id, + message.tg_message_id, + message.chat.tg_group_id, + ) + + message.is_deleted = True + await self.session.commit() + + return DeleteMessageResponse(ok=ok, message=msg) From 00522da68f229eba7ae80e7ed82378e90bc30d32 Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Wed, 2 Apr 2025 15:28:22 +0400 Subject: [PATCH 2/4] feat: sending and receiving messages with files, editing text messages --- backend/config.py | 2 + external/chat/chat_client.py | 21 +++- external/chat/schemas.py | 22 +++- external/kafka/consumer.py | 1 + external/kafka/schemas/consumer.py | 17 ++- external/kafka/schemas/producer.py | 9 +- external/kafka/services/consumer_service.py | 24 ++++- external/kafka/services/producer_service.py | 93 +++++++++------- models/chat.py | 24 +++++ requirements.txt | 1 + routers/chat.py | 58 +++++++++- schemas/chat.py | 48 +++++++-- services/chat.py | 111 ++++++++++++++++++-- 13 files changed, 361 insertions(+), 70 deletions(-) diff --git a/backend/config.py b/backend/config.py index 65cb7c3..0a472d8 100644 --- a/backend/config.py +++ b/backend/config.py @@ -19,9 +19,11 @@ S3_API_KEY = os.environ.get('S3_API_KEY') BILLING_API_KEY = os.environ.get('BILLING_API_KEY') +CHAT_TELEGRAM_BOT_TOKEN = os.environ.get('CHAT_TELEGRAM_BOT_TOKEN') CHAT_CONNECTOR_API_KEY = os.environ.get('CHAT_CONNECTOR_API_KEY') CHATS_SYNC_URL = os.environ.get('CHATS_SYNC_URL') +# Kafka KAFKA_URL = os.environ.get('KAFKA_URL') KAFKA_CONSUMER_TOPIC = os.environ.get('KAFKA_CONSUMER_TOPIC') KAFKA_PRODUCER_TOPIC = os.environ.get('KAFKA_PRODUCER_TOPIC') diff --git a/external/chat/chat_client.py b/external/chat/chat_client.py index b902ad8..53d8d99 100644 --- a/external/chat/chat_client.py +++ b/external/chat/chat_client.py @@ -1,5 +1,6 @@ import aiohttp import jwt +from fastapi import UploadFile from backend.config import CHATS_SYNC_URL, CHAT_CONNECTOR_API_KEY from external.chat.schemas import * @@ -22,7 +23,6 @@ class ChatClient: async def _method(self, http_method, method, **kwargs): async with aiohttp.ClientSession(headers=self.headers) as session: async with session.request(http_method, self.base_url + method, **kwargs) as response: - print(response) return await response.json() async def create_group(self, request: ExternalCreateGroupRequest) -> ExternalCreateGroupResponse: @@ -34,3 +34,22 @@ class ChatClient: json_data = request.model_dump() response = await self._method('POST', self.groups_endpoint + '/topic/create', json=json_data) return ExternalCreateTopicResponse.model_validate(response) + + async def send_messages_with_files( + self, + tg_group_id: str, + tg_topic_id: int, + caption: str, + files: list[UploadFile], + ) -> ExternalSendMessagesWithFilesResponse: + query_params = f'?tg_group_id={tg_group_id}&tg_topic_id={tg_topic_id}&caption={caption}' + + data = aiohttp.FormData(default_to_multipart=True) + + for file in files: + content = await file.read() + data.add_field('files', content, filename=file.filename, content_type=file.content_type) + + response = await self._method('POST', self.chats_sync_endpoint + '/send' + query_params, data=data) + + return ExternalSendMessagesWithFilesResponse.model_validate(response) diff --git a/external/chat/schemas.py b/external/chat/schemas.py index a06ea29..4d50c04 100644 --- a/external/chat/schemas.py +++ b/external/chat/schemas.py @@ -1,9 +1,25 @@ from typing import Optional from uuid import UUID -from schemas.base import BaseSchema +from schemas.base import BaseSchema, OkMessageSchema +# region Entities + +class ExternalSendFileSchema(BaseSchema): + buffer: bytes + file_name: str + file_size: int + + +class ExternalMessageFileSchema(BaseSchema): + file_path: str + type: str + file_name: str + file_size: int + +# endregion + # region Requests class ExternalCreateGroupRequest(BaseSchema): @@ -29,4 +45,8 @@ class ExternalCreateGroupResponse(BaseSchema): class ExternalCreateTopicResponse(BaseSchema): tg_topic_id: int + +class ExternalSendMessagesWithFilesResponse(OkMessageSchema): + files: list[ExternalMessageFileSchema] + # endregion diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py index dd7c207..96655fa 100644 --- a/external/kafka/consumer.py +++ b/external/kafka/consumer.py @@ -24,6 +24,7 @@ async def consume_messages(): try: async for message in consumer: + print("consume") await consumer_service.consume_message(message) finally: await consumer.stop() diff --git a/external/kafka/schemas/consumer.py b/external/kafka/schemas/consumer.py index f92feeb..96d5fca 100644 --- a/external/kafka/schemas/consumer.py +++ b/external/kafka/schemas/consumer.py @@ -1,3 +1,5 @@ +from typing import Optional + from schemas.base import OkMessageSchema, BaseSchema @@ -10,11 +12,19 @@ class TelegramUserSchema(BaseSchema): username: str +class MessageFileSchema(BaseSchema): + file_path: str + type: str + file_name: str + file_size: int + + class MessageFromTelegramSchema(BaseSchema): group_id: str tg_topic_id: int - text: str + text: Optional[str] sender: TelegramUserSchema + file: Optional[MessageFileSchema] # endregion @@ -41,4 +51,9 @@ class SendMessageToConnectorResponse(BaseConnectorResponse): class DeleteMessageResponse(BaseConnectorResponse): message_id: int + +class EditMessageResponse(BaseConnectorResponse): + message_id: int + text: str + # endregion diff --git a/external/kafka/schemas/producer.py b/external/kafka/schemas/producer.py index 60ea636..f3017a0 100644 --- a/external/kafka/schemas/producer.py +++ b/external/kafka/schemas/producer.py @@ -11,8 +11,11 @@ class BaseMessageSchema(BaseSchema): group_id: str -class MessageSchema(BaseMessageSchema): +class EditMessageSchema(BaseMessageSchema): text: str + + +class MessageSchema(EditMessageSchema): topic_id: int @@ -33,4 +36,8 @@ class SendMessageToConnectorRequest(BaseConnectorRequest): class SendMessageDeletingToConnectorRequest(BaseConnectorRequest): pass + +class SendMessageEditingToConnectorRequest(BaseConnectorRequest): + message: EditMessageSchema + # endregion diff --git a/external/kafka/services/consumer_service.py b/external/kafka/services/consumer_service.py index e09be82..3a49f33 100644 --- a/external/kafka/services/consumer_service.py +++ b/external/kafka/services/consumer_service.py @@ -1,6 +1,5 @@ import pickle from datetime import datetime -from typing import Optional from uuid import UUID from aiokafka import ConsumerRecord @@ -8,13 +7,14 @@ from sqlalchemy import select from external.kafka.enums import KafkaMessageType from external.kafka.schemas.consumer import * -from models import Message, MessageStatus, TgUser, Chat, TgGroup +from models import Message, MessageStatus, TgUser, Chat, TgGroup, MessageFile from services.base import BaseService class ConsumerService(BaseService): async def consume_message(self, message: ConsumerRecord): value = pickle.loads(message.value) + print("Consumer: received message: ", value) try: if 'ok' in value: @@ -36,7 +36,8 @@ class ConsumerService(BaseService): response = SendMessageToConnectorResponse.model_validate(value) await self._process_connector_send_response(response) case KafkaMessageType.EDIT: - pass + response = EditMessageResponse.model_validate(value) + await self._process_connector_edit_response(response) case KafkaMessageType.DELETE: response = DeleteMessageResponse.model_validate(value) await self._process_connector_delete_response(response) @@ -59,6 +60,16 @@ class ConsumerService(BaseService): message = await self.session.get(Message, response.message_id) message.is_deleted = True + await self.session.commit() + + async def _process_connector_edit_response(self, response: EditMessageResponse): + if not response.ok: + return + + message = await self.session.get(Message, response.message_id) + message.text = response.text + message.is_edited = True + await self.session.commit() async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]: stmt = ( @@ -81,12 +92,17 @@ class ConsumerService(BaseService): if not chat: return + file = None + if request.message.file: + file = MessageFile(**request.message.file.model_dump()) + message = Message( - text=request.message.text, + text=request.message.text if request.message.text else "", created_at=datetime.now(), tg_sender_id=tg_sender.id, chat_id=chat.id, status=MessageStatus.success, + file=file, ) self.session.add(message) await self.session.commit() diff --git a/external/kafka/services/producer_service.py b/external/kafka/services/producer_service.py index 0866979..e1e9c50 100644 --- a/external/kafka/services/producer_service.py +++ b/external/kafka/services/producer_service.py @@ -6,12 +6,21 @@ from aiohttp import ClientConnectorError from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY from external.kafka import producer from external.kafka.enums import KafkaMessageType -from external.kafka.schemas.producer import MessageSchema, SendMessageToConnectorRequest, \ - SendMessageDeletingToConnectorRequest, BaseMessageSchema +from external.kafka.schemas.producer import * from services.base import BaseService class ProducerService(BaseService): + @staticmethod + async def _send_message(request: BaseConnectorRequest): + try: + await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) + except ClientConnectorError: + return False, 'Ошибка подключения к коннектору' + except Exception as e: + return False, str(e) + return True, 'Сообщение отправлено' + @staticmethod async def send_message_to_connector( message_text: str, @@ -19,42 +28,50 @@ class ProducerService(BaseService): topic_id: int, message_id: int, ) -> tuple[bool, str]: - try: - request = SendMessageToConnectorRequest( - message=MessageSchema( - message_id=message_id, - text=message_text, - group_id=str(group_id), - topic_id=topic_id, - ), - message_type=KafkaMessageType.SEND, - app_auth_key=CHAT_CONNECTOR_API_KEY, - ) - await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) - except ClientConnectorError: - return False, 'Ошибка подключения к коннектору' - except Exception as e: - return False, str(e) - - return True, 'Сообщение отправлено' + request = SendMessageToConnectorRequest( + message_type=KafkaMessageType.SEND, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=MessageSchema( + message_id=message_id, + text=message_text, + group_id=str(group_id), + topic_id=topic_id, + ), + ) + return await ProducerService._send_message(request) @staticmethod - async def send_message_deleting_to_connector(message_id: int, tg_message_id: int, group_id: UUID) -> tuple[bool, str]: - try: - request = SendMessageDeletingToConnectorRequest( - message_type=KafkaMessageType.DELETE, - app_auth_key=CHAT_CONNECTOR_API_KEY, - message=BaseMessageSchema( - message_id=message_id, - tg_message_id=tg_message_id, - group_id=str(group_id), - ), + async def send_message_deleting_to_connector( + message_id: int, + tg_message_id: int, + group_id: UUID, + ) -> tuple[bool, str]: + request = SendMessageDeletingToConnectorRequest( + message_type=KafkaMessageType.DELETE, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=BaseMessageSchema( + message_id=message_id, + tg_message_id=tg_message_id, + group_id=str(group_id), + ), + ) + return await ProducerService._send_message(request) + + @staticmethod + async def send_message_editing_to_connector( + message_id: int, + tg_message_id: int, + group_id: UUID, + text: str, + ) -> tuple[bool, str]: + request = SendMessageEditingToConnectorRequest( + message_type=KafkaMessageType.EDIT, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=EditMessageSchema( + message_id=message_id, + tg_message_id=tg_message_id, + group_id=str(group_id), + text=text, ) - await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) - except ClientConnectorError: - return False, 'Ошибка подключения к коннектору' - except Exception as e: - return False, str(e) - - return True, 'Сообщение отправлено' - \ No newline at end of file + ) + return await ProducerService._send_message(request) diff --git a/models/chat.py b/models/chat.py index 0fa5fa1..826d36c 100644 --- a/models/chat.py +++ b/models/chat.py @@ -109,6 +109,23 @@ class Chat(BaseModel): ) +class MessageFile(BaseModel): + __tablename__ = 'message_files' + + id: Mapped[int] = mapped_column(primary_key=True) + file_path: Mapped[str] = mapped_column(nullable=False) + type: Mapped[Optional[str]] = mapped_column(nullable=True) + file_name: Mapped[str] = mapped_column(nullable=False) + file_size: Mapped[int] = mapped_column(BigInteger(), nullable=True, comment='Размер файла в байтах') + + message_id: Mapped[int] = mapped_column(ForeignKey('messages.id')) + message: Mapped['Message'] = relationship( + 'Message', + lazy='noload', + back_populates='file', + ) + + class MessageStatus(enum.Enum): sending = 'SENDING' success = 'SUCCESS' @@ -125,6 +142,7 @@ class Message(BaseModel): created_at: Mapped[datetime] = mapped_column(nullable=False) status: Mapped[MessageStatus] = mapped_column(Enum(MessageStatus), nullable=False) is_deleted: Mapped[bool] = mapped_column(default=False, server_default='0', nullable=False) + is_edited: Mapped[bool] = mapped_column(default=False, server_default='0', nullable=False) tg_sender_id: Mapped[Optional[int]] = mapped_column( ForeignKey('tg_users.id'), @@ -152,3 +170,9 @@ class Message(BaseModel): lazy='noload', back_populates='messages', ) + + file: Mapped[Optional[MessageFile]] = relationship( + 'MessageFile', + back_populates='message', + lazy='selectin', + ) diff --git a/requirements.txt b/requirements.txt index 557d00a..bbd8c3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,6 +25,7 @@ aiokafka python-dotenv aiohttp aiohttp[speedups] +requests openpyxl==3.0.10 lexorank-py celery[redis] diff --git a/routers/chat.py b/routers/chat.py index 91ab6f5..cea1981 100644 --- a/routers/chat.py +++ b/routers/chat.py @@ -1,6 +1,6 @@ from typing import Annotated -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, UploadFile from sqlalchemy.ext.asyncio import AsyncSession from backend.session import get_session @@ -15,16 +15,42 @@ chat_router = APIRouter( @chat_router.post( '/message', - operation_id='send_message', - response_model=SendMessageResponse, + operation_id='send_text_message', + response_model=SendTextMessageResponse, ) -async def send_message( +async def send_text_message( session: Annotated[AsyncSession, Depends(get_session)], - request: SendMessageRequest, + request: SendTextMessageRequest, ): return await ChatService(session).send_message(request) +@chat_router.post( + '/message/repeat', + operation_id='repeat_sending_text_message', + response_model=RepeatSendingTextMessageResponse, +) +async def repeat_sending_text_message( + session: Annotated[AsyncSession, Depends(get_session)], + request: RepeatSendingTextMessageRequest, +): + return await ChatService(session).repeat_sending_message(request) + + +@chat_router.post( + '/message/files', + operation_id='send_messages_with_files', + response_model=LoadMessagesResponse, +) +async def send_messages_with_files( + session: Annotated[AsyncSession, Depends(get_session)], + files: list[UploadFile], + chat_id: int, + caption: str, +): + return await ChatService(session).send_messages_with_files(files, chat_id, caption) + + @chat_router.delete( '/message/{message_id}', operation_id='delete_message', @@ -37,6 +63,18 @@ async def delete_message( return await ChatService(session).delete_message(message_id) +@chat_router.patch( + '/message', + operation_id='edit_message', + response_model=EditMessageResponse, +) +async def edit_message( + session: Annotated[AsyncSession, Depends(get_session)], + request: EditMessageRequest, +): + return await ChatService(session).edit_message(request) + + @chat_router.post( '/', operation_id='get_chat', @@ -71,3 +109,13 @@ async def get_messages( request: GetMessagesRequest, ): return await ChatService(session).get_messages(request) + + +@chat_router.get( + '/tg-file/{file_id}', +) +async def get_tg_file( + session: Annotated[AsyncSession, Depends(get_session)], + file_id: int, +): + return await ChatService(session).get_tg_file(file_id) diff --git a/schemas/chat.py b/schemas/chat.py index 17bebec..51b48d7 100644 --- a/schemas/chat.py +++ b/schemas/chat.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Optional +from typing import Optional, List from schemas.base import BaseSchema, OkMessageSchema @@ -13,10 +13,12 @@ class TgUserSchema(BaseSchema): username: str -class MessageFromTgSchema(BaseSchema): - chat_id: int - text: str - sender: TgUserSchema +class MessageFileSchema(BaseSchema): + id: int + file_path: str + type: str + file_name: str + file_size: int class BaseMessageSchema(BaseSchema): @@ -24,11 +26,21 @@ class BaseMessageSchema(BaseSchema): chat_id: int -class MessageSchema(BaseMessageSchema): +class EditMessageSchema(BaseMessageSchema): id: int + + +class MessageSchema(EditMessageSchema): created_at: datetime tg_sender: Optional[TgUserSchema] status: str + is_edited: bool + file: Optional[MessageFileSchema] = None + + +class RepeatSendingMessageSchema(BaseMessageSchema): + text: str + id: int class TgGroupSchema(BaseSchema): @@ -46,10 +58,18 @@ class ChatSchema(BaseSchema): # region Requests -class SendMessageRequest(BaseSchema): +class SendTextMessageRequest(BaseSchema): message: BaseMessageSchema +class RepeatSendingTextMessageRequest(BaseSchema): + message: RepeatSendingMessageSchema + + +class EditMessageRequest(BaseSchema): + message: EditMessageSchema + + class GetChatRequest(BaseSchema): client_id: int card_id: Optional[int] @@ -69,7 +89,15 @@ class GetMessagesRequest(BaseSchema): # region Responses -class SendMessageResponse(OkMessageSchema): +class SendTextMessageResponse(OkMessageSchema): + pass + + +class RepeatSendingTextMessageResponse(OkMessageSchema): + pass + + +class LoadMessagesResponse(OkMessageSchema): pass @@ -77,6 +105,10 @@ class DeleteMessageResponse(OkMessageSchema): pass +class EditMessageResponse(OkMessageSchema): + pass + + class GetChatResponse(BaseSchema): chat: Optional[ChatSchema] diff --git a/services/chat.py b/services/chat.py index eb1b65a..9392745 100644 --- a/services/chat.py +++ b/services/chat.py @@ -1,14 +1,15 @@ -from uuid import UUID - +import requests from aiohttp import ClientConnectorError +from fastapi import HTTPException, UploadFile from sqlalchemy import select from sqlalchemy.orm import joinedload +from starlette.responses import StreamingResponse -from backend.config import CHAT_CONNECTOR_API_KEY +from backend.config import CHAT_CONNECTOR_API_KEY, CHAT_TELEGRAM_BOT_TOKEN from external.chat.chat_client import ChatClient -from external.chat.schemas import ExternalCreateGroupRequest, ExternalCreateTopicRequest +from external.chat.schemas import * from external.kafka.services.producer_service import ProducerService -from models import Message, Chat, MessageStatus, TgGroup, Client, Card +from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile from schemas.chat import * from services.base import BaseService @@ -161,10 +162,10 @@ class ChatService(BaseService): messages = (await self.session.scalars(stmt)).all() return GetMessagesResponse(messages=messages) - async def send_message(self, request: SendMessageRequest) -> SendMessageResponse: + async def send_message(self, request: SendTextMessageRequest) -> SendTextMessageResponse: chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id) if not chat: - return SendMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') + return SendTextMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') message = Message( text=request.message.text, @@ -182,7 +183,67 @@ class ChatService(BaseService): message.id, ) - return SendMessageResponse(ok=ok, message=message) + return SendTextMessageResponse(ok=ok, message=message) + + async def repeat_sending_message( + self, + request: RepeatSendingTextMessageRequest, + ) -> RepeatSendingTextMessageResponse: + message: Optional[Message] = await self._get_message_by_id(request.message.id) + if not message: + return RepeatSendingTextMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено') + + ok, msg = await ProducerService.send_message_to_connector( + request.message.text, + message.chat.tg_group_id, + message.chat.tg_topic_id, + message.id, + ) + + if ok: + message.status = MessageStatus.sending + await self.session.commit() + + return RepeatSendingTextMessageResponse(ok=ok, message=msg) + + async def send_messages_with_files( + self, + files: list[UploadFile], + chat_id: int, + caption: str, + ) -> LoadMessagesResponse: + chat: Optional[Chat] = await self.session.get(Chat, chat_id) + if not chat: + return SendTextMessageResponse(ok=False, message=f'Чат с ID: {chat_id} не найден') + + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + response = await chat_client.send_messages_with_files( + str(chat.tg_group_id), + chat.tg_topic_id, + caption, + files + ) + + last_message = None + for file_schema in response.files: + file = MessageFile(**file_schema.model_dump()) + self.session.add(file) + + message = Message( + text='', + created_at=datetime.now(), + chat_id=chat_id, + status=MessageStatus.success, + file=file, + ) + last_message = message + self.session.add(message) + + if last_message: + last_message.text = caption + await self.session.commit() + + return LoadMessagesResponse(ok=response.ok, message=response.message) async def _get_message_by_id(self, message_id: int) -> Optional[Message]: stmt = ( @@ -203,7 +264,35 @@ class ChatService(BaseService): message.chat.tg_group_id, ) - message.is_deleted = True - await self.session.commit() - return DeleteMessageResponse(ok=ok, message=msg) + + async def edit_message(self, request: EditMessageRequest) -> EditMessageResponse: + message: Optional[Message] = await self._get_message_by_id(request.message.id) + if not message: + return EditMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено') + + ok, msg = await ProducerService.send_message_editing_to_connector( + message.id, + message.tg_message_id, + message.chat.tg_group_id, + request.message.text, + ) + + return EditMessageResponse(ok=ok, message=msg) + + async def get_tg_file(self, file_id: int) -> StreamingResponse: + file: Optional[MessageFile] = await self.session.get(MessageFile, file_id) + if not file: + raise HTTPException(status_code=404, detail=f'Файл с ID {file_id} не найден') + + url: str = f'https://api.telegram.org/file/bot{CHAT_TELEGRAM_BOT_TOKEN}/{file.file_path}' + print(f'URL = {url}') + + response = requests.get(url, stream=True) + + if response.status_code != 200: + raise HTTPException(status_code=response.status_code, detail="Error fetching file") + + content_type = response.headers.get("Content-Type", "application/octet-stream") + + return StreamingResponse(response.iter_content(chunk_size=8192), media_type=content_type) From a81e692957ce8213119cc1429165c9ecfbb8326a Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Sun, 6 Apr 2025 11:39:56 +0400 Subject: [PATCH 3/4] feat: logging on sending file error, refactoring --- external/chat/schemas.py | 7 +-- external/kafka/consumer.py | 2 - external/kafka/schemas/consumer.py | 7 +-- external/kafka/services/consumer_service.py | 1 - external/kafka/services/producer_service.py | 4 +- routers/chat.py | 8 ++- schemas/chat.py | 4 +- services/chat.py | 63 ++++++++++++--------- 8 files changed, 50 insertions(+), 46 deletions(-) diff --git a/external/chat/schemas.py b/external/chat/schemas.py index 4d50c04..b7c67d9 100644 --- a/external/chat/schemas.py +++ b/external/chat/schemas.py @@ -6,18 +6,13 @@ from schemas.base import BaseSchema, OkMessageSchema # region Entities -class ExternalSendFileSchema(BaseSchema): - buffer: bytes - file_name: str - file_size: int - - class ExternalMessageFileSchema(BaseSchema): file_path: str type: str file_name: str file_size: int + # endregion # region Requests diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py index 96655fa..dc57a61 100644 --- a/external/kafka/consumer.py +++ b/external/kafka/consumer.py @@ -20,11 +20,9 @@ async def consume_messages(): async with session_maker() as session: consumer_service = ConsumerService(session) - print("started consuming messages") try: async for message in consumer: - print("consume") await consumer_service.consume_message(message) finally: await consumer.stop() diff --git a/external/kafka/schemas/consumer.py b/external/kafka/schemas/consumer.py index 96d5fca..0ad7548 100644 --- a/external/kafka/schemas/consumer.py +++ b/external/kafka/schemas/consumer.py @@ -41,19 +41,18 @@ class MessageFromTelegramRequest(BaseSchema): class BaseConnectorResponse(OkMessageSchema): message_type: int + message_id: int class SendMessageToConnectorResponse(BaseConnectorResponse): - message_id: int - tg_message_id: int + tg_message_id: Optional[int] = None class DeleteMessageResponse(BaseConnectorResponse): - message_id: int + pass class EditMessageResponse(BaseConnectorResponse): - message_id: int text: str # endregion diff --git a/external/kafka/services/consumer_service.py b/external/kafka/services/consumer_service.py index 3a49f33..6bf3f0b 100644 --- a/external/kafka/services/consumer_service.py +++ b/external/kafka/services/consumer_service.py @@ -14,7 +14,6 @@ from services.base import BaseService class ConsumerService(BaseService): async def consume_message(self, message: ConsumerRecord): value = pickle.loads(message.value) - print("Consumer: received message: ", value) try: if 'ok' in value: diff --git a/external/kafka/services/producer_service.py b/external/kafka/services/producer_service.py index e1e9c50..0ef97d8 100644 --- a/external/kafka/services/producer_service.py +++ b/external/kafka/services/producer_service.py @@ -16,7 +16,7 @@ class ProducerService(BaseService): try: await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) except ClientConnectorError: - return False, 'Ошибка подключения к коннектору' + return False, 'Ошибка подключения к сервису' except Exception as e: return False, str(e) return True, 'Сообщение отправлено' @@ -72,6 +72,6 @@ class ProducerService(BaseService): tg_message_id=tg_message_id, group_id=str(group_id), text=text, - ) + ), ) return await ProducerService._send_message(request) diff --git a/routers/chat.py b/routers/chat.py index cea1981..97cad1d 100644 --- a/routers/chat.py +++ b/routers/chat.py @@ -4,7 +4,9 @@ from fastapi import APIRouter, Depends, UploadFile from sqlalchemy.ext.asyncio import AsyncSession from backend.session import get_session +from models import User from schemas.chat import * +from services.auth import get_current_user from services.chat import ChatService chat_router = APIRouter( @@ -20,9 +22,10 @@ chat_router = APIRouter( ) async def send_text_message( session: Annotated[AsyncSession, Depends(get_session)], + user: Annotated[User, Depends(get_current_user)], request: SendTextMessageRequest, ): - return await ChatService(session).send_message(request) + return await ChatService(session).send_message(request, user) @chat_router.post( @@ -44,11 +47,12 @@ async def repeat_sending_text_message( ) async def send_messages_with_files( session: Annotated[AsyncSession, Depends(get_session)], + user: Annotated[User, Depends(get_current_user)], files: list[UploadFile], chat_id: int, caption: str, ): - return await ChatService(session).send_messages_with_files(files, chat_id, caption) + return await ChatService(session).send_messages_with_files(files, chat_id, caption, user) @chat_router.delete( diff --git a/schemas/chat.py b/schemas/chat.py index 51b48d7..841d6f4 100644 --- a/schemas/chat.py +++ b/schemas/chat.py @@ -1,7 +1,8 @@ from datetime import datetime -from typing import Optional, List +from typing import Optional from schemas.base import BaseSchema, OkMessageSchema +from schemas.user import UserSchema # region Entities @@ -33,6 +34,7 @@ class EditMessageSchema(BaseMessageSchema): class MessageSchema(EditMessageSchema): created_at: datetime tg_sender: Optional[TgUserSchema] + crm_sender: Optional[UserSchema] status: str is_edited: bool file: Optional[MessageFileSchema] = None diff --git a/services/chat.py b/services/chat.py index 9392745..0ab3ae8 100644 --- a/services/chat.py +++ b/services/chat.py @@ -9,7 +9,7 @@ from backend.config import CHAT_CONNECTOR_API_KEY, CHAT_TELEGRAM_BOT_TOKEN from external.chat.chat_client import ChatClient from external.chat.schemas import * from external.kafka.services.producer_service import ProducerService -from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile +from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile, User from schemas.chat import * from services.base import BaseService @@ -162,7 +162,7 @@ class ChatService(BaseService): messages = (await self.session.scalars(stmt)).all() return GetMessagesResponse(messages=messages) - async def send_message(self, request: SendTextMessageRequest) -> SendTextMessageResponse: + async def send_message(self, request: SendTextMessageRequest, user: User) -> SendTextMessageResponse: chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id) if not chat: return SendTextMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') @@ -172,6 +172,7 @@ class ChatService(BaseService): created_at=datetime.now(), chat_id=request.message.chat_id, status=MessageStatus.sending, + crm_sender_id=user.id, ) self.session.add(message) await self.session.commit() @@ -211,39 +212,46 @@ class ChatService(BaseService): files: list[UploadFile], chat_id: int, caption: str, + user: User, ) -> LoadMessagesResponse: chat: Optional[Chat] = await self.session.get(Chat, chat_id) if not chat: return SendTextMessageResponse(ok=False, message=f'Чат с ID: {chat_id} не найден') - chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) - response = await chat_client.send_messages_with_files( - str(chat.tg_group_id), - chat.tg_topic_id, - caption, - files - ) - - last_message = None - for file_schema in response.files: - file = MessageFile(**file_schema.model_dump()) - self.session.add(file) - - message = Message( - text='', - created_at=datetime.now(), - chat_id=chat_id, - status=MessageStatus.success, - file=file, + try: + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + response = await chat_client.send_messages_with_files( + str(chat.tg_group_id), + chat.tg_topic_id, + caption, + files ) - last_message = message - self.session.add(message) - if last_message: - last_message.text = caption - await self.session.commit() + last_message = None + for file_schema in response.files: + file = MessageFile(**file_schema.model_dump()) + self.session.add(file) - return LoadMessagesResponse(ok=response.ok, message=response.message) + message = Message( + text='', + created_at=datetime.now(), + chat_id=chat_id, + status=MessageStatus.success, + file=file, + crm_sender_id=user.id, + ) + last_message = message + self.session.add(message) + + if last_message: + last_message.text = caption + await self.session.commit() + + return LoadMessagesResponse(ok=response.ok, message=response.message) + except ClientConnectorError: + return LoadMessagesResponse(ok=False, message='Ошибка подключения к сервису') + except Exception as e: + return LoadMessagesResponse(ok=False, message=str(e)) async def _get_message_by_id(self, message_id: int) -> Optional[Message]: stmt = ( @@ -286,7 +294,6 @@ class ChatService(BaseService): raise HTTPException(status_code=404, detail=f'Файл с ID {file_id} не найден') url: str = f'https://api.telegram.org/file/bot{CHAT_TELEGRAM_BOT_TOKEN}/{file.file_path}' - print(f'URL = {url}') response = requests.get(url, stream=True) From 498ab093f31091a9e771c7796295bd5a083bb12e Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Wed, 9 Apr 2025 16:29:39 +0400 Subject: [PATCH 4/4] feat: context kafka --- .gitignore | 3 ++- constants.py | 2 ++ external/kafka/consumer.py | 3 +++ external/kafka/context.py | 11 +++++++++++ external/kafka/producer.py | 7 ++++++- 5 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 external/kafka/context.py diff --git a/.gitignore b/.gitignore index df5f857..9309ed2 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ __pycache__ /venv /test -/test/* \ No newline at end of file +/test/* +certs diff --git a/constants.py b/constants.py index f1f3da7..f32ae77 100644 --- a/constants.py +++ b/constants.py @@ -17,6 +17,8 @@ API_ROOT = "/api" APP_PATH = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(__file__) +KAFKA_CERTS_PATH = os.path.join(APP_PATH, "certs") + allowed_telegram_ids = [ 7532624817, # Me 355308397, # SerGey diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py index dc57a61..9b0f705 100644 --- a/external/kafka/consumer.py +++ b/external/kafka/consumer.py @@ -3,11 +3,14 @@ from aiokafka.errors import KafkaConnectionError from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC from backend.session import session_maker +from external.kafka.context import context from external.kafka.services.consumer_service import ConsumerService consumer = AIOKafkaConsumer( KAFKA_CONSUMER_TOPIC, bootstrap_servers=KAFKA_URL, + security_protocol='SSL', + ssl_context=context, ) diff --git a/external/kafka/context.py b/external/kafka/context.py new file mode 100644 index 0000000..4be481b --- /dev/null +++ b/external/kafka/context.py @@ -0,0 +1,11 @@ +from pathlib import Path + +from aiokafka.helpers import create_ssl_context + +from constants import KAFKA_CERTS_PATH + +context = create_ssl_context( + cafile=KAFKA_CERTS_PATH / Path('ca-cert'), + certfile=KAFKA_CERTS_PATH / Path('cert-signed'), + keyfile=KAFKA_CERTS_PATH / Path('cert-key'), +) diff --git a/external/kafka/producer.py b/external/kafka/producer.py index c600117..6f56cdb 100644 --- a/external/kafka/producer.py +++ b/external/kafka/producer.py @@ -1,5 +1,10 @@ from aiokafka import AIOKafkaProducer from backend.config import KAFKA_URL +from external.kafka.context import context -producer = AIOKafkaProducer(bootstrap_servers=KAFKA_URL) +producer = AIOKafkaProducer( + bootstrap_servers=KAFKA_URL, + security_protocol='SSL', + ssl_context=context, +)