from aiokafka import AIOKafkaConsumer from aiokafka.errors import KafkaConnectionError from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC from backend.session import session_maker from external.kafka.services.consumer_service import ConsumerService consumer = AIOKafkaConsumer( KAFKA_CONSUMER_TOPIC, bootstrap_servers=KAFKA_URL, ) async def consume_messages(): try: await consumer.start() except KafkaConnectionError as e: print(e) return async with session_maker() as session: consumer_service = ConsumerService(session) try: async for message in consumer: await consumer_service.consume_message(message) finally: await consumer.stop()