fix: optional ssl for kafka and optional tg user fields

This commit is contained in:
2025-04-10 11:19:46 +04:00
parent f083c19cdc
commit b5c8e35910
6 changed files with 16 additions and 20 deletions

View File

@@ -27,6 +27,7 @@ CHATS_SYNC_URL = os.environ.get('CHATS_SYNC_URL')
KAFKA_URL = os.environ.get('KAFKA_URL') KAFKA_URL = os.environ.get('KAFKA_URL')
KAFKA_CONSUMER_TOPIC = os.environ.get('KAFKA_CONSUMER_TOPIC') KAFKA_CONSUMER_TOPIC = os.environ.get('KAFKA_CONSUMER_TOPIC')
KAFKA_PRODUCER_TOPIC = os.environ.get('KAFKA_PRODUCER_TOPIC') KAFKA_PRODUCER_TOPIC = os.environ.get('KAFKA_PRODUCER_TOPIC')
KAFKA_ENABLE_SSL = os.environ.get('KAFKA_ENABLE_SSL', 'true').lower() in ('true', '1', 't')
# Celery # Celery
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL')

View File

@@ -1,20 +1,19 @@
from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaConsumer
from aiokafka.errors import KafkaConnectionError from aiokafka.errors import KafkaConnectionError
from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC, KAFKA_ENABLE_SSL
from backend.session import session_maker from backend.session import session_maker
from external.kafka.context import context from external.kafka.context import context
from external.kafka.services.consumer_service import ConsumerService from external.kafka.services.consumer_service import ConsumerService
async def consume_messages(): async def consume_messages():
consumer = AIOKafkaConsumer( consumer = AIOKafkaConsumer(
KAFKA_CONSUMER_TOPIC, KAFKA_CONSUMER_TOPIC,
bootstrap_servers=KAFKA_URL, bootstrap_servers=KAFKA_URL,
group_id='crm', group_id='crm',
security_protocol='SSL', security_protocol='SSL' if KAFKA_ENABLE_SSL else 'PLAINTEXT',
ssl_context=context, ssl_context=context if KAFKA_ENABLE_SSL else None,
) )
try: try:
await consumer.start() await consumer.start()

View File

@@ -2,7 +2,7 @@ from typing import Optional
from aiokafka import AIOKafkaProducer from aiokafka import AIOKafkaProducer
from backend.config import KAFKA_URL from backend.config import KAFKA_URL, KAFKA_ENABLE_SSL
from external.kafka.context import context from external.kafka.context import context
_producer: Optional[AIOKafkaProducer] = None _producer: Optional[AIOKafkaProducer] = None
@@ -12,8 +12,8 @@ async def init_producer():
global _producer global _producer
_producer = AIOKafkaProducer( _producer = AIOKafkaProducer(
bootstrap_servers=KAFKA_URL, bootstrap_servers=KAFKA_URL,
security_protocol='SSL', security_protocol='SSL' if KAFKA_ENABLE_SSL else 'PLAINTEXT',
ssl_context=context, ssl_context=context if KAFKA_ENABLE_SSL else None,
) )

View File

@@ -7,9 +7,9 @@ from schemas.base import OkMessageSchema, BaseSchema
class TelegramUserSchema(BaseSchema): class TelegramUserSchema(BaseSchema):
id: int id: int
first_name: str first_name: Optional[str] = None
last_name: str last_name: Optional[str] = None
username: str username: Optional[str] = None
class MessageFileSchema(BaseSchema): class MessageFileSchema(BaseSchema):

View File

@@ -20,13 +20,9 @@ class TgUser(BaseModel):
primary_key=True, primary_key=True,
comment='Telegram user ID', comment='Telegram user ID',
) )
username: Mapped[str] = mapped_column( username: Mapped[Optional[str]] = mapped_column(nullable=True)
index=True, first_name: Mapped[Optional[str]] = mapped_column(nullable=True)
nullable=False, last_name: Mapped[Optional[str]] = mapped_column(nullable=True)
unique=True,
)
first_name: Mapped[str] = mapped_column(nullable=True)
last_name: Mapped[str] = mapped_column(nullable=True)
messages: Mapped['Message'] = relationship( messages: Mapped['Message'] = relationship(
'Message', 'Message',

View File

@@ -9,9 +9,9 @@ from schemas.user import UserSchema
class TgUserSchema(BaseSchema): class TgUserSchema(BaseSchema):
id: int id: int
first_name: str first_name: Optional[str]
last_name: str last_name: Optional[str]
username: str username: Optional[str]
class MessageFileSchema(BaseSchema): class MessageFileSchema(BaseSchema):