feat: deadlock clear
This commit is contained in:
@@ -1,8 +1,11 @@
|
||||
import asyncio
|
||||
from typing import List, Union
|
||||
|
||||
from celery.signals import worker_ready
|
||||
|
||||
import background.update
|
||||
from background import celery
|
||||
from limiter import BatchLimiter
|
||||
|
||||
|
||||
@celery.task(name='process_update')
|
||||
@@ -27,3 +30,10 @@ def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
|
||||
def update_marketplaces(marketplace_ids: Union[List[int], None]):
|
||||
loop = asyncio.get_event_loop()
|
||||
return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids))
|
||||
|
||||
|
||||
@worker_ready.connect
|
||||
def worker_is_ready(sender, **kwargs):
|
||||
loop = asyncio.get_event_loop()
|
||||
limiter = BatchLimiter()
|
||||
loop.run_until_complete(limiter.clear_locks())
|
||||
|
||||
@@ -26,7 +26,6 @@ class BatchLimiter:
|
||||
async def acquire(self, key, max_requests, period):
|
||||
redis = await RedisConnectionManager.get_redis_connection()
|
||||
while True:
|
||||
logging.info(f'Getting key: {key}')
|
||||
async with redis.lock(f"{key}_lock"):
|
||||
try:
|
||||
start_time = await redis.get(f"{key}:start_time")
|
||||
@@ -56,6 +55,13 @@ class BatchLimiter:
|
||||
logging.error(f"Redis error: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def clear_locks(self):
|
||||
redis = await RedisConnectionManager.get_redis_connection()
|
||||
keys = []
|
||||
async for key in redis.scan_iter('*_lock*'):
|
||||
keys.append(key)
|
||||
await redis.delete(*keys)
|
||||
|
||||
async def acquire_wildberries(self, key):
|
||||
max_requests = 300
|
||||
period = 60
|
||||
|
||||
@@ -11,4 +11,6 @@ python-dotenv
|
||||
aiohttp
|
||||
aiohttp[speedups]
|
||||
celery[redis]
|
||||
pyjwt
|
||||
pyjwt
|
||||
redis
|
||||
redis[hiredis]
|
||||
Reference in New Issue
Block a user