Files
Fulfillment-Backend/external/marketplace/ozon/core.py
2025-03-20 16:43:03 +03:00

63 lines
1.9 KiB
Python

from typing import AsyncIterator
from external.marketplace.base.core import BaseMarketplaceApi
from models import Marketplace
class OzonMarketplaceApi(BaseMarketplaceApi):
def __init__(self, marketplace: Marketplace):
client_id = marketplace.auth_data.get('Client-Id')
api_key = marketplace.auth_data.get('Api-Key')
if not client_id or not api_key:
raise ValueError(
f"Client-Id or Api-Key is missing for Marketplace ID: {marketplace.id}. "
"Please check the marketplace credentials."
)
self.headers = marketplace.auth_data
self.marketplace = marketplace
@property
def get_headers(self) -> dict:
return self.headers
@property
def base_url(self) -> str:
return "https://api-seller.ozon.ru"
async def get_products(self, data: dict) -> dict:
method = '/v3/product/list'
response = await self._method('POST', method, json=data)
return response
async def get_all_products(self) -> AsyncIterator[dict]:
limit = 100
last_id = ''
while True:
data = {
'limit': limit,
'last_id': last_id,
'filter':{
'visibility': 'ALL',
}
}
response = await self.get_products(data)
if not response:
break
result = response.get('result')
if not result:
break
items = result.get('items')
if not items:
break
for item in items:
yield item
last_id = result.get('last_id')
if not last_id:
break
async def get_products_info(self, data: dict) -> dict:
method = '/v3/product/info/list'
response = await self._method('POST', method, json=data)
return response