81 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			81 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import logging
 | 
						|
import time
 | 
						|
 | 
						|
from redis import asyncio as aioredis
 | 
						|
 | 
						|
import backend.config
 | 
						|
 | 
						|
 | 
						|
class TasksBuffer:
 | 
						|
    def __init__(self, key: str = 'tasks_buffer', limit=200, time_limit=60):
 | 
						|
        self.redis = aioredis.from_url(backend.config.REDIS_URL)
 | 
						|
        self.key = key
 | 
						|
        self._time_key = f'{key}_time'
 | 
						|
        self._lock_key = f'{key}_lock'
 | 
						|
        self.limit = limit
 | 
						|
        self.time_limit = time_limit
 | 
						|
        self.lock = self.redis.lock(self._lock_key)
 | 
						|
        # clear everything, buffer, time, and lock
 | 
						|
 | 
						|
    async def clear(self):
 | 
						|
        await self.redis.delete(self.key)
 | 
						|
        await self.redis.delete(self._time_key)
 | 
						|
        await self.redis.delete(self._lock_key)
 | 
						|
 | 
						|
    async def append_task(self, product_ids: list[int], update_time=True):
 | 
						|
        async with self.lock:  # Use async context for lock acquisition and release
 | 
						|
            try:
 | 
						|
                await self.redis.rpush(self.key, *product_ids)  # Unpack list to push individual items
 | 
						|
                if update_time:
 | 
						|
                    await self.redis.set(self._time_key, int(time.time()))
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"Error in append_task: {e}")
 | 
						|
 | 
						|
    async def update_time(self):
 | 
						|
        async with self.lock:
 | 
						|
            try:
 | 
						|
                await self.redis.set(self._time_key, int(time.time()))
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"Error in update_time: {e}")
 | 
						|
 | 
						|
    async def _should_process(self) -> bool:
 | 
						|
        tasks_count = await self.redis.llen(self.key)
 | 
						|
        if tasks_count >= self.limit:
 | 
						|
            return True
 | 
						|
 | 
						|
        last_time = await self.redis.get(self._time_key)
 | 
						|
        if last_time:
 | 
						|
            last_time = int(last_time)
 | 
						|
            time_diff = int(time.time()) - last_time
 | 
						|
            if time_diff >= self.time_limit:
 | 
						|
                return True
 | 
						|
        return False
 | 
						|
 | 
						|
    async def should_process(self) -> bool:
 | 
						|
        result = False
 | 
						|
        async with self.lock:
 | 
						|
            try:
 | 
						|
                result = await self._should_process()
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"Error in should_process: {e}")
 | 
						|
        return result
 | 
						|
 | 
						|
    async def _get_tasks(self) -> list[int]:
 | 
						|
        try:
 | 
						|
            tasks = await self.redis.lrange(self.key, 0, -1)
 | 
						|
            await self.redis.delete(self.key)
 | 
						|
            await self.redis.set(self._time_key, int(time.time()))
 | 
						|
            return [int(task) for task in tasks]
 | 
						|
        except Exception as e:
 | 
						|
            logging.error(f"Error in _get_tasks: {e}")
 | 
						|
            return []
 | 
						|
 | 
						|
    async def get_tasks(self) -> list[int]:
 | 
						|
        tasks = []
 | 
						|
        async with self.lock:
 | 
						|
            try:
 | 
						|
                tasks = await self._get_tasks()
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"Error in get_tasks: {e}")
 | 
						|
        return tasks
 |