diff --git a/backend/config.py b/backend/config.py index c6c0581..01fbccd 100644 --- a/backend/config.py +++ b/backend/config.py @@ -15,7 +15,6 @@ PG_DATABASE = os.environ.get('PG_DATABASE') # CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') # CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') REDIS_URL = os.environ.get('REDIS_URL') -RABBITMQ_URL = os.environ.get('RABBITMQ_URL') # Yandex YANDEX_CLIENT_ID = os.environ.get('YANDEX_CLIENT_ID') diff --git a/background/broker.py b/background/broker.py index f06cd44..15b542c 100644 --- a/background/broker.py +++ b/background/broker.py @@ -1,10 +1,16 @@ from taskiq import TaskiqScheduler from taskiq.schedule_sources import LabelScheduleSource -from taskiq_aio_pika import AioPikaBroker +from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker -import backend.config +# taskiq_broker = AioPikaBroker(backend.config.RABBITMQ_URL) +result_backend = RedisAsyncResultBackend( + redis_url="redis://redis:6379", + result_ex_time=1000 +) +taskiq_broker = RedisStreamBroker( + url="redis://redis:6379" +).with_result_backend(result_backend) -taskiq_broker = AioPikaBroker(backend.config.RABBITMQ_URL) scheduler = TaskiqScheduler( broker=taskiq_broker, sources=[LabelScheduleSource(taskiq_broker)] diff --git a/docker-compose.yml b/docker-compose.yml index 28cf9ec..c9906e3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,27 +5,17 @@ services: restart: unless-stopped env_file: .env command: [ "sh", "./start_taskiq.sh" ] - volumes_from: - - tmp - volumes: - - pg-socket:/run/postgresql networks: - appnet depends_on: redis: condition: service_healthy - rabbitmq: - condition: service_healthy taskiq_scheduler: image: git.denco.store/fakz9/sipro-stocks:latest container_name: stocks_scheduler restart: unless-stopped env_file: .env command: [ "sh", "./start_scheduler.sh" ] - volumes_from: - - tmp - volumes: - - pg-socket:/run/postgresql networks: - appnet depends_on: @@ -36,45 +26,16 @@ services: restart: unless-stopped env_file: .env command: [ "sh", "./start_fastapi.sh" ] - volumes_from: - - tmp - volumes: - - pg-socket:/run/postgresql networks: - appnet depends_on: - taskiq_worker ports: - "8000:8000" - tmp: - image: busybox:latest - command: [ "chmod", "-R","777", "/tmp/docker" ] - volumes: - - /tmp/docker/ - rabbitmq: - image: rabbitmq:latest - container_name: stocks_rabbitmq - restart: unless-stopped - environment: - RABBITMQ_DEFAULT_USER: guest - RABBITMQ_DEFAULT_PASS: guest - RABBITMQ_DEFAULT_VHOST: stocks_vhost - networks: - - appnet - healthcheck: - test: [ "CMD", "rabbitmqctl", "status" ] - interval: 10s - timeout: 5s - retries: 5 redis: image: redis:latest container_name: stocks_redis restart: unless-stopped - volumes_from: - - tmp - environment: - REDIS_PASSWORD: ${REDIS_PASSWORD} - command: [ "redis-server", "--unixsocket","/tmp/docker/redis.sock", "--unixsocketperm", "777", "--requirepass", "${REDIS_PASSWORD}" ] networks: - appnet healthcheck: @@ -85,10 +46,3 @@ services: networks: appnet: -volumes: - pg-socket: - driver: local - driver_opts: - type: none - device: /run/postgresql - o: bind diff --git a/pyproject.toml b/pyproject.toml index 694c420..ba9de78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "taskiq-aio-pika==0.4.2", "taskiq-fastapi==0.3.5", "taskiq-pipelines>=0.1.4", + "taskiq-redis>=1.0.9", "taskiq==0.11.17", "uvicorn[standard]>=0.35.0", ] diff --git a/start_taskiq.sh b/start_taskiq.sh index ae41107..a0462fb 100755 --- a/start_taskiq.sh +++ b/start_taskiq.sh @@ -1,14 +1,3 @@ #!/bin/bash -ulimit -n 97816 +taskiq worker background:taskiq_broker background.tasks -GREEN='\033[0;32m' - -log_info() { - echo -e "${GREEN}[INFO] $1${NC}" -} -# Start the Taskiq worker -log_info "Starting Taskiq worker..." -taskiq worker background:taskiq_broker background.tasks --max-async-task 100 --use-process-pool --max-prefetch 100 --workers 1 - -# Log when the Taskiq worker stops -log_info "Taskiq worker stopped" diff --git a/uv.lock b/uv.lock index 0fb38ba..32434bb 100644 --- a/uv.lock +++ b/uv.lock @@ -896,6 +896,7 @@ dependencies = [ { name = "taskiq-aio-pika" }, { name = "taskiq-fastapi" }, { name = "taskiq-pipelines" }, + { name = "taskiq-redis" }, { name = "uvicorn", extra = ["standard"] }, ] @@ -918,6 +919,7 @@ requires-dist = [ { name = "taskiq-aio-pika", specifier = "==0.4.2" }, { name = "taskiq-fastapi", specifier = "==0.3.5" }, { name = "taskiq-pipelines", specifier = ">=0.1.4" }, + { name = "taskiq-redis", specifier = ">=1.0.9" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.35.0" }, ] @@ -1046,6 +1048,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e3/e2/0ba3c3797f466cebd5431d8ac70146704cc34346332127a19b2d5f6b28ac/taskiq_pipelines-0.1.4-py3-none-any.whl", hash = "sha256:09b4999f79e74552e4a751f4670c1829eaf494a28ee06d3f47ec5f6f5bf5105d", size = 15810, upload-time = "2025-03-05T02:18:39.041Z" }, ] +[[package]] +name = "taskiq-redis" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "redis" }, + { name = "taskiq" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ab/46/fa492736f5b90587e73113022093dfab3a2ae16fddcfeea79b2012f1f598/taskiq_redis-1.0.9.tar.gz", hash = "sha256:d25a5ef1c8a50dab680bf2433b6c0a811909adc100a9d986cc9ca8cf13b11300", size = 15771, upload-time = "2025-06-06T13:26:48.378Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/39/28/6311e2f8c9eef70d80f711be86887fba5e151817553231819a24bcd02fd9/taskiq_redis-1.0.9-py3-none-any.whl", hash = "sha256:a27a4940cfb79fabbe99dba557423ba53728f14517e1554394c9643df2972f3c", size = 20128, upload-time = "2025-06-06T13:26:47.206Z" }, +] + [[package]] name = "typing-extensions" version = "4.14.1"