This commit is contained in:
2024-07-02 08:55:24 +03:00
parent 386ee7e460
commit 7ba3426989
18 changed files with 228 additions and 155 deletions

View File

@@ -1,9 +1,12 @@
import json import asyncio
from asgiref.sync import async_to_sync
from background import celery from background import celery
import background.update
@celery.task(name='test') @celery.task(name='process_update')
def test_task(): def process_update(product_ids: list[int]):
with open('test.json', 'a') as tf: loop = asyncio.get_event_loop()
tf.write(json.dumps({'ok': True})) return loop.run_until_complete(background.update.process_update(product_ids))

14
background/update.py Normal file
View File

@@ -0,0 +1,14 @@
import time
from backend.session import get_session
from schemas.general import StockUpdate
from updaters.stocks_updater import StocksUpdater
async def process_update(product_ids: list[int]):
async for session in get_session():
updates = [StockUpdate(product_id=product_id) for product_id in product_ids]
updater = StocksUpdater(session)
await updater.update(updates)
await session.close()
return {'message': f'Stocks for [{",".join(map(str, product_ids))}] successfully updated'}

View File

@@ -51,6 +51,7 @@ class Marketplace(BaseSiproModel):
sell_from_price: Mapped[bool] = mapped_column() sell_from_price: Mapped[bool] = mapped_column()
warehouses: Mapped[List["Warehouse"]] = relationship(secondary=marketplace_warehouses) warehouses: Mapped[List["Warehouse"]] = relationship(secondary=marketplace_warehouses)
warehouse_id: Mapped[str] = mapped_column()
company_id: Mapped[int] = mapped_column(ForeignKey('companies.id')) company_id: Mapped[int] = mapped_column(ForeignKey('companies.id'))
company: Mapped["Company"] = relationship() company: Mapped["Company"] = relationship()

View File

@@ -24,11 +24,14 @@ class MarketplaceProduct(BaseSiproModel):
product_id: Mapped[int] = mapped_column(ForeignKey("products.id")) product_id: Mapped[int] = mapped_column(ForeignKey("products.id"))
product: Mapped["Product"] = relationship() product: Mapped["Product"] = relationship()
third_additional_article: Mapped[str] = mapped_column()
class SupplierProduct(BaseSiproModel): class SupplierProduct(BaseSiproModel):
__tablename__ = 'supplier_products' __tablename__ = 'supplier_products'
id: Mapped[int] = mapped_column(primary_key=True) id: Mapped[int] = mapped_column(primary_key=True)
supplier_stock: Mapped[int] = mapped_column() supplier_stock: Mapped[int] = mapped_column()
sold_today: Mapped[int] = mapped_column()
supplier_id: Mapped[int] = mapped_column() supplier_id: Mapped[int] = mapped_column()
product_id: Mapped[int] = mapped_column(ForeignKey("products.id")) product_id: Mapped[int] = mapped_column(ForeignKey("products.id"))

54
main.py
View File

@@ -1,43 +1,37 @@
from typing import Annotated from typing import Annotated
from celery.result import AsyncResult from celery.result import AsyncResult
from fastapi import FastAPI, Depends, Body from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import select from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.dialects.postgresql import insert from starlette import status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from backend.session import get_session import background.tasks
from database import DailyStock
from database.sipro import *
from queries.general import get_stocks_data
from background.tasks import * from background.tasks import *
from updaters.stocks_updater import StockUpdate from schemas.general import UpdateRequest, UpdateResponse
app = FastAPI() auth_schema = HTTPBearer()
@app.get("/") async def check_auth(token: Annotated[HTTPAuthorizationCredentials, Depends(auth_schema)]):
async def root( if token.credentials != 'vvHh1QNl7lS6c7OVwmxU1TVNd7DLlc9W810csZGf4rkqOrBy6fQwlhIDZsQZd9hQYZYK47yWv33aCq':
session: Annotated[AsyncSession, Depends(get_session)], raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials')
marketplace_id: int
app = FastAPI(
dependencies=[Depends(check_auth)]
)
@app.post(
'/update',
response_model=UpdateResponse
)
async def update(
request: UpdateRequest
): ):
marketplace = await session.get(Marketplace, marketplace_id, options=[ task = background.tasks.process_update.delay(request.product_ids)
joinedload(Marketplace.warehouses).joinedload(Warehouse.suppliers), return UpdateResponse(task_id=task.id)
joinedload(Marketplace.warehouses).joinedload(Warehouse.company_warehouses),
joinedload(Marketplace.company).joinedload(Company.warehouse)
])
data = await get_stocks_data(session, marketplace)
data = sorted(data, key=lambda x: x['denco_article'])
return {"message": data}
@app.post("/tasks", status_code=201)
def run_task(payload=Body(...)):
task_type = payload["type"]
task = test_task.delay()
return JSONResponse({"task_id": task.id})
@app.get("/tasks/{task_id}") @app.get("/tasks/{task_id}")

