import uuid from io import BytesIO from typing import Union import aiohttp class S3Uploader: def __init__(self, api_key: str): # constants self.base_url = 'https://s3.denco.store' self.prefix = 'crm' self.api_key = api_key self.headers = { 'X-API-Key': self.api_key, } async def _method(self, http_method, method, **kwargs): async with aiohttp.ClientSession(headers=self.headers) as session: async with session.request(http_method, self.base_url + method, **kwargs) as response: return await response.json() async def _create(self): method = '/create' json_data = { 'prefix': self.prefix, 'immediate': True } return await self._method('POST', method, json=json_data) async def upload(self, file: bytes, file_id: Union[int, None] = None) -> dict: """ Args: file_id: database id created by _create file: bytes of image to upload Returns: url to file """ if not file_id: create_response = await self._create() file_id = create_response['databaseId'] data = { 'file': BytesIO(file), } method = f'/upload/{file_id}' return await self._method('POST', method, data=data)