81 lines
2.7 KiB
Python
81 lines
2.7 KiB
Python
import logging
|
|
import time
|
|
|
|
from redis import asyncio as aioredis
|
|
|
|
import backend.config
|
|
|
|
|
|
class TasksBuffer:
|
|
def __init__(self, key: str = 'tasks_buffer', limit=200, time_limit=60):
|
|
self.redis = aioredis.from_url(backend.config.REDIS_URL)
|
|
self.key = key
|
|
self._time_key = f'{key}_time'
|
|
self._lock_key = f'{key}_lock'
|
|
self.limit = limit
|
|
self.time_limit = time_limit
|
|
self.lock = self.redis.lock(self._lock_key)
|
|
# clear everything, buffer, time, and lock
|
|
|
|
async def clear(self):
|
|
await self.redis.delete(self.key)
|
|
await self.redis.delete(self._time_key)
|
|
await self.redis.delete(self._lock_key)
|
|
|
|
async def append_task(self, product_ids: list[int], update_time=True):
|
|
async with self.lock: # Use async context for lock acquisition and release
|
|
try:
|
|
await self.redis.rpush(self.key, *product_ids) # Unpack list to push individual items
|
|
if update_time:
|
|
await self.redis.set(self._time_key, int(time.time()))
|
|
except Exception as e:
|
|
logging.error(f"Error in append_task: {e}")
|
|
|
|
async def update_time(self):
|
|
async with self.lock:
|
|
try:
|
|
await self.redis.set(self._time_key, int(time.time()))
|
|
except Exception as e:
|
|
logging.error(f"Error in update_time: {e}")
|
|
|
|
async def _should_process(self) -> bool:
|
|
tasks_count = await self.redis.llen(self.key)
|
|
if tasks_count >= self.limit:
|
|
return True
|
|
|
|
last_time = await self.redis.get(self._time_key)
|
|
if last_time:
|
|
last_time = int(last_time)
|
|
time_diff = int(time.time()) - last_time
|
|
if time_diff >= self.time_limit:
|
|
return True
|
|
return False
|
|
|
|
async def should_process(self) -> bool:
|
|
result = False
|
|
async with self.lock:
|
|
try:
|
|
result = await self._should_process()
|
|
except Exception as e:
|
|
logging.error(f"Error in should_process: {e}")
|
|
return result
|
|
|
|
async def _get_tasks(self) -> list[int]:
|
|
try:
|
|
tasks = await self.redis.lrange(self.key, 0, -1)
|
|
await self.redis.delete(self.key)
|
|
await self.redis.set(self._time_key, int(time.time()))
|
|
return [int(task) for task in tasks]
|
|
except Exception as e:
|
|
logging.error(f"Error in _get_tasks: {e}")
|
|
return []
|
|
|
|
async def get_tasks(self) -> list[int]:
|
|
tasks = []
|
|
async with self.lock:
|
|
try:
|
|
tasks = await self._get_tasks()
|
|
except Exception as e:
|
|
logging.error(f"Error in get_tasks: {e}")
|
|
return tasks
|