Compare commits

...

9 Commits

16 changed files with 1472 additions and 114 deletions

1
.dockerignore Normal file
View File

@@ -0,0 +1 @@
.venv

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.13

31
Dockerfile Normal file
View File

@@ -0,0 +1,31 @@
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS builder
ENV UV_COMPILE_BYTECODE=1 UV_LINK_MODE=copy
# Disable Python downloads, because we want to use the system interpreter
# across both images. If using a managed Python version, it needs to be
# copied from the build image into the final image; see `standalone.Dockerfile`
# for an example.
ENV UV_PYTHON_DOWNLOADS=0
# Install git
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*
WORKDIR /app
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=uv.lock,target=uv.lock \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
uv sync --locked --no-install-project --no-dev
COPY . /app
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-dev
# Then, use a final image without uv
FROM python:3.13-slim-bookworm
# It is important to use the image that matches the builder, as the path to the
# Python executable must be the same, e.g., using `python:3.11-slim-bookworm`
# will fail.
# Copy the application from the builder
COPY --from=builder --chown=app:app /app /app
ENV PATH="/app/.venv/bin:$PATH"
WORKDIR /app

0
README.md Normal file
View File

View File

@@ -10,21 +10,23 @@ from .config import (
PG_PASSWORD, PG_PASSWORD,
) )
database_url = (
f"postgresql+asyncpg://"
f"{PG_LOGIN}:{PG_PASSWORD}@"
f"/{PG_DATABASE}?host=/run/postgresql/"
)
# database_url = ( # database_url = (
# f"postgresql+asyncpg://" # f"postgresql+asyncpg://"
# f"{PG_LOGIN}:{PG_PASSWORD}@" # f"{PG_LOGIN}:{PG_PASSWORD}@"
# f"{PG_HOST}:{PG_PORT}/{PG_DATABASE}" # f"/{PG_DATABASE}?host=/run/postgresql/"
# ) # )
database_url = (
f"postgresql+asyncpg://"
f"{PG_LOGIN}:{PG_PASSWORD}@"
f"{PG_HOST}:{PG_PORT}/{PG_DATABASE}"
)
engine = create_async_engine( engine = create_async_engine(
database_url, database_url,
pool_timeout=2000 pool_timeout=2000,
pool_size=10,
pool_pre_ping=True,
) )
session_factory = async_sessionmaker( session_factory = async_sessionmaker(
engine, engine,

2
build-docker.sh Executable file
View File

@@ -0,0 +1,2 @@
docker build -t git.denco.store/fakz9/sipro-stocks:latest .
docker push git.denco.store/fakz9/sipro-stocks:latest

View File

@@ -20,6 +20,7 @@ class MarketplaceProduct(BaseSiproModel):
mp_price_bought: Mapped[int] = mapped_column() mp_price_bought: Mapped[int] = mapped_column()
price_recommended: Mapped[int] = mapped_column() price_recommended: Mapped[int] = mapped_column()
is_archived: Mapped[bool] = mapped_column() is_archived: Mapped[bool] = mapped_column()
is_deleted: Mapped[bool] = mapped_column()
product_id: Mapped[int] = mapped_column(ForeignKey("products.id")) product_id: Mapped[int] = mapped_column(ForeignKey("products.id"))
product: Mapped["Product"] = relationship() product: Mapped["Product"] = relationship()

94
docker-compose.yml Normal file
View File

@@ -0,0 +1,94 @@
services:
taskiq_worker:
image: git.denco.store/fakz9/sipro-stocks:latest
container_name: stocks_worker
restart: unless-stopped
env_file: .env
command: [ "sh", "./start_taskiq.sh" ]
volumes_from:
- tmp
volumes:
- pg-socket:/run/postgresql
networks:
- appnet
depends_on:
redis:
condition: service_healthy
rabbitmq:
condition: service_healthy
taskiq_scheduler:
image: git.denco.store/fakz9/sipro-stocks:latest
container_name: stocks_scheduler
restart: unless-stopped
env_file: .env
command: [ "sh", "./start_scheduler.sh" ]
volumes_from:
- tmp
volumes:
- pg-socket:/run/postgresql
networks:
- appnet
depends_on:
- fastapi
fastapi:
image: git.denco.store/fakz9/sipro-stocks:latest
container_name: stocks_fastapi
restart: unless-stopped
env_file: .env
command: [ "sh", "./start_fastapi.sh" ]
volumes_from:
- tmp
volumes:
- pg-socket:/run/postgresql
networks:
- appnet
depends_on:
- taskiq_worker
ports:
- "8000:8000"
tmp:
image: busybox:latest
command: [ "chmod", "-R","777", "/tmp/docker" ]
volumes:
- /tmp/docker/
rabbitmq:
image: rabbitmq:latest
container_name: stocks_rabbitmq
restart: unless-stopped
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: stocks_vhost
networks:
- appnet
healthcheck:
test: [ "CMD", "rabbitmqctl", "status" ]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:latest
container_name: stocks_redis
restart: unless-stopped
volumes_from:
- tmp
environment:
REDIS_PASSWORD: ${REDIS_PASSWORD}
command: [ "redis-server", "--unixsocket","/tmp/docker/redis.sock", "--unixsocketperm", "777", "--requirepass", "${REDIS_PASSWORD}" ]
networks:
- appnet
healthcheck:
test: [ "CMD" ,"redis-cli", "ping" ]
interval: 5s
timeout: 2s
retries: 5
networks:
appnet:
volumes:
pg-socket:
driver: local
driver_opts:
type: none
device: /run/postgresql
o: bind

View File

@@ -125,9 +125,10 @@ def get_status(task_id):
@app.get('/marketplace/{marketplace_id}/stocks') @app.get('/marketplace/{marketplace_id}/stocks')
async def get_marketplace_stocks( async def get_marketplace_stocks(
marketplace_id: int,
session: SessionDependency, session: SessionDependency,
marketplace_id: int,
only_available: bool = False only_available: bool = False
): ):
updater = StocksUpdater(session) updater = StocksUpdater(session)
return await updater.get_all_stocks_for_marketplace(int(marketplace_id), only_available) return await updater.get_all_stocks_for_marketplace(int(marketplace_id), only_available)

25
pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[project]
name = "sipro-stocks"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"aiohttp[speedups]>=3.12.15",
"alembic>=1.16.4",
"asyncpg>=0.30.0",
"celery[librabbitmq,redis]>=5.5.3",
"fastapi>=0.116.1",
"gevent>=25.5.1",
"gunicorn>=23.0.0",
"pydantic>=2.11.7",
"pyjwt>=2.10.1",
"python-dotenv>=1.1.1",
"redis[hiredis]>=5.2.1",
"sqlalchemy[asyncio]>=2.0.43",
"taskiq-aio-pika==0.4.2",
"taskiq-fastapi==0.3.5",
"taskiq-pipelines>=0.1.4",
"taskiq==0.11.17",
"uvicorn[standard]>=0.35.0",
]

View File

@@ -256,7 +256,7 @@ async def get_stocks_data(
func.coalesce(is_master_subquery.c.is_master, False).label('is_master'), func.coalesce(is_master_subquery.c.is_master, False).label('is_master'),
func.coalesce(slaves_stock_subquery.c.slaves_stock, 0).label('slaves_stock'), func.coalesce(slaves_stock_subquery.c.slaves_stock, 0).label('slaves_stock'),
MarketplaceProduct.price_recommended.label('price_recommended'), MarketplaceProduct.price_recommended.label('price_recommended'),
MarketplaceProduct.is_archived.label('is_archived'), MarketplaceProduct.is_deleted.label('is_deleted'),
func.coalesce(fbo_stock_subquery.c.quantity, 0).label('fbo_stock') func.coalesce(fbo_stock_subquery.c.quantity, 0).label('fbo_stock')
) )
.select_from( .select_from(
@@ -318,25 +318,24 @@ async def get_stocks_data(
is_master, is_master,
slaves_stock, slaves_stock,
price_recommended, price_recommended,
is_archived, is_deleted,
fbo_stock fbo_stock
) in marketplace_products: ) in marketplace_products:
if is_archived or (sell_from_price > price_recommended) or is_paused or reset: base_dict: StockData = {
response.append({ 'article': denco_article,
'article': denco_article, 'marketplace_product': marketplace_product,
'full_stock': 0, 'product_id': marketplace_product.product_id,
'marketplace_product': marketplace_product, 'full_stock': 0
'product_id': marketplace_product.product_id }
}) zero_stock = any([
continue is_deleted,
if fbo_stock > 0 and prefer_fbo_over_fbs: sell_from_price > price_recommended,
response.append({ is_paused,
'article': denco_article, fbo_stock > 0 and prefer_fbo_over_fbs,
'full_stock': 0, 45 > company.balance
'marketplace_product': marketplace_product, ])
'product_id': marketplace_product.product_id if zero_stock:
response.append(base_dict)
})
continue continue
is_mix = mix_stock is not None is_mix = mix_stock is not None
@@ -359,15 +358,7 @@ async def get_stocks_data(
full_stock = warehouse_stock full_stock = warehouse_stock
if (not sell_mixes) and is_mix: if (not sell_mixes) and is_mix:
full_stock = warehouse_stock full_stock = warehouse_stock
if 45 > company.balance:
full_stock = 0
full_stock = max([0, full_stock]) full_stock = max([0, full_stock])
base_dict['full_stock'] = full_stock
response.append({ response.append(base_dict)
'article': denco_article,
'full_stock': full_stock,
'marketplace_product': marketplace_product,
'product_id': marketplace_product.product_id
})
return response return response

1
start_fastapi.sh Executable file
View File

@@ -0,0 +1 @@
gunicorn -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000 main:app

View File

@@ -1,57 +1,11 @@
#!/bin/bash #!/bin/bash
ulimit -n 97816 ulimit -n 97816
# Load Redis password from .env file
export REDIS_PASSWORD=$(grep -m 1 'REDIS_PASSWORD' .env | cut -d '=' -f2)
# Color definitions for log messages
GREEN='\033[0;32m' GREEN='\033[0;32m'
YELLOW='\033[0;33m'
RED='\033[0;31m'
NC='\033[0m' # No Color
# Function to print log messages with colors
log_info() { log_info() {
echo -e "${GREEN}[INFO] $1${NC}" echo -e "${GREEN}[INFO] $1${NC}"
} }
log_warning() {
echo -e "${YELLOW}[WARNING] $1${NC}"
}
log_error() {
echo -e "${RED}[ERROR] $1${NC}"
}
# Start clearing Redis locks
log_info "Clearing Redis locks..."
# Check if Redis password was set correctly
if [ -z "$REDIS_PASSWORD" ]; then
log_error "REDIS_PASSWORD not set. Please check your .env file."
exit 1
fi
# Deleting keys and getting the total deleted count
TOTAL_DELETED_KEYS=$(redis-cli -a "$REDIS_PASSWORD" --scan --pattern '*_lock' 2>/dev/null | \
xargs -I {} redis-cli -a "$REDIS_PASSWORD" del {} 2>/dev/null | wc -l)
# Log the result
if [ "$TOTAL_DELETED_KEYS" -gt 0 ]; then
log_info "Total deleted keys: $TOTAL_DELETED_KEYS"
else
log_warning "No keys matched the pattern '*_lock'."
fi
# Clear RabbitMQ queue
log_info "Purging RabbitMQ queue 'taskiq' in vhost 'stocks_vhost'..."
rabbitmqctl purge_queue -p stocks_vhost taskiq
if [ $? -eq 0 ]; then
log_info "RabbitMQ queue purged successfully."
else
log_error "Failed to purge RabbitMQ queue. Please check your RabbitMQ setup."
fi
# Start the Taskiq worker # Start the Taskiq worker
log_info "Starting Taskiq worker..." log_info "Starting Taskiq worker..."
taskiq worker background:taskiq_broker background.tasks --max-async-task 1000 --max-threadpool-threads 8 --max-prefetch 10000 --workers 1 taskiq worker background:taskiq_broker background.tasks --max-async-task 1000 --max-threadpool-threads 8 --max-prefetch 10000 --workers 1

30
test.py
View File

@@ -1,30 +0,0 @@
import asyncio
from idlelib.pyparse import trans
from pydoc import browse
import background.tasks
from background import taskiq_broker
async def main():
# # generate random json dict with strings and integers
# def generate_json():
# import random
# import string
# import json
# data = {}
# for i in range(0, 10):
# key = ''.join(random.choices(string.ascii_lowercase, k=5))
# value = random.randint(0, 100)
# data[key] = value
# return json.dumps(data)
# # generate json dict
# data = generate_json()
await taskiq_broker.startup()
await background.tasks.update_marketplace.kiq(41)
await taskiq_broker.shutdown()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@@ -1,6 +1,8 @@
import asyncio import asyncio
import json
import logging import logging
import time import time
import redis.asyncio
from collections import defaultdict from collections import defaultdict
from typing import List, Union from typing import List, Union
@@ -8,6 +10,7 @@ from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload from sqlalchemy.orm import joinedload, selectinload
from backend.config import REDIS_URL
from backend.session import session_factory from backend.session import session_factory
from database import Marketplace, MarketplaceProduct, Warehouse, Company from database import Marketplace, MarketplaceProduct, Warehouse, Company
from database.sipro.enums.general import BaseMarketplace from database.sipro.enums.general import BaseMarketplace
@@ -74,9 +77,16 @@ class StocksUpdater:
f"{marketplace.name} successfully fully updated in {round(time.time() - start, 2)} seconds.") f"{marketplace.name} successfully fully updated in {round(time.time() - start, 2)} seconds.")
async def get_all_stocks_for_marketplace(self, marketplace_id: int, only_available: bool) -> List[StockData]: async def get_all_stocks_for_marketplace(self, marketplace_id: int, only_available: bool) -> List[StockData]:
cache_key = f"marketplace_stocks_{marketplace_id}_{only_available}"
client = redis.asyncio.from_url(REDIS_URL)
if cached := await client.get(cache_key):
return json.loads(cached)
marketplace = await self.get_marketplace(marketplace_id) marketplace = await self.get_marketplace(marketplace_id)
updater = UpdaterFactory.get_updater(self.session, marketplace) updater = UpdaterFactory.get_updater(self.session, marketplace)
return await updater.get_all_stocks(only_available) response = await updater.get_all_stocks(only_available)
await client.set(cache_key, json.dumps(response), ex=600)
return response
async def full_update_all_marketplaces(self, marketplace_ids: Union[List[int], None] = None): async def full_update_all_marketplaces(self, marketplace_ids: Union[List[int], None] = None):
marketplaces = await self.get_marketplaces(marketplace_ids) marketplaces = await self.get_marketplaces(marketplace_ids)

1274
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff