From e6674bd576361be745b274520f91c674995948a8 Mon Sep 17 00:00:00 2001 From: admin Date: Wed, 9 Apr 2025 16:25:58 +0300 Subject: [PATCH] feat: fix --- external/kafka/__init__.py | 1 - external/kafka/producer.py | 13 +++++++++---- external/kafka/services/producer_service.py | 3 ++- main.py | 6 ++++-- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/external/kafka/__init__.py b/external/kafka/__init__.py index f7e8c97..568e49a 100644 --- a/external/kafka/__init__.py +++ b/external/kafka/__init__.py @@ -1,2 +1 @@ -from .producer import producer from .consumer import consume_messages diff --git a/external/kafka/producer.py b/external/kafka/producer.py index ab4e152..f24822b 100644 --- a/external/kafka/producer.py +++ b/external/kafka/producer.py @@ -5,13 +5,18 @@ from aiokafka import AIOKafkaProducer from backend.config import KAFKA_URL from external.kafka.context import context -producer: Optional[AIOKafkaProducer] = None +_producer: Optional[AIOKafkaProducer] = None async def init_producer(): - global producer - producer = AIOKafkaProducer( + global _producer + _producer = AIOKafkaProducer( bootstrap_servers=KAFKA_URL, security_protocol='SSL', ssl_context=context, - ) \ No newline at end of file + ) + + +async def get_producer() -> Optional[AIOKafkaProducer]: + global _producer + return _producer diff --git a/external/kafka/services/producer_service.py b/external/kafka/services/producer_service.py index 0ef97d8..75e8b33 100644 --- a/external/kafka/services/producer_service.py +++ b/external/kafka/services/producer_service.py @@ -4,8 +4,8 @@ from uuid import UUID from aiohttp import ClientConnectorError from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY -from external.kafka import producer from external.kafka.enums import KafkaMessageType +from external.kafka.producer import get_producer from external.kafka.schemas.producer import * from services.base import BaseService @@ -14,6 +14,7 @@ class ProducerService(BaseService): @staticmethod async def _send_message(request: BaseConnectorRequest): try: + producer = await get_producer() await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump())) except ClientConnectorError: return False, 'Ошибка подключения к сервису' diff --git a/main.py b/main.py index 60f7b13..4cf4152 100644 --- a/main.py +++ b/main.py @@ -9,8 +9,8 @@ from starlette.staticfiles import StaticFiles import routers from constants import API_ROOT -from external.kafka import producer, consume_messages -from external.kafka.producer import init_producer +from external.kafka import consume_messages +from external.kafka.producer import init_producer, get_producer origins = [ 'http://localhost:5173' @@ -21,6 +21,7 @@ origins = [ async def lifespan(app: FastAPI): try: await init_producer() + producer = await get_producer() if producer: await producer.start() except KafkaConnectionError as e: @@ -29,6 +30,7 @@ async def lifespan(app: FastAPI): consumer_task = asyncio.create_task(consume_messages()) yield + producer = await get_producer() if producer: await producer.stop() consumer_task.cancel()