feat: context kafka

This commit is contained in:
2025-04-09 16:29:39 +04:00
parent a81e692957
commit 498ab093f3
5 changed files with 24 additions and 2 deletions

3
.gitignore vendored
View File

@@ -5,4 +5,5 @@
__pycache__ __pycache__
/venv /venv
/test /test
/test/* /test/*
certs

View File

@@ -17,6 +17,8 @@ API_ROOT = "/api"
APP_PATH = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(__file__) APP_PATH = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(__file__)
KAFKA_CERTS_PATH = os.path.join(APP_PATH, "certs")
allowed_telegram_ids = [ allowed_telegram_ids = [
7532624817, # Me 7532624817, # Me
355308397, # SerGey 355308397, # SerGey

View File

@@ -3,11 +3,14 @@ from aiokafka.errors import KafkaConnectionError
from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC
from backend.session import session_maker from backend.session import session_maker
from external.kafka.context import context
from external.kafka.services.consumer_service import ConsumerService from external.kafka.services.consumer_service import ConsumerService
consumer = AIOKafkaConsumer( consumer = AIOKafkaConsumer(
KAFKA_CONSUMER_TOPIC, KAFKA_CONSUMER_TOPIC,
bootstrap_servers=KAFKA_URL, bootstrap_servers=KAFKA_URL,
security_protocol='SSL',
ssl_context=context,
) )

11
external/kafka/context.py vendored Normal file
View File

@@ -0,0 +1,11 @@
from pathlib import Path
from aiokafka.helpers import create_ssl_context
from constants import KAFKA_CERTS_PATH
context = create_ssl_context(
cafile=KAFKA_CERTS_PATH / Path('ca-cert'),
certfile=KAFKA_CERTS_PATH / Path('cert-signed'),
keyfile=KAFKA_CERTS_PATH / Path('cert-key'),
)

View File

@@ -1,5 +1,10 @@
from aiokafka import AIOKafkaProducer from aiokafka import AIOKafkaProducer
from backend.config import KAFKA_URL from backend.config import KAFKA_URL
from external.kafka.context import context
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_URL) producer = AIOKafkaProducer(
bootstrap_servers=KAFKA_URL,
security_protocol='SSL',
ssl_context=context,
)