Files
Sipro-Stocks/background/update.py

42 lines
1.7 KiB
Python

import logging
from typing import Union, List
from backend.session import session_factory
from schemas.general import StockUpdate
from updaters.stocks_updater import StocksUpdater
async def process_update(product_ids: list[int]):
async with session_factory() as session:
updates = [StockUpdate(product_id=product_id) for product_id in product_ids]
updater = StocksUpdater(session)
await updater.update(updates)
logging.info(f'Products [{",".join(list(map(str, product_ids)))}] successfully updated')
async def update_marketplace(marketplace_id: int):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.full_update_marketplace(marketplace_id)
logging.info(f'Marketplace {marketplace_id} successfully updated')
async def update_marketplace_products(marketplace_id: int, product_ids: list[int]):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.update_marketplace_products(marketplace_id, product_ids)
logging.info(f'Products [{",".join(map(str, product_ids))}] for marketplace {marketplace_id} successfully updated')
async def update_marketplaces(marketplace_ids: Union[List[int], None]):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.full_update_all_marketplaces(marketplace_ids)
logging.info(f'Marketplaces {marketplace_ids} successfully updated')
async def reset_marketplace(marketplace_id:int):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.reset_marketplace(marketplace_id)
logging.info(f'Marketplace {marketplace_id} successfully reset')