import logging import time from redis import asyncio as aioredis import backend.config class TasksBuffer: def __init__(self, key: str = 'tasks_buffer', limit=200, time_limit=60): self.redis = aioredis.from_url(backend.config.REDIS_URL) self.key = key self._time_key = f'{key}_time' self._lock_key = f'{key}_lock' self.limit = limit self.time_limit = time_limit self.lock = self.redis.lock(self._lock_key) # clear everything, buffer, time, and lock async def clear(self): await self.redis.delete(self.key) await self.redis.delete(self._time_key) await self.redis.delete(self._lock_key) async def append_task(self, product_ids: list[int], update_time=True): async with self.lock: # Use async context for lock acquisition and release try: await self.redis.rpush(self.key, *product_ids) # Unpack list to push individual items if update_time: await self.redis.set(self._time_key, int(time.time())) except Exception as e: logging.error(f"Error in append_task: {e}") async def update_time(self): async with self.lock: try: await self.redis.set(self._time_key, int(time.time())) except Exception as e: logging.error(f"Error in update_time: {e}") async def _should_process(self) -> bool: tasks_count = await self.redis.llen(self.key) if tasks_count >= self.limit: return True last_time = await self.redis.get(self._time_key) if last_time: last_time = int(last_time) time_diff = int(time.time()) - last_time if time_diff >= self.time_limit: return True return False async def should_process(self) -> bool: result = False async with self.lock: try: result = await self._should_process() except Exception as e: logging.error(f"Error in should_process: {e}") return result async def _get_tasks(self) -> list[int]: try: tasks = await self.redis.lrange(self.key, 0, -1) await self.redis.delete(self.key) await self.redis.set(self._time_key, int(time.time())) return [int(task) for task in tasks] except Exception as e: logging.error(f"Error in _get_tasks: {e}") return [] async def get_tasks(self) -> list[int]: tasks = [] async with self.lock: try: tasks = await self._get_tasks() except Exception as e: logging.error(f"Error in get_tasks: {e}") return tasks