from aiokafka import AIOKafkaConsumer from aiokafka.errors import KafkaConnectionError from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC from backend.session import session_maker from external.kafka.context import context from external.kafka.services.consumer_service import ConsumerService async def consume_messages(): consumer = AIOKafkaConsumer( KAFKA_CONSUMER_TOPIC, bootstrap_servers=KAFKA_URL, security_protocol='SSL', ssl_context=context, ) try: await consumer.start() except KafkaConnectionError as e: print(e) return async with session_maker() as session: consumer_service = ConsumerService(session) try: async for message in consumer: await consumer_service.consume_message(message) finally: await consumer.stop()