135 lines
3.6 KiB
Python
135 lines
3.6 KiB
Python
from contextlib import asynccontextmanager
|
|
from typing import Annotated
|
|
|
|
from celery.result import AsyncResult
|
|
from fastapi import FastAPI, Depends, HTTPException
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from starlette import status
|
|
from starlette.responses import JSONResponse
|
|
|
|
import backend.config
|
|
from backend.session import get_session
|
|
from background import taskiq_broker
|
|
from buffer.core import TasksBuffer
|
|
from schemas.general import *
|
|
import background.tasks
|
|
from updaters.stocks_updater import StocksUpdater
|
|
|
|
auth_schema = HTTPBearer()
|
|
buffer = TasksBuffer(limit=1000)
|
|
SessionDependency = Annotated[AsyncSession, Depends(get_session)]
|
|
|
|
|
|
async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth_schema)]):
|
|
if token.credentials != backend.config.API_KEY:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials')
|
|
|
|
|
|
async def start_broker():
|
|
if not taskiq_broker.is_worker_process:
|
|
await taskiq_broker.startup()
|
|
print('Broker started')
|
|
return
|
|
|
|
|
|
async def shutdown_broker():
|
|
if not taskiq_broker.is_worker_process:
|
|
await taskiq_broker.shutdown()
|
|
print('Broker shutdown')
|
|
return
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
await buffer.clear()
|
|
await start_broker()
|
|
yield
|
|
await shutdown_broker()
|
|
|
|
|
|
app = FastAPI(
|
|
dependencies=[
|
|
Depends(check_auth)
|
|
],
|
|
lifespan=lifespan
|
|
)
|
|
|
|
|
|
@app.get(
|
|
'/ping'
|
|
)
|
|
async def ping():
|
|
return {'pong': True}
|
|
|
|
|
|
@app.post(
|
|
'/update',
|
|
response_model=UpdateResponse
|
|
)
|
|
async def update(
|
|
request: UpdateRequest
|
|
):
|
|
task_id = "-1"
|
|
await buffer.append_task(request.product_ids, update_time=False)
|
|
if await buffer.should_process():
|
|
product_ids = await buffer.get_tasks()
|
|
task = await background.tasks.process_update.kiq(product_ids)
|
|
task_id = task.task_id
|
|
await buffer.update_time()
|
|
return UpdateResponse(task_id=task_id)
|
|
|
|
|
|
@app.post('/update/marketplace')
|
|
async def update_marketplace(
|
|
request: UpdateMarketplaceRequest
|
|
):
|
|
task = await background.tasks.update_marketplace.kiq(request.marketplace_id)
|
|
return UpdateResponse(task_id=task.task_id)
|
|
|
|
|
|
@app.post('/update/marketplace/products')
|
|
async def update_marketplace_products(
|
|
request: UpdateMarketplaceProductsRequest
|
|
):
|
|
task = await background.tasks.update_marketplace_products.kiq(request.marketplace_id, request.product_ids)
|
|
return UpdateResponse(task_id=task.task_id)
|
|
|
|
|
|
@app.post('/update/marketplaces')
|
|
async def update_marketplace(
|
|
request: UpdateMarketplacesRequest
|
|
):
|
|
task = await background.tasks.update_marketplaces.kiq(request.marketplace_ids)
|
|
return UpdateResponse(task_id=task.task_id)
|
|
|
|
|
|
@app.post('/reset/marketplace')
|
|
async def reset_marketplace(
|
|
request: ResetMarketplaceRequest
|
|
):
|
|
task = await background.tasks.reset_marketplace.kiq(request.marketplace_id)
|
|
return UpdateResponse(task_id=task.task_id)
|
|
|
|
|
|
@app.get("/tasks/{task_id}")
|
|
def get_status(task_id):
|
|
task_result = AsyncResult(task_id)
|
|
result = {
|
|
"task_id": task_id,
|
|
"task_status": task_result.status,
|
|
"task_result": task_result.result
|
|
}
|
|
return JSONResponse(result)
|
|
|
|
|
|
@app.get('/marketplace/{marketplace_id}/stocks')
|
|
async def get_marketplace_stocks(
|
|
marketplace_id: int,
|
|
session: SessionDependency,
|
|
only_available: bool = False
|
|
):
|
|
updater = StocksUpdater(session)
|
|
return await updater.get_all_stocks_for_marketplace(int(marketplace_id), only_available)
|
|
|