diff --git a/background/tasks.py b/background/tasks.py index 936ff07..6e42cce 100644 --- a/background/tasks.py +++ b/background/tasks.py @@ -25,6 +25,11 @@ async def update_marketplaces(marketplace_ids: list[int]): return await background.update.update_marketplaces(marketplace_ids) +@taskiq_broker.task(task_name='reset_marketplace') +async def reset_marketplace(marketplace_id: int): + return await background.update.reset_marketplace(marketplace_id) + + @taskiq_broker.task(schedule=[{"cron": "* * * * *"}]) async def flush_buffer(): try: diff --git a/background/update.py b/background/update.py index 3b6d939..7dce923 100644 --- a/background/update.py +++ b/background/update.py @@ -33,3 +33,9 @@ async def update_marketplaces(marketplace_ids: Union[List[int], None]): updater = StocksUpdater(session) await updater.full_update_all_marketplaces(marketplace_ids) logging.info(f'Marketplaces {marketplace_ids} successfully updated') + +async def reset_marketplace(marketplace_id:int): + async with session_factory() as session: + updater = StocksUpdater(session) + await updater.reset_marketplace(marketplace_id) + logging.info(f'Marketplace {marketplace_id} successfully reset') diff --git a/main.py b/main.py index b0b8b49..d3e07f6 100644 --- a/main.py +++ b/main.py @@ -100,6 +100,14 @@ async def update_marketplace( return UpdateResponse(task_id=task.task_id) +@app.post('/reset/marketplace') +async def reset_marketplace( + request: ResetMarketplaceRequest +): + task = await background.tasks.reset_marketplace.kiq(request.marketplace_id) + return UpdateResponse(task=task.task_id) + + @app.get("/tasks/{task_id}") def get_status(task_id): task_result = AsyncResult(task_id) diff --git a/queries/general.py b/queries/general.py index 85e788d..ec0f01d 100644 --- a/queries/general.py +++ b/queries/general.py @@ -36,7 +36,8 @@ def get_marketplace_suppliers_and_company_warehouses(marketplace: Marketplace): async def get_stocks_data( session: AsyncSession, marketplace: Marketplace, - product_ids: Union[list[int], None] = None + product_ids: Union[list[int], None] = None, + reset: bool = False ) -> List[StockData]: if not product_ids: product_ids = [] @@ -302,7 +303,7 @@ async def get_stocks_data( slaves_stock, price_recommended, is_archived) in marketplace_products: - if is_archived or (sell_from_price > price_recommended) or is_paused: + if is_archived or (sell_from_price > price_recommended) or is_paused or reset: response.append({ 'article': denco_article, 'full_stock': 0, diff --git a/schemas/general.py b/schemas/general.py index eb440f4..c8ed24e 100644 --- a/schemas/general.py +++ b/schemas/general.py @@ -29,5 +29,9 @@ class UpdateMarketplacesRequest(BaseSchema): marketplace_ids: Union[List[int], None] = None +class ResetMarketplaceRequest(BaseSchema): + marketplace_id: int + + class UpdateResponse(BaseSchema): task_id: str diff --git a/updaters/base.py b/updaters/base.py index 51e7ecc..04954d6 100644 --- a/updaters/base.py +++ b/updaters/base.py @@ -57,3 +57,16 @@ class BaseMarketplaceUpdater(ABC): marketplace_update = self.get_update_for_marketplace(stock_data) marketplace_updates.append(marketplace_update) await self.marketplace_api.update_stocks(marketplace_updates) + + async def reset(self): + if not self.marketplace_api: + return + stock_data_list = await queries.general.get_stocks_data( + session=self.session, + marketplace=self.marketplace + ) + marketplace_updates = [] + for stock_data in stock_data_list: + marketplace_update = self.get_update_for_marketplace(stock_data) + marketplace_updates.append(marketplace_update) + await self.marketplace_api.update_stocks(marketplace_updates) diff --git a/updaters/stocks_updater.py b/updaters/stocks_updater.py index 97f060b..1b8a166 100644 --- a/updaters/stocks_updater.py +++ b/updaters/stocks_updater.py @@ -132,3 +132,11 @@ class StocksUpdater: for marketplace_id, marketplace_updates in updates_list: tasks.append(self.update_marketplace(marketplace_id, marketplace_updates)) await asyncio.gather(*tasks) + + async def reset_marketplace(self, marketplace_id:int): + marketplace = await self.get_marketplace(marketplace_id) + start = time.time() + updater = UpdaterFactory.get_updater(self.session,marketplace) + await updater.reset() + logging.info( + f"{marketplace.name} successfully updated in {round(time.time() - start, 2)} seconds.")