Files
Sipro-Stocks/background/tasks.py
2025-12-17 03:06:29 +03:00

87 lines
3.0 KiB
Python

import asyncio
import logging
from sqlalchemy import select
import background.update
from backend.session import get_session
from background.broker import taskiq_broker
from buffer.core import TasksBuffer
from constants import DEFAULT_PROCESSING_PRICE
from database import Marketplace, Company
from utils import chunk_list
SMALL_TIMEOUT = 100
LARGE_TIMEOUT = 30 * 60
@taskiq_broker.task(task_name='process_update', timeout=SMALL_TIMEOUT)
async def process_update(product_ids: list[int]):
return await background.update.process_update(product_ids)
@taskiq_broker.task(task_name='update_marketplace', timeout=SMALL_TIMEOUT)
async def update_marketplace(marketplace_id: int):
return await background.update.update_marketplace(marketplace_id)
@taskiq_broker.task(task_name='update_marketplace_products', timeout=SMALL_TIMEOUT)
async def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
return await background.update.update_marketplace_products(marketplace_id, product_ids)
@taskiq_broker.task(task_name='update_marketplaces', timeout=LARGE_TIMEOUT)
async def update_marketplaces(marketplace_ids: list[int]):
return await background.update.update_marketplaces(marketplace_ids)
@taskiq_broker.task(task_name='reset_marketplace', timeout=SMALL_TIMEOUT)
async def reset_marketplace(marketplace_id: int):
return await background.update.reset_marketplace(marketplace_id)
@taskiq_broker.task(schedule=[{"cron": "* * * * *"}])
async def flush_buffer():
try:
logging.info('Flushing buffer')
buffer = TasksBuffer()
should_process = await buffer.should_process()
if not should_process:
logging.info('Buffer is empty')
return
product_ids = await buffer.get_tasks()
total_products = len(product_ids)
if not product_ids:
logging.info('Buffer is empty')
return
logging.info(f'Flushing buffer with {total_products} products')
await process_update(product_ids)
logging.info(f'Buffer flushed with {total_products} products')
except Exception as e:
logging.error(f'Error in flush_buffer: {e}')
@taskiq_broker.task(schedule=[{"cron": "*/30 * * * *"}], timeout=LARGE_TIMEOUT)
async def reset_companies_with_zero_balance():
logging.info(f'Flushing zero balance companies')
async for session in get_session():
marketplaces_stmt = (
select(
Marketplace
)
.join(
Company
)
.where(
Company.balance <= DEFAULT_PROCESSING_PRICE,
Company.is_deleted == False,
Company.is_archived == False,
Marketplace.is_deleted == False
)
)
marketplaces = list((await session.scalars(marketplaces_stmt)).all())
for marketplaces_chunk in chunk_list(marketplaces, 10):
tasks = [background.update.reset_marketplace(marketplace.id) for marketplace in marketplaces_chunk]
await asyncio.gather(*tasks)