Files
Sipro-Stocks/marketplaces/base.py
2024-07-06 05:47:11 +03:00

44 lines
1.0 KiB
Python

from abc import ABC, abstractmethod
from typing import Literal, Union
import aiohttp
from aiohttp import ClientResponse, TCPConnector, ClientSession
from database import Marketplace
shared_connector = TCPConnector(limit=50000)
class BaseMarketplaceApi(ABC):
session: ClientSession
@abstractmethod
def __init__(self, marketplace: Marketplace):
pass
@abstractmethod
async def update_stocks(self, data: Union[list, dict]):
pass
@abstractmethod
def get_headers(self):
pass
@property
@abstractmethod
def api_url(self):
pass
def init_session(self):
self.session = ClientSession()
async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'],
method: str,
data: dict) -> ClientResponse:
return await self.session.request(
http_method,
f'{self.api_url}{method}',
json=data,
headers=self.get_headers()
)