from aiokafka import AIOKafkaConsumer from aiokafka.errors import KafkaConnectionError from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC, KAFKA_ENABLE_SSL from backend.session import session_maker from external.kafka.context import context from external.kafka.services.consumer_service import ConsumerService async def consume_messages(): consumer = AIOKafkaConsumer( KAFKA_CONSUMER_TOPIC, bootstrap_servers=KAFKA_URL, group_id='crm', security_protocol='SSL' if KAFKA_ENABLE_SSL else 'PLAINTEXT', ssl_context=context if KAFKA_ENABLE_SSL else None, ) try: await consumer.start() except KafkaConnectionError as e: print(e) return async with session_maker() as session: consumer_service = ConsumerService(session) try: async for message in consumer: await consumer_service.consume_message(message) finally: await consumer.stop()