feat: fix
This commit is contained in:
1
external/kafka/__init__.py
vendored
1
external/kafka/__init__.py
vendored
@@ -1,2 +1 @@
|
|||||||
from .producer import producer
|
|
||||||
from .consumer import consume_messages
|
from .consumer import consume_messages
|
||||||
|
|||||||
13
external/kafka/producer.py
vendored
13
external/kafka/producer.py
vendored
@@ -5,13 +5,18 @@ from aiokafka import AIOKafkaProducer
|
|||||||
from backend.config import KAFKA_URL
|
from backend.config import KAFKA_URL
|
||||||
from external.kafka.context import context
|
from external.kafka.context import context
|
||||||
|
|
||||||
producer: Optional[AIOKafkaProducer] = None
|
_producer: Optional[AIOKafkaProducer] = None
|
||||||
|
|
||||||
|
|
||||||
async def init_producer():
|
async def init_producer():
|
||||||
global producer
|
global _producer
|
||||||
producer = AIOKafkaProducer(
|
_producer = AIOKafkaProducer(
|
||||||
bootstrap_servers=KAFKA_URL,
|
bootstrap_servers=KAFKA_URL,
|
||||||
security_protocol='SSL',
|
security_protocol='SSL',
|
||||||
ssl_context=context,
|
ssl_context=context,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_producer() -> Optional[AIOKafkaProducer]:
|
||||||
|
global _producer
|
||||||
|
return _producer
|
||||||
|
|||||||
3
external/kafka/services/producer_service.py
vendored
3
external/kafka/services/producer_service.py
vendored
@@ -4,8 +4,8 @@ from uuid import UUID
|
|||||||
from aiohttp import ClientConnectorError
|
from aiohttp import ClientConnectorError
|
||||||
|
|
||||||
from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY
|
from backend.config import KAFKA_PRODUCER_TOPIC, CHAT_CONNECTOR_API_KEY
|
||||||
from external.kafka import producer
|
|
||||||
from external.kafka.enums import KafkaMessageType
|
from external.kafka.enums import KafkaMessageType
|
||||||
|
from external.kafka.producer import get_producer
|
||||||
from external.kafka.schemas.producer import *
|
from external.kafka.schemas.producer import *
|
||||||
from services.base import BaseService
|
from services.base import BaseService
|
||||||
|
|
||||||
@@ -14,6 +14,7 @@ class ProducerService(BaseService):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
async def _send_message(request: BaseConnectorRequest):
|
async def _send_message(request: BaseConnectorRequest):
|
||||||
try:
|
try:
|
||||||
|
producer = await get_producer()
|
||||||
await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump()))
|
await producer.send(KAFKA_PRODUCER_TOPIC, value=pickle.dumps(request.model_dump()))
|
||||||
except ClientConnectorError:
|
except ClientConnectorError:
|
||||||
return False, 'Ошибка подключения к сервису'
|
return False, 'Ошибка подключения к сервису'
|
||||||
|
|||||||
6
main.py
6
main.py
@@ -9,8 +9,8 @@ from starlette.staticfiles import StaticFiles
|
|||||||
|
|
||||||
import routers
|
import routers
|
||||||
from constants import API_ROOT
|
from constants import API_ROOT
|
||||||
from external.kafka import producer, consume_messages
|
from external.kafka import consume_messages
|
||||||
from external.kafka.producer import init_producer
|
from external.kafka.producer import init_producer, get_producer
|
||||||
|
|
||||||
origins = [
|
origins = [
|
||||||
'http://localhost:5173'
|
'http://localhost:5173'
|
||||||
@@ -21,6 +21,7 @@ origins = [
|
|||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
try:
|
try:
|
||||||
await init_producer()
|
await init_producer()
|
||||||
|
producer = await get_producer()
|
||||||
if producer:
|
if producer:
|
||||||
await producer.start()
|
await producer.start()
|
||||||
except KafkaConnectionError as e:
|
except KafkaConnectionError as e:
|
||||||
@@ -29,6 +30,7 @@ async def lifespan(app: FastAPI):
|
|||||||
consumer_task = asyncio.create_task(consume_messages())
|
consumer_task = asyncio.create_task(consume_messages())
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
producer = await get_producer()
|
||||||
if producer:
|
if producer:
|
||||||
await producer.stop()
|
await producer.stop()
|
||||||
consumer_task.cancel()
|
consumer_task.cancel()
|
||||||
|
|||||||
Reference in New Issue
Block a user