diff --git a/backend/session.py b/backend/session.py index c5e2b75..e31de96 100644 --- a/backend/session.py +++ b/backend/session.py @@ -15,6 +15,13 @@ database_url = ( f"{PG_LOGIN}:{PG_PASSWORD}@" f"/{PG_DATABASE}?host=/run/postgresql/" ) + +# database_url = ( +# f"postgresql+asyncpg://" +# f"{PG_LOGIN}:{PG_PASSWORD}@" +# f"{PG_HOST}:{PG_PORT}/{PG_DATABASE}" +# ) + engine = create_async_engine( database_url, pool_timeout=2000 diff --git a/background/__init__.py b/background/__init__.py index 396497b..01bf4fa 100644 --- a/background/__init__.py +++ b/background/__init__.py @@ -1 +1 @@ -from .broker import taskiq_broker \ No newline at end of file +from .broker import taskiq_broker, scheduler \ No newline at end of file diff --git a/background/broker.py b/background/broker.py index 5f760f6..f06cd44 100644 --- a/background/broker.py +++ b/background/broker.py @@ -1,5 +1,11 @@ +from taskiq import TaskiqScheduler +from taskiq.schedule_sources import LabelScheduleSource from taskiq_aio_pika import AioPikaBroker import backend.config taskiq_broker = AioPikaBroker(backend.config.RABBITMQ_URL) +scheduler = TaskiqScheduler( + broker=taskiq_broker, + sources=[LabelScheduleSource(taskiq_broker)] +) diff --git a/background/tasks.py b/background/tasks.py index 7c0589f..eb883f5 100644 --- a/background/tasks.py +++ b/background/tasks.py @@ -1,6 +1,8 @@ -import background.update +import logging +import background.update from background.broker import taskiq_broker +from buffer.core import TasksBuffer @taskiq_broker.task(task_name='process_update') @@ -21,3 +23,22 @@ async def update_marketplace_products(marketplace_id: int, product_ids: list[int @taskiq_broker.task(task_name='update_marketplaces') async def update_marketplaces(marketplace_ids: list[int]): return await background.update.update_marketplaces(marketplace_ids) + + +@taskiq_broker.task(schedule=[{"cron": "* * * * *"}]) +async def flush_buffer(): + try: + logging.info('Flushing buffer') + buffer = TasksBuffer() + should_process = await buffer.should_process() + if not should_process: + logging.info('Buffer is empty') + return + + product_ids = await buffer.get_tasks() + total_products = len(product_ids) + logging.info(f'Flushing buffer with {total_products} products') + await process_update(product_ids) + logging.info(f'Buffer flushed with {total_products} products') + except Exception as e: + logging.error(f'Error in flush_buffer: {e}') diff --git a/buffer/__init__.py b/buffer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/buffer/core.py b/buffer/core.py new file mode 100644 index 0000000..640645a --- /dev/null +++ b/buffer/core.py @@ -0,0 +1,80 @@ +import logging +import time + +from redis import asyncio as aioredis + +import backend.config + + +class TasksBuffer: + def __init__(self, key: str = 'tasks_buffer', limit=200, time_limit=60): + self.redis = aioredis.from_url(backend.config.REDIS_URL) + self.key = key + self._time_key = f'{key}_time' + self._lock_key = f'{key}_lock' + self.limit = limit + self.time_limit = time_limit + self.lock = self.redis.lock(self._lock_key) + # clear everything, buffer, time, and lock + + async def clear(self): + await self.redis.delete(self.key) + await self.redis.delete(self._time_key) + await self.redis.delete(self._lock_key) + + async def append_task(self, product_ids: list[int], update_time=True): + async with self.lock: # Use async context for lock acquisition and release + try: + await self.redis.rpush(self.key, *product_ids) # Unpack list to push individual items + if update_time: + await self.redis.set(self._time_key, int(time.time())) + except Exception as e: + logging.error(f"Error in append_task: {e}") + + async def update_time(self): + async with self.lock: + try: + await self.redis.set(self._time_key, int(time.time())) + except Exception as e: + logging.error(f"Error in update_time: {e}") + + async def _should_process(self) -> bool: + tasks_count = await self.redis.llen(self.key) + if tasks_count >= self.limit: + return True + + last_time = await self.redis.get(self._time_key) + if last_time: + last_time = int(last_time) + time_diff = int(time.time()) - last_time + if time_diff >= self.time_limit: + return True + return False + + async def should_process(self) -> bool: + result = False + async with self.lock: + try: + result = await self._should_process() + except Exception as e: + logging.error(f"Error in should_process: {e}") + return result + + async def _get_tasks(self) -> list[int]: + try: + tasks = await self.redis.lrange(self.key, 0, -1) + await self.redis.delete(self.key) + await self.redis.set(self._time_key, int(time.time())) + return [int(task) for task in tasks] + except Exception as e: + logging.error(f"Error in _get_tasks: {e}") + return [] + + async def get_tasks(self) -> list[int]: + tasks = [] + async with self.lock: + try: + tasks = await self._get_tasks() + except Exception as e: + logging.error(f"Error in get_tasks: {e}") + return tasks diff --git a/main.py b/main.py index 69c318d..b0b8b49 100644 --- a/main.py +++ b/main.py @@ -9,10 +9,12 @@ from starlette.responses import JSONResponse import backend.config from background import taskiq_broker +from buffer.core import TasksBuffer from schemas.general import * import background.tasks auth_schema = HTTPBearer() +buffer = TasksBuffer() async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth_schema)]): @@ -36,6 +38,7 @@ async def shutdown_broker(): @asynccontextmanager async def lifespan(app: FastAPI): + await buffer.clear() await start_broker() yield await shutdown_broker() @@ -63,8 +66,14 @@ async def ping(): async def update( request: UpdateRequest ): - task = await background.tasks.process_update.kiq(request.product_ids) - return UpdateResponse(task_id=task.task_id) + task_id = "-1" + await buffer.append_task(request.product_ids, update_time=False) + if await buffer.should_process(): + product_ids = await buffer.get_tasks() + task = await background.tasks.process_update.kiq(product_ids) + task_id = task.task_id + await buffer.update_time() + return UpdateResponse(task_id=task_id) @app.post('/update/marketplace') @@ -87,9 +96,10 @@ async def update_marketplace_products( async def update_marketplace( request: UpdateMarketplacesRequest ): - task =await background.tasks.update_marketplaces.kiq(request.marketplace_ids) + task = await background.tasks.update_marketplaces.kiq(request.marketplace_ids) return UpdateResponse(task_id=task.task_id) + @app.get("/tasks/{task_id}") def get_status(task_id): task_result = AsyncResult(task_id) diff --git a/start_taskiq.sh b/start_taskiq.sh index e0668b9..53b06e9 100755 --- a/start_taskiq.sh +++ b/start_taskiq.sh @@ -1,4 +1,5 @@ #!/bin/bash +ulimit -n 97816 # Load Redis password from .env file export REDIS_PASSWORD=$(grep -m 1 'REDIS_PASSWORD' .env | cut -d '=' -f2) @@ -42,9 +43,18 @@ else log_warning "No keys matched the pattern '*_lock'." fi +# Clear RabbitMQ queue +log_info "Purging RabbitMQ queue 'taskiq' in vhost 'stocks_vhost'..." +rabbitmqctl purge_queue -p stocks_vhost taskiq +if [ $? -eq 0 ]; then + log_info "RabbitMQ queue purged successfully." +else + log_error "Failed to purge RabbitMQ queue. Please check your RabbitMQ setup." +fi + # Start the Taskiq worker log_info "Starting Taskiq worker..." -taskiq worker background:taskiq_broker background.tasks +taskiq worker background:taskiq_broker background.tasks --max-async-task 5000 --max-threadpool-threads 8 --max-prefetch 10000 # Log when the Taskiq worker stops -log_info "Taskiq worker stopped" +log_info "Taskiq worker stopped" \ No newline at end of file