This commit is contained in:
2024-07-04 05:22:28 +03:00
parent c9ddfaf8b4
commit c75017c1a8
13 changed files with 189 additions and 553 deletions

View File

@@ -23,7 +23,6 @@ if config.config_file_name is not None:
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = BaseModel.metadata
print(target_metadata.schema)
# other values from the config, defined by the needs of env.py,

View File

@@ -17,3 +17,6 @@ CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND')
# Yandex
YANDEX_CLIENT_ID = os.environ.get('YANDEX_CLIENT_ID')
# Security
API_KEY = os.environ.get('API_KEY')

View File

@@ -15,10 +15,12 @@ database_url = (
f"{PG_LOGIN}:{PG_PASSWORD}@"
f"{PG_HOST}:{PG_PORT}/{PG_DATABASE}"
)
engine = create_async_engine(database_url,
engine = create_async_engine(
database_url,
pool_size=20,
max_overflow=10
)
max_overflow=10,
pool_timeout=1000
)
session_factory = async_sessionmaker(
engine,
expire_on_commit=False,

View File

@@ -1,10 +1,8 @@
import asyncio
from typing import List, Union
from asgiref.sync import async_to_sync
from background import celery
import background.update
from background import celery
@celery.task(name='process_update')
@@ -20,6 +18,6 @@ def update_marketplace(marketplace_id: int):
@celery.task(name='update_marketplaces')
async def update_marketplaces(marketplace_ids: Union[List[int], None]):
def update_marketplaces(marketplace_ids: Union[List[int], None]):
loop = asyncio.get_event_loop()
return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids))

View File

@@ -1,29 +1,70 @@
import asyncio
from datetime import datetime
from redis import asyncio as aioredis
import backend.config
class RedisConnectionManager:
_redis_connection = None
_redis_lock = asyncio.Lock()
@classmethod
async def get_redis_connection(cls):
async with cls._redis_lock:
if cls._redis_connection is None:
cls._redis_connection = await aioredis.from_url(backend.config.CELERY_BROKER_URL)
return cls._redis_connection
class BatchLimiter:
def __init__(self, max_requests, period):
self.max_requests = max_requests
self.period = period
self.current_requests = 0
self.start_time = None
def __init__(self):
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
if self.current_requests == 0:
self.start_time = datetime.now()
async def acquire(self, key, max_requests, period):
redis = await RedisConnectionManager.get_redis_connection()
while True:
async with redis.lock(f"{key}_lock"):
try:
start_time = await redis.get(f"{key}:start_time")
if start_time:
start_time = datetime.fromisoformat(start_time.decode())
current_requests = await redis.get(key)
current_requests = int(current_requests) if current_requests else 0
if self.current_requests < self.max_requests:
self.current_requests += 1
return
elapsed_time = (datetime.now() - self.start_time).total_seconds()
if elapsed_time < self.period:
await asyncio.sleep(self.period - elapsed_time)
self.current_requests = 1
self.start_time = datetime.now()
if start_time:
elapsed_time = (datetime.now() - start_time).total_seconds()
else:
self.current_requests = 1
self.start_time = datetime.now()
elapsed_time = period
if elapsed_time >= period:
await redis.set(key, 1)
await redis.set(f"{key}:start_time", datetime.now().isoformat())
return
else:
if current_requests < max_requests:
await redis.incr(key)
return
else:
await asyncio.sleep(period - elapsed_time)
await redis.set(key, 1)
await redis.set(f"{key}:start_time", datetime.now().isoformat())
except aioredis.RedisError as e:
print(f"Redis error: {e}")
await asyncio.sleep(1)
async def acquire_wildberries(self, key):
max_requests = 300
period = 60
await self.acquire('wildberries:' + key, max_requests, period)
async def acquire_ozon(self, key):
max_requests = 80
period = 60
await self.acquire('ozon:' + key, max_requests, period)
async def acquire_yandexmarket(self, key):
max_requests = 50
period = 60
await self.acquire('yandexmarket:' + key, max_requests, period)

View File

@@ -6,6 +6,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
from starlette.responses import JSONResponse
import backend.config
import background.tasks
from background.tasks import *
from schemas.general import UpdateRequest, UpdateResponse, UpdateMarketplaceRequest, UpdateMarketplacesRequest
@@ -14,7 +15,7 @@ auth_schema = HTTPBearer()
async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth_schema)]):
if token.credentials != 'vvHh1QNl7lS6c7OVwmxU1TVNd7DLlc9W810csZGf4rkqOrBy6fQwlhIDZsQZd9hQYZYK47yWv33aCq':
if token.credentials != backend.config.API_KEY:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials')

View File

