32 lines
		
	
	
		
			988 B
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			32 lines
		
	
	
		
			988 B
		
	
	
	
		
			Python
		
	
	
	
	
	
from aiokafka import AIOKafkaConsumer
 | 
						|
from aiokafka.errors import KafkaConnectionError
 | 
						|
 | 
						|
from backend.config import KAFKA_URL, KAFKA_CONSUMER_TOPIC, KAFKA_ENABLE_SSL
 | 
						|
from backend.session import session_maker
 | 
						|
from external.kafka.context import context
 | 
						|
from external.kafka.services.consumer_service import ConsumerService
 | 
						|
 | 
						|
 | 
						|
async def consume_messages():
 | 
						|
    consumer = AIOKafkaConsumer(
 | 
						|
        KAFKA_CONSUMER_TOPIC,
 | 
						|
        bootstrap_servers=KAFKA_URL,
 | 
						|
        group_id='crm',
 | 
						|
        security_protocol='SSL' if KAFKA_ENABLE_SSL else 'PLAINTEXT',
 | 
						|
        ssl_context=context if KAFKA_ENABLE_SSL else None,
 | 
						|
    )
 | 
						|
    try:
 | 
						|
        await consumer.start()
 | 
						|
    except KafkaConnectionError as e:
 | 
						|
        print(e)
 | 
						|
        return
 | 
						|
 | 
						|
    async with session_maker() as session:
 | 
						|
        consumer_service = ConsumerService(session)
 | 
						|
 | 
						|
        try:
 | 
						|
            async for message in consumer:
 | 
						|
                await consumer_service.consume_message(message)
 | 
						|
        finally:
 | 
						|
            await consumer.stop()
 |