feat: crap

This commit is contained in:
2024-11-05 00:38:48 +03:00
parent 1ac664362e
commit 2e0d5f0944
6 changed files with 43 additions and 13 deletions

View File

@@ -5,8 +5,17 @@ app = Celery(
__name__,
broker='amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost',
backend='rpc://',
)
app.conf.broker_url = 'amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost'
app.conf.result_backend = 'rpc://'
app.conf.update(
worker_concurrency=8,
worker_prefetch_multiplier=1,
task_acks_late=True,
broker_pool_limit=10,
task_reject_on_worker_lost=True,
task_publish_retry=True,
broker_heartbeat=10
)
import background.tasks

View File

@@ -1,29 +1,40 @@
import asyncio
from typing import List, Union
from celery.signals import worker_ready
import background.update
from limiter import BatchLimiter
from .celery_app import app
def run_async(coroutine):
"""Запускает асинхронную корутину в синхронном контексте."""
return asyncio.run(coroutine)
@app.task(name='process_update')
def process_update(product_ids: list[int]):
loop = asyncio.get_event_loop()
return loop.run_until_complete(background.update.process_update(product_ids))
# Запускаем асинхронную функцию
return run_async(background.update.process_update(product_ids))
@app.task(name='update_marketplace')
def update_marketplace(marketplace_id: int):
loop = asyncio.get_event_loop()
return loop.run_until_complete(background.update.update_marketplace(marketplace_id))
return run_async(background.update.update_marketplace(marketplace_id))
@app.task(name='update_marketplace_products')
def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
loop = asyncio.get_event_loop()
return loop.run_until_complete(background.update.update_marketplace_products(marketplace_id, product_ids))
return run_async(background.update.update_marketplace_products(marketplace_id, product_ids))
@app.task(name='update_marketplaces')
def update_marketplaces(marketplace_ids: Union[List[int], None]):
loop = asyncio.get_event_loop()
return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids))
return run_async(background.update.update_marketplaces(marketplace_ids))
@worker_ready.connect
def worker_is_ready(sender, **kwargs):
limiter = BatchLimiter()
return run_async(limiter.clear_locks())

View File

@@ -68,6 +68,8 @@ class BatchLimiter:
keys = []
async for key in redis.scan_iter('*_lock*'):
keys.append(key)
if not keys:
return
await redis.delete(*keys)
async def acquire_wildberries(self, key):

View File

@@ -14,4 +14,5 @@ celery[redis]
pyjwt
redis
redis[hiredis]
celery[librabbitmq]
celery[librabbitmq]
gevent

View File

@@ -1,3 +1,3 @@
#!/bin/bash
ulimit -n 97816
/var/www/sipro/Sipro-Stocks/venv/bin/celery -A background.celery worker --loglevel=info --logfile=/var/www/sipro/Sipro-Stocks/celery.log
/var/www/sipro/Sipro-Stocks/venv/bin/celery -A background.celery_app worker -P gevent -c 20 --loglevel=info --logfile=/var/www/sipro/Sipro-Stocks/celery.log

11
test.py
View File

@@ -1,2 +1,9 @@
a = [1]
print(a[:1], a[1:])
import background.tasks
def main():
task = background.tasks.update_marketplace.delay(41)
if __name__ == '__main__':
main()