from celery.result import AsyncResult from fastapi import APIRouter import background.tasks.marketplace from background.celery_app import celery from schemas.task import * task_router = APIRouter( prefix='/task', tags=["task"], ) @task_router.post( '/synchronize-marketplace', operation_id='create_synchronize_marketplace_task', response_model=CreateTaskResponse ) async def synchronize_marketplace( request: SynchronizeMarketplaceRequest ): marketplace_id = request.marketplace_id task: AsyncResult = background.tasks.marketplace.synchronize_marketplace.delay(marketplace_id) return CreateTaskResponse(task_id=task.id) @task_router.get( '/info/{task_id}', operation_id='get_task_info', response_model=TaskInfoResponse ) def task_info(task_id: str): task = AsyncResult(task_id, app=celery) return TaskInfoResponse( task_id=task_id, status=task.status )