From 00522da68f229eba7ae80e7ed82378e90bc30d32 Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Wed, 2 Apr 2025 15:28:22 +0400 Subject: [PATCH] feat: sending and receiving messages with files, editing text messages --- backend/config.py | 2 + external/chat/chat_client.py | 21 +++- external/chat/schemas.py | 22 +++- external/kafka/consumer.py | 1 + external/kafka/schemas/consumer.py | 17 ++- external/kafka/schemas/producer.py | 9 +- external/kafka/services/consumer_service.py | 24 ++++- external/kafka/services/producer_service.py | 93 +++++++++------- models/chat.py | 24 +++++ requirements.txt | 1 + routers/chat.py | 58 +++++++++- schemas/chat.py | 48 +++++++-- services/chat.py | 111 ++++++++++++++++++-- 13 files changed, 361 insertions(+), 70 deletions(-) diff --git a/backend/config.py b/backend/config.py index 65cb7c3..0a472d8 100644 --- a/backend/config.py +++ b/backend/config.py @@ -19,9 +19,11 @@ S3_API_KEY = os.environ.get('S3_API_KEY') BILLING_API_KEY = os.environ.get('BILLING_API_KEY') +CHAT_TELEGRAM_BOT_TOKEN = os.environ.get('CHAT_TELEGRAM_BOT_TOKEN') CHAT_CONNECTOR_API_KEY = os.environ.get('CHAT_CONNECTOR_API_KEY') CHATS_SYNC_URL = os.environ.get('CHATS_SYNC_URL') +# Kafka KAFKA_URL = os.environ.get('KAFKA_URL') KAFKA_CONSUMER_TOPIC = os.environ.get('KAFKA_CONSUMER_TOPIC') KAFKA_PRODUCER_TOPIC = os.environ.get('KAFKA_PRODUCER_TOPIC') diff --git a/external/chat/chat_client.py b/external/chat/chat_client.py index b902ad8..53d8d99 100644 --- a/external/chat/chat_client.py +++ b/external/chat/chat_client.py @@ -1,5 +1,6 @@ import aiohttp import jwt +from fastapi import UploadFile from backend.config import CHATS_SYNC_URL, CHAT_CONNECTOR_API_KEY from external.chat.schemas import * @@ -22,7 +23,6 @@ class ChatClient: async def _method(self, http_method, method, **kwargs): async with aiohttp.ClientSession(headers=self.headers) as session: async with session.request(http_method, self.base_url + method, **kwargs) as response: - print(response) return await response.json() async def create_group(self, request: ExternalCreateGroupRequest) -> ExternalCreateGroupResponse: @@ -34,3 +34,22 @@ class ChatClient: json_data = request.model_dump() response = await self._method('POST', self.groups_endpoint + '/topic/create', json=json_data) return ExternalCreateTopicResponse.model_validate(response) + + async def send_messages_with_files( + self, + tg_group_id: str, + tg_topic_id: int, + caption: str, + files: list[UploadFile], + ) -> ExternalSendMessagesWithFilesResponse: + query_params = f'?tg_group_id={tg_group_id}&tg_topic_id={tg_topic_id}&caption={caption}' + + data = aiohttp.FormData(default_to_multipart=True) + + for file in files: + content = await file.read() + data.add_field('files', content, filename=file.filename, content_type=file.content_type) + + response = await self._method('POST', self.chats_sync_endpoint + '/send' + query_params, data=data) + + return ExternalSendMessagesWithFilesResponse.model_validate(response) diff --git a/external/chat/schemas.py b/external/chat/schemas.py index a06ea29..4d50c04 100644 --- a/external/chat/schemas.py +++ b/external/chat/schemas.py @@ -1,9 +1,25 @@ from typing import Optional from uuid import UUID -from schemas.base import BaseSchema +from schemas.base import BaseSchema, OkMessageSchema +# region Entities + +class ExternalSendFileSchema(BaseSchema): + buffer: bytes + file_name: str + file_size: int + + +class ExternalMessageFileSchema(BaseSchema): + file_path: str + type: str + file_name: str + file_size: int + +# endregion + # region Requests class ExternalCreateGroupRequest(BaseSchema): @@ -29,4 +45,8 @@ class ExternalCreateGroupResponse(BaseSchema): class ExternalCreateTopicResponse(BaseSchema): tg_topic_id: int + +class ExternalSendMessagesWithFilesResponse(OkMessageSchema): + files: list[ExternalMessageFileSchema] + # endregion diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py index dd7c207..96655fa 100644 --- a/external/kafka/consumer.py +++ b/external/kafka/consumer.py @@ -24,6 +24,7 @@ async def consume_messages(): try: async for message in consumer: + print("consume") await consumer_service.consume_message(message) finally: await consumer.stop() diff --git a/external/kafka/schemas/consumer.py b/external/kafka/schemas/consumer.py index f92feeb..96d5fca 100644 --- a/external/kafka/schemas/consumer.py +++ b/external/kafka/schemas/consumer.py @@ -1,3 +1,5 @@ +from typing import Optional + from schemas.base import OkMessageSchema, BaseSchema @@ -10,11 +12,19 @@ class TelegramUserSchema(BaseSchema): username: str +class MessageFileSchema(BaseSchema): + file_path: str + type: str + file_name: str + file_size: int + + class MessageFromTelegramSchema(BaseSchema): group_id: str tg_topic_id: int - text: str + text: Optional[str] sender: TelegramUserSchema + file: Optional[MessageFileSchema] # endregion @@ -41,4 +51,9 @@ class SendMessageToConnectorResponse(BaseConnectorResponse): class DeleteMessageResponse(BaseConnectorResponse): message_id: int + +class EditMessageResponse(BaseConnectorResponse): + message_id: int + text: str + # endregion diff --git a/external/kafka/schemas/producer.py b/external/kafka/schemas/producer.py index 60ea636..f3017a0 100644 --- a/external/kafka/schemas/producer.py +++ b/external/kafka/schemas/producer.py @@ -11,8 +11,11 @@ class BaseMessageSchema(BaseSchema): group_id: str -class MessageSchema(BaseMessageSchema): +class EditMessageSchema(BaseMessageSchema): text: str + + +class MessageSchema(EditMessageSchema): topic_id: int @@ -33,4 +36,8 @@ class SendMessageToConnectorRequest(BaseConnectorRequest): class SendMessageDeletingToConnectorRequest(BaseConnectorRequest): pass + +class SendMessageEditingToConnectorRequest(BaseConnectorRequest): + message: EditMessageSchema + # endregion diff --git a/external/kafka/services/consumer_service.py b/external/kafka/services/consumer_service.py index e09be82..3a49f33 100644 --- a/external/kafka/services/consumer_service.py +++ b/external/kafka/services/consumer_service.py @@ -1,6 +1,5 @@ import pickle from datetime import datetime -from typing import Optional from uuid import UUID from aiokafka import ConsumerRecord @@ -8,13 +7,14 @@ from sqlalchemy import select from external.kafka.enums import KafkaMessageType from external.kafka.schemas.consumer import * -from models import Message, MessageStatus, TgUser, Chat, TgGroup +from models import Message, MessageStatus, TgUser, Chat, TgGroup, MessageFile from services.base import BaseService class ConsumerService(BaseService): async def consume_message(self, message: ConsumerRecord): value = pickle.loads(message.value) + print("Consumer: received message: ", value) try: if 'ok' in value: @@ -36,7 +36,8 @@ class ConsumerService(BaseService): response = SendMessageToConnectorResponse.model_validate(value) await self._process_connector_send_response(response) case KafkaMessageType.EDIT: - pass + response = EditMessageResponse.model_validate(value) + await self._process_connector_edit_response(response) case KafkaMessageType.DELETE: response = DeleteMessageResponse.model_validate(value) await self._process_connector_delete_response(response) @@ -59,6 +60,16 @@ class ConsumerService(BaseService): message = await self.session.get(Message, response.message_id) message.is_deleted = True + await self.session.commit() + + async def _process_connector_edit_response(self, response: EditMessageResponse): + if not response.ok: + return + + message = await self.session.get(Message, response.message_id) + message.text = response.text + message.is_edited = True + await self.session.commit() async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]: stmt = ( @@ -81,12 +92,17 @@ class ConsumerService(BaseService): if not chat: return + file = None + if request.message.file: + file = MessageFile(**request.message.file.model_dump()) + message = Message( - text=request.message.text, + text=request.message.text if request.message.text else "", created_at=datetime.now(), tg_sender_id=tg_sender.id, chat_id=chat.id, status=MessageStatus.success, + file=file, ) self.session.add(message) await self.session.commit() diff --git a/external/kafka/services/producer_service.py b/external/kafka/services/producer_service.py index 0866979..e1e9c50 100644 --- a/external/kafka/services/producer_service.py +++ b/external/kafka/services/producer_service.py @@ -6,12 +6,21 @@ from aiohttp import ClientConnectorError from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY from external.kafka import producer from external.kafka.enums import KafkaMessageType -from external.kafka.schemas.producer import MessageSchema, SendMessageToConnectorRequest, \ - SendMessageDeletingToConnectorRequest, BaseMessageSchema +from external.kafka.schemas.producer import * from services.base import BaseService class ProducerService(BaseService): + @staticmethod + async def _send_message(request: BaseConnectorRequest): + try: + await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) + except ClientConnectorError: + return False, 'Ошибка подключения к коннектору' + except Exception as e: + return False, str(e) + return True, 'Сообщение отправлено' + @staticmethod async def send_message_to_connector( message_text: str, @@ -19,42 +28,50 @@ class ProducerService(BaseService): topic_id: int, message_id: int, ) -> tuple[bool, str]: - try: - request = SendMessageToConnectorRequest( - message=MessageSchema( - message_id=message_id, - text=message_text, - group_id=str(group_id), - topic_id=topic_id, - ), - message_type=KafkaMessageType.SEND, - app_auth_key=CHAT_CONNECTOR_API_KEY, - ) - await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) - except ClientConnectorError: - return False, 'Ошибка подключения к коннектору' - except Exception as e: - return False, str(e) - - return True, 'Сообщение отправлено' + request = SendMessageToConnectorRequest( + message_type=KafkaMessageType.SEND, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=MessageSchema( + message_id=message_id, + text=message_text, + group_id=str(group_id), + topic_id=topic_id, + ), + ) + return await ProducerService._send_message(request) @staticmethod - async def send_message_deleting_to_connector(message_id: int, tg_message_id: int, group_id: UUID) -> tuple[bool, str]: - try: - request = SendMessageDeletingToConnectorRequest( - message_type=KafkaMessageType.DELETE, - app_auth_key=CHAT_CONNECTOR_API_KEY, - message=BaseMessageSchema( - message_id=message_id, - tg_message_id=tg_message_id, - group_id=str(group_id), - ), + async def send_message_deleting_to_connector( + message_id: int, + tg_message_id: int, + group_id: UUID, + ) -> tuple[bool, str]: + request = SendMessageDeletingToConnectorRequest( + message_type=KafkaMessageType.DELETE, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=BaseMessageSchema( + message_id=message_id, + tg_message_id=tg_message_id, + group_id=str(group_id), + ), + ) + return await ProducerService._send_message(request) + + @staticmethod + async def send_message_editing_to_connector( + message_id: int, + tg_message_id: int, + group_id: UUID, + text: str, + ) -> tuple[bool, str]: + request = SendMessageEditingToConnectorRequest( + message_type=KafkaMessageType.EDIT, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=EditMessageSchema( + message_id=message_id, + tg_message_id=tg_message_id, + group_id=str(group_id), + text=text, ) - await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) - except ClientConnectorError: - return False, 'Ошибка подключения к коннектору' - except Exception as e: - return False, str(e) - - return True, 'Сообщение отправлено' - \ No newline at end of file + ) + return await ProducerService._send_message(request) diff --git a/models/chat.py b/models/chat.py index 0fa5fa1..826d36c 100644 --- a/models/chat.py +++ b/models/chat.py @@ -109,6 +109,23 @@ class Chat(BaseModel): ) +class MessageFile(BaseModel): + __tablename__ = 'message_files' + + id: Mapped[int] = mapped_column(primary_key=True) + file_path: Mapped[str] = mapped_column(nullable=False) + type: Mapped[Optional[str]] = mapped_column(nullable=True) + file_name: Mapped[str] = mapped_column(nullable=False) + file_size: Mapped[int] = mapped_column(BigInteger(), nullable=True, comment='Размер файла в байтах') + + message_id: Mapped[int] = mapped_column(ForeignKey('messages.id')) + message: Mapped['Message'] = relationship( + 'Message', + lazy='noload', + back_populates='file', + ) + + class MessageStatus(enum.Enum): sending = 'SENDING' success = 'SUCCESS' @@ -125,6 +142,7 @@ class Message(BaseModel): created_at: Mapped[datetime] = mapped_column(nullable=False) status: Mapped[MessageStatus] = mapped_column(Enum(MessageStatus), nullable=False) is_deleted: Mapped[bool] = mapped_column(default=False, server_default='0', nullable=False) + is_edited: Mapped[bool] = mapped_column(default=False, server_default='0', nullable=False) tg_sender_id: Mapped[Optional[int]] = mapped_column( ForeignKey('tg_users.id'), @@ -152,3 +170,9 @@ class Message(BaseModel): lazy='noload', back_populates='messages', ) + + file: Mapped[Optional[MessageFile]] = relationship( + 'MessageFile', + back_populates='message', + lazy='selectin', + ) diff --git a/requirements.txt b/requirements.txt index 557d00a..bbd8c3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,6 +25,7 @@ aiokafka python-dotenv aiohttp aiohttp[speedups] +requests openpyxl==3.0.10 lexorank-py celery[redis] diff --git a/routers/chat.py b/routers/chat.py index 91ab6f5..cea1981 100644 --- a/routers/chat.py +++ b/routers/chat.py @@ -1,6 +1,6 @@ from typing import Annotated -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, UploadFile from sqlalchemy.ext.asyncio import AsyncSession from backend.session import get_session @@ -15,16 +15,42 @@ chat_router = APIRouter( @chat_router.post( '/message', - operation_id='send_message', - response_model=SendMessageResponse, + operation_id='send_text_message', + response_model=SendTextMessageResponse, ) -async def send_message( +async def send_text_message( session: Annotated[AsyncSession, Depends(get_session)], - request: SendMessageRequest, + request: SendTextMessageRequest, ): return await ChatService(session).send_message(request) +@chat_router.post( + '/message/repeat', + operation_id='repeat_sending_text_message', + response_model=RepeatSendingTextMessageResponse, +) +async def repeat_sending_text_message( + session: Annotated[AsyncSession, Depends(get_session)], + request: RepeatSendingTextMessageRequest, +): + return await ChatService(session).repeat_sending_message(request) + + +@chat_router.post( + '/message/files', + operation_id='send_messages_with_files', + response_model=LoadMessagesResponse, +) +async def send_messages_with_files( + session: Annotated[AsyncSession, Depends(get_session)], + files: list[UploadFile], + chat_id: int, + caption: str, +): + return await ChatService(session).send_messages_with_files(files, chat_id, caption) + + @chat_router.delete( '/message/{message_id}', operation_id='delete_message', @@ -37,6 +63,18 @@ async def delete_message( return await ChatService(session).delete_message(message_id) +@chat_router.patch( + '/message', + operation_id='edit_message', + response_model=EditMessageResponse, +) +async def edit_message( + session: Annotated[AsyncSession, Depends(get_session)], + request: EditMessageRequest, +): + return await ChatService(session).edit_message(request) + + @chat_router.post( '/', operation_id='get_chat', @@ -71,3 +109,13 @@ async def get_messages( request: GetMessagesRequest, ): return await ChatService(session).get_messages(request) + + +@chat_router.get( + '/tg-file/{file_id}', +) +async def get_tg_file( + session: Annotated[AsyncSession, Depends(get_session)], + file_id: int, +): + return await ChatService(session).get_tg_file(file_id) diff --git a/schemas/chat.py b/schemas/chat.py index 17bebec..51b48d7 100644 --- a/schemas/chat.py +++ b/schemas/chat.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Optional +from typing import Optional, List from schemas.base import BaseSchema, OkMessageSchema @@ -13,10 +13,12 @@ class TgUserSchema(BaseSchema): username: str -class MessageFromTgSchema(BaseSchema): - chat_id: int - text: str - sender: TgUserSchema +class MessageFileSchema(BaseSchema): + id: int + file_path: str + type: str + file_name: str + file_size: int class BaseMessageSchema(BaseSchema): @@ -24,11 +26,21 @@ class BaseMessageSchema(BaseSchema): chat_id: int -class MessageSchema(BaseMessageSchema): +class EditMessageSchema(BaseMessageSchema): id: int + + +class MessageSchema(EditMessageSchema): created_at: datetime tg_sender: Optional[TgUserSchema] status: str + is_edited: bool + file: Optional[MessageFileSchema] = None + + +class RepeatSendingMessageSchema(BaseMessageSchema): + text: str + id: int class TgGroupSchema(BaseSchema): @@ -46,10 +58,18 @@ class ChatSchema(BaseSchema): # region Requests -class SendMessageRequest(BaseSchema): +class SendTextMessageRequest(BaseSchema): message: BaseMessageSchema +class RepeatSendingTextMessageRequest(BaseSchema): + message: RepeatSendingMessageSchema + + +class EditMessageRequest(BaseSchema): + message: EditMessageSchema + + class GetChatRequest(BaseSchema): client_id: int card_id: Optional[int] @@ -69,7 +89,15 @@ class GetMessagesRequest(BaseSchema): # region Responses -class SendMessageResponse(OkMessageSchema): +class SendTextMessageResponse(OkMessageSchema): + pass + + +class RepeatSendingTextMessageResponse(OkMessageSchema): + pass + + +class LoadMessagesResponse(OkMessageSchema): pass @@ -77,6 +105,10 @@ class DeleteMessageResponse(OkMessageSchema): pass +class EditMessageResponse(OkMessageSchema): + pass + + class GetChatResponse(BaseSchema): chat: Optional[ChatSchema] diff --git a/services/chat.py b/services/chat.py index eb1b65a..9392745 100644 --- a/services/chat.py +++ b/services/chat.py @@ -1,14 +1,15 @@ -from uuid import UUID - +import requests from aiohttp import ClientConnectorError +from fastapi import HTTPException, UploadFile from sqlalchemy import select from sqlalchemy.orm import joinedload +from starlette.responses import StreamingResponse -from backend.config import CHAT_CONNECTOR_API_KEY +from backend.config import CHAT_CONNECTOR_API_KEY, CHAT_TELEGRAM_BOT_TOKEN from external.chat.chat_client import ChatClient -from external.chat.schemas import ExternalCreateGroupRequest, ExternalCreateTopicRequest +from external.chat.schemas import * from external.kafka.services.producer_service import ProducerService -from models import Message, Chat, MessageStatus, TgGroup, Client, Card +from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile from schemas.chat import * from services.base import BaseService @@ -161,10 +162,10 @@ class ChatService(BaseService): messages = (await self.session.scalars(stmt)).all() return GetMessagesResponse(messages=messages) - async def send_message(self, request: SendMessageRequest) -> SendMessageResponse: + async def send_message(self, request: SendTextMessageRequest) -> SendTextMessageResponse: chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id) if not chat: - return SendMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') + return SendTextMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') message = Message( text=request.message.text, @@ -182,7 +183,67 @@ class ChatService(BaseService): message.id, ) - return SendMessageResponse(ok=ok, message=message) + return SendTextMessageResponse(ok=ok, message=message) + + async def repeat_sending_message( + self, + request: RepeatSendingTextMessageRequest, + ) -> RepeatSendingTextMessageResponse: + message: Optional[Message] = await self._get_message_by_id(request.message.id) + if not message: + return RepeatSendingTextMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено') + + ok, msg = await ProducerService.send_message_to_connector( + request.message.text, + message.chat.tg_group_id, + message.chat.tg_topic_id, + message.id, + ) + + if ok: + message.status = MessageStatus.sending + await self.session.commit() + + return RepeatSendingTextMessageResponse(ok=ok, message=msg) + + async def send_messages_with_files( + self, + files: list[UploadFile], + chat_id: int, + caption: str, + ) -> LoadMessagesResponse: + chat: Optional[Chat] = await self.session.get(Chat, chat_id) + if not chat: + return SendTextMessageResponse(ok=False, message=f'Чат с ID: {chat_id} не найден') + + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + response = await chat_client.send_messages_with_files( + str(chat.tg_group_id), + chat.tg_topic_id, + caption, + files + ) + + last_message = None + for file_schema in response.files: + file = MessageFile(**file_schema.model_dump()) + self.session.add(file) + + message = Message( + text='', + created_at=datetime.now(), + chat_id=chat_id, + status=MessageStatus.success, + file=file, + ) + last_message = message + self.session.add(message) + + if last_message: + last_message.text = caption + await self.session.commit() + + return LoadMessagesResponse(ok=response.ok, message=response.message) async def _get_message_by_id(self, message_id: int) -> Optional[Message]: stmt = ( @@ -203,7 +264,35 @@ class ChatService(BaseService): message.chat.tg_group_id, ) - message.is_deleted = True - await self.session.commit() - return DeleteMessageResponse(ok=ok, message=msg) + + async def edit_message(self, request: EditMessageRequest) -> EditMessageResponse: + message: Optional[Message] = await self._get_message_by_id(request.message.id) + if not message: + return EditMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено') + + ok, msg = await ProducerService.send_message_editing_to_connector( + message.id, + message.tg_message_id, + message.chat.tg_group_id, + request.message.text, + ) + + return EditMessageResponse(ok=ok, message=msg) + + async def get_tg_file(self, file_id: int) -> StreamingResponse: + file: Optional[MessageFile] = await self.session.get(MessageFile, file_id) + if not file: + raise HTTPException(status_code=404, detail=f'Файл с ID {file_id} не найден') + + url: str = f'https://api.telegram.org/file/bot{CHAT_TELEGRAM_BOT_TOKEN}/{file.file_path}' + print(f'URL = {url}') + + response = requests.get(url, stream=True) + + if response.status_code != 200: + raise HTTPException(status_code=response.status_code, detail="Error fetching file") + + content_type = response.headers.get("Content-Type", "application/octet-stream") + + return StreamingResponse(response.iter_content(chunk_size=8192), media_type=content_type)