feat: switch to redis backend

This commit is contained in:
2025-11-25 18:26:43 +03:00
parent ca2157d71a
commit 3bf182c7db
6 changed files with 26 additions and 62 deletions

View File

@@ -15,7 +15,6 @@ PG_DATABASE = os.environ.get('PG_DATABASE')
# CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL')
# CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND')
REDIS_URL = os.environ.get('REDIS_URL')
RABBITMQ_URL = os.environ.get('RABBITMQ_URL')
# Yandex
YANDEX_CLIENT_ID = os.environ.get('YANDEX_CLIENT_ID')

View File

@@ -1,10 +1,16 @@
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
import backend.config
# taskiq_broker = AioPikaBroker(backend.config.RABBITMQ_URL)
result_backend = RedisAsyncResultBackend(
redis_url="redis://redis:6379",
result_ex_time=1000
)
taskiq_broker = RedisStreamBroker(
url="redis://redis:6379"
).with_result_backend(result_backend)
taskiq_broker = AioPikaBroker(backend.config.RABBITMQ_URL)
scheduler = TaskiqScheduler(
broker=taskiq_broker,
sources=[LabelScheduleSource(taskiq_broker)]

View File

@@ -5,27 +5,17 @@ services:
restart: unless-stopped
env_file: .env
command: [ "sh", "./start_taskiq.sh" ]
volumes_from:
- tmp
volumes:
- pg-socket:/run/postgresql
networks:
- appnet
depends_on:
redis:
condition: service_healthy
rabbitmq:
condition: service_healthy
taskiq_scheduler:
image: git.denco.store/fakz9/sipro-stocks:latest
container_name: stocks_scheduler
restart: unless-stopped
env_file: .env
command: [ "sh", "./start_scheduler.sh" ]
volumes_from:
- tmp
volumes:
- pg-socket:/run/postgresql
networks:
- appnet
depends_on:
@@ -36,45 +26,16 @@ services:
restart: unless-stopped
env_file: .env
command: [ "sh", "./start_fastapi.sh" ]
volumes_from:
- tmp
volumes:
- pg-socket:/run/postgresql
networks:
- appnet
depends_on:
- taskiq_worker
ports:
- "8000:8000"
tmp:
image: busybox:latest
command: [ "chmod", "-R","777", "/tmp/docker" ]
volumes:
- /tmp/docker/
rabbitmq:
image: rabbitmq:latest
container_name: stocks_rabbitmq
restart: unless-stopped
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: stocks_vhost
networks:
- appnet
healthcheck:
test: [ "CMD", "rabbitmqctl", "status" ]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:latest
container_name: stocks_redis
restart: unless-stopped
volumes_from:
- tmp
environment:
REDIS_PASSWORD: ${REDIS_PASSWORD}
command: [ "redis-server", "--unixsocket","/tmp/docker/redis.sock", "--unixsocketperm", "777", "--requirepass", "${REDIS_PASSWORD}" ]
networks:
- appnet
healthcheck:
@@ -85,10 +46,3 @@ services:
networks:
appnet:
volumes:
pg-socket:
driver: local
driver_opts:
type: none
device: /run/postgresql
o: bind

View File

@@ -21,6 +21,7 @@ dependencies = [
"taskiq-aio-pika==0.4.2",
"taskiq-fastapi==0.3.5",
"taskiq-pipelines>=0.1.4",
"taskiq-redis>=1.0.9",
"taskiq==0.11.17",
"uvicorn[standard]>=0.35.0",
]

View File

@@ -1,14 +1,3 @@
#!/bin/bash
ulimit -n 97816
taskiq worker background:taskiq_broker background.tasks
GREEN='\033[0;32m'
log_info() {
echo -e "${GREEN}[INFO] $1${NC}"
}
# Start the Taskiq worker
log_info "Starting Taskiq worker..."
taskiq worker background:taskiq_broker background.tasks --max-async-task 100 --use-process-pool --max-prefetch 100 --workers 1
# Log when the Taskiq worker stops
log_info "Taskiq worker stopped"

15
uv.lock generated
View File

@@ -896,6 +896,7 @@ dependencies = [
{ name = "taskiq-aio-pika" },
{ name = "taskiq-fastapi" },
{ name = "taskiq-pipelines" },
{ name = "taskiq-redis" },
{ name = "uvicorn", extra = ["standard"] },
]
@@ -918,6 +919,7 @@ requires-dist = [
{ name = "taskiq-aio-pika", specifier = "==0.4.2" },
{ name = "taskiq-fastapi", specifier = "==0.3.5" },
{ name = "taskiq-pipelines", specifier = ">=0.1.4" },
{ name = "taskiq-redis", specifier = ">=1.0.9" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.35.0" },
]
@@ -1046,6 +1048,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e3/e2/0ba3c3797f466cebd5431d8ac70146704cc34346332127a19b2d5f6b28ac/taskiq_pipelines-0.1.4-py3-none-any.whl", hash = "sha256:09b4999f79e74552e4a751f4670c1829eaf494a28ee06d3f47ec5f6f5bf5105d", size = 15810, upload-time = "2025-03-05T02:18:39.041Z" },
]
[[package]]
name = "taskiq-redis"
version = "1.0.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "redis" },
{ name = "taskiq" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ab/46/fa492736f5b90587e73113022093dfab3a2ae16fddcfeea79b2012f1f598/taskiq_redis-1.0.9.tar.gz", hash = "sha256:d25a5ef1c8a50dab680bf2433b6c0a811909adc100a9d986cc9ca8cf13b11300", size = 15771, upload-time = "2025-06-06T13:26:48.378Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/39/28/6311e2f8c9eef70d80f711be86887fba5e151817553231819a24bcd02fd9/taskiq_redis-1.0.9-py3-none-any.whl", hash = "sha256:a27a4940cfb79fabbe99dba557423ba53728f14517e1554394c9643df2972f3c", size = 20128, upload-time = "2025-06-06T13:26:47.206Z" },
]
[[package]]
name = "typing-extensions"
version = "4.14.1"