diff --git a/backend/config.py b/backend/config.py index 3e0f027..476808d 100644 --- a/backend/config.py +++ b/backend/config.py @@ -3,11 +3,17 @@ import os from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) + +# Database PG_LOGIN = os.environ.get('PG_LOGIN') PG_PASSWORD = os.environ.get('PG_PASSWORD') PG_PORT = os.environ.get('PG_PORT') PG_HOST = os.environ.get('PG_HOST') PG_DATABASE = os.environ.get('PG_DATABASE') +# Celery CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') + +# Yandex +YANDEX_CLIENT_ID = os.environ.get('YANDEX_CLIENT_ID') diff --git a/backend/session.py b/backend/session.py index f6af392..c5a4b43 100644 --- a/backend/session.py +++ b/backend/session.py @@ -15,8 +15,16 @@ database_url = ( f"{PG_LOGIN}:{PG_PASSWORD}@" f"{PG_HOST}:{PG_PORT}/{PG_DATABASE}" ) -engine = create_async_engine(database_url) -session_factory = async_sessionmaker(engine, expire_on_commit=False, autoflush=False, autocommit=False) +engine = create_async_engine(database_url, + pool_size=20, + max_overflow=10 + ) +session_factory = async_sessionmaker( + engine, + expire_on_commit=False, + autoflush=False, + autocommit=False +) async def get_session() -> AsyncGenerator[AsyncSession, None]: diff --git a/background/tasks.py b/background/tasks.py index a95b664..b1f7a17 100644 --- a/background/tasks.py +++ b/background/tasks.py @@ -1,4 +1,5 @@ import asyncio +from typing import List, Union from asgiref.sync import async_to_sync @@ -10,3 +11,15 @@ import background.update def process_update(product_ids: list[int]): loop = asyncio.get_event_loop() return loop.run_until_complete(background.update.process_update(product_ids)) + + +@celery.task(name='update_marketplace') +def update_marketplace(marketplace_id: int): + loop = asyncio.get_event_loop() + return loop.run_until_complete(background.update.update_marketplace(marketplace_id)) + + +@celery.task(name='update_marketplaces') +async def update_marketplaces(marketplace_ids: Union[List[int], None]): + loop = asyncio.get_event_loop() + return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids)) diff --git a/background/update.py b/background/update.py index a04ab83..c074d2d 100644 --- a/background/update.py +++ b/background/update.py @@ -1,14 +1,27 @@ -import time +from typing import Union, List -from backend.session import get_session +from backend.session import session_factory from schemas.general import StockUpdate from updaters.stocks_updater import StocksUpdater async def process_update(product_ids: list[int]): - async for session in get_session(): + async with session_factory() as session: updates = [StockUpdate(product_id=product_id) for product_id in product_ids] updater = StocksUpdater(session) await updater.update(updates) - await session.close() - return {'message': f'Stocks for [{",".join(map(str, product_ids))}] successfully updated'} + return {'message': f'Stocks for products [{",".join(map(str, product_ids))}] successfully updated'} + + +async def update_marketplace(marketplace_id: int): + async with session_factory() as session: + updater = StocksUpdater(session) + await updater.full_update_marketplace(marketplace_id) + return {'message': f'Stocks for marketplace {marketplace_id} successfully updated'} + + +async def update_marketplaces(marketplace_ids: Union[List[int], None]): + async with session_factory() as session: + updater = StocksUpdater(session) + await updater.full_update_all_marketplaces(marketplace_ids) + return {'message': f'Stocks for marketplaces [{",".join(map(str, marketplace_ids))}] successfully updated'} diff --git a/database/sipro/models/general.py b/database/sipro/models/general.py index e33d5db..0cfcff1 100644 --- a/database/sipro/models/general.py +++ b/database/sipro/models/general.py @@ -16,6 +16,9 @@ class Company(BaseSiproModel): is_denco: Mapped[bool] = mapped_column() balance: Mapped[int] = mapped_column() + is_deleted: Mapped[bool] = mapped_column() + is_archived: Mapped[bool] = mapped_column() + class CompanyWarehouse(BaseSiproModel): __tablename__ = 'company_warehouses' @@ -49,9 +52,12 @@ class Marketplace(BaseSiproModel): sell_blocks: Mapped[bool] = mapped_column() sell_warehouse_products: Mapped[bool] = mapped_column() sell_from_price: Mapped[bool] = mapped_column() + is_deleted: Mapped[bool] = mapped_column() warehouses: Mapped[List["Warehouse"]] = relationship(secondary=marketplace_warehouses) warehouse_id: Mapped[str] = mapped_column() company_id: Mapped[int] = mapped_column(ForeignKey('companies.id')) company: Mapped["Company"] = relationship() + + campaign_id: Mapped[str] = mapped_column() diff --git a/main.py b/main.py index 758437a..6073fb1 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ from starlette.responses import JSONResponse import background.tasks from background.tasks import * -from schemas.general import UpdateRequest, UpdateResponse +from schemas.general import UpdateRequest, UpdateResponse, UpdateMarketplaceRequest, UpdateMarketplacesRequest auth_schema = HTTPBearer() @@ -34,6 +34,22 @@ async def update( return UpdateResponse(task_id=task.id) +@app.post('/update/marketplace') +async def update_marketplace( + request: UpdateMarketplaceRequest +): + task = background.tasks.update_marketplace.delay(request.marketplace_id) + return UpdateResponse(task_id=task.id) + + +@app.post('/update/marketplaces') +async def update_marketplace( + request: UpdateMarketplacesRequest +): + task = background.tasks.update_marketplaces.delay(request.marketplace_ids) + return UpdateResponse(task_id=task.id) + + @app.get("/tasks/{task_id}") def get_status(task_id): task_result = AsyncResult(task_id) diff --git a/marketplaces/base.py b/marketplaces/base.py index 5e501cd..1d73d77 100644 --- a/marketplaces/base.py +++ b/marketplaces/base.py @@ -28,6 +28,7 @@ class BaseMarketplaceApi(ABC): async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'], method: str, data: dict) -> ClientResponse: + return async with aiohttp.ClientSession() as session: return await session.request(http_method, f'{self.api_url}{method}', diff --git a/marketplaces/factory.py b/marketplaces/factory.py index c45b653..b34a6ff 100644 --- a/marketplaces/factory.py +++ b/marketplaces/factory.py @@ -4,6 +4,7 @@ from database import Marketplace from database.sipro.enums.general import BaseMarketplace from .wildberries import WildberriesMarketplaceApi from .ozon import OzonMarketplaceApi +from .yandexmarket import YandexmarketMarketplaceApi class MarketplaceApiFactory: @@ -11,9 +12,12 @@ class MarketplaceApiFactory: def get_marketplace_api(marketplace: Marketplace) -> Union[ WildberriesMarketplaceApi, OzonMarketplaceApi, + YandexmarketMarketplaceApi ]: match marketplace.base_marketplace: case BaseMarketplace.OZON: return OzonMarketplaceApi(marketplace) case BaseMarketplace.WILDBERRIES: return WildberriesMarketplaceApi(marketplace) + case BaseMarketplace.YANDEX_MARKET: + return YandexmarketMarketplaceApi(marketplace) diff --git a/marketplaces/yandexmarket.py b/marketplaces/yandexmarket.py new file mode 100644 index 0000000..c839565 --- /dev/null +++ b/marketplaces/yandexmarket.py @@ -0,0 +1,58 @@ +import asyncio +import json +import logging +from typing import Union + +from backend.config import YANDEX_CLIENT_ID +from database import Marketplace +from limiter import BatchLimiter +from marketplaces.base import BaseMarketplaceApi +from utils import chunk_list + + +class YandexmarketMarketplaceApi(BaseMarketplaceApi): + def __init__(self, marketplace: Marketplace): + self.marketplace = marketplace + auth_data = json.loads(marketplace.auth_data) + access_token = auth_data.get('accessToken') + + self.headers = { + 'Authorization': f'OAuth oauth_token="{access_token}", oauth_client_id="{YANDEX_CLIENT_ID}"' + } + + def get_headers(self): + return self.headers + + @property + def api_url(self): + return 'https://api.partner.market.yandex.ru/v2' + + async def update_stocks(self, data: Union[list, dict]): + if type(data) is not list: + return + campaign_id = self.marketplace.campaign_id + max_stocks = 2000 + chunks = chunk_list(data, max_stocks) + limiter = BatchLimiter(max_requests=50, period=60) + + async def send_stock_chunk(chunk): + try: + await limiter.acquire() + request_data = { + 'skus': chunk + } + response = await self._method('PUT', + f'/campaigns/{campaign_id}/offers/stocks', + data=request_data) + print(request_data) + rsp = await response.json() + print(rsp) + if response.status != 200: + logging.warning( + f'Error occurred when sending stocks to [{self.marketplace.id}]') + except Exception as e: + logging.error( + f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') + + tasks = [send_stock_chunk(chunk) for chunk in chunks] + await asyncio.gather(*tasks) diff --git a/queries/general.py b/queries/general.py index 134e633..1f70e87 100644 --- a/queries/general.py +++ b/queries/general.py @@ -289,10 +289,6 @@ async def get_stocks_data( slaves_stock_subquery.c.product_id == MarketplaceProduct.product_id ) ) - print('-------------------------') - print(stmt.compile(compile_kwargs={ - 'literal_binds': True - })) result = await session.execute(stmt) marketplace_products = result.all() response: List[StockData] = [] diff --git a/schemas/general.py b/schemas/general.py index bab97b9..118ea75 100644 --- a/schemas/general.py +++ b/schemas/general.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Union, List from pydantic import BaseModel @@ -13,7 +14,15 @@ class BaseSchema(BaseModel): class UpdateRequest(BaseSchema): - product_ids: list[int] + product_ids: List[int] + + +class UpdateMarketplaceRequest(BaseSchema): + marketplace_id: int + + +class UpdateMarketplacesRequest(BaseSchema): + marketplace_ids: Union[List[int], None] = None class UpdateResponse(BaseSchema): diff --git a/test.py b/test.py index 68a4aa6..8784e14 100644 --- a/test.py +++ b/test.py @@ -1,7 +1,509 @@ -a = [ - (1, [1, 4, 5, ]), - (2, [1, 2, 3, 4, 5, ]), -] +import asyncio +import time -updates_list = sorted(a, key=lambda x: x[1], reverse=True) -print(updates_list) +from background.update import process_update + +A = [46806, + 50457, + 5294, + 50455, + 1263604, + 3153, + 1281166, + 60075, + 1266284, + 1255288, + 1430906, + 1430905, + 1430909, + 1430911, + 1430908, + 1233617, + 1274809, + 1269081, + 1216052, + 5281, + 1240583, + 1215881, + 1262205, + 1263362, + 1260217, + 1269043, + 1261160, + 1262930, + 1199011, + 1244985, + 1256197, + 1259971, + 1275109, + 1277923, + 1266093, + 1269905, + 1262068, + 1272526, + 1269918, + 1272247, + 14802, + 1690206, + 1265012, + 1268765, + 1258451, + 1199922, + 1264652, + 1206165, + 1276134, + 1276758, + 1277343, + 1277346, + 1262974, + 1323227, + 1280821, + 1276512, + 1688415, + 1274390, + 1273690, + 1268750, + 1268054, + 1266939, + 1262254, + 1262238, + 1261898, + 1260348, + 1258610, + 1256746, + 1255706, + 1255696, + 1255692, + 1255500, + 1255496, + 1677398, + 1255053, + 1255052, + 1255027, + 1255024, + 1255020, + 1254967, + 1254966, + 1254964, + 1254751, + 1254750, + 1253996, + 1253739, + 1252160, + 1252157, + 1242300, + 1256578, + 1256194, + 1255177, + 1278124, + 1267797, + 1266537, + 1277673, + 1235438, + 1256819, + 62484, + 1260874, + 1257620, + 1273799, + 53141, + 1257730, + 1272776, + 1268129, + 1257068, + 1256919, + 1234979, + 1254460, + 1254578, + 1258563, + 1253242, + 1263602, + 1254576, + 1255015, + 1254981, + 1253646, + 1261321, + 1273813, + 1255242, + 1274598, + 1214897, + 1273279, + 2172, + 1231462, + 1257310, + 1253645, + 1242885, + 1747377, + 1233273, + 1220800, + 1257333, + 1256133, + 1253956, + 1663, + 1771, + 1272628, + 1270851, + 57977, + 1257159, + 1258366, + 1274675, + 1228267, + 1267046, + 1277962, + 1255028, + 1273500, + 57994, + 58014, + 57995, + 1425848, + 1255218, + 1255032, + 1257132, + 1276552, + 1272983, + 1272984, + 1270101, + 1257311, + 1266716, + 1255420, + 1265283, + 1269404, + 1265309, + 50749, + 57941, + 62110, + 1265942, + 1255744, + 52862, + 1274446, + 41851, + 1263009, + 1253446, + 4633, + 21392, + 1264372, + 1264231, + 53052, + 1264656, + 1262555, + 1262009, + 1265287, + 1278725, + 1268653, + 1268652, + 1268651, + 1278044, + 1353805, + 1270730, + 1253802, + 1255266, + 1230815, + 1269405, + 1274444, + 1274445, + 1205670, + 1268767, + 1268761, + 1255679, + 1252820, + 1276125, + 1253954, + 1268790, + 1268789, + 1254508, + 1254507, + 1271531, + 1430596, + 1430901, + 1265975, + 1201855, + 1237377, + 1290777, + 1290779, + 1290845, + 1301023, + 1313042, + 1313044, + 1486866, + 1486865, + 1482040, + 1482039, + 1486868, + 1677395, + 1693204, + 1785113, + 1730179, + 1746619, + 57778, + 58549, + 1274911, + 5238, + 59939, + 48130, + 51725, + 3324, + 3315, + 42937, + 1250692, + 1691268, + 65691, + 65679, + 1266715, + 1261585, + 1547, + 58237, + 5267, + 13728, + 57985, + 1430910, + 46996, + 49071, + 49778, + 1199528, + 18420, + 1357948, + 13436, + 1201667, + 1371508, + 60577, + 47106, + 1213493, + 62574, + 46808, + 51130, + 3100, + 47108, + 52536, + 1272396, + 1266321, + 1280034, + 1409383, + 1272575, + 1246252, + 1267713, + 1265479, + 4726, + 1268028, + 1277070, + 1278743, + 1254882, + 1270158, + 1254749, + 1255416, + 1253839, + 1268655, + 1750701, + 3173, + 1725771, + 4646, + 1801937, + 1837677, + 1442421, + 1811379, + 1821818, + 1838555, + 1752948, + 1579762, + 61747, + 1838556, + 5087, + 1838552, + 1270564, + 1838547, + 1815657, + 1821809, + 1752947, + 1821806, + 1546488, + 1536001, + 1838554, + 1752946, + 1562500, + 1838698, + 1821821, + 1815220, + 1213805, + 1821796, + 1546102, + 1173684, + 1849044, + 1821776, + 61453, + 61465, + 1694439, + 1548447, + 1796332, + 1204445, + 1266798, + 1236596, + 1252685, + 1252631, + 1548800, + 1838561, + 1810009, + 1546753, + 1838562, + 61439, + 61466, + 1811405, + 1811396, + 21475, + 1821764, + 1812378, + 1548797, + 1838553, + 37, + 517, + 49124, + 1814818, + 1546149, + 58180, + 1837889, + 4667, + 61354, + 1821769, + 61595, + 1694473, + 1821802, + 4370, + 48406, + 1838559, + 1778238, + 1546760, + 1838558, + 1821752, + 61429, + 754, + 1265, + 1837956, + 1821800, + 1821747, + 1546834, + 16675, + 1214135, + 1791293, + 61341, + 1837887, + 47168, + 1224282, + 1938274, + 1837886, + 1254630, + 1546849, + 49122, + 4825, + 43589, + 1838560, + 1687827, + 1838551, + 4807, + 1694419, + 1548750, + 53220, + 2032683, + 1822337, + 1821728, + 1260032, + 61600, + 1546423, + 1811399, + 49121, + 519, + 1821730, + 1247748, + 4717, + 1808931, + 43583, + 43584, + 1838692, + 1268073, + 1821741, + 1845065, + 1838699, + 1200388, + 1937568, + 4359, + 45049, + 66803, + 1814848, + 49667, + 1821737, + 1821713, + 4156, + 1546421, + 1546419, + 1789644, + 1270222, + 1821717, + 1812381, + 1821742, + 1254254, + 3691, + 1546424, + 1259, + 1253389, + 1457, + 1838557, + 1224277, + 1838548, + 4824, + 1814870, + 1562490, + 1249319, + 1937717, + 1838549, + 4411, + 1562476, + 59199, + 1845056, + 4028, + 1822211, + 47056, + 1201201, + 62192, + 1815646, + 1694418, + 1546549, + 1254472, + 1814112, + 1258777, + 2032682, + 1546763, + 1658549, + 1838546, + 61758, + 1744312, + 1518567, + 1271323, + 1814821, + 4412, + 1938272, + 2032521, + 4196, + 57643, + 49126, + 49127, + 1815644, + 50812, + 1821642, + 1823200, + 66933, + 1268915, + 1837969, + 53209, + 1546794, + 1272267, + 47147, + 43512, + 1849014, + 1838550, + 1368860, + 2609, + 1821727, + 2021536, + 1276589, + 1936567, + 1269506 + ] +start = time.time() +asyncio.run(process_update(A)) +print(time.time() - start) diff --git a/test_main.http b/test_main.http deleted file mode 100644 index a2d81a9..0000000 --- a/test_main.http +++ /dev/null @@ -1,11 +0,0 @@ -# Test your FastAPI endpoints - -GET http://127.0.0.1:8000/ -Accept: application/json - -### - -GET http://127.0.0.1:8000/hello/User -Accept: application/json - -### diff --git a/updaters/base.py b/updaters/base.py index 2bb5e92..bedd449 100644 --- a/updaters/base.py +++ b/updaters/base.py @@ -1,3 +1,4 @@ +import time from abc import ABC, abstractmethod from typing import List @@ -33,7 +34,17 @@ class BaseMarketplaceUpdater(ABC): marketplace=self.marketplace, product_ids=product_ids ) - return + marketplace_updates = [] + for stock_data in stock_data_list: + marketplace_update = self.get_update_for_marketplace(stock_data) + marketplace_updates.append(marketplace_update) + await self.marketplace_api.update_stocks(marketplace_updates) + + async def update_all(self): + stock_data_list = await queries.general.get_stocks_data( + session=self.session, + marketplace=self.marketplace, + ) marketplace_updates = [] for stock_data in stock_data_list: marketplace_update = self.get_update_for_marketplace(stock_data) diff --git a/updaters/factory.py b/updaters/factory.py index 19d20eb..5c336d5 100644 --- a/updaters/factory.py +++ b/updaters/factory.py @@ -6,13 +6,20 @@ from database import Marketplace from database.sipro.enums.general import BaseMarketplace from updaters.ozon_updater import OzonUpdater from updaters.wildberries_updater import WildberriesUpdater +from updaters.yandexmarket_updater import YandexmarketUpdater class UpdaterFactory: @staticmethod - def get_updater(session: AsyncSession, marketplace: Marketplace) -> Union[OzonUpdater, WildberriesUpdater]: + def get_updater(session: AsyncSession, marketplace: Marketplace) -> Union[ + OzonUpdater, + WildberriesUpdater, + YandexmarketUpdater + ]: match marketplace.base_marketplace: case BaseMarketplace.WILDBERRIES: return WildberriesUpdater(marketplace, session) case BaseMarketplace.OZON: return OzonUpdater(marketplace, session) + case BaseMarketplace.YANDEX_MARKET: + return YandexmarketUpdater(marketplace, session) diff --git a/updaters/stocks_updater.py b/updaters/stocks_updater.py index 17a1efc..aaab852 100644 --- a/updaters/stocks_updater.py +++ b/updaters/stocks_updater.py @@ -1,13 +1,16 @@ import asyncio +import time from collections import defaultdict from enum import unique, IntEnum -from typing import List +from typing import List, Union -from sqlalchemy import select +from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import joinedload +from sqlalchemy.orm import joinedload, selectinload +from backend.session import get_session, session_factory from database import Marketplace, MarketplaceProduct, Warehouse, Company +from database.sipro.enums.general import BaseMarketplace from schemas.general import StockUpdate from updaters.factory import UpdaterFactory @@ -24,24 +27,77 @@ class StocksUpdater: ]) return marketplace - async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]): + async def get_marketplaces(self, marketplace_ids: Union[list[int], None] = None) -> List[Marketplace]: + if not marketplace_ids: + marketplace_ids = [] + stmt = ( + select( + Marketplace + ) + .join( + Company + ) + .options( + selectinload(Marketplace.warehouses).selectinload(Warehouse.suppliers), + selectinload(Marketplace.warehouses).selectinload(Warehouse.company_warehouses), + joinedload(Marketplace.company).joinedload(Company.warehouse) + ) + .where( + Company.is_deleted == False, + Company.is_archived == False, + Marketplace.is_deleted == False, + Marketplace.base_marketplace.in_([ + BaseMarketplace.OZON, + BaseMarketplace.WILDBERRIES, + BaseMarketplace.YANDEX_MARKET + ]), + or_( + marketplace_ids == [], + Marketplace.id.in_(marketplace_ids) + ) + ) + ) + query_result = await self.session.scalars(stmt) + return query_result.all() + + async def full_update_marketplace(self, marketplace_id: int): marketplace = await self.get_marketplace(marketplace_id) updater = UpdaterFactory.get_updater(self.session, marketplace) - if not updater: - return - await updater.update(updates) + await updater.update_all() + + async def full_update_all_marketplaces(self, marketplace_ids: Union[List[int], None] = None): + marketplaces = await self.get_marketplaces(marketplace_ids) + + async def update_marketplace(marketplace): + async with session_factory() as session: + updater = UpdaterFactory.get_updater(session, marketplace) + await updater.update_all() + + tasks = [update_marketplace(marketplace) for marketplace in marketplaces] + await asyncio.gather(*tasks) + + async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]): + marketplace = await self.get_marketplace(marketplace_id) + async with session_factory() as session: + updater = UpdaterFactory.get_updater(session, marketplace) + if not updater: + return + await updater.update(updates) async def update(self, updates: list[StockUpdate]): updates_dict = defaultdict(list) for update in updates: - # Working with marketplaces stmt = ( select( MarketplaceProduct.marketplace_id.distinct() ) + .join(Marketplace) + .join(Company) .where( MarketplaceProduct.product_id == update.product_id, - MarketplaceProduct.marketplace_id.in_([9, 41]) + Marketplace.is_deleted == False, + Company.is_deleted == False, + Company.is_archived == False ) ) stmt_result = await self.session.execute(stmt) @@ -52,7 +108,8 @@ class StocksUpdater: updates_dict[marketplace_id].append(update) updates_list = list(updates_dict.items()) updates_list = sorted(updates_list, key=lambda x: len(x[1])) - + print(updates_list) + return tasks = [] for marketplace_id, marketplace_updates in updates_list: tasks.append(self.update_marketplace(marketplace_id, marketplace_updates)) diff --git a/updaters/yandexmarket_updater.py b/updaters/yandexmarket_updater.py new file mode 100644 index 0000000..7e0e693 --- /dev/null +++ b/updaters/yandexmarket_updater.py @@ -0,0 +1,13 @@ +from queries.general import StockData +from updaters.base import BaseMarketplaceUpdater + + +class YandexmarketUpdater(BaseMarketplaceUpdater): + def get_update_for_marketplace(self, stock_data: StockData) -> dict: + return { + 'sku': str(stock_data['article']), + 'warehouseId': self.marketplace.warehouse_id, + 'items': [{ + 'count': stock_data['full_stock'], + }] + }