from contextlib import asynccontextmanager from typing import Annotated from celery.result import AsyncResult from fastapi import FastAPI, Depends, HTTPException from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from starlette import status from starlette.responses import JSONResponse import backend.config from background import taskiq_broker from buffer.core import TasksBuffer from schemas.general import * import background.tasks auth_schema = HTTPBearer() buffer = TasksBuffer() async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth_schema)]): if token.credentials != backend.config.API_KEY: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials') async def start_broker(): if not taskiq_broker.is_worker_process: await taskiq_broker.startup() print('Broker started') return async def shutdown_broker(): if not taskiq_broker.is_worker_process: await taskiq_broker.shutdown() print('Broker shutdown') return @asynccontextmanager async def lifespan(app: FastAPI): await buffer.clear() await start_broker() yield await shutdown_broker() app = FastAPI( dependencies=[ Depends(check_auth) ], lifespan=lifespan ) @app.get( '/ping' ) async def ping(): return {'pong': True} @app.post( '/update', response_model=UpdateResponse ) async def update( request: UpdateRequest ): task_id = "-1" await buffer.append_task(request.product_ids, update_time=False) if await buffer.should_process(): product_ids = await buffer.get_tasks() task = await background.tasks.process_update.kiq(product_ids) task_id = task.task_id await buffer.update_time() return UpdateResponse(task_id=task_id) @app.post('/update/marketplace') async def update_marketplace( request: UpdateMarketplaceRequest ): task = await background.tasks.update_marketplace.kiq(request.marketplace_id) return UpdateResponse(task_id=task.task_id) @app.post('/update/marketplace/products') async def update_marketplace_products( request: UpdateMarketplaceProductsRequest ): task = await background.tasks.update_marketplace_products.kiq(request.marketplace_id, request.product_ids) return UpdateResponse(task_id=task.task_id) @app.post('/update/marketplaces') async def update_marketplace( request: UpdateMarketplacesRequest ): task = await background.tasks.update_marketplaces.kiq(request.marketplace_ids) return UpdateResponse(task_id=task.task_id) @app.post('/reset/marketplace') async def reset_marketplace( request: ResetMarketplaceRequest ): task = await background.tasks.reset_marketplace.kiq(request.marketplace_id) return UpdateResponse(task=task.task_id) @app.get("/tasks/{task_id}") def get_status(task_id): task_result = AsyncResult(task_id) result = { "task_id": task_id, "task_status": task_result.status, "task_result": task_result.result } return JSONResponse(result)