import requests from aiohttp import ClientConnectorError from fastapi import HTTPException, UploadFile from sqlalchemy import select, or_ from sqlalchemy.orm import joinedload from starlette.responses import StreamingResponse from backend.config import CHAT_CONNECTOR_API_KEY, CHAT_TELEGRAM_BOT_TOKEN from external.chat.chat_client import ChatClient from external.chat.schemas import * from external.kafka.services.producer_service import ProducerService from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile, User from schemas.auth import UserUnion from schemas.chat import * from services.base import BaseService from utils.auth import verify_user_deal_editor class ChatService(BaseService): async def _get_chat(self, client_id: int, card_id: Optional[int]) -> Optional[Chat]: stmt = ( select(Chat) .where( Chat.card_id == card_id, TgGroup.client_id == client_id, ) .join(TgGroup) ) return (await self.session.scalars(stmt)).one_or_none() async def get_chat(self, request: GetChatRequest) -> GetChatResponse: chat = await self._get_chat(request.client_id, request.card_id) return GetChatResponse(chat=chat) async def get_client_chats_list(self, client_id: int) -> GetChatsListResponse: stmt = ( select(Chat) .join(TgGroup) .where( or_( TgGroup.client_id == client_id, Chat.client_id == client_id, ) ) ) chats = (await self.session.scalars(stmt)).all() chats_list_items = [] for chat in chats: if chat.client_id: name = f'[{chat.client_id}] {chat.client.name}' else: name = f'[{chat.card_id}] {chat.card.name}' item = ChatsListItemSchema( id=chat.id, name=name, tg_group=TgGroupSchema( tg_group_id=chat.tg_group.tg_group_id, tg_invite_link=chat.tg_group.tg_invite_link, ), client_id=chat.client_id, card_id=chat.card_id, ) chats_list_items.append(item) return GetChatsListResponse(chats=chats_list_items) async def _get_group(self, client_id: int) -> Optional[TgGroup]: stmt = ( select(TgGroup) .where(TgGroup.client_id == client_id) ) return (await self.session.scalars(stmt)).one_or_none() async def _create_group_for_client(self, client_id: int, title: str) -> Optional[TgGroup]: chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) request = ExternalCreateGroupRequest(title=title) response = await chat_client.create_group(request) group = TgGroup( id=response.group_id, client_id=client_id, tg_group_id=response.tg_group_id, tg_invite_link=response.tg_invite_link, ) chat = Chat( tg_topic_id=1, card_id=None, client_id=client_id, tg_group_id=response.group_id, ) self.session.add_all([group, chat]) await self.session.commit() return group @staticmethod def _get_title_for_client_chat(client: Client) -> str: return f'[{client.id}] {client.name}' @staticmethod def _get_title_for_card_chat(card: Card) -> str: return f'[{card.id}] {card.name}' async def _create_topic_for_card( self, group_id: UUID, card: Card, ) -> Chat: card_chat_icon_id = 5348227245599105972 # 💼 chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) request = ExternalCreateTopicRequest( group_id=str(group_id), title=self._get_title_for_card_chat(card), icon_emoji_id=card_chat_icon_id, ) response = await chat_client.create_topic(request) chat = Chat( tg_topic_id=response.tg_topic_id, card_id=card.id, client_id=None, tg_group_id=group_id, ) self.session.add(chat) await self.session.commit() return chat async def _create_chat_for_card(self, request: CreateChatRequest, card: Card, client: Client) -> CreateChatResponse: existing_chat = await self._get_chat(request.client_id, request.card_id) if existing_chat: return CreateChatResponse( ok=False, message=f'Чат для клиента ID {request.client_id} и карточки ID {request.card_id} уже существует' ) group: Optional[TgGroup] = await self._get_group(request.client_id) if not group: group: TgGroup = await self._create_group_for_client( request.client_id, self._get_title_for_client_chat(client) ) await self._create_topic_for_card(group.id, card) return CreateChatResponse(ok=True, message='Чат успешно создан') async def _create_chat_for_client(self, request: CreateChatRequest, client: Client) -> CreateChatResponse: existing_chat = await self._get_chat(client.id, request.card_id) if existing_chat: return CreateChatResponse(ok=False, message=f'Чат для клиента ID {client.id} уже существует') group = await self._get_group(client.id) if group: return CreateChatResponse(ok=False, message=f'Группа для клиента ID {client.id} уже существует') client_chat_title = self._get_title_for_client_chat(client) await self._create_group_for_client(request.client_id, client_chat_title) return CreateChatResponse(ok=True, message='Чат успешно создан') async def create_chat(self, request: CreateChatRequest, user: UserUnion) -> CreateChatResponse: verify_user_deal_editor(user, request.card_id) client: Optional[Client] = await self.session.get(Client, request.client_id) if not client: return CreateChatResponse(ok=False, message=f'Клиент с ID {request.client_id} не найден') # try: if not request.card_id: return await self._create_chat_for_client(request, client) card: Optional[Card] = await self.session.get(Card, request.card_id) if not card: return CreateChatResponse(ok=False, message=f'Карточка с ID {request.card_id} не найдена') return await self._create_chat_for_card(request, card, client) # except ClientConnectorError: # return CreateChatResponse(ok=False, message=f'Ошибка подключения к сервису') # except Exception as e: # print(e.with_traceback()) # return CreateChatResponse(ok=False, message=str(e)) async def get_messages(self, request: GetMessagesRequest) -> GetMessagesResponse: stmt = ( select(Message) .where( Message.chat_id == request.chat_id, Message.is_deleted == False, ) .order_by(Message.created_at.desc()) .offset(request.offset) .limit(request.limit) ) messages = (await self.session.scalars(stmt)).all() return GetMessagesResponse(messages=messages) async def send_message(self, request: SendTextMessageRequest, user: User) -> SendTextMessageResponse: chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id) if not chat: return SendTextMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') message = Message( text=request.message.text, created_at=datetime.now(), chat_id=request.message.chat_id, status=MessageStatus.sending, crm_sender_id=user.id, ) self.session.add(message) await self.session.commit() ok, message = await ProducerService.send_message_to_connector( request.message.text, chat.tg_group_id, chat.tg_topic_id, message.id, ) return SendTextMessageResponse(ok=ok, message=message) async def repeat_sending_message( self, request: RepeatSendingTextMessageRequest, ) -> RepeatSendingTextMessageResponse: message: Optional[Message] = await self._get_message_by_id(request.message.id) if not message: return RepeatSendingTextMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено') ok, msg = await ProducerService.send_message_to_connector( request.message.text, message.chat.tg_group_id, message.chat.tg_topic_id, message.id, ) if ok: message.status = MessageStatus.sending await self.session.commit() return RepeatSendingTextMessageResponse(ok=ok, message=msg) async def send_messages_with_files( self, files: list[UploadFile], chat_id: int, caption: str, user: User, ) -> LoadMessagesResponse: chat: Optional[Chat] = await self.session.get(Chat, chat_id) if not chat: return LoadMessagesResponse(ok=False, message=f'Чат с ID: {chat_id} не найден') try: chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) response = await chat_client.send_messages_with_files( str(chat.tg_group_id), chat.tg_topic_id, caption, files ) last_message = None for file_schema in response.files: file = MessageFile(**file_schema.model_dump()) self.session.add(file) message = Message( text='', created_at=datetime.now(), chat_id=chat_id, status=MessageStatus.success, file=file, crm_sender_id=user.id, ) last_message = message self.session.add(message) if last_message: last_message.text = caption await self.session.commit() return LoadMessagesResponse(ok=response.ok, message=response.message) except ClientConnectorError: return LoadMessagesResponse(ok=False, message='Ошибка подключения к сервису') except Exception as e: return LoadMessagesResponse(ok=False, message=str(e)) async def _get_message_by_id(self, message_id: int) -> Optional[Message]: stmt = ( select(Message) .where(Message.id == message_id) .options(joinedload(Message.chat)) ) return (await self.session.scalars(stmt)).one_or_none() async def delete_message(self, message_id: int) -> DeleteMessageResponse: message: Optional[Message] = await self._get_message_by_id(message_id) if not message: return DeleteMessageResponse(ok=False, message=f'Сообщение с ID: {message_id} не найдено') ok, msg = await ProducerService.send_message_deleting_to_connector( message_id, message.tg_message_id, message.chat.tg_group_id, ) return DeleteMessageResponse(ok=ok, message=msg) async def edit_message(self, request: EditMessageRequest) -> EditMessageResponse: message: Optional[Message] = await self._get_message_by_id(request.message.id) if not message: return EditMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено') ok, msg = await ProducerService.send_message_editing_to_connector( message.id, message.tg_message_id, message.chat.tg_group_id, request.message.text, ) return EditMessageResponse(ok=ok, message=msg) async def get_tg_file(self, file_id: int) -> StreamingResponse: file: Optional[MessageFile] = await self.session.get(MessageFile, file_id) if not file: raise HTTPException(status_code=404, detail=f'Файл с ID {file_id} не найден') url: str = f'https://api.telegram.org/file/bot{CHAT_TELEGRAM_BOT_TOKEN}/{file.file_path}' response = requests.get(url, stream=True) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail="Error fetching file") content_type = response.headers.get("Content-Type", "application/octet-stream") return StreamingResponse(response.iter_content(chunk_size=8192), media_type=content_type)