Skip to content

Commit 1850d8e

Browse files
committed
Move opinions about encoding from communication to timely
1 parent 64a52db commit 1850d8e

File tree

15 files changed

+297
-208
lines changed

15 files changed

+297
-208
lines changed

communication/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ default = ["getopts"]
1818

1919
[dependencies]
2020
getopts = { version = "0.2.14", optional = true }
21-
bincode = { version = "1.0" }
2221
byteorder = "1.5"
2322
serde = { version = "1.0", features = ["derive"] }
2423
timely_bytes = { path = "../bytes", version = "0.12" }

communication/examples/comm_hello.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,26 @@
11
extern crate timely_communication;
22

3-
use std::ops::Deref;
4-
use timely_communication::{Message, Allocate};
3+
use timely_communication::{Allocate, Bytesable};
4+
5+
/// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
6+
pub struct Message {
7+
/// Text contents.
8+
pub payload: String,
9+
}
10+
11+
impl Bytesable for Message {
12+
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
13+
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
14+
}
15+
16+
fn length_in_bytes(&self) -> usize {
17+
self.payload.len()
18+
}
19+
20+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
21+
writer.write_all(self.payload.as_bytes()).unwrap();
22+
}
23+
}
524

625
fn main() {
726

@@ -16,7 +35,7 @@ fn main() {
1635

1736
// send typed data along each channel
1837
for i in 0 .. allocator.peers() {
19-
senders[i].send(Message::from_typed(format!("hello, {}", i)));
38+
senders[i].send(Message { payload: format!("hello, {}", i)});
2039
senders[i].done();
2140
}
2241

@@ -28,7 +47,7 @@ fn main() {
2847
allocator.receive();
2948

3049
if let Some(message) = receiver.recv() {
31-
println!("worker {}: received: <{}>", allocator.index(), message.deref());
50+
println!("worker {}: received: <{}>", allocator.index(), message.payload);
3251
received += 1;
3352
}
3453

communication/src/allocator/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub mod counters;
1717

1818
pub mod zero_copy;
1919

20-
use crate::{Push, Pull};
20+
use crate::{Bytesable, Push, Pull};
2121

2222
/// A proto-allocator, which implements `Send` and can be completed with `build`.
2323
///
@@ -33,11 +33,10 @@ pub trait AllocateBuilder : Send {
3333
}
3434

3535
use std::any::Any;
36-
use crate::message::Bytesable;
3736

3837
/// A type that can be sent along an allocated channel.
39-
pub trait Exchangeable : Send+Sync+Any+Bytesable+'static { }
40-
impl<T: Send+Sync+Any+Bytesable+'static> Exchangeable for T { }
38+
pub trait Exchangeable : Send+Any+Bytesable+'static { }
39+
impl<T: Send+Any+Bytesable+'static> Exchangeable for T { }
4140

4241
/// A type capable of allocating channels.
4342
///

communication/src/allocator/process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl Process {
110110
impl Allocate for Process {
111111
fn index(&self) -> usize { self.index }
112112
fn peers(&self) -> usize { self.peers }
113-
fn allocate<T: Any+Send+Sync+'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
113+
fn allocate<T: Any+Send+'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
114114

115115
// this is race-y global initialisation of all channels for all workers, performed by the
116116
// first worker that enters this critical section

communication/src/allocator/zero_copy/push_pull.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ use bytes::arc::Bytes;
88

99
use crate::allocator::canary::Canary;
1010
use crate::networking::MessageHeader;
11-
use crate::message::Bytesable;
12-
use crate::{Push, Pull};
11+
use crate::{Bytesable, Push, Pull};
1312

1413
use super::bytes_exchange::{BytesPush, SendEndpoint};
1514

communication/src/initialize.rs

Lines changed: 124 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -182,47 +182,71 @@ impl Config {
182182
///
183183
/// # Examples
184184
/// ```
185-
/// use timely_communication::Allocate;
186-
///
187-
/// // configure for two threads, just one process.
188-
/// let config = timely_communication::Config::Process(2);
189-
///
190-
/// // initializes communication, spawns workers
191-
/// let guards = timely_communication::initialize(config, |mut allocator| {
192-
/// println!("worker {} started", allocator.index());
193-
///
194-
/// // allocates a pair of senders list and one receiver.
195-
/// let (mut senders, mut receiver) = allocator.allocate(0);
196-
///
197-
/// // send typed data along each channel
198-
/// use timely_communication::Message;
199-
/// senders[0].send(Message::from_typed(format!("hello, {}", 0)));
200-
/// senders[1].send(Message::from_typed(format!("hello, {}", 1)));
201-
///
202-
/// // no support for termination notification,
203-
/// // we have to count down ourselves.
204-
/// let mut expecting = 2;
205-
/// while expecting > 0 {
206-
/// allocator.receive();
207-
/// if let Some(message) = receiver.recv() {
208-
/// use std::ops::Deref;
209-
/// println!("worker {}: received: <{}>", allocator.index(), message.deref());
210-
/// expecting -= 1;
211-
/// }
212-
/// allocator.release();
185+
/// use timely_communication::{Allocate, Bytesable};
186+
///
187+
/// /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
188+
/// pub struct Message {
189+
/// /// Text contents.
190+
/// pub payload: String,
191+
/// }
192+
///
193+
/// impl Bytesable for Message {
194+
/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
195+
/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
213196
/// }
214-
///
215-
/// // optionally, return something
216-
/// allocator.index()
217-
/// });
218-
///
219-
/// // computation runs until guards are joined or dropped.
220-
/// if let Ok(guards) = guards {
221-
/// for guard in guards.join() {
222-
/// println!("result: {:?}", guard);
197+
///
198+
/// fn length_in_bytes(&self) -> usize {
199+
/// self.payload.len()
200+
/// }
201+
///
202+
/// fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
203+
/// writer.write_all(self.payload.as_bytes()).unwrap();
204+
/// }
205+
/// }
206+
///
207+
/// fn main() {
208+
///
209+
/// // extract the configuration from user-supplied arguments, initialize the computation.
210+
/// let config = timely_communication::Config::from_args(std::env::args()).unwrap();
211+
/// let guards = timely_communication::initialize(config, |mut allocator| {
212+
///
213+
/// println!("worker {} of {} started", allocator.index(), allocator.peers());
214+
///
215+
/// // allocates a pair of senders list and one receiver.
216+
/// let (mut senders, mut receiver) = allocator.allocate(0);
217+
///
218+
/// // send typed data along each channel
219+
/// for i in 0 .. allocator.peers() {
220+
/// senders[i].send(Message { payload: format!("hello, {}", i)});
221+
/// senders[i].done();
222+
/// }
223+
///
224+
/// // no support for termination notification,
225+
/// // we have to count down ourselves.
226+
/// let mut received = 0;
227+
/// while received < allocator.peers() {
228+
///
229+
/// allocator.receive();
230+
///
231+
/// if let Some(message) = receiver.recv() {
232+
/// println!("worker {}: received: <{}>", allocator.index(), message.payload);
233+
/// received += 1;
234+
/// }
235+
///
236+
/// allocator.release();
237+
/// }
238+
///
239+
/// allocator.index()
240+
/// });
241+
///
242+
/// // computation runs until guards are joined or dropped.
243+
/// if let Ok(guards) = guards {
244+
/// for guard in guards.join() {
245+
/// println!("result: {:?}", guard);
246+
/// }
223247
/// }
248+
/// else { println!("error in computation"); }
224249
/// }
225-
/// else { println!("error in computation"); }
226250
/// ```
227251
///
228252
/// The should produce output like:
@@ -254,47 +278,71 @@ pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
254278
///
255279
/// # Examples
256280
/// ```
257-
/// use timely_communication::Allocate;
258-
///
259-
/// // configure for two threads, just one process.
260-
/// let builders = timely_communication::allocator::process::Process::new_vector(2);
261-
///
262-
/// // initializes communication, spawns workers
263-
/// let guards = timely_communication::initialize_from(builders, Box::new(()), |mut allocator| {
264-
/// println!("worker {} started", allocator.index());
265-
///
266-
/// // allocates a pair of senders list and one receiver.
267-
/// let (mut senders, mut receiver) = allocator.allocate(0);
268-
///
269-
/// // send typed data along each channel
270-
/// use timely_communication::Message;
271-
/// senders[0].send(Message::from_typed(format!("hello, {}", 0)));
272-
/// senders[1].send(Message::from_typed(format!("hello, {}", 1)));
273-
///
274-
/// // no support for termination notification,
275-
/// // we have to count down ourselves.
276-
/// let mut expecting = 2;
277-
/// while expecting > 0 {
278-
/// allocator.receive();
279-
/// if let Some(message) = receiver.recv() {
280-
/// use std::ops::Deref;
281-
/// println!("worker {}: received: <{}>", allocator.index(), message.deref());
282-
/// expecting -= 1;
283-
/// }
284-
/// allocator.release();
281+
/// use timely_communication::{Allocate, Bytesable};
282+
///
283+
/// /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
284+
/// pub struct Message {
285+
/// /// Text contents.
286+
/// pub payload: String,
287+
/// }
288+
///
289+
/// impl Bytesable for Message {
290+
/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
291+
/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
285292
/// }
286-
///
287-
/// // optionally, return something
288-
/// allocator.index()
289-
/// });
290-
///
291-
/// // computation runs until guards are joined or dropped.
292-
/// if let Ok(guards) = guards {
293-
/// for guard in guards.join() {
294-
/// println!("result: {:?}", guard);
293+
///
294+
/// fn length_in_bytes(&self) -> usize {
295+
/// self.payload.len()
296+
/// }
297+
///
298+
/// fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
299+
/// writer.write_all(self.payload.as_bytes()).unwrap();
300+
/// }
301+
/// }
302+
///
303+
/// fn main() {
304+
///
305+
/// // extract the configuration from user-supplied arguments, initialize the computation.
306+
/// let config = timely_communication::Config::from_args(std::env::args()).unwrap();
307+
/// let guards = timely_communication::initialize(config, |mut allocator| {
308+
///
309+
/// println!("worker {} of {} started", allocator.index(), allocator.peers());
310+
///
311+
/// // allocates a pair of senders list and one receiver.
312+
/// let (mut senders, mut receiver) = allocator.allocate(0);
313+
///
314+
/// // send typed data along each channel
315+
/// for i in 0 .. allocator.peers() {
316+
/// senders[i].send(Message { payload: format!("hello, {}", i)});
317+
/// senders[i].done();
318+
/// }
319+
///
320+
/// // no support for termination notification,
321+
/// // we have to count down ourselves.
322+
/// let mut received = 0;
323+
/// while received < allocator.peers() {
324+
///
325+
/// allocator.receive();
326+
///
327+
/// if let Some(message) = receiver.recv() {
328+
/// println!("worker {}: received: <{}>", allocator.index(), message.payload);
329+
/// received += 1;
330+
/// }
331+
///
332+
/// allocator.release();
333+
/// }
334+
///
335+
/// allocator.index()
336+
/// });
337+
///
338+
/// // computation runs until guards are joined or dropped.
339+
/// if let Ok(guards) = guards {
340+
/// for guard in guards.join() {
341+
/// println!("result: {:?}", guard);
342+
/// }
295343
/// }
344+
/// else { println!("error in computation"); }
296345
/// }
297-
/// else { println!("error in computation"); }
298346
/// ```
299347
pub fn initialize_from<A, T, F>(
300348
builders: Vec<A>,

0 commit comments

Comments
 (0)