This commit is contained in:
2024-07-03 08:11:08 +03:00
parent 7ba3426989
commit c9ddfaf8b4
17 changed files with 751 additions and 42 deletions

View File

@@ -3,11 +3,17 @@ import os
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
# Database
PG_LOGIN = os.environ.get('PG_LOGIN')
PG_PASSWORD = os.environ.get('PG_PASSWORD')
PG_PORT = os.environ.get('PG_PORT')
PG_HOST = os.environ.get('PG_HOST')
PG_DATABASE = os.environ.get('PG_DATABASE')
# Celery
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL')
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND')
# Yandex
YANDEX_CLIENT_ID = os.environ.get('YANDEX_CLIENT_ID')

View File

@@ -15,8 +15,16 @@ database_url = (
f"{PG_LOGIN}:{PG_PASSWORD}@"
f"{PG_HOST}:{PG_PORT}/{PG_DATABASE}"
)
engine = create_async_engine(database_url)
session_factory = async_sessionmaker(engine, expire_on_commit=False, autoflush=False, autocommit=False)
engine = create_async_engine(database_url,
pool_size=20,
max_overflow=10
)
session_factory = async_sessionmaker(
engine,
expire_on_commit=False,
autoflush=False,
autocommit=False
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:

View File

@@ -1,4 +1,5 @@
import asyncio
from typing import List, Union
from asgiref.sync import async_to_sync
@@ -10,3 +11,15 @@ import background.update
def process_update(product_ids: list[int]):
loop = asyncio.get_event_loop()
return loop.run_until_complete(background.update.process_update(product_ids))
@celery.task(name='update_marketplace')
def update_marketplace(marketplace_id: int):
loop = asyncio.get_event_loop()
return loop.run_until_complete(background.update.update_marketplace(marketplace_id))
@celery.task(name='update_marketplaces')
async def update_marketplaces(marketplace_ids: Union[List[int], None]):
loop = asyncio.get_event_loop()
return loop.run_until_complete(background.update.update_marketplaces(marketplace_ids))

View File

@@ -1,14 +1,27 @@
import time
from typing import Union, List
from backend.session import get_session
from backend.session import session_factory
from schemas.general import StockUpdate
from updaters.stocks_updater import StocksUpdater
async def process_update(product_ids: list[int]):
async for session in get_session():
async with session_factory() as session:
updates = [StockUpdate(product_id=product_id) for product_id in product_ids]
updater = StocksUpdater(session)
await updater.update(updates)
await session.close()
return {'message': f'Stocks for [{",".join(map(str, product_ids))}] successfully updated'}
return {'message': f'Stocks for products [{",".join(map(str, product_ids))}] successfully updated'}
async def update_marketplace(marketplace_id: int):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.full_update_marketplace(marketplace_id)
return {'message': f'Stocks for marketplace {marketplace_id} successfully updated'}
async def update_marketplaces(marketplace_ids: Union[List[int], None]):
async with session_factory() as session:
updater = StocksUpdater(session)
await updater.full_update_all_marketplaces(marketplace_ids)
return {'message': f'Stocks for marketplaces [{",".join(map(str, marketplace_ids))}] successfully updated'}

View File

@@ -16,6 +16,9 @@ class Company(BaseSiproModel):
is_denco: Mapped[bool] = mapped_column()
balance: Mapped[int] = mapped_column()
is_deleted: Mapped[bool] = mapped_column()
is_archived: Mapped[bool] = mapped_column()
class CompanyWarehouse(BaseSiproModel):
__tablename__ = 'company_warehouses'
@@ -49,9 +52,12 @@ class Marketplace(BaseSiproModel):
sell_blocks: Mapped[bool] = mapped_column()
sell_warehouse_products: Mapped[bool] = mapped_column()
sell_from_price: Mapped[bool] = mapped_column()
is_deleted: Mapped[bool] = mapped_column()
warehouses: Mapped[List["Warehouse"]] = relationship(secondary=marketplace_warehouses)
warehouse_id: Mapped[str] = mapped_column()
company_id: Mapped[int] = mapped_column(ForeignKey('companies.id'))
company: Mapped["Company"] = relationship()
campaign_id: Mapped[str] = mapped_column()

18
main.py
View File

@@ -8,7 +8,7 @@ from starlette.responses import JSONResponse
import background.tasks
from background.tasks import *
from schemas.general import UpdateRequest, UpdateResponse
from schemas.general import UpdateRequest, UpdateResponse, UpdateMarketplaceRequest, UpdateMarketplacesRequest
auth_schema = HTTPBearer()
@@ -34,6 +34,22 @@ async def update(
return UpdateResponse(task_id=task.id)
@app.post('/update/marketplace')
async def update_marketplace(
request: UpdateMarketplaceRequest
):
task = background.tasks.update_marketplace.delay(request.marketplace_id)
return UpdateResponse(task_id=task.id)
@app.post('/update/marketplaces')
async def update_marketplace(
request: UpdateMarketplacesRequest
):
task = background.tasks.update_marketplaces.delay(request.marketplace_ids)
return UpdateResponse(task_id=task.id)
@app.get("/tasks/{task_id}")
def get_status(task_id):
task_result = AsyncResult(task_id)

View File

@@ -28,6 +28,7 @@ class BaseMarketplaceApi(ABC):
async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'],
method: str,
data: dict) -> ClientResponse:
return
async with aiohttp.ClientSession() as session:
return await session.request(http_method,
f'{self.api_url}{method}',

View File

@@ -4,6 +4,7 @@ from database import Marketplace
from database.sipro.enums.general import BaseMarketplace
from .wildberries import WildberriesMarketplaceApi
from .ozon import OzonMarketplaceApi
from .yandexmarket import YandexmarketMarketplaceApi
class MarketplaceApiFactory:
@@ -11,9 +12,12 @@ class MarketplaceApiFactory:
def get_marketplace_api(marketplace: Marketplace) -> Union[
WildberriesMarketplaceApi,
OzonMarketplaceApi,
YandexmarketMarketplaceApi
]:
match marketplace.base_marketplace:
case BaseMarketplace.OZON:
return OzonMarketplaceApi(marketplace)
case BaseMarketplace.WILDBERRIES:
return WildberriesMarketplaceApi(marketplace)
case BaseMarketplace.YANDEX_MARKET:
return YandexmarketMarketplaceApi(marketplace)

View File

@@ -0,0 +1,58 @@
import asyncio
import json
import logging
from typing import Union
from backend.config import YANDEX_CLIENT_ID
from database import Marketplace
from limiter import BatchLimiter
from marketplaces.base import BaseMarketplaceApi
from utils import chunk_list
class YandexmarketMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace):
self.marketplace = marketplace
auth_data = json.loads(marketplace.auth_data)
access_token = auth_data.get('accessToken')
self.headers = {
'Authorization': f'OAuth oauth_token="{access_token}", oauth_client_id="{YANDEX_CLIENT_ID}"'
}
def get_headers(self):
return self.headers
@property
def api_url(self):
return 'https://api.partner.market.yandex.ru/v2'
async def update_stocks(self, data: Union[list, dict]):
if type(data) is not list:
return
campaign_id = self.marketplace.campaign_id
max_stocks = 2000
chunks = chunk_list(data, max_stocks)
limiter = BatchLimiter(max_requests=50, period=60)
async def send_stock_chunk(chunk):
try:
await limiter.acquire()
request_data = {
'skus': chunk
}
response = await self._method('PUT',
f'/campaigns/{campaign_id}/offers/stocks',
data=request_data)
print(request_data)
rsp = await response.json()
print(rsp)
if response.status != 200:
logging.warning(
f'Error occurred when sending stocks to [{self.marketplace.id}]')
except Exception as e:
logging.error(
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
tasks = [send_stock_chunk(chunk) for chunk in chunks]
await asyncio.gather(*tasks)

View File

@@ -289,10 +289,6 @@ async def get_stocks_data(
slaves_stock_subquery.c.product_id == MarketplaceProduct.product_id
)
)
print('-------------------------')
print(stmt.compile(compile_kwargs={
'literal_binds': True
}))
result = await session.execute(stmt)
marketplace_products = result.all()
response: List[StockData] = []

View File

@@ -1,4 +1,5 @@
from dataclasses import dataclass
from typing import Union, List
from pydantic import BaseModel
@@ -13,7 +14,15 @@ class BaseSchema(BaseModel):
class UpdateRequest(BaseSchema):
product_ids: list[int]
product_ids: List[int]
class UpdateMarketplaceRequest(BaseSchema):
marketplace_id: int
class UpdateMarketplacesRequest(BaseSchema):
marketplace_ids: Union[List[int], None] = None
class UpdateResponse(BaseSchema):

514
test.py
View File

@@ -1,7 +1,509 @@
a = [
(1, [1, 4, 5, ]),
(2, [1, 2, 3, 4, 5, ]),
]
import asyncio
import time
updates_list = sorted(a, key=lambda x: x[1], reverse=True)
print(updates_list)
from background.update import process_update
A = [46806,
50457,
5294,
50455,
1263604,
3153,
1281166,
60075,
1266284,
1255288,
1430906,
1430905,
1430909,
1430911,
1430908,
1233617,
1274809,
1269081,
1216052,
5281,
1240583,
1215881,
1262205,
1263362,
1260217,
1269043,
1261160,
1262930,
1199011,
1244985,
1256197,
1259971,
1275109,
1277923,
1266093,
1269905,
1262068,
1272526,
1269918,
1272247,
14802,
1690206,
1265012,
1268765,
1258451,
1199922,
1264652,
1206165,
1276134,
1276758,
1277343,
1277346,
1262974,
1323227,
1280821,
1276512,
1688415,
1274390,
1273690,
1268750,
1268054,
1266939,
1262254,
1262238,
1261898,
1260348,
1258610,
1256746,
1255706,
1255696,
1255692,
1255500,
1255496,
1677398,
1255053,
1255052,
1255027,
1255024,
1255020,
1254967,
1254966,
1254964,
1254751,
1254750,
1253996,
1253739,
1252160,
1252157,
1242300,
1256578,
1256194,
1255177,
1278124,
1267797,
1266537,
1277673,
1235438,
1256819,
62484,
1260874,
1257620,
1273799,
53141,
1257730,
1272776,
1268129,
1257068,
1256919,
1234979,
1254460,
1254578,
1258563,
1253242,
1263602,
1254576,
1255015,
1254981,
1253646,
1261321,
1273813,
1255242,
1274598,
1214897,
1273279,
2172,
1231462,
1257310,
1253645,
1242885,
1747377,
1233273,
1220800,
1257333,
1256133,
1253956,
1663,
1771,
1272628,
1270851,
57977,
1257159,
1258366,
1274675,
1228267,
1267046,
1277962,
1255028,
1273500,
57994,
58014,
57995,
1425848,
1255218,
1255032,
1257132,
1276552,
1272983,
1272984,
1270101,
1257311,
1266716,
1255420,
1265283,
1269404,
1265309,
50749,
57941,
62110,
1265942,
1255744,
52862,
1274446,
41851,
1263009,
1253446,
4633,
21392,
1264372,
1264231,
53052,
1264656,
1262555,
1262009,
1265287,
1278725,
1268653,
1268652,
1268651,
1278044,
1353805,
1270730,
1253802,
1255266,
1230815,
1269405,
1274444,
1274445,
1205670,
1268767,
1268761,
1255679,
1252820,
1276125,
1253954,
1268790,
1268789,
1254508,
1254507,
1271531,
1430596,
1430901,
1265975,
1201855,
1237377,
1290777,
1290779,
1290845,
1301023,
1313042,
1313044,
1486866,
1486865,
1482040,
1482039,
1486868,
1677395,
1693204,
1785113,
1730179,
1746619,
57778,
58549,
1274911,
5238,
59939,
48130,
51725,
3324,
3315,
42937,
1250692,
1691268,
65691,
65679,
1266715,
1261585,
1547,
58237,
5267,
13728,
57985,
1430910,
46996,
49071,
49778,
1199528,
18420,
1357948,
13436,
1201667,
1371508,
60577,
47106,
1213493,
62574,
46808,
51130,
3100,
47108,
52536,
1272396,
1266321,
1280034,
1409383,
1272575,
1246252,
1267713,
1265479,
4726,
1268028,
1277070,
1278743,
1254882,
1270158,
1254749,
1255416,
1253839,
1268655,
1750701,
3173,
1725771,
4646,
1801937,
1837677,
1442421,
1811379,
1821818,
1838555,
1752948,
1579762,
61747,
1838556,
5087,
1838552,
1270564,
1838547,
1815657,
1821809,
1752947,
1821806,
1546488,
1536001,
1838554,
1752946,
1562500,
1838698,
1821821,
1815220,
1213805,
1821796,
1546102,
1173684,
1849044,
1821776,
61453,
61465,
1694439,
1548447,
1796332,
1204445,
1266798,
1236596,
1252685,
1252631,
1548800,
1838561,
1810009,
1546753,
1838562,
61439,
61466,
1811405,
1811396,
21475,
1821764,
1812378,
1548797,
1838553,
37,
517,
49124,
1814818,
1546149,
58180,
1837889,
4667,
61354,
1821769,
61595,
1694473,
1821802,
4370,
48406,
1838559,
1778238,
1546760,
1838558,
1821752,
61429,
754,
1265,
1837956,
1821800,
1821747,
1546834,
16675,
1214135,
1791293,
61341,
1837887,
47168,
1224282,
1938274,
1837886,
1254630,
1546849,
49122,
4825,
43589,
1838560,
1687827,
1838551,
4807,
1694419,
1548750,
53220,
2032683,
1822337,
1821728,
1260032,
61600,
1546423,
1811399,
49121,
519,
1821730,
1247748,
4717,
1808931,
43583,
43584,
1838692,
1268073,
1821741,
1845065,
1838699,
1200388,
1937568,
4359,
45049,
66803,
1814848,
49667,
1821737,
1821713,
4156,
1546421,
1546419,
1789644,
1270222,
1821717,
1812381,
1821742,
1254254,
3691,
1546424,
1259,
1253389,
1457,
1838557,
1224277,
1838548,
4824,
1814870,
1562490,
1249319,
1937717,
1838549,
4411,
1562476,
59199,
1845056,
4028,
1822211,
47056,
1201201,
62192,
1815646,
1694418,
1546549,
1254472,
1814112,
1258777,
2032682,
1546763,
1658549,
1838546,
61758,
1744312,
1518567,
1271323,
1814821,
4412,
1938272,
2032521,
4196,
57643,
49126,
49127,
1815644,
50812,
1821642,
1823200,
66933,
1268915,
1837969,
53209,
1546794,
1272267,
47147,
43512,
1849014,
1838550,
1368860,
2609,
1821727,
2021536,
1276589,
1936567,
1269506
]
start = time.time()
asyncio.run(process_update(A))
print(time.time() - start)

View File

@@ -1,11 +0,0 @@
# Test your FastAPI endpoints
GET http://127.0.0.1:8000/
Accept: application/json
###
GET http://127.0.0.1:8000/hello/User
Accept: application/json
###

View File

@@ -1,3 +1,4 @@
import time
from abc import ABC, abstractmethod
from typing import List
@@ -33,7 +34,17 @@ class BaseMarketplaceUpdater(ABC):
marketplace=self.marketplace,
product_ids=product_ids
)
return
marketplace_updates = []
for stock_data in stock_data_list:
marketplace_update = self.get_update_for_marketplace(stock_data)
marketplace_updates.append(marketplace_update)
await self.marketplace_api.update_stocks(marketplace_updates)
async def update_all(self):
stock_data_list = await queries.general.get_stocks_data(
session=self.session,
marketplace=self.marketplace,
)
marketplace_updates = []
for stock_data in stock_data_list:
marketplace_update = self.get_update_for_marketplace(stock_data)

