Files
Sipro-Stocks/marketplaces/wildberries.py
2025-11-04 17:09:19 +03:00

46 lines
1.4 KiB
Python

import json
import logging
from typing import Union
import jwt
from aiohttp import ClientSession, ClientResponse
from backend import config
from database import Marketplace
from marketplaces.base import BaseMarketplaceApi
class WildberriesMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace):
self.marketplace = marketplace
auth_data = json.loads(marketplace.auth_data)
token = auth_data.get('token')
self.is_valid = True
try:
decoded_token = jwt.decode(token, algorithms=["HS256"], options={"verify_signature": False})
except Exception:
logging.error(f"Couldn't decode token for {marketplace.id}")
self.is_valid = False
return
self.limiter_key = str(marketplace.company_id) + str(decoded_token.get('sid'))
self.headers = {
'Authorization': token,
'Content-Type': 'application/json',
'User-Agent': 'wbas_seller.denco.store3547',
'X-Client-Secret': config.WB_SECRET_TOKEN
}
def get_headers(self):
return self.headers
@property
def api_url(self):
return 'https://marketplace-api.wildberries.ru'
async def update_stocks(self, data: Union[list, dict])-> (ClientSession, ClientResponse):
warehouse_id = self.marketplace.warehouse_id.strip()
return await self._method('PUT', f'/api/v3/stocks/{warehouse_id}', data={'stocks': data})