From 2e989eb32ad4e62f2a28f8a47dff4950a512450e Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 28 Dec 2020 17:00:49 -0500 Subject: [PATCH 1/3] Applying streaming upload PR from https://github.com/ckan/ckanapi/pull/109 --- README.md | 20 ++++++++++++++++++- ckanapi/cli/main.py | 3 ++- ckanapi/cli/progressbar.py | 32 ++++++++++++++++++++++++++++++ ckanapi/common.py | 21 +++++++++++++++++++- ckanapi/remoteckan.py | 34 +++++++++++++++++++++++++++----- ckanapi/tests/test_cli_action.py | 4 ++-- requirements.txt | 2 ++ setup.py | 2 ++ 8 files changed, 108 insertions(+), 10 deletions(-) create mode 100644 ckanapi/cli/progressbar.py diff --git a/README.md b/README.md index 4362cf8..755399d 100644 --- a/README.md +++ b/README.md @@ -310,7 +310,25 @@ When using `call_action` you must pass file objects separately: ```python mysite.call_action('resource_create', {'package_id': 'my-dataset-with-files'}, - files={'upload': open('/path/to/file/to/upload.csv', 'rb')}) + files={'upload': open('/path/to/file/to/upload.csv', 'rb')}, + progress=make_callback) +``` + +If 'resource_create' is called using `call_action`, the file will be +streamed and not fully loaded into memory before being uploaded to the +server. + +With this usage the additional keyword argument `progress` can be used +to pass in a callable that takes an instance of +`requests_toolbelt.MultipartEncoder` as parameter and returns a +callback funtion. The callback function will be called every time data +is read from the file-to-be-sent and it will be passed the instance of +`requests_toolbelt.MultipartEncoderMonitor`. This monitor has the +attribute `bytes_read` that can be used to display a progress bar. An +example is implemented in +[ckanapi.cli.progressbar](https://github.com/eawag-rdm/ckanapi/blob/streaming_upload/ckanapi/cli/progressbar.py). + + ``` ### Session Control diff --git a/ckanapi/cli/main.py b/ckanapi/cli/main.py index cc13250..41f9ced 100644 --- a/ckanapi/cli/main.py +++ b/ckanapi/cli/main.py @@ -3,7 +3,7 @@ Usage: ckanapi action ACTION_NAME [(KEY=STRING | KEY:JSON | KEY@FILE ) ... | -i | -I JSON_INPUT] - [-j | -J] + [-j | -J] [-P] [[-c CONFIG] [-u USER] | -r SITE_URL [-a APIKEY] [-g] [--insecure]] ckanapi load datasets [--upload-resources] [-I JSONL_INPUT] [-s START] [-m MAX] @@ -57,6 +57,7 @@ -o --update-only update existing records, don't create new records -O --output=JSONL_OUTPUT output to json lines file instead of stdout -p --processes=PROCESSES set the number of worker processes [default: 1] + -P --progressbar display progress bar for file uploads -q --quiet don't display progress messages -r --remote=URL URL of CKAN server for remote actions -s --start-record=START start from record number START, where the first diff --git a/ckanapi/cli/progressbar.py b/ckanapi/cli/progressbar.py new file mode 100644 index 0000000..8ca654b --- /dev/null +++ b/ckanapi/cli/progressbar.py @@ -0,0 +1,32 @@ +# _*_ coding: utf-8 _*_ + +""" +Implements the progress bar for use in +ckanapi.cli.action:action(RemoteCKAN, resource_create, ...) +""" + +from clint.textui.progress import Bar + +def mkprogress(encoder): + """ + Returns a function that can be used as a callback for + :class:`requests_toolbelt.MultipartEncoderMonitor`. + :param encoder: instance of :class:`requests_toolbelt.MultipartEncoder` + """ + expected_size = encoder.len + bar = Bar(expected_size=expected_size, filled_char = '=') + waiting = [False] + + def callback(monitor): + """ + prints progess bar + :param monitor: instance of + :class:`requests_toolbelt.MultipartEncoderMonitor` + """ + if monitor.bytes_read < expected_size: + bar.show(monitor.bytes_read) + elif not waiting[0]: + waiting[0] = True + print ('\nwaiting for server-side processing ...') + + return callback \ No newline at end of file diff --git a/ckanapi/common.py b/ckanapi/common.py index 9870b6f..31b1926 100644 --- a/ckanapi/common.py +++ b/ckanapi/common.py @@ -3,6 +3,7 @@ """ import json +import sys from ckanapi.errors import (CKANAPIError, NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, SearchIndexError, @@ -62,6 +63,23 @@ def is_file_like(v): isinstance(v, tuple) and len(v) >= 2 and hasattr(v[1], 'read')) +def _encodehack(s): + """ + :param s: input string + :type s: :class:str or :class:bytes: or :class:unicode + Yields either UTF-8 decodeable bytestring (PY2) or PY3 unicode str. + """ + if sys.version_info[0] > 2: + if type(s) == str: + return s # type(s) is PY3 + else: + return s.decode('utf-8') # type(s) is PY3 or illegal + elif type(s) == unicode: + return s.encode('utf-8') # It is PY2 :class:unicode + else: + return s.decode('utf-8').encode('utf-8') #type(s) is PY2 or illegal + + def prepare_action(action, data_dict=None, apikey=None, files=None): """ Return action_url, data_json, http_headers @@ -79,7 +97,8 @@ def prepare_action(action, data_dict=None, apikey=None, files=None): continue # assuming missing will work the same as None if isinstance(v, (int, float)): v = str(v) - data_dict[k.encode('utf-8')] = v.encode('utf-8') + #data_dict[k.encode('utf-8')] = v.encode('utf-8') + data_dict[_encodehack(k)] = _encodehack(v) else: data_dict = json.dumps(data_dict).encode('ascii') headers['Content-Type'] = 'application/json' diff --git a/ckanapi/remoteckan.py b/ckanapi/remoteckan.py index 42d7d9e..598d254 100644 --- a/ckanapi/remoteckan.py +++ b/ckanapi/remoteckan.py @@ -17,6 +17,7 @@ PARALLEL_LIMIT = 3 import requests +from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor class RemoteCKAN(object): @@ -53,7 +54,7 @@ def __init__(self, address, apikey=None, user_agent=None, get_only=False, sessio self.parallel_limit = PARALLEL_LIMIT def call_action(self, action, data_dict=None, context=None, apikey=None, - files=None, requests_kwargs=None): + files=None, requests_kwargs=None, progress=None): """ :param action: the action name, e.g. 'package_create' :param data_dict: the dict to pass to the action as JSON, @@ -62,6 +63,13 @@ def call_action(self, action, data_dict=None, context=None, apikey=None, :param apikey: API key for authentication :param files: None or {field-name: file-to-be-sent, ...} :param requests_kwargs: kwargs for requests get/post calls + :param progress: A callable that takes an instance of + :class:`requests_toolbelt.MultipartEncoder` as parameter and returns + a callback funtion. The callback function will be called every time + data is read from the file-to-be-sent and it will be passed the + instance of :class:`requests_toolbelt.MultipartEncoderMonitor`. This + monitor has the attribute `bytes_read` that can be used to display + a progress bar. An example is implemented in ckanapi.cli. This function parses the response from the server as JSON and returns the decoded value. When an error is returned this @@ -84,12 +92,28 @@ def call_action(self, action, data_dict=None, context=None, apikey=None, if self.get_only: status, response = self._request_fn_get(url, data_dict, headers, requests_kwargs) else: - status, response = self._request_fn(url, data, headers, files, requests_kwargs) + status, response = self._request_fn(url, data, headers, files, requests_kwargs, progress) return reverse_apicontroller_action(url, status, response) - def _request_fn(self, url, data, headers, files, requests_kwargs): - r = self.session.post(url, data=data, headers=headers, files=files, - allow_redirects=False, **requests_kwargs) + def _request_fn(self, url, data, headers, files, requests_kwargs, progress): + if files: # use streaming + newfiles = dict([(k, (getattr(files[k], 'name', 'upload_filename'), + files[k])) for k in files]) + intersect = set(data.keys()) & set(newfiles.keys()) + if intersect: + raise CKANAPIError('field-name for files ("{}")' + .format(', '.join(list(intersect))) + + ' cannot also be field name in data_dict.') + data.update(newfiles) + m = MultipartEncoder(data) + if progress: + m = MultipartEncoderMonitor(m, progress(m)) + headers.update({'Content-Type': m.content_type}) + r = self.session.post(url, data=m, headers=headers, + allow_redirects=False, **requests_kwargs) + else: + r = self.session.post(url, data=data, headers=headers, files=files, + allow_redirects=False, **requests_kwargs) # allow_redirects=False because: if a post is redirected (e.g. 301 due # to a http to https redirect), then the second request is made to the # new URL, but *without* the data. This gives a confusing "No request diff --git a/ckanapi/tests/test_cli_action.py b/ckanapi/tests/test_cli_action.py index a1c1d5a..78c8add 100644 --- a/ckanapi/tests/test_cli_action.py +++ b/ckanapi/tests/test_cli_action.py @@ -16,8 +16,8 @@ def __init__(self, expected_name, expected_args, response, self._expected_files = expected_files or {} self._response = response - def call_action(self, name, args, context=None, apikey=None, files=None, - requests_kwargs=None): + def call_action(self, name, args, context=None, apikey=None, progress=None, + files=None, requests_kwargs=None): if name != self._expected_name: return ["wrong name", name, self._expected_name] if args != self._expected_args: diff --git a/requirements.txt b/requirements.txt index 24d8d54..9da43bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ setuptools docopt requests +requests-toolbelt simplejson +clint diff --git a/setup.py b/setup.py index 267526b..58ee55f 100644 --- a/setup.py +++ b/setup.py @@ -8,8 +8,10 @@ 'setuptools', 'docopt', 'requests', + 'requests-toolbelt', 'python-slugify>=1.0', 'six>=1.9,<2.0', + 'clint', ] tests_require=[ 'pyfakefs==3.6.1', From 554170f62e8e1ef32bd5939da5a18b4ada8c8607 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 29 Dec 2020 02:13:48 -0500 Subject: [PATCH 2/3] progressbar --- ckanapi/cli/action.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ckanapi/cli/action.py b/ckanapi/cli/action.py index 8c2b335..25d8c2e 100644 --- a/ckanapi/cli/action.py +++ b/ckanapi/cli/action.py @@ -8,6 +8,7 @@ from ckanapi.cli.utils import compact_json, pretty_json from ckanapi.errors import CLIError +from ckanapi.cli.progressbar import mkprogress def action(ckan, arguments, stdin=None): @@ -18,6 +19,7 @@ def action(ckan, arguments, stdin=None): stdin = getattr(sys.stdin, 'buffer', sys.stdin) file_args = {} + progress = None requests_kwargs = None if arguments['--insecure']: requests_kwargs = {'verify': False} @@ -52,12 +54,14 @@ def action(ckan, arguments, stdin=None): raise CLIError("Error opening %r: %s" % (expanduser(fvalue), e.args[1])) file_args[fkey] = f + if arguments['--progressbar']: + progress = mkprogress else: raise CLIError("argument not in the form KEY=STRING, " "KEY:JSON or KEY@FILE %r" % kv) result = ckan.call_action(arguments['ACTION_NAME'], action_args, - files=file_args, requests_kwargs=requests_kwargs) + progress=progress, files=file_args, requests_kwargs=requests_kwargs) if arguments['--output-jsonl']: if isinstance(result, list): From 1af22a7d6d91dcb7af92ea2ac05bd8ca21cd2460 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 29 Dec 2020 08:59:54 -0500 Subject: [PATCH 3/3] finish progress don't leave it hanging... --- ckanapi/cli/progressbar.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ckanapi/cli/progressbar.py b/ckanapi/cli/progressbar.py index 8ca654b..39990b2 100644 --- a/ckanapi/cli/progressbar.py +++ b/ckanapi/cli/progressbar.py @@ -26,6 +26,7 @@ def callback(monitor): if monitor.bytes_read < expected_size: bar.show(monitor.bytes_read) elif not waiting[0]: + bar.show(expected_size) waiting[0] = True print ('\nwaiting for server-side processing ...')