import time from abc import ABC, abstractmethod from typing import List from sqlalchemy.ext.asyncio import AsyncSession import queries.general from database import Marketplace from marketplaces import MarketplaceApiFactory from marketplaces.base import BaseMarketplaceApi from queries.general import StockData from schemas.general import StockUpdate class BaseMarketplaceUpdater(ABC): marketplace: Marketplace marketplace_api: BaseMarketplaceApi session: AsyncSession def __init__(self, marketplace: Marketplace, session: AsyncSession): self.marketplace = marketplace self.session = session self.marketplace_api = MarketplaceApiFactory.get_marketplace_api(marketplace) @abstractmethod def get_update_for_marketplace(self, stock_data: StockData) -> dict: pass async def update(self, updates: List[StockUpdate]): if not self.marketplace_api: return product_ids = list(set([update.product_id for update in updates])) await self.update_products(product_ids) async def update_products(self, product_ids: list[int]): stock_data_list = await queries.general.get_stocks_data( session=self.session, marketplace=self.marketplace, product_ids=product_ids ) marketplace_updates = [] for stock_data in stock_data_list: marketplace_update = self.get_update_for_marketplace(stock_data) marketplace_updates.append(marketplace_update) await self.marketplace_api.update_stocks(marketplace_updates) async def update_all(self): if not self.marketplace_api: return stock_data_list = await queries.general.get_stocks_data( session=self.session, marketplace=self.marketplace, ) marketplace_updates = [] for stock_data in stock_data_list: marketplace_update = self.get_update_for_marketplace(stock_data) marketplace_updates.append(marketplace_update) await self.marketplace_api.update_stocks(marketplace_updates) async def get_all_stocks(self, only_available: bool) -> List[StockData]: if not self.marketplace_api: return [] stock_data_list = await queries.general.get_stocks_data( session=self.session, marketplace=self.marketplace ) if only_available: stock_data_list = list(filter(lambda x: x["full_stock"] > 0, stock_data_list)) for idx, stock_data in enumerate(stock_data_list): stock_data['product_id'] = stock_data['marketplace_product'].product_id del stock_data["marketplace_product"] stock_data_list[idx] = stock_data return stock_data_list async def reset(self): if not self.marketplace_api: return stock_data_list = await queries.general.get_stocks_data( session=self.session, marketplace=self.marketplace ) marketplace_updates = [] for stock_data in stock_data_list: marketplace_update = self.get_update_for_marketplace(stock_data) marketplace_updates.append(marketplace_update) await self.marketplace_api.update_stocks(marketplace_updates)