diff --git a/alembic/env.py b/alembic/env.py index 1588fd1..0b82da9 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -23,7 +23,6 @@ if config.config_file_name is not None: # from myapp import mymodel # target_metadata = mymodel.Base.metadata target_metadata = BaseModel.metadata -print(target_metadata.schema) # other values from the config, defined by the needs of env.py, diff --git a/backend/config.py b/backend/config.py index 476808d..23fcbf3 100644 --- a/backend/config.py +++ b/backend/config.py @@ -17,3 +17,6 @@ CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') # Yandex YANDEX_CLIENT_ID = os.environ.get('YANDEX_CLIENT_ID') + +# Security +API_KEY = os.environ.get('API_KEY') diff --git a/backend/session.py b/backend/session.py index c5a4b43..5fd22d1 100644 --- a/backend/session.py +++ b/backend/session.py @@ -15,10 +15,12 @@ database_url = ( f"{PG_LOGIN}:{PG_PASSWORD}@" f"{PG_HOST}:{PG_PORT}/{PG_DATABASE}" ) -engine = create_async_engine(database_url, - pool_size=20, - max_overflow=10 - ) +engine = create_async_engine( + database_url, + pool_size=20, + max_overflow=10, + pool_timeout=1000 +) session_factory = async_sessionmaker( engine, expire_on_commit=False, diff --git a/background/tasks.py b/background/tasks.py index b1f7a17..3e16b20 100644 --- a/background/tasks.py +++ b/background/tasks.py @@ -1,10 +1,8 @@ import asyncio from typing import List, Union -from asgiref.sync import async_to_sync - -from background import celery import background.update +from background import celery @celery.task(name='process_update') @@ -20,6 +18,6 @@ def update_marketplace(marketplace_id: int): @celery.task(name='update_marketplaces') -async def update_marketplaces(marketplace_ids: Union[List[int], None]): +def update_marketplaces(marketplace_ids: Union[List[int], None]): loop = asyncio.get_event_loop() return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids)) diff --git a/limiter/batch_limiter.py b/limiter/batch_limiter.py index bf205e9..df730e3 100644 --- a/limiter/batch_limiter.py +++ b/limiter/batch_limiter.py @@ -1,29 +1,70 @@ import asyncio from datetime import datetime +from redis import asyncio as aioredis + +import backend.config + + +class RedisConnectionManager: + _redis_connection = None + _redis_lock = asyncio.Lock() + + @classmethod + async def get_redis_connection(cls): + async with cls._redis_lock: + if cls._redis_connection is None: + cls._redis_connection = await aioredis.from_url(backend.config.CELERY_BROKER_URL) + return cls._redis_connection + class BatchLimiter: - def __init__(self, max_requests, period): - self.max_requests = max_requests - self.period = period - self.current_requests = 0 - self.start_time = None + def __init__(self): self.lock = asyncio.Lock() - async def acquire(self): - async with self.lock: - if self.current_requests == 0: - self.start_time = datetime.now() + async def acquire(self, key, max_requests, period): + redis = await RedisConnectionManager.get_redis_connection() + while True: + async with redis.lock(f"{key}_lock"): + try: + start_time = await redis.get(f"{key}:start_time") + if start_time: + start_time = datetime.fromisoformat(start_time.decode()) + current_requests = await redis.get(key) + current_requests = int(current_requests) if current_requests else 0 - if self.current_requests < self.max_requests: - self.current_requests += 1 - return + if start_time: + elapsed_time = (datetime.now() - start_time).total_seconds() + else: + elapsed_time = period - elapsed_time = (datetime.now() - self.start_time).total_seconds() - if elapsed_time < self.period: - await asyncio.sleep(self.period - elapsed_time) - self.current_requests = 1 - self.start_time = datetime.now() - else: - self.current_requests = 1 - self.start_time = datetime.now() + if elapsed_time >= period: + await redis.set(key, 1) + await redis.set(f"{key}:start_time", datetime.now().isoformat()) + return + else: + if current_requests < max_requests: + await redis.incr(key) + return + else: + await asyncio.sleep(period - elapsed_time) + await redis.set(key, 1) + await redis.set(f"{key}:start_time", datetime.now().isoformat()) + except aioredis.RedisError as e: + print(f"Redis error: {e}") + await asyncio.sleep(1) + + async def acquire_wildberries(self, key): + max_requests = 300 + period = 60 + await self.acquire('wildberries:' + key, max_requests, period) + + async def acquire_ozon(self, key): + max_requests = 80 + period = 60 + await self.acquire('ozon:' + key, max_requests, period) + + async def acquire_yandexmarket(self, key): + max_requests = 50 + period = 60 + await self.acquire('yandexmarket:' + key, max_requests, period) diff --git a/main.py b/main.py index 6073fb1..96364e3 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from starlette import status from starlette.responses import JSONResponse +import backend.config import background.tasks from background.tasks import * from schemas.general import UpdateRequest, UpdateResponse, UpdateMarketplaceRequest, UpdateMarketplacesRequest @@ -14,7 +15,7 @@ auth_schema = HTTPBearer() async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth_schema)]): - if token.credentials != 'vvHh1QNl7lS6c7OVwmxU1TVNd7DLlc9W810csZGf4rkqOrBy6fQwlhIDZsQZd9hQYZYK47yWv33aCq': + if token.credentials != backend.config.API_KEY: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials') diff --git a/marketplaces/ozon.py b/marketplaces/ozon.py index 1c45a31..677f764 100644 --- a/marketplaces/ozon.py +++ b/marketplaces/ozon.py @@ -14,9 +14,11 @@ class OzonMarketplaceApi(BaseMarketplaceApi): def __init__(self, marketplace: Marketplace): self.marketplace = marketplace auth_data = json.loads(marketplace.auth_data) + client_id = auth_data.get('clientId') + self.limiter_key = str(marketplace.company_id) + str(client_id) self.headers = { - 'Client-Id': auth_data.get('clientId'), + 'Client-Id': client_id, 'Api-Key': auth_data.get('clientToken') } @@ -32,14 +34,13 @@ class OzonMarketplaceApi(BaseMarketplaceApi): return max_stocks = 100 chunks = utils.chunk_list(data, max_stocks) - limiter = BatchLimiter(max_requests=80, period=60) + limiter = BatchLimiter() async def send_stock_chunk(chunk): try: - await limiter.acquire() + await limiter.acquire_ozon(self.limiter_key) request_data = {'stocks': chunk} response = await self._method('POST', '/v2/products/stocks', data=request_data) - print(request_data) response = await response.json() error_message = response.get('message') error_code = response.get('code') @@ -48,7 +49,7 @@ class OzonMarketplaceApi(BaseMarketplaceApi): f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})') except Exception as e: logging.error( - f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') + f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') tasks = [send_stock_chunk(chunk) for chunk in chunks] await asyncio.gather(*tasks) diff --git a/marketplaces/wildberries.py b/marketplaces/wildberries.py index 5bd87d9..2901ea8 100644 --- a/marketplaces/wildberries.py +++ b/marketplaces/wildberries.py @@ -3,6 +3,8 @@ import json import logging from typing import Union +import jwt + import utils from database import Marketplace from limiter import BatchLimiter @@ -13,9 +15,16 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi): def __init__(self, marketplace: Marketplace): self.marketplace = marketplace auth_data = json.loads(marketplace.auth_data) + token = auth_data.get('token') + try: + decoded_token = jwt.decode(token, algorithms=["HS256"], options={"verify_signature": False}) + except Exception: + logging.error(f"Couldn't decode token for {marketplace.id}") + return None + self.limiter_key = str(marketplace.company_id) + str(decoded_token.get('sid')) self.headers = { - 'Authorization': auth_data.get('token'), + 'Authorization': token, 'Content-Type': 'application/json' } @@ -31,15 +40,14 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi): return max_stocks = 1000 chunks = utils.chunk_list(data, max_stocks) - limiter = BatchLimiter(max_requests=300, period=60) + limiter = BatchLimiter() async def send_stock_chunk(chunk): try: - await limiter.acquire() + await limiter.acquire_wildberries(self.limiter_key) request_data = {'stocks': chunk} response = await self._method('PUT', f'/api/v3/stocks/{self.marketplace.warehouse_id}', data=request_data) - print(request_data) if response.status not in [204, 409]: response = await response.json() error_message = response.get('message') diff --git a/marketplaces/yandexmarket.py b/marketplaces/yandexmarket.py index c839565..3813616 100644 --- a/marketplaces/yandexmarket.py +++ b/marketplaces/yandexmarket.py @@ -16,6 +16,8 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi): auth_data = json.loads(marketplace.auth_data) access_token = auth_data.get('accessToken') + self.limiter_key = str(marketplace.company_id) + str(access_token) + str(self.marketplace.campaign_id) + self.headers = { 'Authorization': f'OAuth oauth_token="{access_token}", oauth_client_id="{YANDEX_CLIENT_ID}"' } @@ -33,20 +35,17 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi): campaign_id = self.marketplace.campaign_id max_stocks = 2000 chunks = chunk_list(data, max_stocks) - limiter = BatchLimiter(max_requests=50, period=60) + limiter = BatchLimiter() async def send_stock_chunk(chunk): try: - await limiter.acquire() + await limiter.acquire_yandexmarket(self.limiter_key) request_data = { 'skus': chunk } response = await self._method('PUT', f'/campaigns/{campaign_id}/offers/stocks', data=request_data) - print(request_data) - rsp = await response.json() - print(rsp) if response.status != 200: logging.warning( f'Error occurred when sending stocks to [{self.marketplace.id}]') diff --git a/requirements.txt b/requirements.txt index 4fe2c9a..657677e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ python-dotenv aiohttp aiohttp[speedups] celery[redis] +pyjwt \ No newline at end of file diff --git a/test.py b/test.py index 8784e14..27aed15 100644 --- a/test.py +++ b/test.py @@ -1,509 +1,80 @@ import asyncio import time -from background.update import process_update +from limiter import BatchLimiter -A = [46806, - 50457, - 5294, - 50455, - 1263604, - 3153, - 1281166, - 60075, - 1266284, - 1255288, - 1430906, - 1430905, - 1430909, - 1430911, - 1430908, - 1233617, - 1274809, - 1269081, - 1216052, - 5281, - 1240583, - 1215881, - 1262205, - 1263362, - 1260217, - 1269043, - 1261160, - 1262930, - 1199011, - 1244985, - 1256197, - 1259971, - 1275109, - 1277923, - 1266093, - 1269905, - 1262068, - 1272526, - 1269918, - 1272247, - 14802, - 1690206, - 1265012, - 1268765, - 1258451, - 1199922, - 1264652, - 1206165, - 1276134, - 1276758, - 1277343, - 1277346, - 1262974, - 1323227, - 1280821, - 1276512, - 1688415, - 1274390, - 1273690, - 1268750, - 1268054, - 1266939, - 1262254, - 1262238, - 1261898, - 1260348, - 1258610, - 1256746, - 1255706, - 1255696, - 1255692, - 1255500, - 1255496, - 1677398, - 1255053, - 1255052, - 1255027, - 1255024, - 1255020, - 1254967, - 1254966, - 1254964, - 1254751, - 1254750, - 1253996, - 1253739, - 1252160, - 1252157, - 1242300, - 1256578, - 1256194, - 1255177, - 1278124, - 1267797, - 1266537, - 1277673, - 1235438, - 1256819, - 62484, - 1260874, - 1257620, - 1273799, - 53141, - 1257730, - 1272776, - 1268129, - 1257068, - 1256919, - 1234979, - 1254460, - 1254578, - 1258563, - 1253242, - 1263602, - 1254576, - 1255015, - 1254981, - 1253646, - 1261321, - 1273813, - 1255242, - 1274598, - 1214897, - 1273279, - 2172, - 1231462, - 1257310, - 1253645, - 1242885, - 1747377, - 1233273, - 1220800, - 1257333, - 1256133, - 1253956, - 1663, - 1771, - 1272628, - 1270851, - 57977, - 1257159, - 1258366, - 1274675, - 1228267, - 1267046, - 1277962, - 1255028, - 1273500, - 57994, - 58014, - 57995, - 1425848, - 1255218, - 1255032, - 1257132, - 1276552, - 1272983, - 1272984, - 1270101, - 1257311, - 1266716, - 1255420, - 1265283, - 1269404, - 1265309, - 50749, - 57941, - 62110, - 1265942, - 1255744, - 52862, - 1274446, - 41851, - 1263009, - 1253446, - 4633, - 21392, - 1264372, - 1264231, - 53052, - 1264656, - 1262555, - 1262009, - 1265287, - 1278725, - 1268653, - 1268652, - 1268651, - 1278044, - 1353805, - 1270730, - 1253802, - 1255266, - 1230815, - 1269405, - 1274444, - 1274445, - 1205670, - 1268767, - 1268761, - 1255679, - 1252820, - 1276125, - 1253954, - 1268790, - 1268789, - 1254508, - 1254507, - 1271531, - 1430596, - 1430901, - 1265975, - 1201855, - 1237377, - 1290777, - 1290779, - 1290845, - 1301023, - 1313042, - 1313044, - 1486866, - 1486865, - 1482040, - 1482039, - 1486868, - 1677395, - 1693204, - 1785113, - 1730179, - 1746619, - 57778, - 58549, - 1274911, - 5238, - 59939, - 48130, - 51725, - 3324, - 3315, - 42937, - 1250692, - 1691268, - 65691, - 65679, - 1266715, - 1261585, - 1547, - 58237, - 5267, - 13728, - 57985, - 1430910, - 46996, - 49071, - 49778, - 1199528, - 18420, - 1357948, - 13436, - 1201667, - 1371508, - 60577, - 47106, - 1213493, - 62574, - 46808, - 51130, - 3100, - 47108, - 52536, - 1272396, - 1266321, - 1280034, - 1409383, - 1272575, - 1246252, - 1267713, - 1265479, - 4726, - 1268028, - 1277070, - 1278743, - 1254882, - 1270158, - 1254749, - 1255416, - 1253839, - 1268655, - 1750701, - 3173, - 1725771, - 4646, - 1801937, - 1837677, - 1442421, - 1811379, - 1821818, - 1838555, - 1752948, - 1579762, - 61747, - 1838556, - 5087, - 1838552, - 1270564, - 1838547, - 1815657, - 1821809, - 1752947, - 1821806, - 1546488, - 1536001, - 1838554, - 1752946, - 1562500, - 1838698, - 1821821, - 1815220, - 1213805, - 1821796, - 1546102, - 1173684, - 1849044, - 1821776, - 61453, - 61465, - 1694439, - 1548447, - 1796332, - 1204445, - 1266798, - 1236596, - 1252685, - 1252631, - 1548800, - 1838561, - 1810009, - 1546753, - 1838562, - 61439, - 61466, - 1811405, - 1811396, - 21475, - 1821764, - 1812378, - 1548797, - 1838553, - 37, - 517, - 49124, - 1814818, - 1546149, - 58180, - 1837889, - 4667, - 61354, - 1821769, - 61595, - 1694473, - 1821802, - 4370, - 48406, - 1838559, - 1778238, - 1546760, - 1838558, - 1821752, - 61429, - 754, - 1265, - 1837956, - 1821800, - 1821747, - 1546834, - 16675, - 1214135, - 1791293, - 61341, - 1837887, - 47168, - 1224282, - 1938274, - 1837886, - 1254630, - 1546849, - 49122, - 4825, - 43589, - 1838560, - 1687827, - 1838551, - 4807, - 1694419, - 1548750, - 53220, - 2032683, - 1822337, - 1821728, - 1260032, - 61600, - 1546423, - 1811399, - 49121, - 519, - 1821730, - 1247748, - 4717, - 1808931, - 43583, - 43584, - 1838692, - 1268073, - 1821741, - 1845065, - 1838699, - 1200388, - 1937568, - 4359, - 45049, - 66803, - 1814848, - 49667, - 1821737, - 1821713, - 4156, - 1546421, - 1546419, - 1789644, - 1270222, - 1821717, - 1812381, - 1821742, - 1254254, - 3691, - 1546424, - 1259, - 1253389, - 1457, - 1838557, - 1224277, - 1838548, - 4824, - 1814870, - 1562490, - 1249319, - 1937717, - 1838549, - 4411, - 1562476, - 59199, - 1845056, - 4028, - 1822211, - 47056, - 1201201, - 62192, - 1815646, - 1694418, - 1546549, - 1254472, - 1814112, - 1258777, - 2032682, - 1546763, - 1658549, - 1838546, - 61758, - 1744312, - 1518567, - 1271323, - 1814821, - 4412, - 1938272, - 2032521, - 4196, - 57643, - 49126, - 49127, - 1815644, - 50812, - 1821642, - 1823200, - 66933, - 1268915, - 1837969, - 53209, - 1546794, - 1272267, - 47147, - 43512, - 1849014, - 1838550, - 1368860, - 2609, - 1821727, - 2021536, - 1276589, - 1936567, - 1269506 - ] -start = time.time() -asyncio.run(process_update(A)) -print(time.time() - start) + +async def test1(): + limiter = BatchLimiter() + cnt = 0 + for i in range(100): + await limiter.acquire_ozon('denco') + cnt += 1 + print('ozon denco-1', cnt) + + +async def test2(): + limiter = BatchLimiter() + cnt = 0 + for i in range(100): + await limiter.acquire_ozon('denco') + cnt += 1 + print('ozon denco-2', cnt) + + +async def test3(): + limiter = BatchLimiter() + cnt = 0 + for i in range(100): + await limiter.acquire_wildberries('denco') + cnt += 1 + print('wb denco-1', cnt) + + +async def test4(): + limiter = BatchLimiter() + cnt = 0 + for i in range(100): + await limiter.acquire_wildberries('denco') + cnt += 1 + print('wb denco-2', cnt) + + +async def test5(): + limiter = BatchLimiter() + cnt = 0 + for i in range(100): + await limiter.acquire_wildberries('denco') + cnt += 1 + print('wb denco-2', cnt) + + +async def test6(): + limiter = BatchLimiter() + cnt = 0 + for i in range(100): + await limiter.acquire_ozon('bolgov') + cnt += 1 + print('wb bolgov-1', cnt) + + +async def test(): + start = time.time() + await asyncio.gather(*[ + test1(), + test2(), + test3(), + test4(), + # test5(), + # test6(), + ]) + print(time.time() - start) + + +def main(): + loop = asyncio.get_event_loop() + loop.run_until_complete(test()) + + +if __name__ == '__main__': + main() diff --git a/updaters/base.py b/updaters/base.py index bedd449..135e3b1 100644 --- a/updaters/base.py +++ b/updaters/base.py @@ -28,6 +28,8 @@ class BaseMarketplaceUpdater(ABC): pass async def update(self, updates: List[StockUpdate]): + if not self.marketplace_api: + return product_ids = list(set([update.product_id for update in updates])) stock_data_list = await queries.general.get_stocks_data( session=self.session, @@ -41,6 +43,8 @@ class BaseMarketplaceUpdater(ABC): await self.marketplace_api.update_stocks(marketplace_updates) async def update_all(self): + if not self.marketplace_api: + return stock_data_list = await queries.general.get_stocks_data( session=self.session, marketplace=self.marketplace, diff --git a/updaters/stocks_updater.py b/updaters/stocks_updater.py index aaab852..65ef552 100644 --- a/updaters/stocks_updater.py +++ b/updaters/stocks_updater.py @@ -1,14 +1,14 @@ import asyncio +import logging import time from collections import defaultdict -from enum import unique, IntEnum from typing import List, Union from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, selectinload -from backend.session import get_session, session_factory +from backend.session import session_factory from database import Marketplace, MarketplaceProduct, Warehouse, Company from database.sipro.enums.general import BaseMarketplace from schemas.general import StockUpdate @@ -62,16 +62,23 @@ class StocksUpdater: async def full_update_marketplace(self, marketplace_id: int): marketplace = await self.get_marketplace(marketplace_id) + start = time.time() updater = UpdaterFactory.get_updater(self.session, marketplace) await updater.update_all() + logging.info( + f"{marketplace.name} successfully fully updated in {round(time.time() - start, 2)} seconds.") async def full_update_all_marketplaces(self, marketplace_ids: Union[List[int], None] = None): marketplaces = await self.get_marketplaces(marketplace_ids) async def update_marketplace(marketplace): async with session_factory() as session: + start = time.time() + updater = UpdaterFactory.get_updater(session, marketplace) await updater.update_all() + logging.info( + f"{marketplace.name} successfully fully updated in {round(time.time() - start, 2)} seconds.") tasks = [update_marketplace(marketplace) for marketplace in marketplaces] await asyncio.gather(*tasks) @@ -79,10 +86,13 @@ class StocksUpdater: async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]): marketplace = await self.get_marketplace(marketplace_id) async with session_factory() as session: + start = time.time() updater = UpdaterFactory.get_updater(session, marketplace) if not updater: return await updater.update(updates) + logging.info( + f"Successfully uploaded {len(updates)} updates to {marketplace.name} in {round(time.time() - start, 2)} seconds.") async def update(self, updates: list[StockUpdate]): updates_dict = defaultdict(list) @@ -108,8 +118,6 @@ class StocksUpdater: updates_dict[marketplace_id].append(update) updates_list = list(updates_dict.items()) updates_list = sorted(updates_list, key=lambda x: len(x[1])) - print(updates_list) - return tasks = [] for marketplace_id, marketplace_updates in updates_list: tasks.append(self.update_marketplace(marketplace_id, marketplace_updates))