View File

@@ -1,3 +1,3 @@
from .ozon import OzonMarketplace from .ozon import OzonMarketplaceApi
from .wildberries import WildberriesMarketplace from .wildberries import WildberriesMarketplaceApi
from .factory import MarketplaceFactory from .factory import MarketplaceApiFactory

View File

@@ -7,7 +7,7 @@ from aiohttp import ClientResponse
from database import Marketplace from database import Marketplace
class BaseJsonMarketplace(ABC): class BaseMarketplaceApi(ABC):
@abstractmethod @abstractmethod
def __init__(self, marketplace: Marketplace): def __init__(self, marketplace: Marketplace):
pass pass
@@ -20,18 +20,17 @@ class BaseJsonMarketplace(ABC):
def get_headers(self): def get_headers(self):
pass pass
@abstractmethod
@property @property
@abstractmethod
def api_url(self): def api_url(self):
pass pass
async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'], async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'],
method: str, method: str,
data: dict) -> ClientResponse: data: dict) -> ClientResponse:
async with aiohttp.ClientSession as session: async with aiohttp.ClientSession() as session:
async with session.request(http_method, return await session.request(http_method,
f'{self.api_url}{method}', f'{self.api_url}{method}',
json=data, json=data,
headers=self.get_headers() headers=self.get_headers()
) as response: )
return response

View File

@@ -2,18 +2,18 @@ from typing import Union
from database import Marketplace from database import Marketplace
from database.sipro.enums.general import BaseMarketplace from database.sipro.enums.general import BaseMarketplace
from .wildberries import WildberriesMarketplace from .wildberries import WildberriesMarketplaceApi
from .ozon import OzonMarketplace from .ozon import OzonMarketplaceApi
class MarketplaceFactory: class MarketplaceApiFactory:
@staticmethod @staticmethod
def get_marketplace(marketplace: Marketplace) -> Union[ def get_marketplace_api(marketplace: Marketplace) -> Union[
WildberriesMarketplace, WildberriesMarketplaceApi,
OzonMarketplace, OzonMarketplaceApi,
]: ]:
match marketplace.base_marketplace: match marketplace.base_marketplace:
case BaseMarketplace.OZON: case BaseMarketplace.OZON:
return OzonMarketplace(marketplace) return OzonMarketplaceApi(marketplace)
case BaseMarketplace.WILDBERRIES: case BaseMarketplace.WILDBERRIES:
return WildberriesMarketplace(marketplace) return WildberriesMarketplaceApi(marketplace)

View File

