342 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			342 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import requests
 | 
						||
from aiohttp import ClientConnectorError
 | 
						||
from fastapi import HTTPException, UploadFile
 | 
						||
from sqlalchemy import select, or_
 | 
						||
from sqlalchemy.orm import joinedload
 | 
						||
from starlette.responses import StreamingResponse
 | 
						||
 | 
						||
from backend.config import CHAT_CONNECTOR_API_KEY, CHAT_TELEGRAM_BOT_TOKEN
 | 
						||
from external.chat.chat_client import ChatClient
 | 
						||
from external.chat.schemas import *
 | 
						||
from external.kafka.services.producer_service import ProducerService
 | 
						||
from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile, User
 | 
						||
from schemas.chat import *
 | 
						||
from services.base import BaseService
 | 
						||
 | 
						||
 | 
						||
class ChatService(BaseService):
 | 
						||
    async def _get_chat(self, client_id: int, card_id: Optional[int]) -> Optional[Chat]:
 | 
						||
        stmt = (
 | 
						||
            select(Chat)
 | 
						||
            .where(
 | 
						||
                Chat.card_id == card_id,
 | 
						||
                TgGroup.client_id == client_id,
 | 
						||
            )
 | 
						||
            .join(TgGroup)
 | 
						||
        )
 | 
						||
        return (await self.session.scalars(stmt)).one_or_none()
 | 
						||
 | 
						||
    async def get_chat(self, request: GetChatRequest) -> GetChatResponse:
 | 
						||
        chat = await self._get_chat(request.client_id, request.card_id)
 | 
						||
        return GetChatResponse(chat=chat)
 | 
						||
 | 
						||
    async def get_client_chats_list(self, client_id: int) -> GetChatsListResponse:
 | 
						||
        stmt = (
 | 
						||
            select(Chat)
 | 
						||
            .join(TgGroup)
 | 
						||
            .where(
 | 
						||
                or_(
 | 
						||
                    TgGroup.client_id == client_id,
 | 
						||
                    Chat.client_id == client_id,
 | 
						||
                )
 | 
						||
            )
 | 
						||
        )
 | 
						||
 | 
						||
        chats = (await self.session.scalars(stmt)).all()
 | 
						||
 | 
						||
        chats_list_items = []
 | 
						||
        for chat in chats:
 | 
						||
            if chat.client_id:
 | 
						||
                name = f'[{chat.client_id}] {chat.client.name}'
 | 
						||
            else:
 | 
						||
                name = f'[{chat.card_id}] {chat.card.name}'
 | 
						||
            item = ChatsListItemSchema(
 | 
						||
                id=chat.id,
 | 
						||
                name=name,
 | 
						||
                tg_group=TgGroupSchema(
 | 
						||
                    tg_group_id=chat.tg_group.tg_group_id,
 | 
						||
                    tg_invite_link=chat.tg_group.tg_invite_link,
 | 
						||
                ),
 | 
						||
                client_id=chat.client_id,
 | 
						||
                card_id=chat.card_id,
 | 
						||
            )
 | 
						||
            chats_list_items.append(item)
 | 
						||
 | 
						||
        return GetChatsListResponse(chats=chats_list_items)
 | 
						||
 | 
						||
    async def _get_group(self, client_id: int) -> Optional[TgGroup]:
 | 
						||
        stmt = (
 | 
						||
            select(TgGroup)
 | 
						||
            .where(TgGroup.client_id == client_id)
 | 
						||
        )
 | 
						||
        return (await self.session.scalars(stmt)).one_or_none()
 | 
						||
 | 
						||
    async def _create_group_for_client(self, client_id: int, title: str) -> Optional[TgGroup]:
 | 
						||
        chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY)
 | 
						||
 | 
						||
        request = ExternalCreateGroupRequest(title=title)
 | 
						||
        response = await chat_client.create_group(request)
 | 
						||
 | 
						||
        group = TgGroup(
 | 
						||
            id=response.group_id,
 | 
						||
            client_id=client_id,
 | 
						||
            tg_group_id=response.tg_group_id,
 | 
						||
            tg_invite_link=response.tg_invite_link,
 | 
						||
        )
 | 
						||
 | 
						||
        chat = Chat(
 | 
						||
            tg_topic_id=1,
 | 
						||
            card_id=None,
 | 
						||
            client_id=client_id,
 | 
						||
            tg_group_id=response.group_id,
 | 
						||
        )
 | 
						||
        self.session.add_all([group, chat])
 | 
						||
        await self.session.commit()
 | 
						||
 | 
						||
        return group
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def _get_title_for_client_chat(client: Client) -> str:
 | 
						||
        return f'[{client.id}] {client.name}'
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def _get_title_for_card_chat(card: Card) -> str:
 | 
						||
        return f'[{card.id}] {card.name}'
 | 
						||
 | 
						||
    async def _create_topic_for_card(
 | 
						||
            self,
 | 
						||
            group_id: UUID,
 | 
						||
            card: Card,
 | 
						||
    ) -> Chat:
 | 
						||
        card_chat_icon_id = 5348227245599105972  # 💼
 | 
						||
 | 
						||
        chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY)
 | 
						||
 | 
						||
        request = ExternalCreateTopicRequest(
 | 
						||
            group_id=str(group_id),
 | 
						||
            title=self._get_title_for_card_chat(card),
 | 
						||
            icon_emoji_id=card_chat_icon_id,
 | 
						||
        )
 | 
						||
        response = await chat_client.create_topic(request)
 | 
						||
 | 
						||
        chat = Chat(
 | 
						||
            tg_topic_id=response.tg_topic_id,
 | 
						||
            card_id=card.id,
 | 
						||
            client_id=None,
 | 
						||
            tg_group_id=group_id,
 | 
						||
        )
 | 
						||
        self.session.add(chat)
 | 
						||
        await self.session.commit()
 | 
						||
 | 
						||
        return chat
 | 
						||
 | 
						||
    async def _create_chat_for_card(self, request: CreateChatRequest, card: Card, client: Client) -> CreateChatResponse:
 | 
						||
        existing_chat = await self._get_chat(request.client_id, request.card_id)
 | 
						||
        if existing_chat:
 | 
						||
            return CreateChatResponse(
 | 
						||
                ok=False,
 | 
						||
                message=f'Чат для клиента ID {request.client_id} и карточки ID {request.card_id} уже существует'
 | 
						||
            )
 | 
						||
 | 
						||
        group: Optional[TgGroup] = await self._get_group(request.client_id)
 | 
						||
        if not group:
 | 
						||
            group: TgGroup = await self._create_group_for_client(
 | 
						||
                request.client_id,
 | 
						||
                self._get_title_for_client_chat(client)
 | 
						||
            )
 | 
						||
 | 
						||
        await self._create_topic_for_card(group.id, card)
 | 
						||
 | 
						||
        return CreateChatResponse(ok=True, message='Чат успешно создан')
 | 
						||
 | 
						||
    async def _create_chat_for_client(self, request: CreateChatRequest, client: Client) -> CreateChatResponse:
 | 
						||
        existing_chat = await self._get_chat(client.id, request.card_id)
 | 
						||
        if existing_chat:
 | 
						||
            return CreateChatResponse(ok=False, message=f'Чат для клиента ID {client.id} уже существует')
 | 
						||
 | 
						||
        group = await self._get_group(client.id)
 | 
						||
        if group:
 | 
						||
            return CreateChatResponse(ok=False, message=f'Группа для клиента ID {client.id} уже существует')
 | 
						||
 | 
						||
        client_chat_title = self._get_title_for_client_chat(client)
 | 
						||
        await self._create_group_for_client(request.client_id, client_chat_title)
 | 
						||
 | 
						||
        return CreateChatResponse(ok=True, message='Чат успешно создан')
 | 
						||
 | 
						||
    async def create_chat(self, request: CreateChatRequest) -> CreateChatResponse:
 | 
						||
        client: Optional[Client] = await self.session.get(Client, request.client_id)
 | 
						||
        if not client:
 | 
						||
            return CreateChatResponse(ok=False, message=f'Клиент с ID {request.client_id} не найден')
 | 
						||
 | 
						||
        # try:
 | 
						||
        if not request.card_id:
 | 
						||
            return await self._create_chat_for_client(request, client)
 | 
						||
 | 
						||
        card: Optional[Card] = await self.session.get(Card, request.card_id)
 | 
						||
        if not card:
 | 
						||
            return CreateChatResponse(ok=False, message=f'Карточка с ID {request.card_id} не найдена')
 | 
						||
 | 
						||
        return await self._create_chat_for_card(request, card, client)
 | 
						||
        # except ClientConnectorError:
 | 
						||
        #     return CreateChatResponse(ok=False, message=f'Ошибка подключения к сервису')
 | 
						||
        # except Exception as e:
 | 
						||
        #     print(e.with_traceback())
 | 
						||
        #     return CreateChatResponse(ok=False, message=str(e))
 | 
						||
 | 
						||
    async def get_messages(self, request: GetMessagesRequest) -> GetMessagesResponse:
 | 
						||
        stmt = (
 | 
						||
            select(Message)
 | 
						||
            .where(
 | 
						||
                Message.chat_id == request.chat_id,
 | 
						||
                Message.is_deleted == False,
 | 
						||
            )
 | 
						||
            .order_by(Message.created_at.desc())
 | 
						||
            .offset(request.offset)
 | 
						||
            .limit(request.limit)
 | 
						||
        )
 | 
						||
        messages = (await self.session.scalars(stmt)).all()
 | 
						||
        return GetMessagesResponse(messages=messages)
 | 
						||
 | 
						||
    async def send_message(self, request: SendTextMessageRequest, user: User) -> SendTextMessageResponse:
 | 
						||
        chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id)
 | 
						||
        if not chat:
 | 
						||
            return SendTextMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден')
 | 
						||
 | 
						||
        message = Message(
 | 
						||
            text=request.message.text,
 | 
						||
            created_at=datetime.now(),
 | 
						||
            chat_id=request.message.chat_id,
 | 
						||
            status=MessageStatus.sending,
 | 
						||
            crm_sender_id=user.id,
 | 
						||
        )
 | 
						||
        self.session.add(message)
 | 
						||
        await self.session.commit()
 | 
						||
 | 
						||
        ok, message = await ProducerService.send_message_to_connector(
 | 
						||
            request.message.text,
 | 
						||
            chat.tg_group_id,
 | 
						||
            chat.tg_topic_id,
 | 
						||
            message.id,
 | 
						||
        )
 | 
						||
 | 
						||
        return SendTextMessageResponse(ok=ok, message=message)
 | 
						||
 | 
						||
    async def repeat_sending_message(
 | 
						||
            self,
 | 
						||
            request: RepeatSendingTextMessageRequest,
 | 
						||
    ) -> RepeatSendingTextMessageResponse:
 | 
						||
        message: Optional[Message] = await self._get_message_by_id(request.message.id)
 | 
						||
        if not message:
 | 
						||
            return RepeatSendingTextMessageResponse(ok=False,
 | 
						||
                                                    message=f'Сообщение с ID: {request.message.id} не найдено')
 | 
						||
 | 
						||
        ok, msg = await ProducerService.send_message_to_connector(
 | 
						||
            request.message.text,
 | 
						||
            message.chat.tg_group_id,
 | 
						||
            message.chat.tg_topic_id,
 | 
						||
            message.id,
 | 
						||
        )
 | 
						||
 | 
						||
        if ok:
 | 
						||
            message.status = MessageStatus.sending
 | 
						||
            await self.session.commit()
 | 
						||
 | 
						||
        return RepeatSendingTextMessageResponse(ok=ok, message=msg)
 | 
						||
 | 
						||
    async def send_messages_with_files(
 | 
						||
            self,
 | 
						||
            files: list[UploadFile],
 | 
						||
            chat_id: int,
 | 
						||
            caption: str,
 | 
						||
            user: User,
 | 
						||
    ) -> LoadMessagesResponse:
 | 
						||
        chat: Optional[Chat] = await self.session.get(Chat, chat_id)
 | 
						||
        if not chat:
 | 
						||
            return LoadMessagesResponse(ok=False, message=f'Чат с ID: {chat_id} не найден')
 | 
						||
 | 
						||
        try:
 | 
						||
            chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY)
 | 
						||
            response = await chat_client.send_messages_with_files(
 | 
						||
                str(chat.tg_group_id),
 | 
						||
                chat.tg_topic_id,
 | 
						||
                caption,
 | 
						||
                files
 | 
						||
            )
 | 
						||
 | 
						||
            last_message = None
 | 
						||
            for file_schema in response.files:
 | 
						||
                file = MessageFile(**file_schema.model_dump())
 | 
						||
                self.session.add(file)
 | 
						||
 | 
						||
                message = Message(
 | 
						||
                    text='',
 | 
						||
                    created_at=datetime.now(),
 | 
						||
                    chat_id=chat_id,
 | 
						||
                    status=MessageStatus.success,
 | 
						||
                    file=file,
 | 
						||
                    crm_sender_id=user.id,
 | 
						||
                )
 | 
						||
                last_message = message
 | 
						||
                self.session.add(message)
 | 
						||
 | 
						||
            if last_message:
 | 
						||
                last_message.text = caption
 | 
						||
            await self.session.commit()
 | 
						||
 | 
						||
            return LoadMessagesResponse(ok=response.ok, message=response.message)
 | 
						||
        except ClientConnectorError:
 | 
						||
            return LoadMessagesResponse(ok=False, message='Ошибка подключения к сервису')
 | 
						||
        except Exception as e:
 | 
						||
            return LoadMessagesResponse(ok=False, message=str(e))
 | 
						||
 | 
						||
    async def _get_message_by_id(self, message_id: int) -> Optional[Message]:
 | 
						||
        stmt = (
 | 
						||
            select(Message)
 | 
						||
            .where(Message.id == message_id)
 | 
						||
            .options(joinedload(Message.chat))
 | 
						||
        )
 | 
						||
        return (await self.session.scalars(stmt)).one_or_none()
 | 
						||
 | 
						||
    async def delete_message(self, message_id: int) -> DeleteMessageResponse:
 | 
						||
        message: Optional[Message] = await self._get_message_by_id(message_id)
 | 
						||
        if not message:
 | 
						||
            return DeleteMessageResponse(ok=False, message=f'Сообщение с ID: {message_id} не найдено')
 | 
						||
 | 
						||
        ok, msg = await ProducerService.send_message_deleting_to_connector(
 | 
						||
            message_id,
 | 
						||
            message.tg_message_id,
 | 
						||
            message.chat.tg_group_id,
 | 
						||
        )
 | 
						||
 | 
						||
        return DeleteMessageResponse(ok=ok, message=msg)
 | 
						||
 | 
						||
    async def edit_message(self, request: EditMessageRequest) -> EditMessageResponse:
 | 
						||
        message: Optional[Message] = await self._get_message_by_id(request.message.id)
 | 
						||
        if not message:
 | 
						||
            return EditMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено')
 | 
						||
 | 
						||
        ok, msg = await ProducerService.send_message_editing_to_connector(
 | 
						||
            message.id,
 | 
						||
            message.tg_message_id,
 | 
						||
            message.chat.tg_group_id,
 | 
						||
            request.message.text,
 | 
						||
        )
 | 
						||
 | 
						||
        return EditMessageResponse(ok=ok, message=msg)
 | 
						||
 | 
						||
    async def get_tg_file(self, file_id: int) -> StreamingResponse:
 | 
						||
        file: Optional[MessageFile] = await self.session.get(MessageFile, file_id)
 | 
						||
        if not file:
 | 
						||
            raise HTTPException(status_code=404, detail=f'Файл с ID {file_id} не найден')
 | 
						||
 | 
						||
        url: str = f'https://api.telegram.org/file/bot{CHAT_TELEGRAM_BOT_TOKEN}/{file.file_path}'
 | 
						||
 | 
						||
        response = requests.get(url, stream=True)
 | 
						||
 | 
						||
        if response.status_code != 200:
 | 
						||
            raise HTTPException(status_code=response.status_code, detail="Error fetching file")
 | 
						||
 | 
						||
        content_type = response.headers.get("Content-Type", "application/octet-stream")
 | 
						||
 | 
						||
        return StreamingResponse(response.iter_content(chunk_size=8192), media_type=content_type)
 |