Files
Sipro-Stocks/background/tasks.py
2024-11-05 00:44:27 +03:00

33 lines
1.2 KiB
Python

import asyncio
from typing import List, Union
import background.update
from .celery_app import app
def run_async(coroutine):
try:
# Попытка получить текущий запущенный цикл
loop = asyncio.get_running_loop()
except RuntimeError: # Если не запущен ни один цикл событий
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coroutine)
@app.task(name='process_update')
def process_update(product_ids: list[int]):
return run_async(background.update.process_update(product_ids))
@app.task(name='update_marketplace')
def update_marketplace(marketplace_id: int):
return run_async(background.update.update_marketplace(marketplace_id))
@app.task(name='update_marketplace_products')
def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
return run_async(background.update.update_marketplace_products(marketplace_id, product_ids))
@app.task(name='update_marketplaces')
def update_marketplaces(marketplace_ids: Union[List[int], None]):
return run_async(background.update.update_marketplaces(marketplace_ids))