@@ -14,9 +14,11 @@ class OzonMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace):
self.marketplace = marketplace
auth_data = json.loads(marketplace.auth_data)
client_id = auth_data.get('clientId')
self.limiter_key = str(marketplace.company_id) + str(client_id)
self.headers = {
'Client-Id': auth_data.get('clientId'),
'Client-Id': client_id,
'Api-Key': auth_data.get('clientToken')
}
@@ -32,14 +34,13 @@ class OzonMarketplaceApi(BaseMarketplaceApi):
return
max_stocks = 100
chunks = utils.chunk_list(data, max_stocks)
limiter = BatchLimiter(max_requests=80, period=60)
limiter = BatchLimiter()
async def send_stock_chunk(chunk):
try:
await limiter.acquire()
await limiter.acquire_ozon(self.limiter_key)
request_data = {'stocks': chunk}
response = await self._method('POST', '/v2/products/stocks', data=request_data)
print(request_data)
response = await response.json()
error_message = response.get('message')
error_code = response.get('code')

View File

@@ -3,6 +3,8 @@ import json
import logging
from typing import Union
import jwt
import utils
from database import Marketplace
from limiter import BatchLimiter
@@ -13,9 +15,16 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace):
self.marketplace = marketplace
auth_data = json.loads(marketplace.auth_data)
token = auth_data.get('token')
try:
decoded_token = jwt.decode(token, algorithms=["HS256"], options={"verify_signature": False})
except Exception:
logging.error(f"Couldn't decode token for {marketplace.id}")
return None
self.limiter_key = str(marketplace.company_id) + str(decoded_token.get('sid'))
self.headers = {
'Authorization': auth_data.get('token'),
'Authorization': token,
'Content-Type': 'application/json'
}
@@ -31,15 +40,14 @@ class WildberriesMarketplaceApi(BaseMarketplaceApi):
return
max_stocks = 1000
chunks = utils.chunk_list(data, max_stocks)
limiter = BatchLimiter(max_requests=300, period=60)
limiter = BatchLimiter()
async def send_stock_chunk(chunk):
try:
await limiter.acquire()
await limiter.acquire_wildberries(self.limiter_key)
request_data = {'stocks': chunk}
response = await self._method('PUT', f'/api/v3/stocks/{self.marketplace.warehouse_id}',
data=request_data)
print(request_data)
if response.status not in [204, 409]:
response = await response.json()
error_message = response.get('message')

View File

@@ -16,6 +16,8 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi):
auth_data = json.loads(marketplace.auth_data)
access_token = auth_data.get('accessToken')
self.limiter_key = str(marketplace.company_id) + str(access_token) + str(self.marketplace.campaign_id)
self.headers = {
'Authorization': f'OAuth oauth_token="{access_token}", oauth_client_id="{YANDEX_CLIENT_ID}"'
}
@@ -33,20 +35,17 @@ class YandexmarketMarketplaceApi(BaseMarketplaceApi):
campaign_id = self.marketplace.campaign_id
max_stocks = 2000
chunks = chunk_list(data, max_stocks)
limiter = BatchLimiter(max_requests=50, period=60)
limiter = BatchLimiter()
async def send_stock_chunk(chunk):
try:
await limiter.acquire()
await limiter.acquire_yandexmarket(self.limiter_key)
request_data = {
'skus': chunk
}
response = await self._method('PUT',
f'/campaigns/{campaign_id}/offers/stocks',
data=request_data)
print(request_data)
rsp = await response.json()
print(rsp)
if response.status != 200:
logging.warning(
f'Error occurred when sending stocks to [{self.marketplace.id}]')

View File

@@ -11,3 +11,4 @@ python-dotenv
aiohttp
aiohttp[speedups]
celery[redis]
pyjwt

581
test.py
View File

