53 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			53 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import logging
 | 
						|
 | 
						|
import background.update
 | 
						|
from background.broker import taskiq_broker
 | 
						|
from buffer.core import TasksBuffer
 | 
						|
 | 
						|
 | 
						|
@taskiq_broker.task(task_name='process_update')
 | 
						|
async def process_update(product_ids: list[int]):
 | 
						|
    return await background.update.process_update(product_ids)
 | 
						|
 | 
						|
 | 
						|
@taskiq_broker.task(task_name='update_marketplace')
 | 
						|
async def update_marketplace(marketplace_id: int):
 | 
						|
    return await background.update.update_marketplace(marketplace_id)
 | 
						|
 | 
						|
 | 
						|
@taskiq_broker.task(task_name='update_marketplace_products')
 | 
						|
async def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
 | 
						|
    return await background.update.update_marketplace_products(marketplace_id, product_ids)
 | 
						|
 | 
						|
 | 
						|
@taskiq_broker.task(task_name='update_marketplaces')
 | 
						|
async def update_marketplaces(marketplace_ids: list[int]):
 | 
						|
    return await background.update.update_marketplaces(marketplace_ids)
 | 
						|
 | 
						|
 | 
						|
@taskiq_broker.task(task_name='reset_marketplace')
 | 
						|
async def reset_marketplace(marketplace_id: int):
 | 
						|
    return await background.update.reset_marketplace(marketplace_id)
 | 
						|
 | 
						|
 | 
						|
@taskiq_broker.task(schedule=[{"cron": "* * * * *"}])
 | 
						|
async def flush_buffer():
 | 
						|
    try:
 | 
						|
        logging.info('Flushing buffer')
 | 
						|
        buffer = TasksBuffer()
 | 
						|
        should_process = await buffer.should_process()
 | 
						|
        if not should_process:
 | 
						|
            logging.info('Buffer is empty')
 | 
						|
            return
 | 
						|
 | 
						|
        product_ids = await buffer.get_tasks()
 | 
						|
        total_products = len(product_ids)
 | 
						|
        if not product_ids:
 | 
						|
            logging.info('Buffer is empty')
 | 
						|
            return
 | 
						|
        logging.info(f'Flushing buffer with {total_products} products')
 | 
						|
        await process_update(product_ids)
 | 
						|
        logging.info(f'Buffer flushed with {total_products} products')
 | 
						|
    except Exception as e:
 | 
						|
        logging.error(f'Error in flush_buffer: {e}')
 |