feat: new stuff

This commit is contained in:
2024-11-05 04:40:42 +03:00
parent d497bb0465
commit eeac97861c
10 changed files with 138 additions and 78 deletions

View File

@@ -15,7 +15,7 @@ PG_DATABASE = os.environ.get('PG_DATABASE')
# CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL')
# CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND')
REDIS_URL = os.environ.get('REDIS_URL')
RABBITMQ_URL = os.environ.get('RABBITMQ_URL')
# Yandex
YANDEX_CLIENT_ID = os.environ.get('YANDEX_CLIENT_ID')

1
background/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .broker import taskiq_broker

5
background/broker.py Normal file
View File

@@ -0,0 +1,5 @@
from taskiq_aio_pika import AioPikaBroker
import backend.config
taskiq_broker = AioPikaBroker(backend.config.RABBITMQ_URL)

View File

@@ -1,32 +0,0 @@
from celery import Celery
# from backend.config import CELERY_RESULT_BACKEND, CELERY_BROKER_URL
broker = 'amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost'
backend = 'rpc://'
#
# broker = 'amqp://test:test123@localhost/test_vhost'
# backend = 'rpc://'
app = Celery(
__name__,
# broker='amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost',
# backend='rpc://',
broker=broker,
backend=backend
)
# app.conf.broker_url = 'amqp://stocks:5W3FcvqXnnp1vv04jf673Mf8EY@localhost/stocks_vhost'
# app.conf.result_backend = 'rpc://'
app.conf.broker_url = broker
app.conf.result_backend = backend
app.conf.update(
worker_concurrency=8,
worker_prefetch_multiplier=1,
task_acks_late=True,
broker_pool_limit=10,
task_reject_on_worker_lost=True,
task_publish_retry=True,
broker_heartbeat=10
)
import background.tasks

View File

@@ -1,32 +1,23 @@
import asyncio
from typing import List, Union
import background.update
from .celery_app import app
from background.broker import taskiq_broker
def run_async(coroutine):
try:
# Попытка получить текущий запущенный цикл
loop = asyncio.get_running_loop()
except RuntimeError: # Если не запущен ни один цикл событий
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@taskiq_broker.task(task_name='process_update')
async def process_update(product_ids: list[int]):
return await background.update.process_update(product_ids)
return loop.run_until_complete(coroutine)
@app.task(name='process_update')
def process_update(product_ids: list[int]):
return run_async(background.update.process_update(product_ids))
@taskiq_broker.task(task_name='update_marketplace')
async def update_marketplace(marketplace_id: int):
return await background.update.update_marketplace(marketplace_id)
@app.task(name='update_marketplace')
def update_marketplace(marketplace_id: int):
return run_async(background.update.update_marketplace(marketplace_id))
@app.task(name='update_marketplace_products')
def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
return run_async(background.update.update_marketplace_products(marketplace_id, product_ids))
@taskiq_broker.task(task_name='update_marketplace_products')
async def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
return await background.update.update_marketplace_products(marketplace_id, product_ids)
@app.task(name='update_marketplaces')
def update_marketplaces(marketplace_ids: Union[List[int], None]):
return run_async(background.update.update_marketplaces(marketplace_ids))
@taskiq_broker.task(task_name='update_marketplaces')
async def update_marketplaces(marketplace_ids: list[int]):
return await background.update.update_marketplaces(marketplace_ids)

View File

@@ -1,3 +1,4 @@
import logging
from typing import Union, List
from backend.session import session_factory
@@ -10,26 +11,25 @@ async def process_update(product_ids: list[int]):
updates = [StockUpdate(product_id=product_id) for product_id in product_ids]
updater = StocksUpdater(session)
await updater.update(updates)
return {'message': f'Stocks for products [{",".join(map(str, product_ids))}] successfully updated'}
logging.info(f'Products [{",".join(list(map(str, product_ids)))}] successfully updated')
async def update_marketplace(marketplace_id: int):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.full_update_marketplace(marketplace_id)
return {'message': f'Stocks for marketplace {marketplace_id} successfully updated'}
logging.info(f'Marketplace {marketplace_id} successfully updated')
async def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.update_marketplace_products(marketplace_id, product_ids)
return {
'message': f'Products [{",".join(list(map(str, product_ids)))}] successfully updated for marketplace {marketplace_id}'}
logging.info(f'Products [{",".join(map(str, product_ids))}] for marketplace {marketplace_id} successfully updated')
async def update_marketplaces(marketplace_ids: Union[List[int], None]):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.full_update_all_marketplaces(marketplace_ids)
return {'message': f'Stocks for marketplaces [{",".join(map(str, marketplace_ids))}] successfully updated'}
logging.info(f'Marketplaces {marketplace_ids} successfully updated')

45
main.py
View File

