108 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			108 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import pickle
 | 
						|
from datetime import datetime
 | 
						|
from uuid import UUID
 | 
						|
 | 
						|
from aiokafka import ConsumerRecord
 | 
						|
from sqlalchemy import select
 | 
						|
 | 
						|
from external.kafka.enums import KafkaMessageType
 | 
						|
from external.kafka.schemas.consumer import *
 | 
						|
from models import Message, MessageStatus, TgUser, Chat, TgGroup, MessageFile
 | 
						|
from services.base import BaseService
 | 
						|
 | 
						|
 | 
						|
class ConsumerService(BaseService):
 | 
						|
    async def consume_message(self, message: ConsumerRecord):
 | 
						|
        value = pickle.loads(message.value)
 | 
						|
 | 
						|
        try:
 | 
						|
            if 'ok' in value:
 | 
						|
                """ Received response after sending message from crm """
 | 
						|
                await self._process_connector_response(message)
 | 
						|
            else:
 | 
						|
                """ Received message from client """
 | 
						|
                request = MessageFromTelegramRequest.model_validate(value)
 | 
						|
                await self._receive_message_from_client(request)
 | 
						|
        except Exception as e:
 | 
						|
            print(e)
 | 
						|
 | 
						|
    async def _process_connector_response(self, message: ConsumerRecord):
 | 
						|
        value = pickle.loads(message.value)
 | 
						|
        message_type = KafkaMessageType(value['message_type'])
 | 
						|
 | 
						|
        match message_type:
 | 
						|
            case KafkaMessageType.SEND:
 | 
						|
                response = SendMessageToConnectorResponse.model_validate(value)
 | 
						|
                await self._process_connector_send_response(response)
 | 
						|
            case KafkaMessageType.EDIT:
 | 
						|
                response = EditMessageResponse.model_validate(value)
 | 
						|
                await self._process_connector_edit_response(response)
 | 
						|
            case KafkaMessageType.DELETE:
 | 
						|
                response = DeleteMessageResponse.model_validate(value)
 | 
						|
                await self._process_connector_delete_response(response)
 | 
						|
            case _:
 | 
						|
                raise Exception('Unexpected message type in crm consumer')
 | 
						|
 | 
						|
    async def _process_connector_send_response(self, response: SendMessageToConnectorResponse):
 | 
						|
        message = await self.session.get(Message, response.message_id)
 | 
						|
        message.tg_message_id = response.tg_message_id
 | 
						|
        if response.ok:
 | 
						|
            message.status = MessageStatus.success
 | 
						|
        else:
 | 
						|
            message.status = MessageStatus.error
 | 
						|
 | 
						|
        await self.session.commit()
 | 
						|
 | 
						|
    async def _process_connector_delete_response(self, response: DeleteMessageResponse):
 | 
						|
        if not response.ok:
 | 
						|
            return
 | 
						|
 | 
						|
        message = await self.session.get(Message, response.message_id)
 | 
						|
        message.is_deleted = True
 | 
						|
        await self.session.commit()
 | 
						|
 | 
						|
    async def _process_connector_edit_response(self, response: EditMessageResponse):
 | 
						|
        if not response.ok:
 | 
						|
            return
 | 
						|
 | 
						|
        message = await self.session.get(Message, response.message_id)
 | 
						|
        message.text = response.text
 | 
						|
        message.is_edited = True
 | 
						|
        await self.session.commit()
 | 
						|
 | 
						|
    async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]:
 | 
						|
        stmt = (
 | 
						|
            select(Chat)
 | 
						|
            .join(TgGroup)
 | 
						|
            .where(
 | 
						|
                Chat.tg_topic_id == tg_topic_id,
 | 
						|
                TgGroup.id == UUID(group_id),
 | 
						|
            )
 | 
						|
        )
 | 
						|
        return (await self.session.scalars(stmt)).one_or_none()
 | 
						|
 | 
						|
    async def _receive_message_from_client(self, request: MessageFromTelegramRequest):
 | 
						|
        tg_sender: Optional[TgUser] = await self.session.get(TgUser, request.message.sender.id)
 | 
						|
        if not tg_sender:
 | 
						|
            tg_sender: TgUser = TgUser(**request.message.sender.model_dump())
 | 
						|
            self.session.add(tg_sender)
 | 
						|
 | 
						|
        chat = await self._get_chat(request.message.group_id, request.message.tg_topic_id)
 | 
						|
        if not chat:
 | 
						|
            return
 | 
						|
 | 
						|
        file = None
 | 
						|
        if request.message.file:
 | 
						|
            file = MessageFile(**request.message.file.model_dump())
 | 
						|
 | 
						|
        message = Message(
 | 
						|
            text=request.message.text if request.message.text else "",
 | 
						|
            created_at=datetime.now(),
 | 
						|
            tg_sender_id=tg_sender.id,
 | 
						|
            chat_id=chat.id,
 | 
						|
            status=MessageStatus.success,
 | 
						|
            file=file,
 | 
						|
        )
 | 
						|
        self.session.add(message)
 | 
						|
        await self.session.commit()
 |