@@ -1,17 +1,15 @@
import asyncio
import json import json
import logging import logging
from typing import Union from typing import Union
from aiolimiter import AsyncLimiter
from asynciolimiter import StrictLimiter
import utils import utils
from database import Marketplace from database import Marketplace
from limiter import BatchLimiter from limiter import BatchLimiter
from marketplaces.base import BaseJsonMarketplace from marketplaces.base import BaseMarketplaceApi
class OzonMarketplace(BaseJsonMarketplace): class OzonMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace): def __init__(self, marketplace: Marketplace):
self.marketplace = marketplace self.marketplace = marketplace
@@ -25,6 +23,7 @@ class OzonMarketplace(BaseJsonMarketplace):
def get_headers(self): def get_headers(self):
return self.headers return self.headers
@property
def api_url(self): def api_url(self):
return 'https://api-seller.ozon.ru' return 'https://api-seller.ozon.ru'
@@ -33,22 +32,23 @@ class OzonMarketplace(BaseJsonMarketplace):
return return
max_stocks = 100 max_stocks = 100
chunks = utils.chunk_list(data, max_stocks) chunks = utils.chunk_list(data, max_stocks)
limiter = BatchLimiter(max_requests=80, limiter = BatchLimiter(max_requests=80, period=60)
period=60)
for chunk in chunks: async def send_stock_chunk(chunk):
try: try:
await limiter.acquire() await limiter.acquire()
response = await self._method('POST', request_data = {'stocks': chunk}
'/v2/products/stocks', response = await self._method('POST', '/v2/products/stocks', data=request_data)
data=chunk) print(request_data)
response = await response.json() response = await response.json()
# response = await
error_message = response.get('message') error_message = response.get('message')
error_code = response.get('code') error_code = response.get('code')
if error_message: if error_message:
logging.warning( logging.warning(
f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})') f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})')
break
except Exception as e: except Exception as e:
logging.error( logging.error(
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
tasks = [send_stock_chunk(chunk) for chunk in chunks]
await asyncio.gather(*tasks)

View File

@@ -1,3 +1,4 @@
import asyncio
import json import json
import logging import logging
from typing import Union from typing import Union
@@ -5,10 +6,10 @@ from typing import Union
import utils import utils
from database import Marketplace from database import Marketplace
from limiter import BatchLimiter from limiter import BatchLimiter
from marketplaces.base import BaseJsonMarketplace from marketplaces.base import BaseMarketplaceApi
class WildberriesMarketplace(BaseJsonMarketplace): class WildberriesMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace): def __init__(self, marketplace: Marketplace):
self.marketplace = marketplace self.marketplace = marketplace
auth_data = json.loads(marketplace.auth_data) auth_data = json.loads(marketplace.auth_data)
@@ -21,6 +22,7 @@ class WildberriesMarketplace(BaseJsonMarketplace):
def get_headers(self): def get_headers(self):
return self.headers return self.headers
@property
def api_url(self): def api_url(self):
return 'https://suppliers-api.wildberries.ru' return 'https://suppliers-api.wildberries.ru'
@@ -29,21 +31,24 @@ class WildberriesMarketplace(BaseJsonMarketplace):
return return
max_stocks = 1000 max_stocks = 1000
chunks = utils.chunk_list(data, max_stocks) chunks = utils.chunk_list(data, max_stocks)
limiter = BatchLimiter(max_requests=300, limiter = BatchLimiter(max_requests=300, period=60)
period=60)
for chunk in chunks: async def send_stock_chunk(chunk):
try: try:
await limiter.acquire() await limiter.acquire()
response = await self._method('PUT', request_data = {'stocks': chunk}
'/api/v3/stocks/{warehouseId}', response = await self._method('PUT', f'/api/v3/stocks/{self.marketplace.warehouse_id}',
chunk) data=request_data)
if response.status != 204: print(request_data)
if response.status not in [204, 409]:
response = await response.json() response = await response.json()
error_message = response.get('message') error_message = response.get('message')
error_code = response.get('code') error_code = response.get('code')
logging.warning( logging.warning(
f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})') f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})')
break
except Exception as e: except Exception as e:
logging.error( logging.error(
f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}')
tasks = [send_stock_chunk(chunk) for chunk in chunks]
await asyncio.gather(*tasks)

View File

@@ -1,4 +1,5 @@
from typing import Union from dataclasses import dataclass
from typing import Union, TypedDict
from sqlalchemy import select, func, and_, cast, String, case, or_ from sqlalchemy import select, func, and_, cast, String, case, or_
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@@ -9,6 +10,12 @@ from database.sipro import *
from database.sipro.enums.product import ProductRelationType from database.sipro.enums.product import ProductRelationType
class StockData(TypedDict):
full_stock: int
article: Union[str, int]
marketplace_product: MarketplaceProduct
def get_marketplace_suppliers_and_company_warehouses(marketplace: Marketplace): def get_marketplace_suppliers_and_company_warehouses(marketplace: Marketplace):
company = marketplace.company company = marketplace.company
suppliers = set() suppliers = set()
@@ -30,7 +37,7 @@ async def get_stocks_data(
session: AsyncSession, session: AsyncSession,
marketplace: Marketplace, marketplace: Marketplace,
product_ids: Union[list[int], None] = None product_ids: Union[list[int], None] = None
): ) -> List[StockData]:
if not product_ids: if not product_ids:
product_ids = [] product_ids = []
company = marketplace.company company = marketplace.company
@@ -46,7 +53,7 @@ async def get_stocks_data(
supplier_stock_subquery = ( supplier_stock_subquery = (
select( select(
func.greatest( func.greatest(
func.sum(SupplierProduct.supplier_stock) - func.coalesce(DailyStock.sold_today, 0), func.sum(SupplierProduct.supplier_stock - SupplierProduct.sold_today),
0 0
) )
.label('supplier_stock'), .label('supplier_stock'),
@@ -58,10 +65,6 @@ async def get_stocks_data(
.join( .join(
Product Product
) )
.outerjoin(
DailyStock,
DailyStock.product_id == SupplierProduct.product_id
)
.where( .where(
SupplierProduct.supplier_id.in_(supplier_ids) SupplierProduct.supplier_id.in_(supplier_ids)
) )
@@ -286,9 +289,13 @@ async def get_stocks_data(
slaves_stock_subquery.c.product_id == MarketplaceProduct.product_id slaves_stock_subquery.c.product_id == MarketplaceProduct.product_id
) )
) )
print('-------------------------')
print(stmt.compile(compile_kwargs={
'literal_binds': True
}))
result = await session.execute(stmt) result = await session.execute(stmt)
marketplace_products = result.all() marketplace_products = result.all()
result = [] response: List[StockData] = []
for (marketplace_product, for (marketplace_product,
denco_article, denco_article,
price_purchase, price_purchase,
@@ -301,8 +308,8 @@ async def get_stocks_data(
price_recommended, price_recommended,
is_archived) in marketplace_products: is_archived) in marketplace_products:
if is_archived or (sell_from_price > price_recommended): if is_archived or (sell_from_price > price_recommended):
result.append({ response.append({
'denco_article': denco_article, 'article': denco_article,
'full_stock': 0, 'full_stock': 0,
'marketplace_product': marketplace_product, 'marketplace_product': marketplace_product,
}) })
@@ -328,10 +335,10 @@ async def get_stocks_data(
full_stock = 0 full_stock = 0
full_stock = max([0, full_stock]) full_stock = max([0, full_stock])
result.append({ response.append({
'denco_article': denco_article, 'article': denco_article,
'full_stock': full_stock, 'full_stock': full_stock,
'marketplace_product': marketplace_product, 'marketplace_product': marketplace_product,
}) })
return result return response

0
schemas/__init__.py Normal file
View File

20
schemas/general.py Normal file
View File

@@ -0,0 +1,20 @@
from dataclasses import dataclass
from pydantic import BaseModel
@dataclass
class StockUpdate:
product_id: int
class BaseSchema(BaseModel):
pass
class UpdateRequest(BaseSchema):
product_ids: list[int]
class UpdateResponse(BaseSchema):
task_id: str

View File

@@ -1,15 +1,41 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List from typing import List
from sqlalchemy.ext.asyncio import AsyncSession
import queries.general
from database import Marketplace from database import Marketplace
from updaters.stocks_updater import StockUpdate from marketplaces import MarketplaceApiFactory
from marketplaces.base import BaseMarketplaceApi
from queries.general import StockData
from schemas.general import StockUpdate
class BaseMarketplaceUpdater(ABC): class BaseMarketplaceUpdater(ABC):
@abstractmethod marketplace: Marketplace
def __init__(self, marketplace: Marketplace): marketplace_api: BaseMarketplaceApi
pass session: AsyncSession
def __init__(self, marketplace: Marketplace, session: AsyncSession):
self.marketplace = marketplace
self.session = session
self.marketplace_api = MarketplaceApiFactory.get_marketplace_api(marketplace)
@abstractmethod @abstractmethod
async def update(self, updates: List[StockUpdate]): def get_update_for_marketplace(self,
stock_data: StockData) -> dict:
pass pass
async def update(self, updates: List[StockUpdate]):
product_ids = list(set([update.product_id for update in updates]))
stock_data_list = await queries.general.get_stocks_data(
session=self.session,
marketplace=self.marketplace,
product_ids=product_ids
)
return
marketplace_updates = []
for stock_data in stock_data_list:
marketplace_update = self.get_update_for_marketplace(stock_data)
marketplace_updates.append(marketplace_update)
await self.marketplace_api.update_stocks(marketplace_updates)

18
updaters/factory.py Normal file
View File

@@ -0,0 +1,18 @@
from typing import Union
from sqlalchemy.ext.asyncio import AsyncSession
from database import Marketplace
from database.sipro.enums.general import BaseMarketplace
from updaters.ozon_updater import OzonUpdater
from updaters.wildberries_updater import WildberriesUpdater
class UpdaterFactory:
@staticmethod
def get_updater(session: AsyncSession, marketplace: Marketplace) -> Union[OzonUpdater, WildberriesUpdater]:
match marketplace.base_marketplace:
case BaseMarketplace.WILDBERRIES:
return WildberriesUpdater(marketplace, session)
case BaseMarketplace.OZON:
return OzonUpdater(marketplace, session)

View File

@@ -1,14 +1,11 @@
from typing import List from queries.general import StockData
from database import Marketplace
from marketplaces import MarketplaceFactory, OzonMarketplace
from updaters.base import BaseMarketplaceUpdater from updaters.base import BaseMarketplaceUpdater
from updaters.stocks_updater import StockUpdate
class OzonUpdater(BaseMarketplaceUpdater): class OzonUpdater(BaseMarketplaceUpdater):
def __init__(self, marketplace: Marketplace): def get_update_for_marketplace(self, data: StockData) -> dict:
self.ozon_marketplace: OzonMarketplace = MarketplaceFactory.get_marketplace(marketplace) return {
'offer_id': str(data['article']),
async def update(self, updates: List[StockUpdate]): 'stock': 0, # $data['full_stock'],
pass 'warehouse_id': self.marketplace.warehouse_id
}

View File

@@ -1,53 +1,47 @@
import asyncio
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass
from enum import unique, IntEnum from enum import unique, IntEnum
from typing import List, Union from typing import List
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
import database from database import Marketplace, MarketplaceProduct, Warehouse, Company
from database import Marketplace, MarketplaceProduct, DailyStock from schemas.general import StockUpdate
from updaters.factory import UpdaterFactory
@unique
class StockUpdateType(IntEnum):
SALE = 0
SUPPLIER_UPDATE = 1
WAREHOUSE_UPDATE = 2
@dataclass
class StockUpdate:
product_id: int
type: StockUpdateType
quantity: int
class StocksUpdater: class StocksUpdater:
def __init__(self, session: AsyncSession): def __init__(self, session: AsyncSession):
self.session = session self.session = session
async def get_marketplace(self, marketplace_id: int):
marketplace = await self.session.get(Marketplace, marketplace_id, options=[
joinedload(Marketplace.warehouses).joinedload(Warehouse.suppliers),
joinedload(Marketplace.warehouses).joinedload(Warehouse.company_warehouses),
joinedload(Marketplace.company).joinedload(Company.warehouse)
])
return marketplace
async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]): async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]):
pass marketplace = await self.get_marketplace(marketplace_id)
updater = UpdaterFactory.get_updater(self.session, marketplace)
if not updater:
return
await updater.update(updates)
async def update(self, updates: list[StockUpdate]): async def update(self, updates: list[StockUpdate]):
updates_dict = defaultdict(list) updates_dict = defaultdict(list)
stock_update_values = []
for update in updates: for update in updates:
# Working with sold today
if update.type == StockUpdateType.SALE:
stock_update_values.append({
'product_id': update.product_id,
'sold_today': update.quantity
})
# Working with marketplaces # Working with marketplaces
stmt = ( stmt = (
select( select(
MarketplaceProduct.marketplace_id.distinct() MarketplaceProduct.marketplace_id.distinct()
) )
.where( .where(
MarketplaceProduct.product_id == update.product_id MarketplaceProduct.product_id == update.product_id,
MarketplaceProduct.marketplace_id.in_([9, 41])
) )
) )
stmt_result = await self.session.execute(stmt) stmt_result = await self.session.execute(stmt)
@@ -57,27 +51,9 @@ class StocksUpdater:
for marketplace_id in marketplace_ids: for marketplace_id in marketplace_ids:
updates_dict[marketplace_id].append(update) updates_dict[marketplace_id].append(update)
updates_list = list(updates_dict.items()) updates_list = list(updates_dict.items())
updates_list = sorted(updates_list, key=lambda x: x[1]) updates_list = sorted(updates_list, key=lambda x: len(x[1]))
# Updating DailyStock-s
insert_stmt = (
insert(
DailyStock
)
.values(
stock_update_values
)
)
insert_stmt = (
insert_stmt.on_conflict_do_update(
index_elements=['product_id'],
set_={
'sold_today': DailyStock.sold_today + insert_stmt.excluded.sold_today
}
)
)
await self.session.execute(insert_stmt)
await self.session.commit()
tasks = []
for marketplace_id, marketplace_updates in updates_list: for marketplace_id, marketplace_updates in updates_list:
await self.update_marketplace(marketplace_id, marketplace_updates) tasks.append(self.update_marketplace(marketplace_id, marketplace_updates))
await asyncio.gather(*tasks)

View File

@@ -1 +1,11 @@
from queries.general import StockData
from updaters.base import BaseMarketplaceUpdater
class WildberriesUpdater(BaseMarketplaceUpdater):
def get_update_for_marketplace(self, stock_data: StockData) -> dict:
return {
'sku': stock_data['marketplace_product'].third_additional_article,
'amount': 0 # stock_data['full_stock']
}