85 lines
2.8 KiB
Python
85 lines
2.8 KiB
Python
import asyncio
|
|
import logging
|
|
|
|
from sqlalchemy import select
|
|
|
|
import background.update
|
|
from backend.session import get_session
|
|
from background.broker import taskiq_broker
|
|
from buffer.core import TasksBuffer
|
|
from constants import DEFAULT_PROCESSING_PRICE
|
|
from database import Marketplace, Company
|
|
from utils import chunk_list
|
|
|
|
|
|
@taskiq_broker.task(task_name='process_update')
|
|
async def process_update(product_ids: list[int]):
|
|
return await background.update.process_update(product_ids)
|
|
|
|
|
|
@taskiq_broker.task(task_name='update_marketplace')
|
|
async def update_marketplace(marketplace_id: int):
|
|
return await background.update.update_marketplace(marketplace_id)
|
|
|
|
|
|
@taskiq_broker.task(task_name='update_marketplace_products')
|
|
async def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
|
|
return await background.update.update_marketplace_products(marketplace_id, product_ids)
|
|
|
|
|
|
@taskiq_broker.task(task_name='update_marketplaces')
|
|
async def update_marketplaces(marketplace_ids: list[int]):
|
|
return await background.update.update_marketplaces(marketplace_ids)
|
|
|
|
|
|
@taskiq_broker.task(task_name='reset_marketplace')
|
|
async def reset_marketplace(marketplace_id: int):
|
|
return await background.update.reset_marketplace(marketplace_id)
|
|
|
|
|
|
@taskiq_broker.task(schedule=[{"cron": "* * * * *"}])
|
|
async def flush_buffer():
|
|
try:
|
|
logging.info('Flushing buffer')
|
|
buffer = TasksBuffer()
|
|
should_process = await buffer.should_process()
|
|
if not should_process:
|
|
logging.info('Buffer is empty')
|
|
return
|
|
|
|
product_ids = await buffer.get_tasks()
|
|
total_products = len(product_ids)
|
|
if not product_ids:
|
|
logging.info('Buffer is empty')
|
|
return
|
|
logging.info(f'Flushing buffer with {total_products} products')
|
|
await process_update(product_ids)
|
|
logging.info(f'Buffer flushed with {total_products} products')
|
|
except Exception as e:
|
|
logging.error(f'Error in flush_buffer: {e}')
|
|
|
|
|
|
@taskiq_broker.task(schedule=[{"cron": "0 */3 * * *"}])
|
|
async def reset_companies_with_zero_balance():
|
|
logging.info(f'Flushing zero balance companies')
|
|
async for session in get_session():
|
|
marketplaces_stmt = (
|
|
select(
|
|
Marketplace
|
|
)
|
|
.join(
|
|
Company
|
|
)
|
|
.where(
|
|
Company.balance <= DEFAULT_PROCESSING_PRICE,
|
|
Company.is_deleted == False,
|
|
Company.is_archived == False,
|
|
Marketplace.is_deleted == False
|
|
)
|
|
)
|
|
marketplaces = list((await session.scalars(marketplaces_stmt)).all())
|
|
for marketplaces_chunk in chunk_list(marketplaces, 10):
|
|
tasks = [background.update.reset_marketplace(marketplace.id) for marketplace in marketplaces_chunk]
|
|
await asyncio.gather(*tasks)
|
|
|