50 lines
1.4 KiB
Python
50 lines
1.4 KiB
Python
import uuid
|
|
from io import BytesIO
|
|
from typing import Union
|
|
|
|
import aiohttp
|
|
|
|
|
|
class S3Uploader:
|
|
def __init__(self, api_key: str):
|
|
# constants
|
|
self.base_url = 'https://s3.denco.store'
|
|
self.prefix = 'crm'
|
|
self.api_key = api_key
|
|
|
|
self.headers = {
|
|
'X-API-Key': self.api_key,
|
|
}
|
|
|
|
async def _method(self, http_method, method, **kwargs):
|
|
async with aiohttp.ClientSession(headers=self.headers) as session:
|
|
async with session.request(http_method, self.base_url + method, **kwargs) as response:
|
|
return await response.json()
|
|
|
|
async def _create(self):
|
|
method = '/create'
|
|
json_data = {
|
|
'prefix': self.prefix,
|
|
'immediate': True
|
|
}
|
|
return await self._method('POST', method, json=json_data)
|
|
|
|
async def upload(self, file: bytes, file_id: Union[int, None] = None) -> dict:
|
|
"""
|
|
|
|
Args:
|
|
file_id: database id created by _create
|
|
file: bytes of image to upload
|
|
|
|
Returns:
|
|
url to file
|
|
"""
|
|
if not file_id:
|
|
create_response = await self._create()
|
|
file_id = create_response['databaseId']
|
|
data = {
|
|
'file': BytesIO(file),
|
|
}
|
|
method = f'/upload/{file_id}'
|
|
return await self._method('POST', method, data=data)
|