feat: rabbit instead redis
This commit is contained in:
@@ -12,8 +12,8 @@ PG_HOST = os.environ.get('PG_HOST')
|
|||||||
PG_DATABASE = os.environ.get('PG_DATABASE')
|
PG_DATABASE = os.environ.get('PG_DATABASE')
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL')
|
# CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL')
|
||||||
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND')
|
# CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND')
|
||||||
REDIS_URL = os.environ.get('REDIS_URL')
|
REDIS_URL = os.environ.get('REDIS_URL')
|
||||||
|
|
||||||
# Yandex
|
# Yandex
|
||||||
|
|||||||
@@ -1,2 +0,0 @@
|
|||||||
from .worker import celery
|
|
||||||
from .tasks import *
|
|
||||||
12
background/celery_app.py
Normal file
12
background/celery_app.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
from celery import Celery
|
||||||
|
# from backend.config import CELERY_RESULT_BACKEND, CELERY_BROKER_URL
|
||||||
|
|
||||||
|
app = Celery(
|
||||||
|
__name__,
|
||||||
|
broker='amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost',
|
||||||
|
backend='rpc://',
|
||||||
|
|
||||||
|
)
|
||||||
|
app.conf.broker_url = 'amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost'
|
||||||
|
app.conf.result_backend = 'rpc://'
|
||||||
|
import background.tasks
|
||||||
@@ -1,39 +1,29 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from typing import List, Union
|
from typing import List, Union
|
||||||
|
|
||||||
from celery.signals import worker_ready
|
|
||||||
|
|
||||||
import background.update
|
import background.update
|
||||||
from background import celery
|
from .celery_app import app
|
||||||
from limiter import BatchLimiter
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name='process_update')
|
@app.task(name='process_update')
|
||||||
def process_update(product_ids: list[int]):
|
def process_update(product_ids: list[int]):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
return loop.run_until_complete(background.update.process_update(product_ids))
|
return loop.run_until_complete(background.update.process_update(product_ids))
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name='update_marketplace')
|
@app.task(name='update_marketplace')
|
||||||
def update_marketplace(marketplace_id: int):
|
def update_marketplace(marketplace_id: int):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
return loop.run_until_complete(background.update.update_marketplace(marketplace_id))
|
return loop.run_until_complete(background.update.update_marketplace(marketplace_id))
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name='update_marketplace_products')
|
@app.task(name='update_marketplace_products')
|
||||||
def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
|
def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
return loop.run_until_complete(background.update.update_marketplace_products(marketplace_id, product_ids))
|
return loop.run_until_complete(background.update.update_marketplace_products(marketplace_id, product_ids))
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name='update_marketplaces')
|
@app.task(name='update_marketplaces')
|
||||||
def update_marketplaces(marketplace_ids: Union[List[int], None]):
|
def update_marketplaces(marketplace_ids: Union[List[int], None]):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids))
|
return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids))
|
||||||
|
|
||||||
|
|
||||||
@worker_ready.connect
|
|
||||||
def worker_is_ready(sender, **kwargs):
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
limiter = BatchLimiter()
|
|
||||||
loop.run_until_complete(limiter.clear_locks())
|
|
||||||
|
|||||||
@@ -1,10 +0,0 @@
|
|||||||
from celery import Celery
|
|
||||||
from backend.config import CELERY_RESULT_BACKEND, CELERY_BROKER_URL
|
|
||||||
|
|
||||||
celery = Celery(
|
|
||||||
__name__,
|
|
||||||
broker=CELERY_BROKER_URL,
|
|
||||||
backend=CELERY_RESULT_BACKEND,
|
|
||||||
|
|
||||||
)
|
|
||||||
celery.conf.result_backend_thread_safe = True
|
|
||||||
@@ -13,4 +13,5 @@ aiohttp[speedups]
|
|||||||
celery[redis]
|
celery[redis]
|
||||||
pyjwt
|
pyjwt
|
||||||
redis
|
redis
|
||||||
redis[hiredis]
|
redis[hiredis]
|
||||||
|
celery[librabbitmq]
|
||||||
Reference in New Issue
Block a user