from collections import defaultdict from dataclasses import dataclass from enum import unique, IntEnum from typing import List, Union from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert from sqlalchemy.ext.asyncio import AsyncSession import database from database import Marketplace, MarketplaceProduct, DailyStock @unique class StockUpdateType(IntEnum): SALE = 0 SUPPLIER_UPDATE = 1 WAREHOUSE_UPDATE = 2 @dataclass class StockUpdate: product_id: int type: StockUpdateType quantity: int class StocksUpdater: def __init__(self, session: AsyncSession): self.session = session async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]): pass async def update(self, updates: list[StockUpdate]): updates_dict = defaultdict(list) stock_update_values = [] for update in updates: # Working with sold today if update.type == StockUpdateType.SALE: stock_update_values.append({ 'product_id': update.product_id, 'sold_today': update.quantity }) # Working with marketplaces stmt = ( select( MarketplaceProduct.marketplace_id.distinct() ) .where( MarketplaceProduct.product_id == update.product_id ) ) stmt_result = await self.session.execute(stmt) marketplace_ids = stmt_result.scalars().all() if not marketplace_ids: continue for marketplace_id in marketplace_ids: updates_dict[marketplace_id].append(update) updates_list = list(updates_dict.items()) updates_list = sorted(updates_list, key=lambda x: x[1]) # Updating DailyStock-s insert_stmt = ( insert( DailyStock ) .values( stock_update_values ) ) insert_stmt = ( insert_stmt.on_conflict_do_update( index_elements=['product_id'], set_={ 'sold_today': DailyStock.sold_today + insert_stmt.excluded.sold_today } ) ) await self.session.execute(insert_stmt) await self.session.commit() for marketplace_id, marketplace_updates in updates_list: await self.update_marketplace(marketplace_id, marketplace_updates)