import asyncio import json import logging import time import redis.asyncio from collections import defaultdict from typing import List, Union from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, selectinload from backend.config import REDIS_URL from backend.session import session_factory from database import Marketplace, MarketplaceProduct, Warehouse, Company from database.sipro.enums.general import BaseMarketplace from queries.general import StockData from schemas.general import StockUpdate from updaters.factory import UpdaterFactory class StocksUpdater: def __init__(self, session: AsyncSession): self.session = session async def get_marketplace(self, marketplace_id: int) -> Marketplace: marketplace = await self.session.get(Marketplace, marketplace_id, options=[ joinedload(Marketplace.warehouses).joinedload(Warehouse.suppliers), joinedload(Marketplace.warehouses).joinedload(Warehouse.company_warehouses), joinedload(Marketplace.company).joinedload(Company.warehouse) ]) return marketplace async def get_marketplaces(self, marketplace_ids: Union[list[int], None] = None) -> List[Marketplace]: if not marketplace_ids: marketplace_ids = [] stmt = ( select( Marketplace ) .join( Company ) .options( selectinload(Marketplace.warehouses).selectinload(Warehouse.suppliers), selectinload(Marketplace.warehouses).selectinload(Warehouse.company_warehouses), joinedload(Marketplace.company).joinedload(Company.warehouse) ) .where( Company.is_deleted == False, Company.is_archived == False, Marketplace.is_deleted == False, Marketplace.is_paused == False, Marketplace.send_stocks == True, Marketplace.base_marketplace.in_([ BaseMarketplace.OZON, BaseMarketplace.WILDBERRIES, BaseMarketplace.YANDEX_MARKET ]), or_( marketplace_ids == [], Marketplace.id.in_(marketplace_ids) ) ) ) query_result = await self.session.scalars(stmt) return query_result.all() async def full_update_marketplace(self, marketplace_id: int): marketplace = await self.get_marketplace(marketplace_id) if not marketplace.send_stocks: return start = time.time() updater = UpdaterFactory.get_updater(self.session, marketplace) await updater.update_all() logging.info( f"{marketplace.name} successfully fully updated in {round(time.time() - start, 2)} seconds.") async def get_all_stocks_for_marketplace(self, marketplace_id: int, only_available: bool) -> List[StockData]: cache_key = f"marketplace_stocks_{marketplace_id}_{only_available}" client = redis.asyncio.from_url(REDIS_URL) if cached := await client.get(cache_key): return json.loads(cached) marketplace = await self.get_marketplace(marketplace_id) updater = UpdaterFactory.get_updater(self.session, marketplace) response = await updater.get_all_stocks(only_available) await client.set(cache_key, json.dumps(response), ex=600) return response async def full_update_all_marketplaces(self, marketplace_ids: Union[List[int], None] = None): marketplaces = await self.get_marketplaces(marketplace_ids) async def update_marketplace(marketplace): async with session_factory() as session: start = time.time() updater = UpdaterFactory.get_updater(session, marketplace) await updater.update_all() logging.info( f"{marketplace.name} successfully fully updated in {round(time.time() - start, 2)} seconds.") tasks = [update_marketplace(marketplace) for marketplace in marketplaces] await asyncio.gather(*tasks) async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]): marketplace = await self.get_marketplace(marketplace_id) async with session_factory() as session: start = time.time() updater = UpdaterFactory.get_updater(session, marketplace) if not updater: return await updater.update(updates) logging.info( f"Successfully uploaded {len(updates)} updates to {marketplace.name} in {round(time.time() - start, 2)} seconds.") async def update_marketplace_products(self, marketplace_id: int, product_ids: list[int]): marketplace = await self.get_marketplace(marketplace_id) if not marketplace.send_stocks: return start = time.time() updater = UpdaterFactory.get_updater(self.session, marketplace) await updater.update_products(product_ids) logging.info( f"Successfully updated {len(product_ids)} products for {marketplace.name} in {round(time.time() - start, 2)} seconds.") async def update(self, updates: list[StockUpdate]): updates_dict = defaultdict(list) for update in updates: stmt = ( select( MarketplaceProduct.marketplace_id.distinct() ) .join(Marketplace) .join(Company) .where( MarketplaceProduct.product_id == update.product_id, Marketplace.is_deleted == False, Company.is_deleted == False, Company.is_archived == False, Marketplace.is_paused == False, Marketplace.send_stocks == True ) ) stmt_result = await self.session.execute(stmt) marketplace_ids = stmt_result.scalars().all() if not marketplace_ids: continue for marketplace_id in marketplace_ids: updates_dict[marketplace_id].append(update) updates_list = list(updates_dict.items()) updates_list = sorted(updates_list, key=lambda x: len(x[1])) tasks = [] for marketplace_id, marketplace_updates in updates_list: tasks.append(self.update_marketplace(marketplace_id, marketplace_updates)) await asyncio.gather(*tasks) async def reset_marketplace(self, marketplace_id: int): marketplace = await self.get_marketplace(marketplace_id) start = time.time() updater = UpdaterFactory.get_updater(self.session, marketplace) await updater.reset() logging.info( f"{marketplace.name} successfully updated in {round(time.time() - start, 2)} seconds.")