Merge remote-tracking branch 'origin/chats'
This commit is contained in:
0
external/chat/__init__.py
vendored
Normal file
0
external/chat/__init__.py
vendored
Normal file
55
external/chat/chat_client.py
vendored
Normal file
55
external/chat/chat_client.py
vendored
Normal file
@@ -0,0 +1,55 @@
|
||||
import aiohttp
|
||||
import jwt
|
||||
from fastapi import UploadFile
|
||||
|
||||
from backend.config import CHATS_SYNC_URL, CHAT_CONNECTOR_API_KEY
|
||||
from external.chat.schemas import *
|
||||
from services.auth import algorithm
|
||||
|
||||
|
||||
class ChatClient:
|
||||
def __init__(self, api_key: str):
|
||||
self.api_key = api_key
|
||||
self.headers = {
|
||||
'Authorization': 'Bearer ' + self.create_jwt_token()
|
||||
}
|
||||
self.base_url = CHATS_SYNC_URL
|
||||
self.chats_sync_endpoint = '/chats-sync'
|
||||
self.groups_endpoint = '/group'
|
||||
|
||||
def create_jwt_token(self):
|
||||
return jwt.encode({'sub': self.api_key}, CHAT_CONNECTOR_API_KEY, algorithm=algorithm)
|
||||
|
||||
async def _method(self, http_method, method, **kwargs):
|
||||
async with aiohttp.ClientSession(headers=self.headers) as session:
|
||||
async with session.request(http_method, self.base_url + method, **kwargs) as response:
|
||||
return await response.json()
|
||||
|
||||
async def create_group(self, request: ExternalCreateGroupRequest) -> ExternalCreateGroupResponse:
|
||||
json_data = request.model_dump()
|
||||
response = await self._method('POST', self.groups_endpoint + '/create', json=json_data)
|
||||
return ExternalCreateGroupResponse.model_validate(response)
|
||||
|
||||
async def create_topic(self, request: ExternalCreateTopicRequest) -> ExternalCreateTopicResponse:
|
||||
json_data = request.model_dump()
|
||||
response = await self._method('POST', self.groups_endpoint + '/topic/create', json=json_data)
|
||||
return ExternalCreateTopicResponse.model_validate(response)
|
||||
|
||||
async def send_messages_with_files(
|
||||
self,
|
||||
tg_group_id: str,
|
||||
tg_topic_id: int,
|
||||
caption: str,
|
||||
files: list[UploadFile],
|
||||
) -> ExternalSendMessagesWithFilesResponse:
|
||||
query_params = f'?tg_group_id={tg_group_id}&tg_topic_id={tg_topic_id}&caption={caption}'
|
||||
|
||||
data = aiohttp.FormData(default_to_multipart=True)
|
||||
|
||||
for file in files:
|
||||
content = await file.read()
|
||||
data.add_field('files', content, filename=file.filename, content_type=file.content_type)
|
||||
|
||||
response = await self._method('POST', self.chats_sync_endpoint + '/send' + query_params, data=data)
|
||||
|
||||
return ExternalSendMessagesWithFilesResponse.model_validate(response)
|
||||
47
external/chat/schemas.py
vendored
Normal file
47
external/chat/schemas.py
vendored
Normal file
@@ -0,0 +1,47 @@
|
||||
from typing import Optional
|
||||
from uuid import UUID
|
||||
|
||||
from schemas.base import BaseSchema, OkMessageSchema
|
||||
|
||||
|
||||
# region Entities
|
||||
|
||||
class ExternalMessageFileSchema(BaseSchema):
|
||||
file_path: str
|
||||
type: str
|
||||
file_name: str
|
||||
file_size: int
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region Requests
|
||||
|
||||
class ExternalCreateGroupRequest(BaseSchema):
|
||||
title: str
|
||||
|
||||
|
||||
class ExternalCreateTopicRequest(BaseSchema):
|
||||
group_id: str
|
||||
title: str
|
||||
icon_emoji_id: Optional[int] = None
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region Responses
|
||||
|
||||
class ExternalCreateGroupResponse(BaseSchema):
|
||||
tg_group_id: int
|
||||
group_id: UUID
|
||||
tg_invite_link: str
|
||||
|
||||
|
||||
class ExternalCreateTopicResponse(BaseSchema):
|
||||
tg_topic_id: int
|
||||
|
||||
|
||||
class ExternalSendMessagesWithFilesResponse(OkMessageSchema):
|
||||
files: list[ExternalMessageFileSchema]
|
||||
|
||||
# endregion
|
||||
2
external/kafka/__init__.py
vendored
Normal file
2
external/kafka/__init__.py
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
from .producer import producer
|
||||
from .consumer import consume_messages
|
||||
31
external/kafka/consumer.py
vendored
Normal file
31
external/kafka/consumer.py
vendored
Normal file
@@ -0,0 +1,31 @@
|
||||
from aiokafka import AIOKafkaConsumer
|
||||
from aiokafka.errors import KafkaConnectionError
|
||||
|
||||
from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC
|
||||
from backend.session import session_maker
|
||||
from external.kafka.context import context
|
||||
from external.kafka.services.consumer_service import ConsumerService
|
||||
|
||||
consumer = AIOKafkaConsumer(
|
||||
KAFKA_CONSUMER_TOPIC,
|
||||
bootstrap_servers=KAFKA_URL,
|
||||
security_protocol='SSL',
|
||||
ssl_context=context,
|
||||
)
|
||||
|
||||
|
||||
async def consume_messages():
|
||||
try:
|
||||
await consumer.start()
|
||||
except KafkaConnectionError as e:
|
||||
print(e)
|
||||
return
|
||||
|
||||
async with session_maker() as session:
|
||||
consumer_service = ConsumerService(session)
|
||||
|
||||
try:
|
||||
async for message in consumer:
|
||||
await consumer_service.consume_message(message)
|
||||
finally:
|
||||
await consumer.stop()
|
||||
11
external/kafka/context.py
vendored
Normal file
11
external/kafka/context.py
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
from pathlib import Path
|
||||
|
||||
from aiokafka.helpers import create_ssl_context
|
||||
|
||||
from constants import KAFKA_CERTS_PATH
|
||||
|
||||
context = create_ssl_context(
|
||||
cafile=KAFKA_CERTS_PATH / Path('ca-cert'),
|
||||
certfile=KAFKA_CERTS_PATH / Path('cert-signed'),
|
||||
keyfile=KAFKA_CERTS_PATH / Path('cert-key'),
|
||||
)
|
||||
7
external/kafka/enums.py
vendored
Normal file
7
external/kafka/enums.py
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
import enum
|
||||
|
||||
|
||||
class KafkaMessageType(enum.Enum):
|
||||
SEND = 1
|
||||
EDIT = 2
|
||||
DELETE = 3
|
||||
10
external/kafka/producer.py
vendored
Normal file
10
external/kafka/producer.py
vendored
Normal file
@@ -0,0 +1,10 @@
|
||||
from aiokafka import AIOKafkaProducer
|
||||
|
||||
from backend.config import KAFKA_URL
|
||||
from external.kafka.context import context
|
||||
|
||||
producer = AIOKafkaProducer(
|
||||
bootstrap_servers=KAFKA_URL,
|
||||
security_protocol='SSL',
|
||||
ssl_context=context,
|
||||
)
|
||||
58
external/kafka/schemas/consumer.py
vendored
Normal file
58
external/kafka/schemas/consumer.py
vendored
Normal file
@@ -0,0 +1,58 @@
|
||||
from typing import Optional
|
||||
|
||||
from schemas.base import OkMessageSchema, BaseSchema
|
||||
|
||||
|
||||
# region Entities
|
||||
|
||||
class TelegramUserSchema(BaseSchema):
|
||||
id: int
|
||||
first_name: str
|
||||
last_name: str
|
||||
username: str
|
||||
|
||||
|
||||
class MessageFileSchema(BaseSchema):
|
||||
file_path: str
|
||||
type: str
|
||||
file_name: str
|
||||
file_size: int
|
||||
|
||||
|
||||
class MessageFromTelegramSchema(BaseSchema):
|
||||
group_id: str
|
||||
tg_topic_id: int
|
||||
text: Optional[str]
|
||||
sender: TelegramUserSchema
|
||||
file: Optional[MessageFileSchema]
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region Requests
|
||||
|
||||
class MessageFromTelegramRequest(BaseSchema):
|
||||
message: MessageFromTelegramSchema
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region Responses
|
||||
|
||||
class BaseConnectorResponse(OkMessageSchema):
|
||||
message_type: int
|
||||
message_id: int
|
||||
|
||||
|
||||
class SendMessageToConnectorResponse(BaseConnectorResponse):
|
||||
tg_message_id: Optional[int] = None
|
||||
|
||||
|
||||
class DeleteMessageResponse(BaseConnectorResponse):
|
||||
pass
|
||||
|
||||
|
||||
class EditMessageResponse(BaseConnectorResponse):
|
||||
text: str
|
||||
|
||||
# endregion
|
||||
43
external/kafka/schemas/producer.py
vendored
Normal file
43
external/kafka/schemas/producer.py
vendored
Normal file
@@ -0,0 +1,43 @@
|
||||
from typing import Optional
|
||||
|
||||
from schemas.base import BaseSchema
|
||||
|
||||
|
||||
# region Entities
|
||||
|
||||
class BaseMessageSchema(BaseSchema):
|
||||
message_id: int
|
||||
tg_message_id: Optional[int] = None
|
||||
group_id: str
|
||||
|
||||
|
||||
class EditMessageSchema(BaseMessageSchema):
|
||||
text: str
|
||||
|
||||
|
||||
class MessageSchema(EditMessageSchema):
|
||||
topic_id: int
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region Requests
|
||||
|
||||
class BaseConnectorRequest(BaseSchema):
|
||||
message_type: int
|
||||
app_auth_key: str
|
||||
message: BaseMessageSchema
|
||||
|
||||
|
||||
class SendMessageToConnectorRequest(BaseConnectorRequest):
|
||||
message: MessageSchema
|
||||
|
||||
|
||||
class SendMessageDeletingToConnectorRequest(BaseConnectorRequest):
|
||||
pass
|
||||
|
||||
|
||||
class SendMessageEditingToConnectorRequest(BaseConnectorRequest):
|
||||
message: EditMessageSchema
|
||||
|
||||
# endregion
|
||||
107
external/kafka/services/consumer_service.py
vendored
Normal file
107
external/kafka/services/consumer_service.py
vendored
Normal file
@@ -0,0 +1,107 @@
|
||||
import pickle
|
||||
from datetime import datetime
|
||||
from uuid import UUID
|
||||
|
||||
from aiokafka import ConsumerRecord
|
||||
from sqlalchemy import select
|
||||
|
||||
from external.kafka.enums import KafkaMessageType
|
||||
from external.kafka.schemas.consumer import *
|
||||
from models import Message, MessageStatus, TgUser, Chat, TgGroup, MessageFile
|
||||
from services.base import BaseService
|
||||
|
||||
|
||||
class ConsumerService(BaseService):
|
||||
async def consume_message(self, message: ConsumerRecord):
|
||||
value = pickle.loads(message.value)
|
||||
|
||||
try:
|
||||
if 'ok' in value:
|
||||
""" Received response after sending message from crm """
|
||||
await self._process_connector_response(message)
|
||||
else:
|
||||
""" Received message from client """
|
||||
request = MessageFromTelegramRequest.model_validate(value)
|
||||
await self._receive_message_from_client(request)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
async def _process_connector_response(self, message: ConsumerRecord):
|
||||
value = pickle.loads(message.value)
|
||||
message_type = KafkaMessageType(value['message_type'])
|
||||
|
||||
match message_type:
|
||||
case KafkaMessageType.SEND:
|
||||
response = SendMessageToConnectorResponse.model_validate(value)
|
||||
await self._process_connector_send_response(response)
|
||||
case KafkaMessageType.EDIT:
|
||||
response = EditMessageResponse.model_validate(value)
|
||||
await self._process_connector_edit_response(response)
|
||||
case KafkaMessageType.DELETE:
|
||||
response = DeleteMessageResponse.model_validate(value)
|
||||
await self._process_connector_delete_response(response)
|
||||
case _:
|
||||
raise Exception('Unexpected message type in crm consumer')
|
||||
|
||||
async def _process_connector_send_response(self, response: SendMessageToConnectorResponse):
|
||||
message = await self.session.get(Message, response.message_id)
|
||||
message.tg_message_id = response.tg_message_id
|
||||
if response.ok:
|
||||
message.status = MessageStatus.success
|
||||
else:
|
||||
message.status = MessageStatus.error
|
||||
|
||||
await self.session.commit()
|
||||
|
||||
async def _process_connector_delete_response(self, response: DeleteMessageResponse):
|
||||
if not response.ok:
|
||||
return
|
||||
|
||||
message = await self.session.get(Message, response.message_id)
|
||||
message.is_deleted = True
|
||||
await self.session.commit()
|
||||
|
||||
async def _process_connector_edit_response(self, response: EditMessageResponse):
|
||||
if not response.ok:
|
||||
return
|
||||
|
||||
message = await self.session.get(Message, response.message_id)
|
||||
message.text = response.text
|
||||
message.is_edited = True
|
||||
await self.session.commit()
|
||||
|
||||
async def _get_chat(self, group_id: str, tg_topic_id: int) -> Optional[Chat]:
|
||||
stmt = (
|
||||
select(Chat)
|
||||
.join(TgGroup)
|
||||
.where(
|
||||
Chat.tg_topic_id == tg_topic_id,
|
||||
TgGroup.id == UUID(group_id),
|
||||
)
|
||||
)
|
||||
return (await self.session.scalars(stmt)).one_or_none()
|
||||
|
||||
async def _receive_message_from_client(self, request: MessageFromTelegramRequest):
|
||||
tg_sender: Optional[TgUser] = await self.session.get(TgUser, request.message.sender.id)
|
||||
if not tg_sender:
|
||||
tg_sender: TgUser = TgUser(**request.message.sender.model_dump())
|
||||
self.session.add(tg_sender)
|
||||
|
||||
chat = await self._get_chat(request.message.group_id, request.message.tg_topic_id)
|
||||
if not chat:
|
||||
return
|
||||
|
||||
file = None
|
||||
if request.message.file:
|
||||
file = MessageFile(**request.message.file.model_dump())
|
||||
|
||||
message = Message(
|
||||
text=request.message.text if request.message.text else "",
|
||||
created_at=datetime.now(),
|
||||
tg_sender_id=tg_sender.id,
|
||||
chat_id=chat.id,
|
||||
status=MessageStatus.success,
|
||||
file=file,
|
||||
)
|
||||
self.session.add(message)
|
||||
await self.session.commit()
|
||||
77
external/kafka/services/producer_service.py
vendored
Normal file
77
external/kafka/services/producer_service.py
vendored
Normal file
@@ -0,0 +1,77 @@
|
||||
import pickle
|
||||
from uuid import UUID
|
||||
|
||||
from aiohttp import ClientConnectorError
|
||||
|
||||
from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY
|
||||
from external.kafka import producer
|
||||
from external.kafka.enums import KafkaMessageType
|
||||
from external.kafka.schemas.producer import *
|
||||
from services.base import BaseService
|
||||
|
||||
|
||||
class ProducerService(BaseService):
|
||||
@staticmethod
|
||||
async def _send_message(request: BaseConnectorRequest):
|
||||
try:
|
||||
await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump()))
|
||||
except ClientConnectorError:
|
||||
return False, 'Ошибка подключения к сервису'
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
return True, 'Сообщение отправлено'
|
||||
|
||||
@staticmethod
|
||||
async def send_message_to_connector(
|
||||
message_text: str,
|
||||
group_id: UUID,
|
||||
topic_id: int,
|
||||
message_id: int,
|
||||
) -> tuple[bool, str]:
|
||||
request = SendMessageToConnectorRequest(
|
||||
message_type=KafkaMessageType.SEND,
|
||||
app_auth_key=CHAT_CONNECTOR_API_KEY,
|
||||
message=MessageSchema(
|
||||
message_id=message_id,
|
||||
text=message_text,
|
||||
group_id=str(group_id),
|
||||
topic_id=topic_id,
|
||||
),
|
||||
)
|
||||
return await ProducerService._send_message(request)
|
||||
|
||||
@staticmethod
|
||||
async def send_message_deleting_to_connector(
|
||||
message_id: int,
|
||||
tg_message_id: int,
|
||||
group_id: UUID,
|
||||
) -> tuple[bool, str]:
|
||||
request = SendMessageDeletingToConnectorRequest(
|
||||
message_type=KafkaMessageType.DELETE,
|
||||
app_auth_key=CHAT_CONNECTOR_API_KEY,
|
||||
message=BaseMessageSchema(
|
||||
message_id=message_id,
|
||||
tg_message_id=tg_message_id,
|
||||
group_id=str(group_id),
|
||||
),
|
||||
)
|
||||
return await ProducerService._send_message(request)
|
||||
|
||||
@staticmethod
|
||||
async def send_message_editing_to_connector(
|
||||
message_id: int,
|
||||
tg_message_id: int,
|
||||
group_id: UUID,
|
||||
text: str,
|
||||
) -> tuple[bool, str]:
|
||||
request = SendMessageEditingToConnectorRequest(
|
||||
message_type=KafkaMessageType.EDIT,
|
||||
app_auth_key=CHAT_CONNECTOR_API_KEY,
|
||||
message=EditMessageSchema(
|
||||
message_id=message_id,
|
||||
tg_message_id=tg_message_id,
|
||||
group_id=str(group_id),
|
||||
text=text,
|
||||
),
|
||||
)
|
||||
return await ProducerService._send_message(request)
|
||||
Reference in New Issue
Block a user