View File

@@ -6,13 +6,20 @@ from database import Marketplace
from database.sipro.enums.general import BaseMarketplace
from updaters.ozon_updater import OzonUpdater
from updaters.wildberries_updater import WildberriesUpdater
from updaters.yandexmarket_updater import YandexmarketUpdater
class UpdaterFactory:
@staticmethod
def get_updater(session: AsyncSession, marketplace: Marketplace) -> Union[OzonUpdater, WildberriesUpdater]:
def get_updater(session: AsyncSession, marketplace: Marketplace) -> Union[
OzonUpdater,
WildberriesUpdater,
YandexmarketUpdater
]:
match marketplace.base_marketplace:
case BaseMarketplace.WILDBERRIES:
return WildberriesUpdater(marketplace, session)
case BaseMarketplace.OZON:
return OzonUpdater(marketplace, session)
case BaseMarketplace.YANDEX_MARKET:
return YandexmarketUpdater(marketplace, session)

View File

@@ -1,13 +1,16 @@
import asyncio
import time
from collections import defaultdict
from enum import unique, IntEnum
from typing import List
from typing import List, Union
from sqlalchemy import select
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import joinedload, selectinload
from backend.session import get_session, session_factory
from database import Marketplace, MarketplaceProduct, Warehouse, Company
from database.sipro.enums.general import BaseMarketplace
from schemas.general import StockUpdate
from updaters.factory import UpdaterFactory
@@ -24,24 +27,77 @@ class StocksUpdater:
])
return marketplace
async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]):
async def get_marketplaces(self, marketplace_ids: Union[list[int], None] = None) -> List[Marketplace]:
if not marketplace_ids:
marketplace_ids = []
stmt = (
select(
Marketplace
)
.join(
Company
)
.options(
selectinload(Marketplace.warehouses).selectinload(Warehouse.suppliers),
selectinload(Marketplace.warehouses).selectinload(Warehouse.company_warehouses),
joinedload(Marketplace.company).joinedload(Company.warehouse)
)
.where(
Company.is_deleted == False,
Company.is_archived == False,
Marketplace.is_deleted == False,
Marketplace.base_marketplace.in_([
BaseMarketplace.OZON,
BaseMarketplace.WILDBERRIES,
BaseMarketplace.YANDEX_MARKET
]),
or_(
marketplace_ids == [],
Marketplace.id.in_(marketplace_ids)
)
)
)
query_result = await self.session.scalars(stmt)
return query_result.all()
async def full_update_marketplace(self, marketplace_id: int):
marketplace = await self.get_marketplace(marketplace_id)
updater = UpdaterFactory.get_updater(self.session, marketplace)
if not updater:
return
await updater.update(updates)
await updater.update_all()
async def full_update_all_marketplaces(self, marketplace_ids: Union[List[int], None] = None):
marketplaces = await self.get_marketplaces(marketplace_ids)
async def update_marketplace(marketplace):
async with session_factory() as session:
updater = UpdaterFactory.get_updater(session, marketplace)
await updater.update_all()
tasks = [update_marketplace(marketplace) for marketplace in marketplaces]
await asyncio.gather(*tasks)
async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]):
marketplace = await self.get_marketplace(marketplace_id)
async with session_factory() as session:
updater = UpdaterFactory.get_updater(session, marketplace)
if not updater:
return
await updater.update(updates)
async def update(self, updates: list[StockUpdate]):
updates_dict = defaultdict(list)
for update in updates:
# Working with marketplaces
stmt = (
select(
MarketplaceProduct.marketplace_id.distinct()
)
.join(Marketplace)
.join(Company)
.where(
MarketplaceProduct.product_id == update.product_id,
MarketplaceProduct.marketplace_id.in_([9, 41])
Marketplace.is_deleted == False,
Company.is_deleted == False,
Company.is_archived == False
)
)
stmt_result = await self.session.execute(stmt)
@@ -52,7 +108,8 @@ class StocksUpdater:
updates_dict[marketplace_id].append(update)
updates_list = list(updates_dict.items())
updates_list = sorted(updates_list, key=lambda x: len(x[1]))
print(updates_list)
return
tasks = []
for marketplace_id, marketplace_updates in updates_list:
tasks.append(self.update_marketplace(marketplace_id, marketplace_updates))

View File

@@ -0,0 +1,13 @@
from queries.general import StockData
from updaters.base import BaseMarketplaceUpdater
class YandexmarketUpdater(BaseMarketplaceUpdater):
def get_update_for_marketplace(self, stock_data: StockData) -> dict:
return {
'sku': str(stock_data['article']),
'warehouseId': self.marketplace.warehouse_id,
'items': [{
'count': stock_data['full_stock'],
}]
}