Skip to content

Commit

Permalink
WIP chunk uploads
Browse files Browse the repository at this point in the history
Signed-off-by: Cédric Foellmi <[email protected]>
  • Loading branch information
onekiloparsec committed Apr 13, 2024
1 parent b6df897 commit c1ec70d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 12 deletions.
18 changes: 10 additions & 8 deletions arcsecond/api/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,47 +34,49 @@ def _build_url(self, *args, **filters):
query = '?' + urlencode(filters) if len(filters) > 0 else ''
return url + query

def _list_url(self, **filters):
def get_list_url(self, **filters):
return self._build_url(self.__path, **filters)

def _detail_url(self, uuid_or_id):
def get_detail_url(self, uuid_or_id):
return self._build_url(self.__path, str(uuid_or_id))

def list(self, **filters):
return self._perform_request(self._list_url(**filters), 'get')
return self._perform_request(self.get_list_url(**filters), 'get')

def read(self, id_name_uuid, headers=None):
return self._perform_request(self._detail_url(id_name_uuid),
return self._perform_request(self.get_detail_url(id_name_uuid),
'get',
json=None,
data=None,
headers=headers)

def create(self, json=None, data=None, headers=None):
return self._perform_request(self._list_url(),
return self._perform_request(self.get_list_url(),
'post',
json=json,
data=data,
headers=headers)

def update(self, id_name_uuid, json=None, data=None, headers=None):
return self._perform_request(self._detail_url(id_name_uuid),
return self._perform_request(self.get_detail_url(id_name_uuid),
'patch',
json=json,
data=data,
headers=headers)

def delete(self, id_name_uuid):
return self._perform_request(self._detail_url(id_name_uuid), 'delete')
return self._perform_request(self.get_detail_url(id_name_uuid), 'delete')

def _perform_request(self, url, method_name, json=None, data=None, headers=None):
if self.__config.verbose:
click.echo(f'Sending {method_name} request to {url}')

headers = self._check_and_set_auth_key(headers or {}, url)
method = getattr(requests, method_name.lower())
response = method(url, json=json, data=data, headers=headers, timeout=60)
response = method(url, json=json, data=data, headers=headers, timeout=180)
return self._process_response(response)

def _process_response(self, response):
if isinstance(response, dict):
# Responses of standard JSON payload requests are dict
return response, None
Expand Down
1 change: 1 addition & 0 deletions arcsecond/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, config: ArcsecondConfig, subdomain: str = ''):
self.datapackages = ArcsecondAPIEndpoint(self.config, 'datapackages', self.subdomain)
self.datasets = ArcsecondAPIEndpoint(self.config, 'datasets', self.subdomain)
self.datafiles = ArcsecondAPIEndpoint(self.config, 'datafiles', self.subdomain)
self.datafilechunks = ArcsecondAPIEndpoint(self.config, 'datafilechunks', self.subdomain)

def login(self, username, access_key=None, upload_key=None):
assert access_key or upload_key
Expand Down
52 changes: 48 additions & 4 deletions arcsecond/uploader/uploader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import hashlib
import os
import socket
from datetime import datetime
from pathlib import Path

import requests
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor

from arcsecond import ArcsecondAPI
Expand All @@ -12,6 +14,8 @@
from .errors import UploadRemoteDatasetCheckError, UploadRemoteFileError, UploadRemoteFileTagsError
from .logger import get_oort_logger

SIZE_10MB = 10485760


class FileUploader(object):
def __init__(self, context: Context, root_path: Path, file_path: Path, display_progress: bool = False):
Expand Down Expand Up @@ -78,14 +82,54 @@ def percent_printer(monitor):
return m

def _perform_upload(self):
self._logger.info(f'{self.log_prefix} Start uploading...')
self._status = [Status.UPLOADING, Substatus.UPLOADING, None]

self._logger.info(f'{self.log_prefix} Start uploading {self._file_size} bytes...')
self._started = datetime.now()
self._logger.info(f'{self.log_prefix} Starting upload to Arcsecond ({self._file_size} bytes)')
self._status = [Status.UPLOADING, Substatus.UPLOADING, None]
if self._file_size <= SIZE_10MB: # 10MB in binary = 10 * pow(2, 20) = 10 * pow(1024, 2)
return self._perform_regular_upload()
else:
return self._perform_chunked_upload()

def _perform_regular_upload(self):
data = self.__get_upload_data()
self._datafile, error = self._api.datafiles.create(data=data, headers={"Content-Type": data.content_type})
self._finish_upload(error)

def _perform_chunked_upload(self):
self._hash_md5 = hashlib.md5()

# def chunks(file_name, size=SIZE_10MB):
# with open(file_name, 'rb') as f:
# while content := f.read(size):
# self._hash_md5.update(content)
# yield content

upload_url = self._api.datafilechunks.get_list_url()
print(upload_url)
headers = self._api.datafiles._check_and_set_auth_key({}, upload_url)
offset = 0
with open(self._file_path, 'rb') as f:
while offset < self._file_size:
chunk = f.read(SIZE_10MB)
self._hash_md5.update(chunk)
print('OFFSET', offset)
headers["Content-Range"] = f"bytes {offset}-{offset + SIZE_10MB - 1}/{self._file_size}"
response = requests.put(
upload_url,
headers=headers,
data={"filename": self._file_path.name, 'dataset': self._context.dataset_uuid},
files={'file': chunk},
)
print(response)
upload_url = response.json().get('url')
offset = response.json().get('offset')

print('MD5', self._hash_md5.hexdigest())
response = requests.post(upload_url, data={"md5": self._hash_md5.hexdigest()})
self._datafile, error = self._api.datafiles._process_response / (response)
self._finish_upload(error)

def _finish_upload(self, error):
if not error:
seconds = (datetime.now() - self._started).total_seconds()
self._logger.info(f'{self.log_prefix} Upload duration is {seconds} seconds.')
Expand Down

0 comments on commit c1ec70d

Please sign in to comment.