diff --git a/background/tasks.py b/background/tasks.py index 932e523..28c827a 100644 --- a/background/tasks.py +++ b/background/tasks.py @@ -11,28 +11,31 @@ from constants import DEFAULT_PROCESSING_PRICE from database import Marketplace, Company from utils import chunk_list +SMALL_TIMEOUT = 100 +LARGE_TIMEOUT = 500 -@taskiq_broker.task(task_name='process_update') + +@taskiq_broker.task(task_name='process_update', timeout=SMALL_TIMEOUT) async def process_update(product_ids: list[int]): return await background.update.process_update(product_ids) -@taskiq_broker.task(task_name='update_marketplace') +@taskiq_broker.task(task_name='update_marketplace', timeout=SMALL_TIMEOUT) async def update_marketplace(marketplace_id: int): return await background.update.update_marketplace(marketplace_id) -@taskiq_broker.task(task_name='update_marketplace_products') +@taskiq_broker.task(task_name='update_marketplace_products', timeout=SMALL_TIMEOUT) async def update_marketplace_products(marketplace_id: int, product_ids: list[int]): return await background.update.update_marketplace_products(marketplace_id, product_ids) -@taskiq_broker.task(task_name='update_marketplaces') +@taskiq_broker.task(task_name='update_marketplaces', timeout=LARGE_TIMEOUT) async def update_marketplaces(marketplace_ids: list[int]): return await background.update.update_marketplaces(marketplace_ids) -@taskiq_broker.task(task_name='reset_marketplace') +@taskiq_broker.task(task_name='reset_marketplace', timeout=SMALL_TIMEOUT) async def reset_marketplace(marketplace_id: int): return await background.update.reset_marketplace(marketplace_id) @@ -59,7 +62,7 @@ async def flush_buffer(): logging.error(f'Error in flush_buffer: {e}') -@taskiq_broker.task(schedule=[{"cron": "0 */3 * * *"}]) +@taskiq_broker.task(schedule=[{"cron": "0 */3 * * *"}], timeout=LARGE_TIMEOUT) async def reset_companies_with_zero_balance(): logging.info(f'Flushing zero balance companies') async for session in get_session(): @@ -81,4 +84,3 @@ async def reset_companies_with_zero_balance(): for marketplaces_chunk in chunk_list(marketplaces, 10): tasks = [background.update.reset_marketplace(marketplace.id) for marketplace in marketplaces_chunk] await asyncio.gather(*tasks) -