Files
Fulfillment-Backend/external/kafka/services/consumer_service.py

109 lines
4.0 KiB
Python

import pickle
from datetime import datetime
from uuid import UUID
from aiokafka import ConsumerRecord
from sqlalchemy import select
from external.kafka.enums import KafkaMessageType
from external.kafka.schemas.consumer import *
from models import Message, MessageStatus, TgUser, Chat, TgGroup, MessageFile
from services.base import BaseService
class ConsumerService(BaseService):
async def consume_message(self, message: ConsumerRecord):
value = pickle.loads(message.value)
print("Consumer: received message: ", value)
try:
if 'ok' in value:
""" Received response after sending message from crm """
await self._process_connector_response(message)
else:
""" Received message from client """
request = MessageFromTelegramRequest.model_validate(value)
await self._receive_message_from_client(request)
except Exception as e:
print(e)
async def _process_connector_response(self, message: ConsumerRecord):
value = pickle.loads(message.value)
message_type = KafkaMessageType(value['message_type'])
match message_type:
case KafkaMessageType.SEND:
response = SendMessageToConnectorResponse.model_validate(value)
await self._process_connector_send_response(response)
case KafkaMessageType.EDIT:
response = EditMessageResponse.model_validate(value)
await self._process_connector_edit_response(response)
case KafkaMessageType.DELETE:
response = DeleteMessageResponse.model_validate(value)
await self._process_connector_delete_response(response)
case _:
raise Exception('Unexpected message type in crm consumer')
async def _process_connector_send_response(self, response: SendMessageToConnectorResponse):
message = await self.session.get(Message, response.message_id)
message.tg_message_id = response.tg_message_id
if response.ok:
message.status = MessageStatus.success
else:
message.status = MessageStatus.error
await self.session.commit()
async def _process_connector_delete_response(self, response: DeleteMessageResponse):
if not response.ok:
return
message = await self.session.get(Message, response.message_id)
message.is_deleted = True
await self.session.commit()
async def _process_connector_edit_response(self, response: EditMessageResponse):
if not response.ok:
return
message = await self.session.get(Message, response.message_id)
message.text = response.text
message.is_edited = True
await self.session.commit()
async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]:
stmt = (
select(Chat)
.join(TgGroup)
.where(
Chat.tg_topic_id == tg_topic_id,
TgGroup.id == UUID(group_id),
)
)
return (await self.session.scalars(stmt)).one_or_none()
async def _receive_message_from_client(self, request: MessageFromTelegramRequest):
tg_sender: Optional[TgUser] = await self.session.get(TgUser, request.message.sender.id)
if not tg_sender:
tg_sender: TgUser = TgUser(**request.message.sender.model_dump())
self.session.add(tg_sender)
chat = await self._get_chat(request.message.group_id, request.message.tg_topic_id)
if not chat:
return
file = None
if request.message.file:
file = MessageFile(**request.message.file.model_dump())
message = Message(
text=request.message.text if request.message.text else "",
created_at=datetime.now(),
tg_sender_id=tg_sender.id,
chat_id=chat.id,
status=MessageStatus.success,
file=file,
)
self.session.add(message)
await self.session.commit()