feat: timeouts on tasks
This commit is contained in:
@@ -11,28 +11,31 @@ from constants import DEFAULT_PROCESSING_PRICE
|
||||
from database import Marketplace, Company
|
||||
from utils import chunk_list
|
||||
|
||||
SMALL_TIMEOUT = 100
|
||||
LARGE_TIMEOUT = 500
|
||||
|
||||
@taskiq_broker.task(task_name='process_update')
|
||||
|
||||
@taskiq_broker.task(task_name='process_update', timeout=SMALL_TIMEOUT)
|
||||
async def process_update(product_ids: list[int]):
|
||||
return await background.update.process_update(product_ids)
|
||||
|
||||
|
||||
@taskiq_broker.task(task_name='update_marketplace')
|
||||
@taskiq_broker.task(task_name='update_marketplace', timeout=SMALL_TIMEOUT)
|
||||
async def update_marketplace(marketplace_id: int):
|
||||
return await background.update.update_marketplace(marketplace_id)
|
||||
|
||||
|
||||
@taskiq_broker.task(task_name='update_marketplace_products')
|
||||
@taskiq_broker.task(task_name='update_marketplace_products', timeout=SMALL_TIMEOUT)
|
||||
async def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
|
||||
return await background.update.update_marketplace_products(marketplace_id, product_ids)
|
||||
|
||||
|
||||
@taskiq_broker.task(task_name='update_marketplaces')
|
||||
@taskiq_broker.task(task_name='update_marketplaces', timeout=LARGE_TIMEOUT)
|
||||
async def update_marketplaces(marketplace_ids: list[int]):
|
||||
return await background.update.update_marketplaces(marketplace_ids)
|
||||
|
||||
|
||||
@taskiq_broker.task(task_name='reset_marketplace')
|
||||
@taskiq_broker.task(task_name='reset_marketplace', timeout=SMALL_TIMEOUT)
|
||||
async def reset_marketplace(marketplace_id: int):
|
||||
return await background.update.reset_marketplace(marketplace_id)
|
||||
|
||||
@@ -59,7 +62,7 @@ async def flush_buffer():
|
||||
logging.error(f'Error in flush_buffer: {e}')
|
||||
|
||||
|
||||
@taskiq_broker.task(schedule=[{"cron": "0 */3 * * *"}])
|
||||
@taskiq_broker.task(schedule=[{"cron": "0 */3 * * *"}], timeout=LARGE_TIMEOUT)
|
||||
async def reset_companies_with_zero_balance():
|
||||
logging.info(f'Flushing zero balance companies')
|
||||
async for session in get_session():
|
||||
@@ -81,4 +84,3 @@ async def reset_companies_with_zero_balance():
|
||||
for marketplaces_chunk in chunk_list(marketplaces, 10):
|
||||
tasks = [background.update.reset_marketplace(marketplace.id) for marketplace in marketplaces_chunk]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user