diff --git a/backend/config.py b/backend/config.py index 040ddc9..1dfa206 100644 --- a/backend/config.py +++ b/backend/config.py @@ -12,8 +12,8 @@ PG_HOST = os.environ.get('PG_HOST') PG_DATABASE = os.environ.get('PG_DATABASE') # Celery -CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') -CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') +# CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') +# CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') REDIS_URL = os.environ.get('REDIS_URL') # Yandex diff --git a/background/__init__.py b/background/__init__.py deleted file mode 100644 index 645aa30..0000000 --- a/background/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .worker import celery -from .tasks import * diff --git a/background/celery_app.py b/background/celery_app.py new file mode 100644 index 0000000..e3ba1d5 --- /dev/null +++ b/background/celery_app.py @@ -0,0 +1,12 @@ +from celery import Celery +# from backend.config import CELERY_RESULT_BACKEND, CELERY_BROKER_URL + +app = Celery( + __name__, + broker='amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost', + backend='rpc://', + +) +app.conf.broker_url = 'amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost' +app.conf.result_backend = 'rpc://' +import background.tasks \ No newline at end of file diff --git a/background/tasks.py b/background/tasks.py index 0685f4e..b0d842d 100644 --- a/background/tasks.py +++ b/background/tasks.py @@ -1,39 +1,29 @@ import asyncio from typing import List, Union -from celery.signals import worker_ready - import background.update -from background import celery -from limiter import BatchLimiter +from .celery_app import app -@celery.task(name='process_update') +@app.task(name='process_update') def process_update(product_ids: list[int]): loop = asyncio.get_event_loop() return loop.run_until_complete(background.update.process_update(product_ids)) -@celery.task(name='update_marketplace') +@app.task(name='update_marketplace') def update_marketplace(marketplace_id: int): loop = asyncio.get_event_loop() return loop.run_until_complete(background.update.update_marketplace(marketplace_id)) -@celery.task(name='update_marketplace_products') +@app.task(name='update_marketplace_products') def update_marketplace_products(marketplace_id: int, product_ids: list[int]): loop = asyncio.get_event_loop() return loop.run_until_complete(background.update.update_marketplace_products(marketplace_id, product_ids)) -@celery.task(name='update_marketplaces') +@app.task(name='update_marketplaces') def update_marketplaces(marketplace_ids: Union[List[int], None]): loop = asyncio.get_event_loop() return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids)) - - -@worker_ready.connect -def worker_is_ready(sender, **kwargs): - loop = asyncio.get_event_loop() - limiter = BatchLimiter() - loop.run_until_complete(limiter.clear_locks()) diff --git a/background/worker.py b/background/worker.py deleted file mode 100644 index 0357f06..0000000 --- a/background/worker.py +++ /dev/null @@ -1,10 +0,0 @@ -from celery import Celery -from backend.config import CELERY_RESULT_BACKEND, CELERY_BROKER_URL - -celery = Celery( - __name__, - broker=CELERY_BROKER_URL, - backend=CELERY_RESULT_BACKEND, - -) -celery.conf.result_backend_thread_safe = True diff --git a/requirements.txt b/requirements.txt index 9bddf05..e6fd9c4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,5 @@ aiohttp[speedups] celery[redis] pyjwt redis -redis[hiredis] \ No newline at end of file +redis[hiredis] +celery[librabbitmq] \ No newline at end of file