@@ -1,3 +1,4 @@
from contextlib import asynccontextmanager
from typing import Annotated
from celery.result import AsyncResult
@@ -7,10 +8,9 @@ from starlette import status
from starlette.responses import JSONResponse
import backend.config
from background import taskiq_broker
from schemas.general import *
import background.tasks
from background.tasks import *
from schemas.general import UpdateRequest, UpdateResponse, UpdateMarketplaceRequest, UpdateMarketplacesRequest, \
UpdateMarketplaceProductsRequest
auth_schema = HTTPBearer()
@@ -20,10 +20,32 @@ async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials')
async def start_broker():
if not taskiq_broker.is_worker_process:
await taskiq_broker.startup()
print('Broker started')
return
async def shutdown_broker():
if not taskiq_broker.is_worker_process:
await taskiq_broker.shutdown()
print('Broker shutdown')
return
@asynccontextmanager
async def lifespan(app: FastAPI):
await start_broker()
yield
await shutdown_broker()
app = FastAPI(
dependencies=[
Depends(check_auth)
]
],
lifespan=lifespan
)
@@ -41,7 +63,7 @@ async def ping():
async def update(
request: UpdateRequest
):
task = background.tasks.process_update.delay(request.product_ids)
task = background.tasks.process_update.kiq(request.product_ids)
return UpdateResponse(task_id=task.id)
@@ -49,25 +71,24 @@ async def update(
async def update_marketplace(
request: UpdateMarketplaceRequest
):
task = background.tasks.update_marketplace.delay(request.marketplace_id)
return UpdateResponse(task_id=task.id)
task = await background.tasks.update_marketplace.kiq(request.marketplace_id)
return UpdateResponse(task_id=task.task_id)
@app.post('/update/marketplace/products')
async def update_marketplace_products(
request: UpdateMarketplaceProductsRequest
):
task = background.tasks.update_marketplace_products.delay(request.marketplace_id, request.product_ids)
return UpdateResponse(task_id=task.id)
task = await background.tasks.update_marketplace_products.kiq(request.marketplace_id, request.product_ids)
return UpdateResponse(task_id=task.task_id)
@app.post('/update/marketplaces')
async def update_marketplace(
request: UpdateMarketplacesRequest
):
task = background.tasks.update_marketplaces.delay(request.marketplace_ids)
return UpdateResponse(task_id=task.id)
task =await background.tasks.update_marketplaces.kiq(request.marketplace_ids)
return UpdateResponse(task_id=task.task_id)
@app.get("/tasks/{task_id}")
def get_status(task_id):

View File

@@ -15,4 +15,7 @@ pyjwt
redis
redis[hiredis]
celery[librabbitmq]
gevent
gevent
taskiq-aio-pika
taskiq-fastapi

50
start_taskiq.sh Executable file
View File

@@ -0,0 +1,50 @@
#!/bin/bash
# Load Redis password from .env file
export REDIS_PASSWORD=$(grep -m 1 'REDIS_PASSWORD' .env | cut -d '=' -f2)
# Color definitions for log messages
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
RED='\033[0;31m'
NC='\033[0m' # No Color
# Function to print log messages with colors
log_info() {
echo -e "${GREEN}[INFO] $1${NC}"
}
log_warning() {
echo -e "${YELLOW}[WARNING] $1${NC}"
}
log_error() {
echo -e "${RED}[ERROR] $1${NC}"
}
# Start clearing Redis locks
log_info "Clearing Redis locks..."
# Check if Redis password was set correctly
if [ -z "$REDIS_PASSWORD" ]; then
log_error "REDIS_PASSWORD not set. Please check your .env file."
exit 1
fi
# Deleting keys and getting the total deleted count
TOTAL_DELETED_KEYS=$(redis-cli -a "$REDIS_PASSWORD" --scan --pattern '*_lock' 2>/dev/null | \
xargs -I {} redis-cli -a "$REDIS_PASSWORD" del {} 2>/dev/null | wc -l)
# Log the result
if [ "$TOTAL_DELETED_KEYS" -gt 0 ]; then
log_info "Total deleted keys: $TOTAL_DELETED_KEYS"
else
log_warning "No keys matched the pattern '*_lock'."
fi
# Start the Taskiq worker
log_info "Starting Taskiq worker..."
taskiq worker background:taskiq_broker background.tasks
# Log when the Taskiq worker stops
log_info "Taskiq worker stopped"

27
test.py
View File

@@ -1,9 +1,30 @@
import asyncio
from idlelib.pyparse import trans
from pydoc import browse
import background.tasks
from background import taskiq_broker
def main():
task = background.tasks.update_marketplace.delay(41)
async def main():
# # generate random json dict with strings and integers
# def generate_json():
# import random
# import string
# import json
# data = {}
# for i in range(0, 10):
# key = ''.join(random.choices(string.ascii_lowercase, k=5))
# value = random.randint(0, 100)
# data[key] = value
# return json.dumps(data)
# # generate json dict
# data = generate_json()
await taskiq_broker.startup()
await background.tasks.update_marketplace.kiq(41)
await taskiq_broker.shutdown()
if __name__ == '__main__':
main()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())