temp images on products

This commit is contained in:
2024-05-24 09:45:54 +03:00
parent 3a4f91d751
commit 88679089b6
11 changed files with 115 additions and 31 deletions

0
external/__init__.py vendored Normal file
View File

0
external/s3_uploader/__init__.py vendored Normal file
View File

49
external/s3_uploader/uploader.py vendored Normal file
View File

@@ -0,0 +1,49 @@
import uuid
from io import BytesIO
from typing import Union
import aiohttp
class S3Uploader:
def __init__(self, api_key: str):
# constants
self.base_url = 'https://s3.denco.store'
self.prefix = 'crm'
self.api_key = api_key
self.headers = {
'X-API-Key': self.api_key,
}
async def _method(self, http_method, method, **kwargs):
async with aiohttp.ClientSession(headers=self.headers) as session:
async with session.request(http_method, self.base_url + method, **kwargs) as response:
return await response.json()
async def _create(self):
method = '/create'
json_data = {
'prefix': self.prefix,
'immediate': True
}
return await self._method('POST', method, json=json_data)
async def upload(self, file: bytes, file_id: Union[int, None] = None) -> dict:
"""
Args:
file_id: database id created by _create
file: bytes of image to upload
Returns:
url to file
"""
if not file_id:
create_response = await self._create()
file_id = create_response['databaseId']
data = {
'file': BytesIO(file),
}
method = f'/upload/{file_id}'
return await self._method('POST', method, data=data)