diff --git a/.gitignore b/.gitignore index df5f857..9309ed2 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ __pycache__ /venv /test -/test/* \ No newline at end of file +/test/* +certs diff --git a/backend/config.py b/backend/config.py index e3fd5e2..0a472d8 100644 --- a/backend/config.py +++ b/backend/config.py @@ -19,6 +19,15 @@ S3_API_KEY = os.environ.get('S3_API_KEY') BILLING_API_KEY = os.environ.get('BILLING_API_KEY') +CHAT_TELEGRAM_BOT_TOKEN = os.environ.get('CHAT_TELEGRAM_BOT_TOKEN') +CHAT_CONNECTOR_API_KEY = os.environ.get('CHAT_CONNECTOR_API_KEY') +CHATS_SYNC_URL = os.environ.get('CHATS_SYNC_URL') + +# Kafka +KAFKA_URL = os.environ.get('KAFKA_URL') +KAFKA_CONSUMER_TOPIC = os.environ.get('KAFKA_CONSUMER_TOPIC') +KAFKA_PRODUCER_TOPIC = os.environ.get('KAFKA_PRODUCER_TOPIC') + # Celery CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') diff --git a/constants.py b/constants.py index f1f3da7..f32ae77 100644 --- a/constants.py +++ b/constants.py @@ -17,6 +17,8 @@ API_ROOT = "/api" APP_PATH = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(__file__) +KAFKA_CERTS_PATH = os.path.join(APP_PATH, "certs") + allowed_telegram_ids = [ 7532624817, # Me 355308397, # SerGey diff --git a/external/chat/__init__.py b/external/chat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/external/chat/chat_client.py b/external/chat/chat_client.py new file mode 100644 index 0000000..53d8d99 --- /dev/null +++ b/external/chat/chat_client.py @@ -0,0 +1,55 @@ +import aiohttp +import jwt +from fastapi import UploadFile + +from backend.config import CHATS_SYNC_URL, CHAT_CONNECTOR_API_KEY +from external.chat.schemas import * +from services.auth import algorithm + + +class ChatClient: + def __init__(self, api_key: str): + self.api_key = api_key + self.headers = { + 'Authorization': 'Bearer ' + self.create_jwt_token() + } + self.base_url = CHATS_SYNC_URL + self.chats_sync_endpoint = '/chats-sync' + self.groups_endpoint = '/group' + + def create_jwt_token(self): + return jwt.encode({'sub': self.api_key}, CHAT_CONNECTOR_API_KEY, algorithm=algorithm) + + async def _method(self, http_method, method, **kwargs): + async with aiohttp.ClientSession(headers=self.headers) as session: + async with session.request(http_method, self.base_url + method, **kwargs) as response: + return await response.json() + + async def create_group(self, request: ExternalCreateGroupRequest) -> ExternalCreateGroupResponse: + json_data = request.model_dump() + response = await self._method('POST', self.groups_endpoint + '/create', json=json_data) + return ExternalCreateGroupResponse.model_validate(response) + + async def create_topic(self, request: ExternalCreateTopicRequest) -> ExternalCreateTopicResponse: + json_data = request.model_dump() + response = await self._method('POST', self.groups_endpoint + '/topic/create', json=json_data) + return ExternalCreateTopicResponse.model_validate(response) + + async def send_messages_with_files( + self, + tg_group_id: str, + tg_topic_id: int, + caption: str, + files: list[UploadFile], + ) -> ExternalSendMessagesWithFilesResponse: + query_params = f'?tg_group_id={tg_group_id}&tg_topic_id={tg_topic_id}&caption={caption}' + + data = aiohttp.FormData(default_to_multipart=True) + + for file in files: + content = await file.read() + data.add_field('files', content, filename=file.filename, content_type=file.content_type) + + response = await self._method('POST', self.chats_sync_endpoint + '/send' + query_params, data=data) + + return ExternalSendMessagesWithFilesResponse.model_validate(response) diff --git a/external/chat/schemas.py b/external/chat/schemas.py new file mode 100644 index 0000000..b7c67d9 --- /dev/null +++ b/external/chat/schemas.py @@ -0,0 +1,47 @@ +from typing import Optional +from uuid import UUID + +from schemas.base import BaseSchema, OkMessageSchema + + +# region Entities + +class ExternalMessageFileSchema(BaseSchema): + file_path: str + type: str + file_name: str + file_size: int + + +# endregion + +# region Requests + +class ExternalCreateGroupRequest(BaseSchema): + title: str + + +class ExternalCreateTopicRequest(BaseSchema): + group_id: str + title: str + icon_emoji_id: Optional[int] = None + + +# endregion + +# region Responses + +class ExternalCreateGroupResponse(BaseSchema): + tg_group_id: int + group_id: UUID + tg_invite_link: str + + +class ExternalCreateTopicResponse(BaseSchema): + tg_topic_id: int + + +class ExternalSendMessagesWithFilesResponse(OkMessageSchema): + files: list[ExternalMessageFileSchema] + +# endregion diff --git a/external/kafka/__init__.py b/external/kafka/__init__.py new file mode 100644 index 0000000..f7e8c97 --- /dev/null +++ b/external/kafka/__init__.py @@ -0,0 +1,2 @@ +from .producer import producer +from .consumer import consume_messages diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py new file mode 100644 index 0000000..9b0f705 --- /dev/null +++ b/external/kafka/consumer.py @@ -0,0 +1,31 @@ +from aiokafka import AIOKafkaConsumer +from aiokafka.errors import KafkaConnectionError + +from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC +from backend.session import session_maker +from external.kafka.context import context +from external.kafka.services.consumer_service import ConsumerService + +consumer = AIOKafkaConsumer( + KAFKA_CONSUMER_TOPIC, + bootstrap_servers=KAFKA_URL, + security_protocol='SSL', + ssl_context=context, +) + + +async def consume_messages(): + try: + await consumer.start() + except KafkaConnectionError as e: + print(e) + return + + async with session_maker() as session: + consumer_service = ConsumerService(session) + + try: + async for message in consumer: + await consumer_service.consume_message(message) + finally: + await consumer.stop() diff --git a/external/kafka/context.py b/external/kafka/context.py new file mode 100644 index 0000000..4be481b --- /dev/null +++ b/external/kafka/context.py @@ -0,0 +1,11 @@ +from pathlib import Path + +from aiokafka.helpers import create_ssl_context + +from constants import KAFKA_CERTS_PATH + +context = create_ssl_context( + cafile=KAFKA_CERTS_PATH / Path('ca-cert'), + certfile=KAFKA_CERTS_PATH / Path('cert-signed'), + keyfile=KAFKA_CERTS_PATH / Path('cert-key'), +) diff --git a/external/kafka/enums.py b/external/kafka/enums.py new file mode 100644 index 0000000..09b348b --- /dev/null +++ b/external/kafka/enums.py @@ -0,0 +1,7 @@ +import enum + + +class KafkaMessageType(enum.Enum): + SEND = 1 + EDIT = 2 + DELETE = 3 diff --git a/external/kafka/producer.py b/external/kafka/producer.py new file mode 100644 index 0000000..6f56cdb --- /dev/null +++ b/external/kafka/producer.py @@ -0,0 +1,10 @@ +from aiokafka import AIOKafkaProducer + +from backend.config import KAFKA_URL +from external.kafka.context import context + +producer = AIOKafkaProducer( + bootstrap_servers=KAFKA_URL, + security_protocol='SSL', + ssl_context=context, +) diff --git a/external/kafka/schemas/consumer.py b/external/kafka/schemas/consumer.py new file mode 100644 index 0000000..0ad7548 --- /dev/null +++ b/external/kafka/schemas/consumer.py @@ -0,0 +1,58 @@ +from typing import Optional + +from schemas.base import OkMessageSchema, BaseSchema + + +# region Entities + +class TelegramUserSchema(BaseSchema): + id: int + first_name: str + last_name: str + username: str + + +class MessageFileSchema(BaseSchema): + file_path: str + type: str + file_name: str + file_size: int + + +class MessageFromTelegramSchema(BaseSchema): + group_id: str + tg_topic_id: int + text: Optional[str] + sender: TelegramUserSchema + file: Optional[MessageFileSchema] + + +# endregion + +# region Requests + +class MessageFromTelegramRequest(BaseSchema): + message: MessageFromTelegramSchema + + +# endregion + +# region Responses + +class BaseConnectorResponse(OkMessageSchema): + message_type: int + message_id: int + + +class SendMessageToConnectorResponse(BaseConnectorResponse): + tg_message_id: Optional[int] = None + + +class DeleteMessageResponse(BaseConnectorResponse): + pass + + +class EditMessageResponse(BaseConnectorResponse): + text: str + +# endregion diff --git a/external/kafka/schemas/producer.py b/external/kafka/schemas/producer.py new file mode 100644 index 0000000..f3017a0 --- /dev/null +++ b/external/kafka/schemas/producer.py @@ -0,0 +1,43 @@ +from typing import Optional + +from schemas.base import BaseSchema + + +# region Entities + +class BaseMessageSchema(BaseSchema): + message_id: int + tg_message_id: Optional[int] = None + group_id: str + + +class EditMessageSchema(BaseMessageSchema): + text: str + + +class MessageSchema(EditMessageSchema): + topic_id: int + + +# endregion + +# region Requests + +class BaseConnectorRequest(BaseSchema): + message_type: int + app_auth_key: str + message: BaseMessageSchema + + +class SendMessageToConnectorRequest(BaseConnectorRequest): + message: MessageSchema + + +class SendMessageDeletingToConnectorRequest(BaseConnectorRequest): + pass + + +class SendMessageEditingToConnectorRequest(BaseConnectorRequest): + message: EditMessageSchema + +# endregion diff --git a/external/kafka/services/consumer_service.py b/external/kafka/services/consumer_service.py new file mode 100644 index 0000000..6bf3f0b --- /dev/null +++ b/external/kafka/services/consumer_service.py @@ -0,0 +1,107 @@ +import pickle +from datetime import datetime +from uuid import UUID + +from aiokafka import ConsumerRecord +from sqlalchemy import select + +from external.kafka.enums import KafkaMessageType +from external.kafka.schemas.consumer import * +from models import Message, MessageStatus, TgUser, Chat, TgGroup, MessageFile +from services.base import BaseService + + +class ConsumerService(BaseService): + async def consume_message(self, message: ConsumerRecord): + value = pickle.loads(message.value) + + try: + if 'ok' in value: + """ Received response after sending message from crm """ + await self._process_connector_response(message) + else: + """ Received message from client """ + request = MessageFromTelegramRequest.model_validate(value) + await self._receive_message_from_client(request) + except Exception as e: + print(e) + + async def _process_connector_response(self, message: ConsumerRecord): + value = pickle.loads(message.value) + message_type = KafkaMessageType(value['message_type']) + + match message_type: + case KafkaMessageType.SEND: + response = SendMessageToConnectorResponse.model_validate(value) + await self._process_connector_send_response(response) + case KafkaMessageType.EDIT: + response = EditMessageResponse.model_validate(value) + await self._process_connector_edit_response(response) + case KafkaMessageType.DELETE: + response = DeleteMessageResponse.model_validate(value) + await self._process_connector_delete_response(response) + case _: + raise Exception('Unexpected message type in crm consumer') + + async def _process_connector_send_response(self, response: SendMessageToConnectorResponse): + message = await self.session.get(Message, response.message_id) + message.tg_message_id = response.tg_message_id + if response.ok: + message.status = MessageStatus.success + else: + message.status = MessageStatus.error + + await self.session.commit() + + async def _process_connector_delete_response(self, response: DeleteMessageResponse): + if not response.ok: + return + + message = await self.session.get(Message, response.message_id) + message.is_deleted = True + await self.session.commit() + + async def _process_connector_edit_response(self, response: EditMessageResponse): + if not response.ok: + return + + message = await self.session.get(Message, response.message_id) + message.text = response.text + message.is_edited = True + await self.session.commit() + + async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]: + stmt = ( + select(Chat) + .join(TgGroup) + .where( + Chat.tg_topic_id == tg_topic_id, + TgGroup.id == UUID(group_id), + ) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def _receive_message_from_client(self, request: MessageFromTelegramRequest): + tg_sender: Optional[TgUser] = await self.session.get(TgUser, request.message.sender.id) + if not tg_sender: + tg_sender: TgUser = TgUser(**request.message.sender.model_dump()) + self.session.add(tg_sender) + + chat = await self._get_chat(request.message.group_id, request.message.tg_topic_id) + if not chat: + return + + file = None + if request.message.file: + file = MessageFile(**request.message.file.model_dump()) + + message = Message( + text=request.message.text if request.message.text else "", + created_at=datetime.now(), + tg_sender_id=tg_sender.id, + chat_id=chat.id, + status=MessageStatus.success, + file=file, + ) + self.session.add(message) + await self.session.commit() diff --git a/external/kafka/services/producer_service.py b/external/kafka/services/producer_service.py new file mode 100644 index 0000000..0ef97d8 --- /dev/null +++ b/external/kafka/services/producer_service.py @@ -0,0 +1,77 @@ +import pickle +from uuid import UUID + +from aiohttp import ClientConnectorError + +from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY +from external.kafka import producer +from external.kafka.enums import KafkaMessageType +from external.kafka.schemas.producer import * +from services.base import BaseService + + +class ProducerService(BaseService): + @staticmethod + async def _send_message(request: BaseConnectorRequest): + try: + await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) + except ClientConnectorError: + return False, 'Ошибка подключения к сервису' + except Exception as e: + return False, str(e) + return True, 'Сообщение отправлено' + + @staticmethod + async def send_message_to_connector( + message_text: str, + group_id: UUID, + topic_id: int, + message_id: int, + ) -> tuple[bool, str]: + request = SendMessageToConnectorRequest( + message_type=KafkaMessageType.SEND, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=MessageSchema( + message_id=message_id, + text=message_text, + group_id=str(group_id), + topic_id=topic_id, + ), + ) + return await ProducerService._send_message(request) + + @staticmethod + async def send_message_deleting_to_connector( + message_id: int, + tg_message_id: int, + group_id: UUID, + ) -> tuple[bool, str]: + request = SendMessageDeletingToConnectorRequest( + message_type=KafkaMessageType.DELETE, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=BaseMessageSchema( + message_id=message_id, + tg_message_id=tg_message_id, + group_id=str(group_id), + ), + ) + return await ProducerService._send_message(request) + + @staticmethod + async def send_message_editing_to_connector( + message_id: int, + tg_message_id: int, + group_id: UUID, + text: str, + ) -> tuple[bool, str]: + request = SendMessageEditingToConnectorRequest( + message_type=KafkaMessageType.EDIT, + app_auth_key=CHAT_CONNECTOR_API_KEY, + message=EditMessageSchema( + message_id=message_id, + tg_message_id=tg_message_id, + group_id=str(group_id), + text=text, + ), + ) + return await ProducerService._send_message(request) diff --git a/main.py b/main.py index a969ddd..34239fb 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,37 @@ +import asyncio +import platform +from contextlib import asynccontextmanager + +from aiokafka.errors import KafkaConnectionError from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -import platform - from starlette.staticfiles import StaticFiles import routers from constants import API_ROOT +from external.kafka import producer, consume_messages origins = [ 'http://localhost:5173' ] -app = FastAPI(separate_input_output_schemas=False) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + try: + await producer.start() + except KafkaConnectionError as e: + print(e) + + consumer_task = asyncio.create_task(consume_messages()) + + yield + + await producer.stop() + consumer_task.cancel() + + +app = FastAPI(lifespan=lifespan, separate_input_output_schemas=False) if platform.system() == 'Linux': import uvicorn.workers @@ -58,6 +79,7 @@ routers_list = [ routers.board_router, routers.status_router, routers.card_tag_router, + routers.chat_router, ] for router in routers_list: app.include_router(router) diff --git a/models/__init__.py b/models/__init__.py index 1ee7bfa..235496e 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -23,5 +23,6 @@ from .card_group import * from .transaction import * from .residues import * from .shipping import * +from .chat import * configure_mappers() diff --git a/models/auth.py b/models/auth.py index c6c9da0..ddd79f9 100644 --- a/models/auth.py +++ b/models/auth.py @@ -10,7 +10,7 @@ from models.work_shifts import WorkShift if TYPE_CHECKING: from models.payroll import PayRate, PaymentRecord - from models import Card, CardEmployees + from models import Card, CardEmployees, Message role_permissions = Table( 'role_permissions', @@ -138,6 +138,12 @@ class User(BaseModel): lazy="noload", ) + messages: Mapped[list['Message']] = relationship( + 'Message', + back_populates='crm_sender', + lazy='noload', + ) + class InviteCode(BaseModel): __tablename__ = 'invite_codes' diff --git a/models/card.py b/models/card.py index 90f29e6..5a83446 100644 --- a/models/card.py +++ b/models/card.py @@ -12,8 +12,8 @@ from .shipping_warehouse import ShippingWarehouse if TYPE_CHECKING: from . import ( CardBillRequest, User, BaseModel, Board, CardStatus, CardGroup, CardAttribute, Client, CardTag, - CardService as CardServiceModel, CardProduct, - ) + CardService as CardServiceModel, CardProduct, Chat, +) class Card(BaseModel): @@ -114,6 +114,9 @@ class Card(BaseModel): # module employees employees: Mapped[list['CardEmployees']] = relationship(back_populates='card', lazy='selectin') + # module chat + chat: Mapped[Optional['Chat']] = relationship(back_populates='card', lazy='joined') + # endregion diff --git a/models/chat.py b/models/chat.py new file mode 100644 index 0000000..826d36c --- /dev/null +++ b/models/chat.py @@ -0,0 +1,178 @@ +import enum +from datetime import datetime +from typing import Optional, TYPE_CHECKING +from uuid import UUID + +from sqlalchemy import ForeignKey, BigInteger, Enum, Uuid +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from models import BaseModel, User + +if TYPE_CHECKING: + from models import Client, Card + + +class TgUser(BaseModel): + __tablename__ = 'tg_users' + + id: Mapped[int] = mapped_column( + BigInteger(), + primary_key=True, + comment='Telegram user ID', + ) + username: Mapped[str] = mapped_column( + index=True, + nullable=False, + unique=True, + ) + first_name: Mapped[str] = mapped_column(nullable=True) + last_name: Mapped[str] = mapped_column(nullable=True) + + messages: Mapped['Message'] = relationship( + 'Message', + lazy='noload', + back_populates='tg_sender', + ) + + +class TgGroup(BaseModel): + __tablename__ = 'tg_groups' + + id: Mapped[UUID] = mapped_column(Uuid, primary_key=True) + + tg_group_id: Mapped[int] = mapped_column( + BigInteger(), + nullable=False, + unique=True, + ) + tg_invite_link: Mapped[str] = mapped_column(nullable=False) + + client_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('clients.id'), + unique=True, + ) + client: Mapped[Optional['Client']] = relationship( + 'Client', + lazy='joined', + back_populates='tg_group', + ) + + chats: Mapped[list['Chat']] = relationship( + 'Chat', + lazy='noload', + back_populates='tg_group', + ) + + +class Chat(BaseModel): + __tablename__ = 'chats' + + id: Mapped[int] = mapped_column(primary_key=True) + + tg_topic_id: Mapped[int] = mapped_column(nullable=False) + + card_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('cards.id'), + unique=True, + ) + card: Mapped[Optional['Card']] = relationship( + 'Card', + lazy='joined', + back_populates='chat', + ) + + client_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('clients.id'), + unique=True, + ) + client: Mapped[Optional['Client']] = relationship( + 'Client', + lazy='joined', + back_populates='chat', + ) + + tg_group_id: Mapped[UUID] = mapped_column( + ForeignKey('tg_groups.id'), + nullable=False, + ) + tg_group: Mapped[TgGroup] = relationship( + 'TgGroup', + lazy='joined', + back_populates='chats', + ) + + messages: Mapped[list['Message']] = relationship( + 'Message', + lazy='selectin', + back_populates='chat', + order_by='Message.created_at.desc()', + ) + + +class MessageFile(BaseModel): + __tablename__ = 'message_files' + + id: Mapped[int] = mapped_column(primary_key=True) + file_path: Mapped[str] = mapped_column(nullable=False) + type: Mapped[Optional[str]] = mapped_column(nullable=True) + file_name: Mapped[str] = mapped_column(nullable=False) + file_size: Mapped[int] = mapped_column(BigInteger(), nullable=True, comment='Размер файла в байтах') + + message_id: Mapped[int] = mapped_column(ForeignKey('messages.id')) + message: Mapped['Message'] = relationship( + 'Message', + lazy='noload', + back_populates='file', + ) + + +class MessageStatus(enum.Enum): + sending = 'SENDING' + success = 'SUCCESS' + error = 'ERROR' + + +class Message(BaseModel): + __tablename__ = 'messages' + + id: Mapped[int] = mapped_column(primary_key=True) + tg_message_id: Mapped[Optional[int]] = mapped_column(nullable=True) + + text: Mapped[str] = mapped_column(nullable=False) + created_at: Mapped[datetime] = mapped_column(nullable=False) + status: Mapped[MessageStatus] = mapped_column(Enum(MessageStatus), nullable=False) + is_deleted: Mapped[bool] = mapped_column(default=False, server_default='0', nullable=False) + is_edited: Mapped[bool] = mapped_column(default=False, server_default='0', nullable=False) + + tg_sender_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('tg_users.id'), + nullable=True, + ) + tg_sender: Mapped[TgUser] = relationship( + 'TgUser', + lazy='selectin', + back_populates='messages', + ) + + crm_sender_id: Mapped[Optional[int]] = mapped_column( + ForeignKey('users.id'), + nullable=True, + ) + crm_sender: Mapped[Optional['User']] = relationship( + 'User', + lazy='selectin', + back_populates='messages', + ) + + chat_id: Mapped[int] = mapped_column(ForeignKey('chats.id')) + chat: Mapped[Chat] = relationship( + 'Chat', + lazy='noload', + back_populates='messages', + ) + + file: Mapped[Optional[MessageFile]] = relationship( + 'MessageFile', + back_populates='message', + lazy='selectin', + ) diff --git a/models/client.py b/models/client.py index 66aff4b..74a8ad7 100644 --- a/models/client.py +++ b/models/client.py @@ -7,7 +7,7 @@ from sqlalchemy.orm import relationship, Mapped, mapped_column from models import BaseModel if TYPE_CHECKING: - from models import ResidualPallet, ResidualBox, Product, BarcodeTemplate, User + from models import ResidualPallet, ResidualBox, Product, BarcodeTemplate, User, TgGroup, Chat class Client(BaseModel): @@ -39,6 +39,9 @@ class Client(BaseModel): pallets: Mapped[list['ResidualPallet']] = relationship(back_populates='client', lazy='selectin') boxes: Mapped[list['ResidualBox']] = relationship(back_populates='client', lazy='selectin') + tg_group: Mapped[Optional['TgGroup']] = relationship('TgGroup', back_populates='client', lazy='joined') + chat: Mapped[Optional['Chat']] = relationship('Chat', back_populates='client', lazy='joined') + class ClientDetails(BaseModel): __tablename__ = 'client_details' diff --git a/requirements.txt b/requirements.txt index ba641fd..bbd8c3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ jinja2 # Security python-jose[cryptography] passlib[bcrypt] +pyjwt # Database sqlalchemy @@ -17,10 +18,14 @@ sqlalchemy[asyncio] asyncpg alembic +# Kafka +aiokafka + # Other python-dotenv aiohttp aiohttp[speedups] +requests openpyxl==3.0.10 lexorank-py celery[redis] diff --git a/routers/__init__.py b/routers/__init__.py index b9a55a1..1829815 100644 --- a/routers/__init__.py +++ b/routers/__init__.py @@ -26,3 +26,4 @@ from .project import project_router from .board import board_router from .status import status_router from .card_tag import card_tag_router +from .chat import chat_router diff --git a/routers/chat.py b/routers/chat.py new file mode 100644 index 0000000..97cad1d --- /dev/null +++ b/routers/chat.py @@ -0,0 +1,125 @@ +from typing import Annotated + +from fastapi import APIRouter, Depends, UploadFile +from sqlalchemy.ext.asyncio import AsyncSession + +from backend.session import get_session +from models import User +from schemas.chat import * +from services.auth import get_current_user +from services.chat import ChatService + +chat_router = APIRouter( + prefix='/chat', + tags=['chat'], +) + + +@chat_router.post( + '/message', + operation_id='send_text_message', + response_model=SendTextMessageResponse, +) +async def send_text_message( + session: Annotated[AsyncSession, Depends(get_session)], + user: Annotated[User, Depends(get_current_user)], + request: SendTextMessageRequest, +): + return await ChatService(session).send_message(request, user) + + +@chat_router.post( + '/message/repeat', + operation_id='repeat_sending_text_message', + response_model=RepeatSendingTextMessageResponse, +) +async def repeat_sending_text_message( + session: Annotated[AsyncSession, Depends(get_session)], + request: RepeatSendingTextMessageRequest, +): + return await ChatService(session).repeat_sending_message(request) + + +@chat_router.post( + '/message/files', + operation_id='send_messages_with_files', + response_model=LoadMessagesResponse, +) +async def send_messages_with_files( + session: Annotated[AsyncSession, Depends(get_session)], + user: Annotated[User, Depends(get_current_user)], + files: list[UploadFile], + chat_id: int, + caption: str, +): + return await ChatService(session).send_messages_with_files(files, chat_id, caption, user) + + +@chat_router.delete( + '/message/{message_id}', + operation_id='delete_message', + response_model=DeleteMessageResponse, +) +async def delete_message( + session: Annotated[AsyncSession, Depends(get_session)], + message_id: int, +): + return await ChatService(session).delete_message(message_id) + + +@chat_router.patch( + '/message', + operation_id='edit_message', + response_model=EditMessageResponse, +) +async def edit_message( + session: Annotated[AsyncSession, Depends(get_session)], + request: EditMessageRequest, +): + return await ChatService(session).edit_message(request) + + +@chat_router.post( + '/', + operation_id='get_chat', + response_model=GetChatResponse, +) +async def get_chat( + session: Annotated[AsyncSession, Depends(get_session)], + request: GetChatRequest, +): + return await ChatService(session).get_chat(request) + + +@chat_router.post( + '/create', + operation_id='create_chat', + response_model=CreateChatResponse, +) +async def create_chat( + session: Annotated[AsyncSession, Depends(get_session)], + request: CreateChatRequest, +): + return await ChatService(session).create_chat(request) + + +@chat_router.post( + '/messages', + operation_id='get_messages', + response_model=GetMessagesResponse, +) +async def get_messages( + session: Annotated[AsyncSession, Depends(get_session)], + request: GetMessagesRequest, +): + return await ChatService(session).get_messages(request) + + +@chat_router.get( + '/tg-file/{file_id}', +) +async def get_tg_file( + session: Annotated[AsyncSession, Depends(get_session)], + file_id: int, +): + return await ChatService(session).get_tg_file(file_id) diff --git a/schemas/card.py b/schemas/card.py index 488c68a..569288d 100644 --- a/schemas/card.py +++ b/schemas/card.py @@ -8,6 +8,7 @@ from schemas.base import BaseSchema, OkMessageSchema from schemas.billing import CardBillRequestSchema from schemas.board import BoardSchema from schemas.card_tag import CardTagSchema +from schemas.chat import ChatSchema from schemas.client import ClientSchema from schemas.group import CardGroupSchema from schemas.marketplace import BaseMarketplaceSchema @@ -108,6 +109,7 @@ class BaseCardSchema(BaseSchema): class CardSchema(BaseCardSchema): attributes: list[CardAttributeSchema] + chat: Optional[ChatSchema] class CardGeneralInfoSchema(BaseSchemaWithAttributes): diff --git a/schemas/chat.py b/schemas/chat.py new file mode 100644 index 0000000..841d6f4 --- /dev/null +++ b/schemas/chat.py @@ -0,0 +1,125 @@ +from datetime import datetime +from typing import Optional + +from schemas.base import BaseSchema, OkMessageSchema +from schemas.user import UserSchema + + +# region Entities + +class TgUserSchema(BaseSchema): + id: int + first_name: str + last_name: str + username: str + + +class MessageFileSchema(BaseSchema): + id: int + file_path: str + type: str + file_name: str + file_size: int + + +class BaseMessageSchema(BaseSchema): + text: str + chat_id: int + + +class EditMessageSchema(BaseMessageSchema): + id: int + + +class MessageSchema(EditMessageSchema): + created_at: datetime + tg_sender: Optional[TgUserSchema] + crm_sender: Optional[UserSchema] + status: str + is_edited: bool + file: Optional[MessageFileSchema] = None + + +class RepeatSendingMessageSchema(BaseMessageSchema): + text: str + id: int + + +class TgGroupSchema(BaseSchema): + tg_group_id: int + tg_invite_link: str + + +class ChatSchema(BaseSchema): + id: int + client_id: Optional[int] + card_id: Optional[int] + tg_group: Optional[TgGroupSchema] + +# endregion + +# region Requests + +class SendTextMessageRequest(BaseSchema): + message: BaseMessageSchema + + +class RepeatSendingTextMessageRequest(BaseSchema): + message: RepeatSendingMessageSchema + + +class EditMessageRequest(BaseSchema): + message: EditMessageSchema + + +class GetChatRequest(BaseSchema): + client_id: int + card_id: Optional[int] + + +class CreateChatRequest(BaseSchema): + client_id: int + card_id: Optional[int] + + +class GetMessagesRequest(BaseSchema): + chat_id: int + offset: int + limit: int + +# endregion + +# region Responses + +class SendTextMessageResponse(OkMessageSchema): + pass + + +class RepeatSendingTextMessageResponse(OkMessageSchema): + pass + + +class LoadMessagesResponse(OkMessageSchema): + pass + + +class DeleteMessageResponse(OkMessageSchema): + pass + + +class EditMessageResponse(OkMessageSchema): + pass + + +class GetChatResponse(BaseSchema): + chat: Optional[ChatSchema] + + +class CreateChatResponse(OkMessageSchema): + pass + + +class GetMessagesResponse(BaseSchema): + messages: list[MessageSchema] + +# endregion diff --git a/schemas/client.py b/schemas/client.py index 6707d75..4d3c019 100644 --- a/schemas/client.py +++ b/schemas/client.py @@ -4,6 +4,7 @@ from pydantic import field_validator from schemas.barcode import BarcodeTemplateSchema from schemas.base import BaseSchema, OkMessageSchema +from schemas.chat import ChatSchema from schemas.residues import ResidualBoxSchema, ResidualPalletSchema @@ -26,6 +27,7 @@ class ClientSchema(BaseSchema): barcode_template: BarcodeTemplateSchema | None = None comment: Optional[str] = None details: ClientDetailsSchema | None = None + chat: ChatSchema | None = None class ClientDetailedSchema(ClientSchema): diff --git a/services/chat.py b/services/chat.py new file mode 100644 index 0000000..0ab3ae8 --- /dev/null +++ b/services/chat.py @@ -0,0 +1,305 @@ +import requests +from aiohttp import ClientConnectorError +from fastapi import HTTPException, UploadFile +from sqlalchemy import select +from sqlalchemy.orm import joinedload +from starlette.responses import StreamingResponse + +from backend.config import CHAT_CONNECTOR_API_KEY, CHAT_TELEGRAM_BOT_TOKEN +from external.chat.chat_client import ChatClient +from external.chat.schemas import * +from external.kafka.services.producer_service import ProducerService +from models import Message, Chat, MessageStatus, TgGroup, Client, Card, MessageFile, User +from schemas.chat import * +from services.base import BaseService + + +class ChatService(BaseService): + async def _get_chat(self, client_id: int, card_id: Optional[int]) -> Optional[Chat]: + stmt = ( + select(Chat) + .where( + Chat.card_id == card_id, + TgGroup.client_id == client_id, + ) + .join(TgGroup) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def get_chat(self, request: GetChatRequest) -> GetChatResponse: + chat = await self._get_chat(request.client_id, request.card_id) + return GetChatResponse(chat=chat) + + async def _get_group(self, client_id: int) -> Optional[TgGroup]: + stmt = ( + select(TgGroup) + .where(TgGroup.client_id == client_id) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def _create_group_for_client(self, client_id: int, title: str) -> Optional[TgGroup]: + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + + request = ExternalCreateGroupRequest(title=title) + response = await chat_client.create_group(request) + + group = TgGroup( + id=response.group_id, + client_id=client_id, + tg_group_id=response.tg_group_id, + tg_invite_link=response.tg_invite_link, + ) + + chat = Chat( + tg_topic_id=1, + card_id=None, + client_id=client_id, + tg_group_id=response.group_id, + ) + self.session.add_all([group, chat]) + await self.session.commit() + + return group + + @staticmethod + def _get_title_for_client_chat(client: Client) -> str: + return f'[{client.id}] {client.name}' + + @staticmethod + def _get_title_for_card_chat(card: Card) -> str: + return f'[{card.id}] {card.name}' + + async def _create_topic_for_card( + self, + group_id: UUID, + card: Card, + ) -> Chat: + card_chat_icon_id = 5348227245599105972 # 💼 + + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + + request = ExternalCreateTopicRequest( + group_id=str(group_id), + title=self._get_title_for_card_chat(card), + icon_emoji_id=card_chat_icon_id, + ) + response = await chat_client.create_topic(request) + + chat = Chat( + tg_topic_id=response.tg_topic_id, + card_id=card.id, + client_id=None, + tg_group_id=group_id, + ) + self.session.add(chat) + await self.session.commit() + + return chat + + async def _create_chat_for_card(self, request: CreateChatRequest, card: Card, client: Client) -> CreateChatResponse: + existing_chat = await self._get_chat(request.client_id, request.card_id) + if existing_chat: + return CreateChatResponse( + ok=False, + message=f'Чат для клиента ID {request.client_id} и карточки ID {request.card_id} уже существует' + ) + + group: Optional[TgGroup] = await self._get_group(request.client_id) + if not group: + group: TgGroup = await self._create_group_for_client( + request.client_id, + self._get_title_for_client_chat(client) + ) + + await self._create_topic_for_card(group.id, card) + + return CreateChatResponse(ok=True, message='Чат успешно создан') + + async def _create_chat_for_client(self, request: CreateChatRequest, client: Client) -> CreateChatResponse: + existing_chat = await self._get_chat(client.id, request.card_id) + if existing_chat: + return CreateChatResponse(ok=False, message=f'Чат для клиента ID {client.id} уже существует') + + group = await self._get_group(client.id) + if group: + return CreateChatResponse(ok=False, message=f'Группа для клиента ID {client.id} уже существует') + + client_chat_title = self._get_title_for_client_chat(client) + await self._create_group_for_client(request.client_id, client_chat_title) + + return CreateChatResponse(ok=True, message='Чат успешно создан') + + async def create_chat(self, request: CreateChatRequest) -> CreateChatResponse: + client: Optional[Client] = await self.session.get(Client, request.client_id) + if not client: + return CreateChatResponse(ok=False, message=f'Клиент с ID {request.client_id} не найден') + + try: + if not request.card_id: + return await self._create_chat_for_client(request, client) + + card: Optional[Card] = await self.session.get(Card, request.card_id) + if not card: + return CreateChatResponse(ok=False, message=f'Карточка с ID {request.card_id} не найдена') + + return await self._create_chat_for_card(request, card, client) + except ClientConnectorError: + return CreateChatResponse(ok=False, message=f'Ошибка подключения к сервису') + except Exception as e: + return CreateChatResponse(ok=False, message=str(e)) + + async def get_messages(self, request: GetMessagesRequest) -> GetMessagesResponse: + stmt = ( + select(Message) + .where( + Message.chat_id == request.chat_id, + Message.is_deleted == False, + ) + .order_by(Message.created_at.desc()) + .offset(request.offset) + .limit(request.limit) + ) + messages = (await self.session.scalars(stmt)).all() + return GetMessagesResponse(messages=messages) + + async def send_message(self, request: SendTextMessageRequest, user: User) -> SendTextMessageResponse: + chat: Optional[Chat] = await self.session.get(Chat, request.message.chat_id) + if not chat: + return SendTextMessageResponse(ok=False, message=f'Чат с ID: {request.message.chat_id} не найден') + + message = Message( + text=request.message.text, + created_at=datetime.now(), + chat_id=request.message.chat_id, + status=MessageStatus.sending, + crm_sender_id=user.id, + ) + self.session.add(message) + await self.session.commit() + + ok, message = await ProducerService.send_message_to_connector( + request.message.text, + chat.tg_group_id, + chat.tg_topic_id, + message.id, + ) + + return SendTextMessageResponse(ok=ok, message=message) + + async def repeat_sending_message( + self, + request: RepeatSendingTextMessageRequest, + ) -> RepeatSendingTextMessageResponse: + message: Optional[Message] = await self._get_message_by_id(request.message.id) + if not message: + return RepeatSendingTextMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено') + + ok, msg = await ProducerService.send_message_to_connector( + request.message.text, + message.chat.tg_group_id, + message.chat.tg_topic_id, + message.id, + ) + + if ok: + message.status = MessageStatus.sending + await self.session.commit() + + return RepeatSendingTextMessageResponse(ok=ok, message=msg) + + async def send_messages_with_files( + self, + files: list[UploadFile], + chat_id: int, + caption: str, + user: User, + ) -> LoadMessagesResponse: + chat: Optional[Chat] = await self.session.get(Chat, chat_id) + if not chat: + return SendTextMessageResponse(ok=False, message=f'Чат с ID: {chat_id} не найден') + + try: + chat_client = ChatClient(api_key=CHAT_CONNECTOR_API_KEY) + response = await chat_client.send_messages_with_files( + str(chat.tg_group_id), + chat.tg_topic_id, + caption, + files + ) + + last_message = None + for file_schema in response.files: + file = MessageFile(**file_schema.model_dump()) + self.session.add(file) + + message = Message( + text='', + created_at=datetime.now(), + chat_id=chat_id, + status=MessageStatus.success, + file=file, + crm_sender_id=user.id, + ) + last_message = message + self.session.add(message) + + if last_message: + last_message.text = caption + await self.session.commit() + + return LoadMessagesResponse(ok=response.ok, message=response.message) + except ClientConnectorError: + return LoadMessagesResponse(ok=False, message='Ошибка подключения к сервису') + except Exception as e: + return LoadMessagesResponse(ok=False, message=str(e)) + + async def _get_message_by_id(self, message_id: int) -> Optional[Message]: + stmt = ( + select(Message) + .where(Message.id == message_id) + .options(joinedload(Message.chat)) + ) + return (await self.session.scalars(stmt)).one_or_none() + + async def delete_message(self, message_id: int) -> DeleteMessageResponse: + message: Optional[Message] = await self._get_message_by_id(message_id) + if not message: + return DeleteMessageResponse(ok=False, message=f'Сообщение с ID: {message_id} не найдено') + + ok, msg = await ProducerService.send_message_deleting_to_connector( + message_id, + message.tg_message_id, + message.chat.tg_group_id, + ) + + return DeleteMessageResponse(ok=ok, message=msg) + + async def edit_message(self, request: EditMessageRequest) -> EditMessageResponse: + message: Optional[Message] = await self._get_message_by_id(request.message.id) + if not message: + return EditMessageResponse(ok=False, message=f'Сообщение с ID: {request.message.id} не найдено') + + ok, msg = await ProducerService.send_message_editing_to_connector( + message.id, + message.tg_message_id, + message.chat.tg_group_id, + request.message.text, + ) + + return EditMessageResponse(ok=ok, message=msg) + + async def get_tg_file(self, file_id: int) -> StreamingResponse: + file: Optional[MessageFile] = await self.session.get(MessageFile, file_id) + if not file: + raise HTTPException(status_code=404, detail=f'Файл с ID {file_id} не найден') + + url: str = f'https://api.telegram.org/file/bot{CHAT_TELEGRAM_BOT_TOKEN}/{file.file_path}' + + response = requests.get(url, stream=True) + + if response.status_code != 200: + raise HTTPException(status_code=response.status_code, detail="Error fetching file") + + content_type = response.headers.get("Content-Type", "application/octet-stream") + + return StreamingResponse(response.iter_content(chunk_size=8192), media_type=content_type)