Skip to content

Support custom serialization #606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Nov 2, 2016
Merged

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Oct 25, 2016

This changes the protocol to support custom serialization methods on a per-type basis.

Fixes #604 which also includes an explanation of the approach taken here.

As a proof of concept I implemented a custom serialization for numpy ndarrays that both uses blosc (if available) and avoids the two memory copies in pickle.dumps.

This changes the protocol to support custom serialization methods on a
per-type basis.

Fixes dask#604
This avoids a circular import
@mrocklin
Copy link
Member Author

header['keys'].append(key)
out_frames.extend(frames)

out_frames = [bytes(f) for f in out_frames]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This possible copy should become unnecessary if we can get something like tornadoweb/tornado#1691 in to tornado



default = config.get('compression', 'auto')
if default != 'auto':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'auto' doesn't do anything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If config['compression'] == 'auto' then we use the default_compression determined above during the import process.

yield write(stream, response)
yield gen.sleep(interval)
except (OSError, IOError, StreamClosedError):
with log_errors(pdb=True):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we shouldn't keep this in production code.

else:
return result
else:
if isinstance(x, pickle_types) or b'__main__' not in result:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure what pickle_types is here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally these were types for which pickle definitely worked and where we didn't want to use cloudpickle. This notably included numpy arrays. I must have lost the variable while moving around code. Clearly this code isn't covered though and needs to be fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

return header, frames

def deserialize_bytes(header, frames):
return b''.join(frames) # the frames may be cut up in transit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say "in transit", it's in case some level of compression is enabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are various reasons to cut up large bytestrings. I think in the current case we do need this for compression, yes.

Arguably we should be able to handle this internally though without the custom functions being aware of it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is maybe fixed in 05198a8

header['keys'].append(key)
out_frames.extend(frames)

for key, (head, frames) in pre.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't pre-serialized items get automatic compression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the frames should already be compressed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now tested in 9d68847


def deserialize_numpy_ndarray(header, frames):
with log_errors():
assert len(frames) == 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to match distributed.protocol.core.decompress(), which returns an arbitrary number of frames.

@pitrou
Copy link
Member

pitrou commented Oct 31, 2016

I'm a bit confused trying to follow the layers of protocol encoding. Whose responsiblity is it to call to_serialize()?

@mrocklin
Copy link
Member Author

I'm a bit confused trying to follow the layers of protocol encoding. Whose responsiblity is it to call to_serialize()?

Application code like the worker and client functions. For example

class Worker:
    def get_data(self, keys):
        return {key: to_serialize(self.data[key]) for key in keys}

@pitrou
Copy link
Member

pitrou commented Nov 1, 2016

Application code like the worker and client functions.

What happens if a dask graph produces e.g. intermediate results as Numpy arrays (think about rechunk()). Do those get the special serialization treatment as well when shuffled around between workers?

@mrocklin
Copy link
Member Author

mrocklin commented Nov 1, 2016

Yes, worker-worker communication is actually handled by exactly the function I used above:

class Worker:
    def get_data(self, keys):
        return {key: to_serialize(self.data[key]) for key in keys}

So those messages contain data in custom serialized form. Any time you don't wrap a part of a message within to_serialize you are implying that it can be serialized with msgpack.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 1, 2016

I've added a test to demonstrate inter-worker communication works fine.

return typ.__module__ + '.' + typ.__name__


def serialize(x):
Copy link
Member

@pitrou pitrou Nov 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the case where x is e.g. a tuple of Numpy arrays (instead of a single array) isn't covered by this... Does such a case never happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. We would fall back to pickle in that case. Perhaps we should dive within standard containers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm going to wait on this until it comes up in practice. There are some tricky things to consider (how to avoid type checking all entries of long lists of text for example) and I'd like to ensure that what's in here handles itself well.

@minrk
Copy link
Contributor

minrk commented Nov 1, 2016

Nice! This is a lot like IPython's custom serialization. How do you propagate the custom deserializer to peers, so they know how to deserialize a custom-serialized message?

@mrocklin
Copy link
Member Author

mrocklin commented Nov 1, 2016

Each piece of serialized data comes along with a header that mentions the kind used, the compression per frame, the length per frame, etc.. Currently we use serialization by type and use the 'modulename.classname' string as an identifier.

@minrk
Copy link
Contributor

minrk commented Nov 1, 2016

Right - I think I meant how does the field in deserializers get populated on the destination? I get how the key gets there, just not the value.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 1, 2016

Ah, we currently assume that it is imported on the client.

This works fine for types that we manage, like numpy arrays.

We could probably use something like worker environments (in a stalled PR) to distribute custom serialization functions

@mrocklin mrocklin closed this Nov 1, 2016
@mrocklin mrocklin reopened this Nov 1, 2016
@mrocklin
Copy link
Member Author

mrocklin commented Nov 1, 2016

I would like to merge this soon.

@@ -76,8 +77,6 @@ def gather_from_workers(who_has, deserialize=True, rpc=rpc, close=True,
bad_addresses |= {v for k, v in rev.items() if k not in response}
results.update(merge(response))

if deserialize:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this map the deserialize parameter is now obsolete on this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@@ -492,6 +492,10 @@ def ensure_bytes(s):
"""
if isinstance(s, bytes):
return s
if isinstance(s, memoryview):
return s.tobytes()
if isinstance(s, buffer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer doesn't exist on Python 3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guarded. Also added this to tests

@mrocklin
Copy link
Member Author

mrocklin commented Nov 2, 2016

Merging.

@mrocklin mrocklin merged commit a9bb18a into dask:master Nov 2, 2016
@mrocklin mrocklin deleted the protocol-serialize branch November 2, 2016 12:46

try:
import blosc
n = blosc.set_nthreads(2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this hard-coded to 2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want some benefit from threads, but don't want to take them all (dask itself does parallelism) Two seemed like a decent default (more decent than "all"). Do you have other thoughts on what this should be?

Copy link
Member

@jakirkham jakirkham Nov 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a sensible default is fine. Having it hard-coded can be problematic though.

Is there some way for the user to override things like this for dask already? If so, my suggestion would be to expose this through the same mechanism. If not, maybe we should discuss how this and any other hard-coded values can be tuned through a config file or similar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could place this in the ~/.dask/config file. Users can also call blosc.set_threads themselves. It's possible that blosc itself should have some place to make this configurable instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could place this in the ~/.dask/config file.

This seems like the most desirable option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I interest you in raising an issue or perhaps a short PR? Relevant files is distributed/config.py and an example is here:

distributed/scheduler.py:BANDWIDTH = config.get('bandwidth', 100e6)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed done in issue ( #1054 ).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants