From b5c8e35910d17918fb6255a9279f9c47d550d044 Mon Sep 17 00:00:00 2001 From: AlexSserb Date: Thu, 10 Apr 2025 11:19:46 +0400 Subject: [PATCH] fix: optional ssl for kafka and optional tg user fields --- backend/config.py | 1 + external/kafka/consumer.py | 7 +++---- external/kafka/producer.py | 6 +++--- external/kafka/schemas/consumer.py | 6 +++--- models/chat.py | 10 +++------- schemas/chat.py | 6 +++--- 6 files changed, 16 insertions(+), 20 deletions(-) diff --git a/backend/config.py b/backend/config.py index 0a472d8..0519c3d 100644 --- a/backend/config.py +++ b/backend/config.py @@ -27,6 +27,7 @@ CHATS_SYNC_URL = os.environ.get('CHATS_SYNC_URL') KAFKA_URL = os.environ.get('KAFKA_URL') KAFKA_CONSUMER_TOPIC = os.environ.get('KAFKA_CONSUMER_TOPIC') KAFKA_PRODUCER_TOPIC = os.environ.get('KAFKA_PRODUCER_TOPIC') +KAFKA_ENABLE_SSL = os.environ.get('KAFKA_ENABLE_SSL', 'true').lower() in ('true', '1', 't') # Celery CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') diff --git a/external/kafka/consumer.py b/external/kafka/consumer.py index 84f3ff0..df6f113 100644 --- a/external/kafka/consumer.py +++ b/external/kafka/consumer.py @@ -1,20 +1,19 @@ from aiokafka import AIOKafkaConsumer from aiokafka.errors import KafkaConnectionError -from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC +from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC, KAFKA_ENABLE_SSL from backend.session import session_maker from external.kafka.context import context from external.kafka.services.consumer_service import ConsumerService - async def consume_messages(): consumer = AIOKafkaConsumer( KAFKA_CONSUMER_TOPIC, bootstrap_servers=KAFKA_URL, group_id='crm', - security_protocol='SSL', - ssl_context=context, + security_protocol='SSL' if KAFKA_ENABLE_SSL else 'PLAINTEXT', + ssl_context=context if KAFKA_ENABLE_SSL else None, ) try: await consumer.start() diff --git a/external/kafka/producer.py b/external/kafka/producer.py index f24822b..f422a0a 100644 --- a/external/kafka/producer.py +++ b/external/kafka/producer.py @@ -2,7 +2,7 @@ from typing import Optional from aiokafka import AIOKafkaProducer -from backend.config import KAFKA_URL +from backend.config import KAFKA_URL, KAFKA_ENABLE_SSL from external.kafka.context import context _producer: Optional[AIOKafkaProducer] = None @@ -12,8 +12,8 @@ async def init_producer(): global _producer _producer = AIOKafkaProducer( bootstrap_servers=KAFKA_URL, - security_protocol='SSL', - ssl_context=context, + security_protocol='SSL' if KAFKA_ENABLE_SSL else 'PLAINTEXT', + ssl_context=context if KAFKA_ENABLE_SSL else None, ) diff --git a/external/kafka/schemas/consumer.py b/external/kafka/schemas/consumer.py index 0ad7548..dca3143 100644 --- a/external/kafka/schemas/consumer.py +++ b/external/kafka/schemas/consumer.py @@ -7,9 +7,9 @@ from schemas.base import OkMessageSchema, BaseSchema class TelegramUserSchema(BaseSchema): id: int - first_name: str - last_name: str - username: str + first_name: Optional[str] = None + last_name: Optional[str] = None + username: Optional[str] = None class MessageFileSchema(BaseSchema): diff --git a/models/chat.py b/models/chat.py index 826d36c..ffcc7bc 100644 --- a/models/chat.py +++ b/models/chat.py @@ -20,13 +20,9 @@ class TgUser(BaseModel): primary_key=True, comment='Telegram user ID', ) - username: Mapped[str] = mapped_column( - index=True, - nullable=False, - unique=True, - ) - first_name: Mapped[str] = mapped_column(nullable=True) - last_name: Mapped[str] = mapped_column(nullable=True) + username: Mapped[Optional[str]] = mapped_column(nullable=True) + first_name: Mapped[Optional[str]] = mapped_column(nullable=True) + last_name: Mapped[Optional[str]] = mapped_column(nullable=True) messages: Mapped['Message'] = relationship( 'Message', diff --git a/schemas/chat.py b/schemas/chat.py index 841d6f4..4e43fb3 100644 --- a/schemas/chat.py +++ b/schemas/chat.py @@ -9,9 +9,9 @@ from schemas.user import UserSchema class TgUserSchema(BaseSchema): id: int - first_name: str - last_name: str - username: str + first_name: Optional[str] + last_name: Optional[str] + username: Optional[str] class MessageFileSchema(BaseSchema):