from functools import wraps from typing import Any, Callable import redis import utils.hashing_utils from backend import config redis_client = redis.from_url( config.CELERY_BROKER_URL, ) def lock(lock_key: str, include_args_in_key=False) -> Callable: def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs) -> Any: key = lock_key if include_args_in_key: key += '_' + utils.hashing_utils.hash_args_and_kwargs_with_crc32(*args, **kwargs) redis_lock = redis_client.lock(key) acquired = redis_lock.acquire(blocking=False) if not acquired: raise Exception(f'Lock {key} is already acquired') try: return func(*args, **kwargs) finally: redis_lock.release() return wrapper return decorator