from contextlib import asynccontextmanager from typing import Annotated from celery.result import AsyncResult from fastapi import FastAPI, Depends, HTTPException from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy.ext.asyncio import AsyncSession from starlette import status from starlette.responses import JSONResponse import backend.config from backend.session import get_session from background import taskiq_broker from buffer.core import TasksBuffer from schemas.general import * import background.tasks from updaters.stocks_updater import StocksUpdater auth_schema = HTTPBearer() buffer = TasksBuffer(limit=1000) SessionDependency = Annotated[AsyncSession, Depends(get_session)] async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth_schema)]): if token.credentials != backend.config.API_KEY: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials') async def start_broker(): if not taskiq_broker.is_worker_process: await taskiq_broker.startup() print('Broker started') return async def shutdown_broker(): if not taskiq_broker.is_worker_process: await taskiq_broker.shutdown() print('Broker shutdown') return @asynccontextmanager async def lifespan(app: FastAPI): await buffer.clear() await start_broker() yield await shutdown_broker() app = FastAPI( dependencies=[ Depends(check_auth) ], lifespan=lifespan ) @app.get( '/ping' ) async def ping(): return {'pong': True} @app.post( '/update', response_model=UpdateResponse ) async def update( request: UpdateRequest ): task_id = "-1" await buffer.append_task(request.product_ids, update_time=False) if await buffer.should_process(): product_ids = await buffer.get_tasks() task = await background.tasks.process_update.kiq(product_ids) task_id = task.task_id await buffer.update_time() return UpdateResponse(task_id=task_id) @app.post('/update/marketplace') async def update_marketplace( request: UpdateMarketplaceRequest ): task = await background.tasks.update_marketplace.kiq(request.marketplace_id) return UpdateResponse(task_id=task.task_id) @app.post('/update/marketplace/products') async def update_marketplace_products( request: UpdateMarketplaceProductsRequest ): task = await background.tasks.update_marketplace_products.kiq(request.marketplace_id, request.product_ids) return UpdateResponse(task_id=task.task_id) @app.post('/update/marketplaces') async def update_marketplace( request: UpdateMarketplacesRequest ): task = await background.tasks.update_marketplaces.kiq(request.marketplace_ids) return UpdateResponse(task_id=task.task_id) @app.post('/reset/marketplace') async def reset_marketplace( request: ResetMarketplaceRequest ): task = await background.tasks.reset_marketplace.kiq(request.marketplace_id) return UpdateResponse(task_id=task.task_id) @app.get("/tasks/{task_id}") def get_status(task_id): task_result = AsyncResult(task_id) result = { "task_id": task_id, "task_status": task_result.status, "task_result": task_result.result } return JSONResponse(result) @app.get('/marketplace/{marketplace_id}/stocks') async def get_marketplace_stocks( session: SessionDependency, marketplace_id: int, only_available: bool = False ): updater = StocksUpdater(session) return await updater.get_all_stocks_for_marketplace(int(marketplace_id), only_available)