Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,25 @@ When using `call_action` you must pass file objects separately:
```python
mysite.call_action('resource_create',
{'package_id': 'my-dataset-with-files'},
files={'upload': open('/path/to/file/to/upload.csv', 'rb')})
files={'upload': open('/path/to/file/to/upload.csv', 'rb')},
progress=make_callback)
```

If 'resource_create' is called using `call_action`, the file will be
streamed and not fully loaded into memory before being uploaded to the
server.

With this usage the additional keyword argument `progress` can be used
to pass in a callable that takes an instance of
`requests_toolbelt.MultipartEncoder` as parameter and returns a
callback funtion. The callback function will be called every time data
is read from the file-to-be-sent and it will be passed the instance of
`requests_toolbelt.MultipartEncoderMonitor`. This monitor has the
attribute `bytes_read` that can be used to display a progress bar. An
example is implemented in
[ckanapi.cli.progressbar](https://github.com/eawag-rdm/ckanapi/blob/streaming_upload/ckanapi/cli/progressbar.py).


```

### Session Control
Expand Down
6 changes: 5 additions & 1 deletion ckanapi/cli/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ckanapi.cli.utils import compact_json, pretty_json
from ckanapi.errors import CLIError
from ckanapi.cli.progressbar import mkprogress


def action(ckan, arguments, stdin=None):
Expand All @@ -18,6 +19,7 @@ def action(ckan, arguments, stdin=None):
stdin = getattr(sys.stdin, 'buffer', sys.stdin)

file_args = {}
progress = None
requests_kwargs = None
if arguments['--insecure']:
requests_kwargs = {'verify': False}
Expand Down Expand Up @@ -52,12 +54,14 @@ def action(ckan, arguments, stdin=None):
raise CLIError("Error opening %r: %s" %
(expanduser(fvalue), e.args[1]))
file_args[fkey] = f
if arguments['--progressbar']:
progress = mkprogress
else:
raise CLIError("argument not in the form KEY=STRING, "
"KEY:JSON or KEY@FILE %r" % kv)

result = ckan.call_action(arguments['ACTION_NAME'], action_args,
files=file_args, requests_kwargs=requests_kwargs)
progress=progress, files=file_args, requests_kwargs=requests_kwargs)

if arguments['--output-jsonl']:
if isinstance(result, list):
Expand Down
3 changes: 2 additions & 1 deletion ckanapi/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Usage:
ckanapi action ACTION_NAME
[(KEY=STRING | KEY:JSON | KEY@FILE ) ... | -i | -I JSON_INPUT]
[-j | -J]
[-j | -J] [-P]
[[-c CONFIG] [-u USER] | -r SITE_URL [-a APIKEY] [-g] [--insecure]]
ckanapi load datasets
[--upload-resources] [-I JSONL_INPUT] [-s START] [-m MAX]
Expand Down Expand Up @@ -57,6 +57,7 @@
-o --update-only update existing records, don't create new records
-O --output=JSONL_OUTPUT output to json lines file instead of stdout
-p --processes=PROCESSES set the number of worker processes [default: 1]
-P --progressbar display progress bar for file uploads
-q --quiet don't display progress messages
-r --remote=URL URL of CKAN server for remote actions
-s --start-record=START start from record number START, where the first
Expand Down
33 changes: 33 additions & 0 deletions ckanapi/cli/progressbar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# _*_ coding: utf-8 _*_

"""
Implements the progress bar for use in
ckanapi.cli.action:action(RemoteCKAN, resource_create, ...)
"""

from clint.textui.progress import Bar

def mkprogress(encoder):
"""
Returns a function that can be used as a callback for
:class:`requests_toolbelt.MultipartEncoderMonitor`.
:param encoder: instance of :class:`requests_toolbelt.MultipartEncoder`
"""
expected_size = encoder.len
bar = Bar(expected_size=expected_size, filled_char = '=')
waiting = [False]

def callback(monitor):
"""
prints progess bar
:param monitor: instance of
:class:`requests_toolbelt.MultipartEncoderMonitor`
"""
if monitor.bytes_read < expected_size:
bar.show(monitor.bytes_read)
elif not waiting[0]:
bar.show(expected_size)
waiting[0] = True
print ('\nwaiting for server-side processing ...')

return callback
21 changes: 20 additions & 1 deletion ckanapi/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import json
import sys

from ckanapi.errors import (CKANAPIError, NotAuthorized, NotFound,
ValidationError, SearchQueryError, SearchError, SearchIndexError,
Expand Down Expand Up @@ -62,6 +63,23 @@ def is_file_like(v):
isinstance(v, tuple) and len(v) >= 2 and hasattr(v[1], 'read'))


def _encodehack(s):
"""
:param s: input string
:type s: :class:str or :class:bytes: or :class:unicode
Yields either UTF-8 decodeable bytestring (PY2) or PY3 unicode str.
"""
if sys.version_info[0] > 2:
if type(s) == str:
return s # type(s) is PY3<str>
else:
return s.decode('utf-8') # type(s) is PY3<bytes> or illegal
elif type(s) == unicode:
return s.encode('utf-8') # It is PY2 :class:unicode
else:
return s.decode('utf-8').encode('utf-8') #type(s) is PY2<str> or illegal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just to raise an error for invalid types?



def prepare_action(action, data_dict=None, apikey=None, files=None):
"""
Return action_url, data_json, http_headers
Expand All @@ -79,7 +97,8 @@ def prepare_action(action, data_dict=None, apikey=None, files=None):
continue # assuming missing will work the same as None
if isinstance(v, (int, float)):
v = str(v)
data_dict[k.encode('utf-8')] = v.encode('utf-8')
#data_dict[k.encode('utf-8')] = v.encode('utf-8')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#data_dict[k.encode('utf-8')] = v.encode('utf-8')

data_dict[_encodehack(k)] = _encodehack(v)
else:
data_dict = json.dumps(data_dict).encode('ascii')
headers['Content-Type'] = 'application/json'
Expand Down
34 changes: 29 additions & 5 deletions ckanapi/remoteckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
PARALLEL_LIMIT = 3

import requests
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor


class RemoteCKAN(object):
Expand Down Expand Up @@ -53,7 +54,7 @@ def __init__(self, address, apikey=None, user_agent=None, get_only=False, sessio
self.parallel_limit = PARALLEL_LIMIT

def call_action(self, action, data_dict=None, context=None, apikey=None,
files=None, requests_kwargs=None):
files=None, requests_kwargs=None, progress=None):
"""
:param action: the action name, e.g. 'package_create'
:param data_dict: the dict to pass to the action as JSON,
Expand All @@ -62,6 +63,13 @@ def call_action(self, action, data_dict=None, context=None, apikey=None,
:param apikey: API key for authentication
:param files: None or {field-name: file-to-be-sent, ...}
:param requests_kwargs: kwargs for requests get/post calls
:param progress: A callable that takes an instance of
:class:`requests_toolbelt.MultipartEncoder` as parameter and returns
a callback funtion. The callback function will be called every time
data is read from the file-to-be-sent and it will be passed the
instance of :class:`requests_toolbelt.MultipartEncoderMonitor`. This
monitor has the attribute `bytes_read` that can be used to display
a progress bar. An example is implemented in ckanapi.cli.

This function parses the response from the server as JSON and
returns the decoded value. When an error is returned this
Expand All @@ -84,12 +92,28 @@ def call_action(self, action, data_dict=None, context=None, apikey=None,
if self.get_only:
status, response = self._request_fn_get(url, data_dict, headers, requests_kwargs)
else:
status, response = self._request_fn(url, data, headers, files, requests_kwargs)
status, response = self._request_fn(url, data, headers, files, requests_kwargs, progress)
return reverse_apicontroller_action(url, status, response)

def _request_fn(self, url, data, headers, files, requests_kwargs):
r = self.session.post(url, data=data, headers=headers, files=files,
allow_redirects=False, **requests_kwargs)
def _request_fn(self, url, data, headers, files, requests_kwargs, progress):
if files: # use streaming
newfiles = dict([(k, (getattr(files[k], 'name', 'upload_filename'),
files[k])) for k in files])
intersect = set(data.keys()) & set(newfiles.keys())
if intersect:
raise CKANAPIError('field-name for files ("{}")'
.format(', '.join(list(intersect))) +
' cannot also be field name in data_dict.')
data.update(newfiles)
m = MultipartEncoder(data)
if progress:
m = MultipartEncoderMonitor(m, progress(m))
headers.update({'Content-Type': m.content_type})
r = self.session.post(url, data=m, headers=headers,
allow_redirects=False, **requests_kwargs)
else:
r = self.session.post(url, data=data, headers=headers, files=files,
allow_redirects=False, **requests_kwargs)
# allow_redirects=False because: if a post is redirected (e.g. 301 due
# to a http to https redirect), then the second request is made to the
# new URL, but *without* the data. This gives a confusing "No request
Expand Down
4 changes: 2 additions & 2 deletions ckanapi/tests/test_cli_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ def __init__(self, expected_name, expected_args, response,
self._expected_files = expected_files or {}
self._response = response

def call_action(self, name, args, context=None, apikey=None, files=None,
requests_kwargs=None):
def call_action(self, name, args, context=None, apikey=None, progress=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new parameters at the end, just in case

files=None, requests_kwargs=None):
if name != self._expected_name:
return ["wrong name", name, self._expected_name]
if args != self._expected_args:
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
setuptools
docopt
requests
requests-toolbelt
simplejson
clint
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
'setuptools',
'docopt',
'requests',
'requests-toolbelt',
'python-slugify>=1.0',
'six>=1.9,<2.0',
'clint',
]
tests_require=[
'pyfakefs==3.6.1',
Expand Down