import pickle from datetime import datetime from typing import Optional from uuid import UUID from aiokafka import ConsumerRecord from sqlalchemy import select from external.kafka.enums import KafkaMessageType from external.kafka.schemas.consumer import * from models import Message, MessageStatus, TgUser, Chat, TgGroup from services.base import BaseService class ConsumerService(BaseService): async def consume_message(self, message: ConsumerRecord): value = pickle.loads(message.value) try: if 'ok' in value: """ Received response after sending message from crm """ await self._process_connector_response(message) else: """ Received message from client """ request = MessageFromTelegramRequest.model_validate(value) await self._receive_message_from_client(request) except Exception as e: print(e) async def _process_connector_response(self, message: ConsumerRecord): value = pickle.loads(message.value) message_type = KafkaMessageType(value['message_type']) match message_type: case KafkaMessageType.SEND: response = SendMessageToConnectorResponse.model_validate(value) await self._process_connector_send_response(response) case KafkaMessageType.EDIT: pass case KafkaMessageType.DELETE: response = DeleteMessageResponse.model_validate(value) await self._process_connector_delete_response(response) case _: raise Exception('Unexpected message type in crm consumer') async def _process_connector_send_response(self, response: SendMessageToConnectorResponse): message = await self.session.get(Message, response.message_id) message.tg_message_id = response.tg_message_id if response.ok: message.status = MessageStatus.success else: message.status = MessageStatus.error await self.session.commit() async def _process_connector_delete_response(self, response: DeleteMessageResponse): if not response.ok: return message = await self.session.get(Message, response.message_id) message.is_deleted = True async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]: stmt = ( select(Chat) .join(TgGroup) .where( Chat.tg_topic_id == tg_topic_id, TgGroup.id == UUID(group_id), ) ) return (await self.session.scalars(stmt)).one_or_none() async def _receive_message_from_client(self, request: MessageFromTelegramRequest): tg_sender: Optional[TgUser] = await self.session.get(TgUser, request.message.sender.id) if not tg_sender: tg_sender: TgUser = TgUser(**request.message.sender.model_dump()) self.session.add(tg_sender) chat = await self._get_chat(request.message.group_id, request.message.tg_topic_id) if not chat: return message = Message( text=request.message.text, created_at=datetime.now(), tg_sender_id=tg_sender.id, chat_id=chat.id, status=MessageStatus.success, ) self.session.add(message) await self.session.commit()