From a81e692957ce8213119cc1429165c9ecfbb8326a Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Sun, 6 Apr 2025 11:39:56 +0400 Subject: [PATCH] feat: logging on sending file error, refactoring --- external/chat/schemas.py | 7 +-- external/kafka/consumer.py | 2 - external/kafka/schemas/consumer.py | 7 +-- external/kafka/services/consumer_service.py | 1 - external/kafka/services/producer_service.py | 4 +- routers/chat.py | 8 ++- schemas/chat.py | 4 +- services/chat.py | 63 ++++++++++++--------- 8 files changed, 50 insertions(+), 46 deletions(-) diff --git a/external/chat/schemas.py b/external/chat/schemas.py index 4d50c04..b7c67d9 100644 --- a/external/chat/schemas.py +++ b/external/chat/schemas.py @@ -6,18 +6,13 @@ from schemas.base import BaseSchema, OkMessageSchema # region Entities -class ExternalSendFileSchema(BaseSchema): - buffer: bytes - file_name: str - file_size: int - - class ExternalMessageFileSchema(BaseSchema): file_path: str type: str file_name: str file_size: int + # endregion # region Requests diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py index 96655fa..dc57a61 100644 --- a/external/kafka/consumer.py +++ b/external/kafka/consumer.py @@ -20,11 +20,9 @@ async def consume_messages(): async with session_maker() as session: consumer_service = ConsumerService(session) - print("started consuming messages") try: async for message in consumer: - print("consume") await consumer_service.consume_message(message) finally: await consumer.stop() diff --git a/external/kafka/schemas/consumer.py b/external/kafka/schemas/consumer.py index 96d5fca..0ad7548 100644 --- a/external/kafka/schemas/consumer.py +++ b/external/kafka/schemas/consumer.py @@ -41,19 +41,18 @@ class MessageFromTelegramRequest(BaseSchema): class BaseConnectorResponse(OkMessageSchema): message_type: int + message_id: int class SendMessageToConnectorResponse(BaseConnectorResponse): - message_id: int - tg_message_id: int + tg_message_id: Optional[int] = None class DeleteMessageResponse(BaseConnectorResponse): - message_id: int + pass class EditMessageResponse(BaseConnectorResponse): - message_id: int text: str # endregion diff --git a/external/kafka/services/consumer_service.py b/external/kafka/services/consumer_service.py index 3a49f33..6bf3f0b 100644 --- a/external/kafka/services/consumer_service.py +++ b/external/kafka/services/consumer_service.py @@ -14,7 +14,6 @@ from services.base import BaseService class ConsumerService(BaseService): async def consume_message(self, message: ConsumerRecord): value = pickle.loads(message.value) - print("Consumer: received message: ", value) try: if 'ok' in value: diff --git a/external/kafka/services/producer_service.py b/external/kafka/services/producer_service.py index e1e9c50..0ef97d8 100644 --- a/external/kafka/services/producer_service.py +++ b/external/kafka/services/producer_service.py @@ -16,7 +16,7 @@ class ProducerService(BaseService): try: await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) except ClientConnectorError: - return False, 'Ошибка подключения к коннектору' + return False, 'Ошибка подключения к сервису' except Exception as e: return False, str(e) return True, 'Сообщение отправлено' @@ -72,6 +72,6 @@ class ProducerService(BaseService): tg_message_id=tg_message_id, group_id=str(group_id), text=text, - ) + ), ) return await ProducerService._send_message(request) diff --git a/routers/chat.py b/routers/chat.py index cea1981..97cad1d 100644 --- a/routers/chat.py +++ b/routers/chat.py @@ -4,7 +4,9 @@ from fastapi import APIRouter, Depends, UploadFile from sqlalchemy.ext.asyncio import AsyncSession from backend.session import get_session +from models import User from schemas.chat import * +from services.auth import get_current_user from services.chat import ChatService chat_router = APIRouter( @@ -20,9 +22,10 @@ chat_router = APIRouter( ) async def send_text_message( session: Annotated[AsyncSession, Depends(get_session)], + user: Annotated[User, Depends(get_current_user)], request: SendTextMessageRequest, ): - return await ChatService(session).send_message(request) + return await ChatService(session).send_message(request, user) @chat_router.post( @@ -44,11 +47,12 @@ async def repeat_sending_text_message( ) async def send_messages_with_files( session: Annotated[AsyncSession, Depends(get_session)], + user: Annotated[User, Depends(get_current_user)], files: list[UploadFile], chat_id: int, caption: str, ): - return await ChatService(session).send_messages_with_files(files, chat_id, caption) + return await ChatService(session).send_messages_with_files(files, chat_id, caption, user) @chat_router.delete( diff --git a/schemas/chat.py b/schemas/chat.py index 51b48d7..841d6f4 100644 --- a/schemas/chat.py +++ b/schemas/chat.py @@ -1,7 +1,8 @@ from datetime import datetime -from typing import Optional, List +from typing import Optional from schemas.base import BaseSchema, OkMessageSchema +from schemas.user import UserSchema # region Entities @@ -33,6 +34,7 @@ class EditMessageSchema(BaseMessageSchema): class MessageSchema(EditMessageSchema): created_at: datetime tg_sender: Optional[TgUserSchema] + crm_sender: Optional[UserSchema] status: str is_edited: bool file: Optional[MessageFileSchema] = None diff --git a/services/chat.py b/services/chat.py index 9392745..0ab3ae8 100644 --- a/services/chat.py +++ b/services/chat.py @@ -9,7 +9,7 @@ from backend.config import CHAT_CONNECTOR_API_KEY, CHAT_TELEGRAM_BOT_TOKEN from external.chat.chat_client import ChatClient from external.chat.schemas import * from external.kafka.services.producer_service import ProducerService -from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile +from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile, User from schemas.chat import * from services.base import BaseService @@ -162,7 +162,7 @@ class ChatService(BaseService): messages = (await self.session.scalars(stmt)).all() return GetMessagesResponse(messages=messages) - async def send_message(self, request: SendTextMessageRequest) -> SendTextMessageResponse: + async def send_message(self, request: SendTextMessageRequest, user: User) -> SendTextMessageResponse: chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id) if not chat: return SendTextMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') @@ -172,6 +172,7 @@ class ChatService(BaseService): created_at=datetime.now(), chat_id=request.message.chat_id, status=MessageStatus.sending, + crm_sender_id=user.id, ) self.session.add(message) await self.session.commit() @@ -211,39 +212,46 @@ class ChatService(BaseService): files: list[UploadFile], chat_id: int, caption: str, + user: User, ) -> LoadMessagesResponse: chat: Optional[Chat] = await self.session.get(Chat, chat_id) if not chat: return SendTextMessageResponse(ok=False, message=f'Чат с ID: {chat_id} не найден') - chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) - response = await chat_client.send_messages_with_files( - str(chat.tg_group_id), - chat.tg_topic_id, - caption, - files - ) - - last_message = None - for file_schema in response.files: - file = MessageFile(**file_schema.model_dump()) - self.session.add(file) - - message = Message( - text='', - created_at=datetime.now(), - chat_id=chat_id, - status=MessageStatus.success, - file=file, + try: + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + response = await chat_client.send_messages_with_files( + str(chat.tg_group_id), + chat.tg_topic_id, + caption, + files ) - last_message = message - self.session.add(message) - if last_message: - last_message.text = caption - await self.session.commit() + last_message = None + for file_schema in response.files: + file = MessageFile(**file_schema.model_dump()) + self.session.add(file) - return LoadMessagesResponse(ok=response.ok, message=response.message) + message = Message( + text='', + created_at=datetime.now(), + chat_id=chat_id, + status=MessageStatus.success, + file=file, + crm_sender_id=user.id, + ) + last_message = message + self.session.add(message) + + if last_message: + last_message.text = caption + await self.session.commit() + + return LoadMessagesResponse(ok=response.ok, message=response.message) + except ClientConnectorError: + return LoadMessagesResponse(ok=False, message='Ошибка подключения к сервису') + except Exception as e: + return LoadMessagesResponse(ok=False, message=str(e)) async def _get_message_by_id(self, message_id: int) -> Optional[Message]: stmt = ( @@ -286,7 +294,6 @@ class ChatService(BaseService): raise HTTPException(status_code=404, detail=f'Файл с ID {file_id} не найден') url: str = f'https://api.telegram.org/file/bot{CHAT_TELEGRAM_BOT_TOKEN}/{file.file_path}' - print(f'URL = {url}') response = requests.get(url, stream=True)