@@ -1,509 +1,80 @@
import asyncio
import time
from background.update import process_update
from limiter import BatchLimiter
A = [46806,
50457,
5294,
50455,
1263604,
3153,
1281166,
60075,
1266284,
1255288,
1430906,
1430905,
1430909,
1430911,
1430908,
1233617,
1274809,
1269081,
1216052,
5281,
1240583,
1215881,
1262205,
1263362,
1260217,
1269043,
1261160,
1262930,
1199011,
1244985,
1256197,
1259971,
1275109,
1277923,
1266093,
1269905,
1262068,
1272526,
1269918,
1272247,
14802,
1690206,
1265012,
1268765,
1258451,
1199922,
1264652,
1206165,
1276134,
1276758,
1277343,
1277346,
1262974,
1323227,
1280821,
1276512,
1688415,
1274390,
1273690,
1268750,
1268054,
1266939,
1262254,
1262238,
1261898,
1260348,
1258610,
1256746,
1255706,
1255696,
1255692,
1255500,
1255496,
1677398,
1255053,
1255052,
1255027,
1255024,
1255020,
1254967,
1254966,
1254964,
1254751,
1254750,
1253996,
1253739,
1252160,
1252157,
1242300,
1256578,
1256194,
1255177,
1278124,
1267797,
1266537,
1277673,
1235438,
1256819,
62484,
1260874,
1257620,
1273799,
53141,
1257730,
1272776,
1268129,
1257068,
1256919,
1234979,
1254460,
1254578,
1258563,
1253242,
1263602,
1254576,
1255015,
1254981,
1253646,
1261321,
1273813,
1255242,
1274598,
1214897,
1273279,
2172,
1231462,
1257310,
1253645,
1242885,
1747377,
1233273,
1220800,
1257333,
1256133,
1253956,
1663,
1771,
1272628,
1270851,
57977,
1257159,
1258366,
1274675,
1228267,
1267046,
1277962,
1255028,
1273500,
57994,
58014,
57995,
1425848,
1255218,
1255032,
1257132,
1276552,
1272983,
1272984,
1270101,
1257311,
1266716,
1255420,
1265283,
1269404,
1265309,
50749,
57941,
62110,
1265942,
1255744,
52862,
1274446,
41851,
1263009,
1253446,
4633,
21392,
1264372,
1264231,
53052,
1264656,
1262555,
1262009,
1265287,
1278725,
1268653,
1268652,
1268651,
1278044,
1353805,
1270730,
1253802,
1255266,
1230815,
1269405,
1274444,
1274445,
1205670,
1268767,
1268761,
1255679,
1252820,
1276125,
1253954,
1268790,
1268789,
1254508,
1254507,
1271531,
1430596,
1430901,
1265975,
1201855,
1237377,
1290777,
1290779,
1290845,
1301023,
1313042,
1313044,
1486866,
1486865,
1482040,
1482039,
1486868,
1677395,
1693204,
1785113,
1730179,
1746619,
57778,
58549,
1274911,
5238,
59939,
48130,
51725,
3324,
3315,
42937,
1250692,
1691268,
65691,
65679,
1266715,
1261585,
1547,
58237,
5267,
13728,
57985,
1430910,
46996,
49071,
49778,
1199528,
18420,
1357948,
13436,
1201667,
1371508,
60577,
47106,
1213493,
62574,
46808,
51130,
3100,
47108,
52536,
1272396,
1266321,
1280034,
1409383,
1272575,
1246252,
1267713,
1265479,
4726,
1268028,
1277070,
1278743,
1254882,
1270158,
1254749,
1255416,
1253839,
1268655,
1750701,
3173,
1725771,
4646,
1801937,
1837677,
1442421,
1811379,
1821818,
1838555,
1752948,
1579762,
61747,
1838556,
5087,
1838552,
1270564,
1838547,
1815657,
1821809,
1752947,
1821806,
1546488,
1536001,
1838554,
1752946,
1562500,
1838698,
1821821,
1815220,
1213805,
1821796,
1546102,
1173684,
1849044,
1821776,
61453,
61465,
1694439,
1548447,
1796332,
1204445,
1266798,
1236596,
1252685,
1252631,
1548800,
1838561,
1810009,
1546753,
1838562,
61439,
61466,
1811405,
1811396,
21475,
1821764,
1812378,
1548797,
1838553,
37,
517,
49124,
1814818,
1546149,
58180,
1837889,
4667,
61354,
1821769,
61595,
1694473,
1821802,
4370,
48406,
1838559,
1778238,
1546760,
1838558,
1821752,
61429,
754,
1265,
1837956,
1821800,
1821747,
1546834,
16675,
1214135,
1791293,
61341,
1837887,
47168,
1224282,
1938274,
1837886,
1254630,
1546849,
49122,
4825,
43589,
1838560,
1687827,
1838551,
4807,
1694419,
1548750,
53220,
2032683,
1822337,
1821728,
1260032,
61600,
1546423,
1811399,
49121,
519,
1821730,
1247748,
4717,
1808931,
43583,
43584,
1838692,
1268073,
1821741,
1845065,
1838699,
1200388,
1937568,
4359,
45049,
66803,
1814848,
49667,
1821737,
1821713,
4156,
1546421,
1546419,
1789644,
1270222,
1821717,
1812381,
1821742,
1254254,
3691,
1546424,
1259,
1253389,
1457,
1838557,
1224277,
1838548,
4824,
1814870,
1562490,
1249319,
1937717,
1838549,
4411,
1562476,
59199,
1845056,
4028,
1822211,
47056,
1201201,
62192,
1815646,
1694418,
1546549,
1254472,
1814112,
1258777,
2032682,
1546763,
1658549,
1838546,
61758,
1744312,
1518567,
1271323,
1814821,
4412,
1938272,
2032521,
4196,
57643,
49126,
49127,
1815644,
50812,
1821642,
1823200,
66933,
1268915,
1837969,
53209,
1546794,
1272267,
47147,
43512,
1849014,
1838550,
1368860,
2609,
1821727,
2021536,
1276589,
1936567,
1269506
]
start = time.time()
asyncio.run(process_update(A))
print(time.time() - start)
async def test1():
limiter = BatchLimiter()
cnt = 0
for i in range(100):
await limiter.acquire_ozon('denco')
cnt += 1
print('ozon denco-1', cnt)
async def test2():
limiter = BatchLimiter()
cnt = 0
for i in range(100):
await limiter.acquire_ozon('denco')
cnt += 1
print('ozon denco-2', cnt)
async def test3():
limiter = BatchLimiter()
cnt = 0
for i in range(100):
await limiter.acquire_wildberries('denco')
cnt += 1
print('wb denco-1', cnt)
async def test4():
limiter = BatchLimiter()
cnt = 0
for i in range(100):
await limiter.acquire_wildberries('denco')
cnt += 1
print('wb denco-2', cnt)
async def test5():
limiter = BatchLimiter()
cnt = 0
for i in range(100):
await limiter.acquire_wildberries('denco')
cnt += 1
print('wb denco-2', cnt)
async def test6():
limiter = BatchLimiter()
cnt = 0
for i in range(100):
await limiter.acquire_ozon('bolgov')
cnt += 1
print('wb bolgov-1', cnt)
async def test():
start = time.time()
await asyncio.gather(*[
test1(),
test2(),
test3(),
test4(),
# test5(),
# test6(),
])
print(time.time() - start)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
if __name__ == '__main__':
main()

