From 2e0d5f0944f86efbc1784a1f29e5ba0e45f25c0c Mon Sep 17 00:00:00 2001 From: admin Date: Tue, 5 Nov 2024 00:38:48 +0300 Subject: [PATCH] feat: crap --- background/celery_app.py | 11 ++++++++++- background/tasks.py | 27 +++++++++++++++++++-------- limiter/batch_limiter.py | 2 ++ requirements.txt | 3 ++- start_celery.sh | 2 +- test.py | 11 +++++++++-- 6 files changed, 43 insertions(+), 13 deletions(-) diff --git a/background/celery_app.py b/background/celery_app.py index e3ba1d5..bd00d82 100644 --- a/background/celery_app.py +++ b/background/celery_app.py @@ -5,8 +5,17 @@ app = Celery( __name__, broker='amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost', backend='rpc://', - ) app.conf.broker_url = 'amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost' app.conf.result_backend = 'rpc://' +app.conf.update( + worker_concurrency=8, + worker_prefetch_multiplier=1, + task_acks_late=True, + broker_pool_limit=10, + task_reject_on_worker_lost=True, + task_publish_retry=True, + broker_heartbeat=10 +) + import background.tasks \ No newline at end of file diff --git a/background/tasks.py b/background/tasks.py index b0d842d..e72b3ef 100644 --- a/background/tasks.py +++ b/background/tasks.py @@ -1,29 +1,40 @@ import asyncio from typing import List, Union +from celery.signals import worker_ready + import background.update +from limiter import BatchLimiter from .celery_app import app +def run_async(coroutine): + """Запускает асинхронную корутину в синхронном контексте.""" + return asyncio.run(coroutine) + + @app.task(name='process_update') def process_update(product_ids: list[int]): - loop = asyncio.get_event_loop() - return loop.run_until_complete(background.update.process_update(product_ids)) + # Запускаем асинхронную функцию + return run_async(background.update.process_update(product_ids)) @app.task(name='update_marketplace') def update_marketplace(marketplace_id: int): - loop = asyncio.get_event_loop() - return loop.run_until_complete(background.update.update_marketplace(marketplace_id)) + return run_async(background.update.update_marketplace(marketplace_id)) @app.task(name='update_marketplace_products') def update_marketplace_products(marketplace_id: int, product_ids: list[int]): - loop = asyncio.get_event_loop() - return loop.run_until_complete(background.update.update_marketplace_products(marketplace_id, product_ids)) + return run_async(background.update.update_marketplace_products(marketplace_id, product_ids)) @app.task(name='update_marketplaces') def update_marketplaces(marketplace_ids: Union[List[int], None]): - loop = asyncio.get_event_loop() - return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids)) + return run_async(background.update.update_marketplaces(marketplace_ids)) + + +@worker_ready.connect +def worker_is_ready(sender, **kwargs): + limiter = BatchLimiter() + return run_async(limiter.clear_locks()) diff --git a/limiter/batch_limiter.py b/limiter/batch_limiter.py index bc30dea..aae043f 100644 --- a/limiter/batch_limiter.py +++ b/limiter/batch_limiter.py @@ -68,6 +68,8 @@ class BatchLimiter: keys = [] async for key in redis.scan_iter('*_lock*'): keys.append(key) + if not keys: + return await redis.delete(*keys) async def acquire_wildberries(self, key): diff --git a/requirements.txt b/requirements.txt index e6fd9c4..5f2ee9f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,4 +14,5 @@ celery[redis] pyjwt redis redis[hiredis] -celery[librabbitmq] \ No newline at end of file +celery[librabbitmq] +gevent \ No newline at end of file diff --git a/start_celery.sh b/start_celery.sh index 0290a5f..db52d2d 100755 --- a/start_celery.sh +++ b/start_celery.sh @@ -1,3 +1,3 @@ #!/bin/bash ulimit -n 97816 -/var/www/sipro/Sipro-Stocks/venv/bin/celery -A background.celery worker --loglevel=info --logfile=/var/www/sipro/Sipro-Stocks/celery.log +/var/www/sipro/Sipro-Stocks/venv/bin/celery -A background.celery_app worker -P gevent -c 20 --loglevel=info --logfile=/var/www/sipro/Sipro-Stocks/celery.log \ No newline at end of file diff --git a/test.py b/test.py index 715e793..45bc242 100644 --- a/test.py +++ b/test.py @@ -1,2 +1,9 @@ -a = [1] -print(a[:1], a[1:]) +import background.tasks + + +def main(): + task = background.tasks.update_marketplace.delay(41) + + +if __name__ == '__main__': + main()