feat: buffer
This commit is contained in:
@@ -15,6 +15,13 @@ database_url = (
|
||||
f"{PG_LOGIN}:{PG_PASSWORD}@"
|
||||
f"/{PG_DATABASE}?host=/run/postgresql/"
|
||||
)
|
||||
|
||||
# database_url = (
|
||||
# f"postgresql+asyncpg://"
|
||||
# f"{PG_LOGIN}:{PG_PASSWORD}@"
|
||||
# f"{PG_HOST}:{PG_PORT}/{PG_DATABASE}"
|
||||
# )
|
||||
|
||||
engine = create_async_engine(
|
||||
database_url,
|
||||
pool_timeout=2000
|
||||
|
||||
@@ -1 +1 @@
|
||||
from .broker import taskiq_broker
|
||||
from .broker import taskiq_broker, scheduler
|
||||
@@ -1,5 +1,11 @@
|
||||
from taskiq import TaskiqScheduler
|
||||
from taskiq.schedule_sources import LabelScheduleSource
|
||||
from taskiq_aio_pika import AioPikaBroker
|
||||
|
||||
import backend.config
|
||||
|
||||
taskiq_broker = AioPikaBroker(backend.config.RABBITMQ_URL)
|
||||
scheduler = TaskiqScheduler(
|
||||
broker=taskiq_broker,
|
||||
sources=[LabelScheduleSource(taskiq_broker)]
|
||||
)
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import background.update
|
||||
import logging
|
||||
|
||||
import background.update
|
||||
from background.broker import taskiq_broker
|
||||
from buffer.core import TasksBuffer
|
||||
|
||||
|
||||
@taskiq_broker.task(task_name='process_update')
|
||||
@@ -21,3 +23,22 @@ async def update_marketplace_products(marketplace_id: int, product_ids: list[int
|
||||
@taskiq_broker.task(task_name='update_marketplaces')
|
||||
async def update_marketplaces(marketplace_ids: list[int]):
|
||||
return await background.update.update_marketplaces(marketplace_ids)
|
||||
|
||||
|
||||
@taskiq_broker.task(schedule=[{"cron": "* * * * *"}])
|
||||
async def flush_buffer():
|
||||
try:
|
||||
logging.info('Flushing buffer')
|
||||
buffer = TasksBuffer()
|
||||
should_process = await buffer.should_process()
|
||||
if not should_process:
|
||||
logging.info('Buffer is empty')
|
||||
return
|
||||
|
||||
product_ids = await buffer.get_tasks()
|
||||
total_products = len(product_ids)
|
||||
logging.info(f'Flushing buffer with {total_products} products')
|
||||
await process_update(product_ids)
|
||||
logging.info(f'Buffer flushed with {total_products} products')
|
||||
except Exception as e:
|
||||
logging.error(f'Error in flush_buffer: {e}')
|
||||
|
||||
0
buffer/__init__.py
Normal file
0
buffer/__init__.py
Normal file
80
buffer/core.py
Normal file
80
buffer/core.py
Normal file
@@ -0,0 +1,80 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
from redis import asyncio as aioredis
|
||||
|
||||
import backend.config
|
||||
|
||||
|
||||
class TasksBuffer:
|
||||
def __init__(self, key: str = 'tasks_buffer', limit=200, time_limit=60):
|
||||
self.redis = aioredis.from_url(backend.config.REDIS_URL)
|
||||
self.key = key
|
||||
self._time_key = f'{key}_time'
|
||||
self._lock_key = f'{key}_lock'
|
||||
self.limit = limit
|
||||
self.time_limit = time_limit
|
||||
self.lock = self.redis.lock(self._lock_key)
|
||||
# clear everything, buffer, time, and lock
|
||||
|
||||
async def clear(self):
|
||||
await self.redis.delete(self.key)
|
||||
await self.redis.delete(self._time_key)
|
||||
await self.redis.delete(self._lock_key)
|
||||
|
||||
async def append_task(self, product_ids: list[int], update_time=True):
|
||||
async with self.lock: # Use async context for lock acquisition and release
|
||||
try:
|
||||
await self.redis.rpush(self.key, *product_ids) # Unpack list to push individual items
|
||||
if update_time:
|
||||
await self.redis.set(self._time_key, int(time.time()))
|
||||
except Exception as e:
|
||||
logging.error(f"Error in append_task: {e}")
|
||||
|
||||
async def update_time(self):
|
||||
async with self.lock:
|
||||
try:
|
||||
await self.redis.set(self._time_key, int(time.time()))
|
||||
except Exception as e:
|
||||
logging.error(f"Error in update_time: {e}")
|
||||
|
||||
async def _should_process(self) -> bool:
|
||||
tasks_count = await self.redis.llen(self.key)
|
||||
if tasks_count >= self.limit:
|
||||
return True
|
||||
|
||||
last_time = await self.redis.get(self._time_key)
|
||||
if last_time:
|
||||
last_time = int(last_time)
|
||||
time_diff = int(time.time()) - last_time
|
||||
if time_diff >= self.time_limit:
|
||||
return True
|
||||
return False
|
||||
|
||||
async def should_process(self) -> bool:
|
||||
result = False
|
||||
async with self.lock:
|
||||
try:
|
||||
result = await self._should_process()
|
||||
except Exception as e:
|
||||
logging.error(f"Error in should_process: {e}")
|
||||
return result
|
||||
|
||||
async def _get_tasks(self) -> list[int]:
|
||||
try:
|
||||
tasks = await self.redis.lrange(self.key, 0, -1)
|
||||
await self.redis.delete(self.key)
|
||||
await self.redis.set(self._time_key, int(time.time()))
|
||||
return [int(task) for task in tasks]
|
||||
except Exception as e:
|
||||
logging.error(f"Error in _get_tasks: {e}")
|
||||
return []
|
||||
|
||||
async def get_tasks(self) -> list[int]:
|
||||
tasks = []
|
||||
async with self.lock:
|
||||
try:
|
||||
tasks = await self._get_tasks()
|
||||
except Exception as e:
|
||||
logging.error(f"Error in get_tasks: {e}")
|
||||
return tasks
|
||||
16
main.py
16
main.py
@@ -9,10 +9,12 @@ from starlette.responses import JSONResponse
|
||||
|
||||
import backend.config
|
||||
from background import taskiq_broker
|
||||
from buffer.core import TasksBuffer
|
||||
from schemas.general import *
|
||||
import background.tasks
|
||||
|
||||
auth_schema = HTTPBearer()
|
||||
buffer = TasksBuffer()
|
||||
|
||||
|
||||
async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth_schema)]):
|
||||
@@ -36,6 +38,7 @@ async def shutdown_broker():
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
await buffer.clear()
|
||||
await start_broker()
|
||||
yield
|
||||
await shutdown_broker()
|
||||
@@ -63,8 +66,14 @@ async def ping():
|
||||
async def update(
|
||||
request: UpdateRequest
|
||||
):
|
||||
task = await background.tasks.process_update.kiq(request.product_ids)
|
||||
return UpdateResponse(task_id=task.task_id)
|
||||
task_id = "-1"
|
||||
await buffer.append_task(request.product_ids, update_time=False)
|
||||
if await buffer.should_process():
|
||||
product_ids = await buffer.get_tasks()
|
||||
task = await background.tasks.process_update.kiq(product_ids)
|
||||
task_id = task.task_id
|
||||
await buffer.update_time()
|
||||
return UpdateResponse(task_id=task_id)
|
||||
|
||||
|
||||
@app.post('/update/marketplace')
|
||||
@@ -87,9 +96,10 @@ async def update_marketplace_products(
|
||||
async def update_marketplace(
|
||||
request: UpdateMarketplacesRequest
|
||||
):
|
||||
task =await background.tasks.update_marketplaces.kiq(request.marketplace_ids)
|
||||
task = await background.tasks.update_marketplaces.kiq(request.marketplace_ids)
|
||||
return UpdateResponse(task_id=task.task_id)
|
||||
|
||||
|
||||
@app.get("/tasks/{task_id}")
|
||||
def get_status(task_id):
|
||||
task_result = AsyncResult(task_id)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#!/bin/bash
|
||||
ulimit -n 97816
|
||||
|
||||
# Load Redis password from .env file
|
||||
export REDIS_PASSWORD=$(grep -m 1 'REDIS_PASSWORD' .env | cut -d '=' -f2)
|
||||
@@ -42,9 +43,18 @@ else
|
||||
log_warning "No keys matched the pattern '*_lock'."
|
||||
fi
|
||||
|
||||
# Clear RabbitMQ queue
|
||||
log_info "Purging RabbitMQ queue 'taskiq' in vhost 'stocks_vhost'..."
|
||||
rabbitmqctl purge_queue -p stocks_vhost taskiq
|
||||
if [ $? -eq 0 ]; then
|
||||
log_info "RabbitMQ queue purged successfully."
|
||||
else
|
||||
log_error "Failed to purge RabbitMQ queue. Please check your RabbitMQ setup."
|
||||
fi
|
||||
|
||||
# Start the Taskiq worker
|
||||
log_info "Starting Taskiq worker..."
|
||||
taskiq worker background:taskiq_broker background.tasks
|
||||
taskiq worker background:taskiq_broker background.tasks --max-async-task 5000 --max-threadpool-threads 8 --max-prefetch 10000
|
||||
|
||||
# Log when the Taskiq worker stops
|
||||
log_info "Taskiq worker stopped"
|
||||
log_info "Taskiq worker stopped"
|
||||
Reference in New Issue
Block a user