View File

@@ -28,6 +28,8 @@ class BaseMarketplaceUpdater(ABC):
pass
async def update(self, updates: List[StockUpdate]):
if not self.marketplace_api:
return
product_ids = list(set([update.product_id for update in updates]))
stock_data_list = await queries.general.get_stocks_data(
session=self.session,
@@ -41,6 +43,8 @@ class BaseMarketplaceUpdater(ABC):
await self.marketplace_api.update_stocks(marketplace_updates)
async def update_all(self):
if not self.marketplace_api:
return
stock_data_list = await queries.general.get_stocks_data(
session=self.session,
marketplace=self.marketplace,

View File

@@ -1,14 +1,14 @@
import asyncio
import logging
import time
from collections import defaultdict
from enum import unique, IntEnum
from typing import List, Union
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload
from backend.session import get_session, session_factory
from backend.session import session_factory
from database import Marketplace, MarketplaceProduct, Warehouse, Company
from database.sipro.enums.general import BaseMarketplace
from schemas.general import StockUpdate
@@ -62,16 +62,23 @@ class StocksUpdater:
async def full_update_marketplace(self, marketplace_id: int):
marketplace = await self.get_marketplace(marketplace_id)
start = time.time()
updater = UpdaterFactory.get_updater(self.session, marketplace)
await updater.update_all()
logging.info(
f"{marketplace.name} successfully fully updated in {round(time.time() - start, 2)} seconds.")
async def full_update_all_marketplaces(self, marketplace_ids: Union[List[int], None] = None):
marketplaces = await self.get_marketplaces(marketplace_ids)
async def update_marketplace(marketplace):
async with session_factory() as session:
start = time.time()
updater = UpdaterFactory.get_updater(session, marketplace)
await updater.update_all()
logging.info(
f"{marketplace.name} successfully fully updated in {round(time.time() - start, 2)} seconds.")
tasks = [update_marketplace(marketplace) for marketplace in marketplaces]
await asyncio.gather(*tasks)
@@ -79,10 +86,13 @@ class StocksUpdater:
async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]):
marketplace = await self.get_marketplace(marketplace_id)
async with session_factory() as session:
start = time.time()
updater = UpdaterFactory.get_updater(session, marketplace)
if not updater:
return
await updater.update(updates)
logging.info(
f"Successfully uploaded {len(updates)} updates to {marketplace.name} in {round(time.time() - start, 2)} seconds.")
async def update(self, updates: list[StockUpdate]):
updates_dict = defaultdict(list)
@@ -108,8 +118,6 @@ class StocksUpdater:
updates_dict[marketplace_id].append(update)
updates_list = list(updates_dict.items())
updates_list = sorted(updates_list, key=lambda x: len(x[1]))
print(updates_list)
return
tasks = []
for marketplace_id, marketplace_updates in updates_list:
tasks.append(self.update_marketplace(marketplace_id, marketplace_updates))