diff --git a/communication/Cargo.toml b/communication/Cargo.toml index fb7375221..dce2a0efe 100644 --- a/communication/Cargo.toml +++ b/communication/Cargo.toml @@ -18,7 +18,6 @@ default = ["getopts"] [dependencies] getopts = { version = "0.2.21", optional = true } -bincode = { version = "1.0" } byteorder = "1.5" serde = { version = "1.0", features = ["derive"] } timely_bytes = { path = "../bytes", version = "0.12" } diff --git a/communication/examples/comm_hello.rs b/communication/examples/comm_hello.rs index 08fdd2335..309a52e94 100644 --- a/communication/examples/comm_hello.rs +++ b/communication/examples/comm_hello.rs @@ -1,5 +1,24 @@ -use std::ops::Deref; -use timely_communication::{Message, Allocate}; +use timely_communication::{Allocate, Bytesable}; + +/// A wrapper that indicates the serialization/deserialization strategy. +pub struct Message { + /// Text contents. + pub payload: String, +} + +impl Bytesable for Message { + fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { + Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } + } + + fn length_in_bytes(&self) -> usize { + self.payload.len() + } + + fn into_bytes(&self, writer: &mut W) { + writer.write_all(self.payload.as_bytes()).unwrap(); + } +} fn main() { @@ -14,7 +33,7 @@ fn main() { // send typed data along each channel for i in 0 .. allocator.peers() { - senders[i].send(Message::from_typed(format!("hello, {}", i))); + senders[i].send(Message { payload: format!("hello, {}", i)}); senders[i].done(); } @@ -26,7 +45,7 @@ fn main() { allocator.receive(); if let Some(message) = receiver.recv() { - println!("worker {}: received: <{}>", allocator.index(), message.deref()); + println!("worker {}: received: <{}>", allocator.index(), message.payload); received += 1; } diff --git a/communication/src/allocator/generic.rs b/communication/src/allocator/generic.rs index 49f014072..605ade7a1 100644 --- a/communication/src/allocator/generic.rs +++ b/communication/src/allocator/generic.rs @@ -8,11 +8,11 @@ use std::cell::RefCell; use crate::allocator::thread::ThreadBuilder; use crate::allocator::process::ProcessBuilder as TypedProcessBuilder; -use crate::allocator::{Allocate, AllocateBuilder, Thread, Process}; +use crate::allocator::{Allocate, AllocateBuilder, Exchangeable, Thread, Process}; use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator}; use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator}; -use crate::{Push, Pull, Data, Message}; +use crate::{Push, Pull}; /// Enumerates known implementors of `Allocate`. /// Passes trait method calls on to members. @@ -47,7 +47,7 @@ impl Generic { } } /// Constructs several send endpoints and one receive endpoint. - fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>) { match self { Generic::Thread(t) => t.allocate(identifier), Generic::Process(p) => p.allocate(identifier), @@ -86,7 +86,7 @@ impl Generic { impl Allocate for Generic { fn index(&self) -> usize { self.index() } fn peers(&self) -> usize { self.peers() } - fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>) { self.allocate(identifier) } diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index e5b858f69..0b04c348f 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -17,7 +17,7 @@ pub mod counters; pub mod zero_copy; -use crate::{Data, Push, Pull, Message}; +use crate::{Bytesable, Push, Pull}; /// A proto-allocator, which implements `Send` and can be completed with `build`. /// @@ -32,6 +32,12 @@ pub trait AllocateBuilder : Send { fn build(self) -> Self::Allocator; } +use std::any::Any; + +/// A type that can be sent along an allocated channel. +pub trait Exchangeable : Send+Any+Bytesable { } +impl Exchangeable for T { } + /// A type capable of allocating channels. /// /// There is some feature creep, in that this contains several convenience methods about the nature @@ -42,7 +48,7 @@ pub trait Allocate { /// The number of workers in the communication group. fn peers(&self) -> usize; /// Constructs several send endpoints and one receive endpoint. - fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>); + fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>); /// A shared queue of communication events with channel identifier. /// /// It is expected that users of the channel allocator will regularly @@ -85,8 +91,8 @@ pub trait Allocate { /// By default, this method uses the thread-local channel constructor /// based on a shared `VecDeque` which updates the event queue. fn pipeline(&mut self, identifier: usize) -> - (thread::ThreadPusher>, - thread::ThreadPuller>) + (thread::ThreadPusher, + thread::ThreadPuller) { thread::Thread::new_from(identifier, self.events().clone()) } diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index 07d793684..f8ac3322a 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -10,7 +10,7 @@ use crossbeam_channel::{Sender, Receiver}; use crate::allocator::thread::{ThreadBuilder}; use crate::allocator::{Allocate, AllocateBuilder, Thread}; -use crate::{Push, Pull, Message}; +use crate::{Push, Pull}; use crate::buzzer::Buzzer; /// An allocator for inter-thread, intra-process communication @@ -110,7 +110,7 @@ impl Process { impl Allocate for Process { fn index(&self) -> usize { self.index } fn peers(&self) -> usize { self.peers } - fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>) { // this is race-y global initialisation of all channels for all workers, performed by the // first worker that enters this critical section @@ -126,7 +126,7 @@ impl Allocate for Process { let mut pushers = Vec::with_capacity(self.peers); let mut pullers = Vec::with_capacity(self.peers); for buzzer in self.buzzers.iter() { - let (s, r): (Sender>, Receiver>) = crossbeam_channel::unbounded(); + let (s, r): (Sender, Receiver) = crossbeam_channel::unbounded(); // TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter. pushers.push((Pusher { target: s }, buzzer.clone())); pullers.push(Puller { source: r, current: None }); @@ -142,7 +142,7 @@ impl Allocate for Process { let vector = entry - .downcast_mut::>, Buzzer)>, Puller>)>>>() + .downcast_mut::, Buzzer)>, Puller)>>>() .expect("failed to correctly cast channel"); let (sends, recv) = @@ -166,10 +166,10 @@ impl Allocate for Process { sends.into_iter() .zip(self.counters_send.iter()) .map(|((s,b), sender)| CountPusher::new(s, identifier, sender.clone(), b)) - .map(|s| Box::new(s) as Box>>) + .map(|s| Box::new(s) as Box>) .collect::>(); - let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box>>; + let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box>; (sends, recv) } diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index f46e3532b..2f1fca9b6 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -8,7 +8,7 @@ use std::collections::VecDeque; use crate::allocator::{Allocate, AllocateBuilder}; use crate::allocator::counters::Pusher as CountPusher; use crate::allocator::counters::Puller as CountPuller; -use crate::{Push, Pull, Message}; +use crate::{Push, Pull}; /// Builder for single-threaded allocator. pub struct ThreadBuilder; @@ -28,7 +28,7 @@ pub struct Thread { impl Allocate for Thread { fn index(&self) -> usize { 0 } fn peers(&self) -> usize { 1 } - fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>) { let (pusher, puller) = Thread::new_from(identifier, self.events.clone()); (vec![Box::new(pusher)], Box::new(puller)) } @@ -62,9 +62,9 @@ impl Thread { /// Creates a new thread-local channel from an identifier and shared counts. pub fn new_from(identifier: usize, events: Rc>>) - -> (ThreadPusher>, ThreadPuller>) + -> (ThreadPusher, ThreadPuller) { - let shared = Rc::new(RefCell::new((VecDeque::>::new(), VecDeque::>::new()))); + let shared = Rc::new(RefCell::new((VecDeque::::new(), VecDeque::::new()))); let pusher = Pusher { target: shared.clone() }; let pusher = CountPusher::new(pusher, identifier, events.clone()); let puller = Puller { source: shared, current: None }; diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 632c5be0e..dffc40380 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -8,8 +8,8 @@ use timely_bytes::arc::Bytes; use crate::networking::MessageHeader; -use crate::{Allocate, Message, Data, Push, Pull}; -use crate::allocator::AllocateBuilder; +use crate::{Allocate, Push, Pull}; +use crate::allocator::{AllocateBuilder, Exchangeable}; use crate::allocator::canary::Canary; use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; @@ -135,7 +135,7 @@ pub struct TcpAllocator { impl Allocate for TcpAllocator { fn index(&self) -> usize { self.index } fn peers(&self) -> usize { self.peers } - fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>) { // Assume and enforce in-order identifier allocation. if let Some(bound) = self.channel_id_bound { @@ -144,7 +144,7 @@ impl Allocate for TcpAllocator { self.channel_id_bound = Some(identifier); // Result list of boxed pushers. - let mut pushes = Vec::>>>::new(); + let mut pushes = Vec::>>::new(); // Inner exchange allocations. let inner_peers = self.inner.peers(); diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index cd5c92a21..eb81c20ec 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -9,8 +9,8 @@ use timely_bytes::arc::Bytes; use crate::networking::MessageHeader; -use crate::{Allocate, Message, Data, Push, Pull}; -use crate::allocator::{AllocateBuilder}; +use crate::{Allocate, Push, Pull}; +use crate::allocator::{AllocateBuilder, Exchangeable}; use crate::allocator::canary::Canary; use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; @@ -119,7 +119,7 @@ pub struct ProcessAllocator { impl Allocate for ProcessAllocator { fn index(&self) -> usize { self.index } fn peers(&self) -> usize { self.peers } - fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>) { // Assume and enforce in-order identifier allocation. if let Some(bound) = self.channel_id_bound { @@ -127,7 +127,7 @@ impl Allocate for ProcessAllocator { } self.channel_id_bound = Some(identifier); - let mut pushes = Vec::>>>::with_capacity(self.peers()); + let mut pushes = Vec::>>::with_capacity(self.peers()); for target_index in 0 .. self.peers() { diff --git a/communication/src/allocator/zero_copy/push_pull.rs b/communication/src/allocator/zero_copy/push_pull.rs index 9f2469265..758aad014 100644 --- a/communication/src/allocator/zero_copy/push_pull.rs +++ b/communication/src/allocator/zero_copy/push_pull.rs @@ -8,9 +8,7 @@ use timely_bytes::arc::Bytes; use crate::allocator::canary::Canary; use crate::networking::MessageHeader; - -use crate::{Data, Push, Pull}; -use crate::allocator::Message; +use crate::{Bytesable, Push, Pull}; use super::bytes_exchange::{BytesPush, SendEndpoint}; @@ -35,9 +33,9 @@ impl Pusher { } } -impl Push> for Pusher { +impl Push for Pusher { #[inline] - fn push(&mut self, element: &mut Option>) { + fn push(&mut self, element: &mut Option) { if let Some(ref mut element) = *element { // determine byte lengths and build header. @@ -68,11 +66,11 @@ impl Push> for Pusher { /// allocation. pub struct Puller { _canary: Canary, - current: Option>, + current: Option, receiver: Rc>>, // source of serialized buffers } -impl Puller { +impl Puller { /// Creates a new `Puller` instance from a shared queue. pub fn new(receiver: Rc>>, _canary: Canary) -> Puller { Puller { @@ -83,14 +81,14 @@ impl Puller { } } -impl Pull> for Puller { +impl Pull for Puller { #[inline] - fn pull(&mut self) -> &mut Option> { + fn pull(&mut self) -> &mut Option { self.current = self.receiver .borrow_mut() .pop_front() - .map(Message::from_bytes); + .map(T::from_bytes); &mut self.current } @@ -103,15 +101,15 @@ impl Pull> for Puller { /// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared /// allocation. pub struct PullerInner { - inner: Box>>, // inner pullable (e.g. intra-process typed queue) + inner: Box>, // inner pullable (e.g. intra-process typed queue) _canary: Canary, - current: Option>, + current: Option, receiver: Rc>>, // source of serialized buffers } -impl PullerInner { +impl PullerInner { /// Creates a new `PullerInner` instance from a shared queue. - pub fn new(inner: Box>>, receiver: Rc>>, _canary: Canary) -> Self { + pub fn new(inner: Box>, receiver: Rc>>, _canary: Canary) -> Self { PullerInner { inner, _canary, @@ -121,9 +119,9 @@ impl PullerInner { } } -impl Pull> for PullerInner { +impl Pull for PullerInner { #[inline] - fn pull(&mut self) -> &mut Option> { + fn pull(&mut self) -> &mut Option { let inner = self.inner.pull(); if inner.is_some() { @@ -134,7 +132,7 @@ impl Pull> for PullerInner { self.receiver .borrow_mut() .pop_front() - .map(Message::from_bytes); + .map(T::from_bytes); &mut self.current } diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index 11f04e7bc..040b5f06d 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -182,47 +182,71 @@ impl Config { /// /// # Examples /// ``` -/// use timely_communication::Allocate; -/// -/// // configure for two threads, just one process. -/// let config = timely_communication::Config::Process(2); -/// -/// // initializes communication, spawns workers -/// let guards = timely_communication::initialize(config, |mut allocator| { -/// println!("worker {} started", allocator.index()); -/// -/// // allocates a pair of senders list and one receiver. -/// let (mut senders, mut receiver) = allocator.allocate(0); -/// -/// // send typed data along each channel -/// use timely_communication::Message; -/// senders[0].send(Message::from_typed(format!("hello, {}", 0))); -/// senders[1].send(Message::from_typed(format!("hello, {}", 1))); -/// -/// // no support for termination notification, -/// // we have to count down ourselves. -/// let mut expecting = 2; -/// while expecting > 0 { -/// allocator.receive(); -/// if let Some(message) = receiver.recv() { -/// use std::ops::Deref; -/// println!("worker {}: received: <{}>", allocator.index(), message.deref()); -/// expecting -= 1; -/// } -/// allocator.release(); +/// use timely_communication::{Allocate, Bytesable}; +/// +/// /// A wrapper that indicates the serialization/deserialization strategy. +/// pub struct Message { +/// /// Text contents. +/// pub payload: String, +/// } +/// +/// impl Bytesable for Message { +/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { +/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } /// } -/// -/// // optionally, return something -/// allocator.index() -/// }); -/// -/// // computation runs until guards are joined or dropped. -/// if let Ok(guards) = guards { -/// for guard in guards.join() { -/// println!("result: {:?}", guard); +/// +/// fn length_in_bytes(&self) -> usize { +/// self.payload.len() +/// } +/// +/// fn into_bytes(&self, writer: &mut W) { +/// writer.write_all(self.payload.as_bytes()).unwrap(); +/// } +/// } +/// +/// fn main() { +/// +/// // extract the configuration from user-supplied arguments, initialize the computation. +/// let config = timely_communication::Config::from_args(std::env::args()).unwrap(); +/// let guards = timely_communication::initialize(config, |mut allocator| { +/// +/// println!("worker {} of {} started", allocator.index(), allocator.peers()); +/// +/// // allocates a pair of senders list and one receiver. +/// let (mut senders, mut receiver) = allocator.allocate(0); +/// +/// // send typed data along each channel +/// for i in 0 .. allocator.peers() { +/// senders[i].send(Message { payload: format!("hello, {}", i)}); +/// senders[i].done(); +/// } +/// +/// // no support for termination notification, +/// // we have to count down ourselves. +/// let mut received = 0; +/// while received < allocator.peers() { +/// +/// allocator.receive(); +/// +/// if let Some(message) = receiver.recv() { +/// println!("worker {}: received: <{}>", allocator.index(), message.payload); +/// received += 1; +/// } +/// +/// allocator.release(); +/// } +/// +/// allocator.index() +/// }); +/// +/// // computation runs until guards are joined or dropped. +/// if let Ok(guards) = guards { +/// for guard in guards.join() { +/// println!("result: {:?}", guard); +/// } /// } +/// else { println!("error in computation"); } /// } -/// else { println!("error in computation"); } /// ``` /// /// The should produce output like: @@ -254,47 +278,71 @@ pub fn initializeT+Send+Sync+'static>( /// /// # Examples /// ``` -/// use timely_communication::Allocate; -/// -/// // configure for two threads, just one process. -/// let builders = timely_communication::allocator::process::Process::new_vector(2); -/// -/// // initializes communication, spawns workers -/// let guards = timely_communication::initialize_from(builders, Box::new(()), |mut allocator| { -/// println!("worker {} started", allocator.index()); -/// -/// // allocates a pair of senders list and one receiver. -/// let (mut senders, mut receiver) = allocator.allocate(0); -/// -/// // send typed data along each channel -/// use timely_communication::Message; -/// senders[0].send(Message::from_typed(format!("hello, {}", 0))); -/// senders[1].send(Message::from_typed(format!("hello, {}", 1))); -/// -/// // no support for termination notification, -/// // we have to count down ourselves. -/// let mut expecting = 2; -/// while expecting > 0 { -/// allocator.receive(); -/// if let Some(message) = receiver.recv() { -/// use std::ops::Deref; -/// println!("worker {}: received: <{}>", allocator.index(), message.deref()); -/// expecting -= 1; -/// } -/// allocator.release(); +/// use timely_communication::{Allocate, Bytesable}; +/// +/// /// A wrapper that indicates `bincode` as the serialization/deserialization strategy. +/// pub struct Message { +/// /// Text contents. +/// pub payload: String, +/// } +/// +/// impl Bytesable for Message { +/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { +/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } /// } -/// -/// // optionally, return something -/// allocator.index() -/// }); -/// -/// // computation runs until guards are joined or dropped. -/// if let Ok(guards) = guards { -/// for guard in guards.join() { -/// println!("result: {:?}", guard); +/// +/// fn length_in_bytes(&self) -> usize { +/// self.payload.len() +/// } +/// +/// fn into_bytes(&self, writer: &mut W) { +/// writer.write_all(self.payload.as_bytes()).unwrap(); +/// } +/// } +/// +/// fn main() { +/// +/// // extract the configuration from user-supplied arguments, initialize the computation. +/// let config = timely_communication::Config::from_args(std::env::args()).unwrap(); +/// let guards = timely_communication::initialize(config, |mut allocator| { +/// +/// println!("worker {} of {} started", allocator.index(), allocator.peers()); +/// +/// // allocates a pair of senders list and one receiver. +/// let (mut senders, mut receiver) = allocator.allocate(0); +/// +/// // send typed data along each channel +/// for i in 0 .. allocator.peers() { +/// senders[i].send(Message { payload: format!("hello, {}", i)}); +/// senders[i].done(); +/// } +/// +/// // no support for termination notification, +/// // we have to count down ourselves. +/// let mut received = 0; +/// while received < allocator.peers() { +/// +/// allocator.receive(); +/// +/// if let Some(message) = receiver.recv() { +/// println!("worker {}: received: <{}>", allocator.index(), message.payload); +/// received += 1; +/// } +/// +/// allocator.release(); +/// } +/// +/// allocator.index() +/// }); +/// +/// // computation runs until guards are joined or dropped. +/// if let Ok(guards) = guards { +/// for guard in guards.join() { +/// println!("result: {:?}", guard); +/// } /// } +/// else { println!("error in computation"); } /// } -/// else { println!("error in computation"); } /// ``` pub fn initialize_from( builders: Vec, diff --git a/communication/src/lib.rs b/communication/src/lib.rs index 224708bf9..16fe73eeb 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -8,55 +8,78 @@ //! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker, //! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees. //! -//! To be communicated, a type must implement the [`Serialize`](serde::Serialize) trait. +//! To be communicated, a type must implement the [`Bytesable`] trait. //! //! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`] and [`Pull`] //! traits), which is used for more precise control of resources. //! //! # Examples //! ``` -//! use timely_communication::Allocate; -//! -//! // configure for two threads, just one process. -//! let config = timely_communication::Config::Process(2); -//! -//! // initializes communication, spawns workers -//! let guards = timely_communication::initialize(config, |mut allocator| { -//! println!("worker {} started", allocator.index()); -//! -//! // allocates a pair of senders list and one receiver. -//! let (mut senders, mut receiver) = allocator.allocate(0); -//! -//! // send typed data along each channel -//! use timely_communication::Message; -//! senders[0].send(Message::from_typed(format!("hello, {}", 0))); -//! senders[1].send(Message::from_typed(format!("hello, {}", 1))); -//! -//! // no support for termination notification, -//! // we have to count down ourselves. -//! let mut expecting = 2; -//! while expecting > 0 { -//! -//! allocator.receive(); -//! if let Some(message) = receiver.recv() { -//! use std::ops::Deref; -//! println!("worker {}: received: <{}>", allocator.index(), message.deref()); -//! expecting -= 1; -//! } -//! allocator.release(); +//! use timely_communication::{Allocate, Bytesable}; +//! +//! /// A wrapper that indicates `bincode` as the serialization/deserialization strategy. +//! pub struct Message { +//! /// Text contents. +//! pub payload: String, +//! } +//! +//! impl Bytesable for Message { +//! fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { +//! Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } //! } -//! -//! // optionally, return something -//! allocator.index() -//! }); -//! -//! // computation runs until guards are joined or dropped. -//! if let Ok(guards) = guards { -//! for guard in guards.join() { -//! println!("result: {:?}", guard); +//! +//! fn length_in_bytes(&self) -> usize { +//! self.payload.len() +//! } +//! +//! fn into_bytes(&self, writer: &mut W) { +//! writer.write_all(self.payload.as_bytes()).unwrap(); //! } //! } -//! else { println!("error in computation"); } +//! +//! fn main() { +//! +//! // extract the configuration from user-supplied arguments, initialize the computation. +//! let config = timely_communication::Config::from_args(std::env::args()).unwrap(); +//! let guards = timely_communication::initialize(config, |mut allocator| { +//! +//! println!("worker {} of {} started", allocator.index(), allocator.peers()); +//! +//! // allocates a pair of senders list and one receiver. +//! let (mut senders, mut receiver) = allocator.allocate(0); +//! +//! // send typed data along each channel +//! for i in 0 .. allocator.peers() { +//! senders[i].send(Message { payload: format!("hello, {}", i)}); +//! senders[i].done(); +//! } +//! +//! // no support for termination notification, +//! // we have to count down ourselves. +//! let mut received = 0; +//! while received < allocator.peers() { +//! +//! allocator.receive(); +//! +//! if let Some(message) = receiver.recv() { +//! println!("worker {}: received: <{}>", allocator.index(), message.payload); +//! received += 1; +//! } +//! +//! allocator.release(); +//! } +//! +//! allocator.index() +//! }); +//! +//! // computation runs until guards are joined or dropped. +//! if let Ok(guards) = guards { +//! for guard in guards.join() { +//! println!("result: {:?}", guard); +//! } +//! } +//! else { println!("error in computation"); } +//! } //! ``` //! //! This should produce output like: @@ -78,21 +101,25 @@ pub mod allocator; pub mod networking; pub mod initialize; pub mod logging; -pub mod message; pub mod buzzer; -use std::any::Any; - -use serde::{Serialize, Deserialize}; - pub use allocator::Generic as Allocator; -pub use allocator::Allocate; +pub use allocator::{Allocate, Exchangeable}; pub use initialize::{initialize, initialize_from, Config, WorkerGuards}; -pub use message::Message; -/// A composite trait for types that may be used with channels. -pub trait Data : Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static { } -implDeserialize<'a>+'static> Data for T { } +use timely_bytes::arc::Bytes; + +/// A type that can be serialized and deserialized through `Bytes`. +pub trait Bytesable { + /// Wrap bytes as `Self`. + fn from_bytes(bytes: Bytes) -> Self; + + /// The number of bytes required to serialize the data. + fn length_in_bytes(&self) -> usize; + + /// Writes the binary representation into `writer`. + fn into_bytes(&self, writer: &mut W); +} /// Pushing elements of type `T`. /// diff --git a/communication/src/message.rs b/communication/src/message.rs index b2b590143..38f5596b1 100644 --- a/communication/src/message.rs +++ b/communication/src/message.rs @@ -3,33 +3,45 @@ use timely_bytes::arc::Bytes; use crate::Data; -/// A wrapped message which supports serialization and deserialization. +/// A type that can be serialized and deserialized through `Bytes`. +pub trait Bytesable { + /// Wrap bytes as a Message. + fn from_bytes(bytes: Bytes) -> Self; + + /// The number of bytes required to serialize the data. + fn length_in_bytes(&self) -> usize; + + /// Writes the binary representation into `writer`. + fn into_bytes(&self, writer: &mut W); +} + +/// A wrapped Message which supports serialization and deserialization. pub struct Message { /// Message contents. pub payload: T, } impl Message { - /// Wrap a typed item as a message. + /// Wrap a typed item as a Message. pub fn from_typed(typed: T) -> Self { Message { payload: typed } } } -impl Message { - /// Wrap bytes as a message. - pub fn from_bytes(bytes: Bytes) -> Self { +impl Bytesable for Message { + /// Wrap bytes as a Message. + fn from_bytes(bytes: Bytes) -> Self { let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed"); Message { payload: typed } } /// The number of bytes required to serialize the data. - pub fn length_in_bytes(&self) -> usize { + fn length_in_bytes(&self) -> usize { ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize } /// Writes the binary representation into `writer`. - pub fn into_bytes(&self, writer: &mut W) { + fn into_bytes(&self, writer: &mut W) { ::bincode::serialize_into(writer, &self.payload).expect("bincode::serialize_into() failed"); } } diff --git a/mdbook/src/chapter_5/chapter_5_1.md b/mdbook/src/chapter_5/chapter_5_1.md index 707ae35e8..6110ff570 100644 --- a/mdbook/src/chapter_5/chapter_5_1.md +++ b/mdbook/src/chapter_5/chapter_5_1.md @@ -7,17 +7,35 @@ Before continuing, I want to remind you that this is the *internals* section; yo That being said, let's take a look at the example from the `timely_communication` documentation, which is not brief but shouldn't be wildly surprising either. ```rust,no_run +extern crate timely_bytes; extern crate timely_communication; -use std::ops::Deref; +use timely_communication::{Allocate, Bytesable}; -use timely_communication::{Allocate, Message}; +/// A wrapper that indicates the serialization/deserialization strategy. +pub struct Message { + /// Text contents. + pub payload: String, +} + +impl Bytesable for Message { + fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { + Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } + } + + fn length_in_bytes(&self) -> usize { + self.payload.len() + } + + fn into_bytes(&self, writer: &mut W) { + writer.write_all(self.payload.as_bytes()).unwrap(); + } +} fn main() { // extract the configuration from user-supplied arguments, initialize the computation. - // configure for two threads, just one process. - let config = timely_communication::Config::Process(2); + let config = timely_communication::Config::from_args(std::env::args()).unwrap(); let guards = timely_communication::initialize(config, |mut allocator| { println!("worker {} of {} started", allocator.index(), allocator.peers()); @@ -27,7 +45,7 @@ fn main() { // send typed data along each channel for i in 0 .. allocator.peers() { - senders[i].send(Message::from_typed(format!("hello, {}", i))); + senders[i].send(Message { payload: format!("hello, {}", i)}); senders[i].done(); } @@ -35,10 +53,15 @@ fn main() { // we have to count down ourselves. let mut received = 0; while received < allocator.peers() { + + allocator.receive(); + if let Some(message) = receiver.recv() { - println!("worker {}: received: <{}>", allocator.index(), message.deref()); + println!("worker {}: received: <{}>", allocator.index(), message.payload); received += 1; } + + allocator.release(); } allocator.index() @@ -92,25 +115,26 @@ The channels are various and interesting, but should be smartly arranged. The ch One crucial assumption made in this design is that the channels can be identified by their order of creation. If two workers start executing in different processes, allocating multiple channels, the only way we will know how to align these channels is by identifiers handed out as the channels are allocated. I strongly recommend against non-deterministic channel construction, or "optimizing out" some channels from some workers. -### The Data Trait +### The Bytesable Trait -The `Data` trait that we impose on all types that we exchange is a "marker trait": it wraps several constraints together, like so +The `Bytesable` trait that we impose on all types that we exchange is a "marker trait": it wraps several constraints together, like so ```rust,ignore -pub trait Data : Send+Any+Serialize+Clone+'static { } -impl Data for T { } +pub trait Exchangeable : Send+Any+Bytesable { } +impl Exchangeable for T { } ``` -These traits are all Rust traits, except for `Serialize`, and they mostly just say that we can clone and send the data around. The `Serialize` trait is something we introduce, and asks for methods to get into and out of a sequence of bytes. +These traits are all Rust traits, except for `Bytesable`, and they mostly just say that we can send the data around. The `Bytesable` trait is something we introduce, and asks for methods to get into and out of a sequence of bytes. ```rust,ignore -pub trait Serialize { +pub trait Bytesable { fn into_bytes(&mut self, &mut Vec); fn from_bytes(&mut Vec) -> Self; } ``` -We have a blanket implementation of `Serialize` for any type that implements `Abomonation`. Ideally, you shouldn't have to worry about this, unless you are introducing a new type and need an `Abomonation` implementation or you are hoping to move some types containing fields that do not satisfy those Rust traits. +The timely crate has a `Bincode` wrapper type that implements `Bytesable` for any types that implement `serde::Serialize + for<'a> serde::Deserialize<'a>`. +You can also implement them on your own though, as we have done in the example. ## Push and Pull diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index dc22e9c75..9ff40fde1 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -12,7 +12,7 @@ pub mod pullers; pub mod pact; /// The input to and output from timely dataflow communication channels. -pub type Bundle = crate::communication::Message>; +pub type Bundle = crate::Message>; /// A serializable representation of timestamped data. #[derive(Clone, Serialize, Deserialize)] diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index b6c97ef7d..00ef24058 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -12,13 +12,14 @@ use std::rc::Rc; use crate::Container; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; -use crate::communication::{Push, Pull, Data}; +use crate::communication::{Push, Pull}; use crate::container::PushPartitioned; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; -use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::channels::Bundle; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; use crate::worker::AsWorker; +use crate::ExchangeData; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { @@ -38,7 +39,7 @@ impl ParallelizationContract for Pipeline { type Pusher = LogPusher>>; type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (pusher, puller) = allocator.pipeline::>(identifier, address); + let (pusher, puller) = allocator.pipeline::>(identifier, address); (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), LogPuller::new(puller, allocator.index(), identifier, logging)) } @@ -67,14 +68,14 @@ where // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. impl ParallelizationContract for ExchangeCore where - C: Data + PushPartitioned, + C: ExchangeData + PushPartitioned, for<'a> H: FnMut(&C::Item<'a>) -> u64 { type Pusher = ExchangePusher>>>, H>; type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); + let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } diff --git a/timely/src/dataflow/channels/pullers/mod.rs b/timely/src/dataflow/channels/pullers/mod.rs index 422593cf2..2f63e197e 100644 --- a/timely/src/dataflow/channels/pullers/mod.rs +++ b/timely/src/dataflow/channels/pullers/mod.rs @@ -1,11 +1,2 @@ pub use self::counter::Counter; pub mod counter; - - -// pub trait Pullable { -// fn pull(&mut self) -> Option<(&T, &mut Message)>; -// } -// -// impl> Pullable for Box

{ -// fn pull(&mut self) -> Option<(&T, &mut Message)> { (**self).pull() } -// } diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 4b4928b5e..277466100 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::communication::{Data, Push, Pull}; +use crate::communication::{Exchangeable, Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::scheduling::Scheduler; use crate::scheduling::activate::Activations; @@ -58,10 +58,10 @@ where fn config(&self) -> &Config { self.parent.config() } fn index(&self) -> usize { self.parent.index() } fn peers(&self) -> usize { self.parent.peers() } - fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>) { self.parent.allocate(identifier, address) } - fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher>, ThreadPuller>) { + fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller) { self.parent.pipeline(identifier, address) } fn new_identifier(&mut self) -> usize { @@ -148,8 +148,6 @@ where } } -use crate::communication::Message; - impl<'a, G, T> Clone for Child<'a, G, T> where G: ScopeParent, diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 988de7388..c268f17da 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -108,9 +108,59 @@ impl Data for T { } /// /// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication` /// `Data` trait, which describes requirements for communication along channels. -pub trait ExchangeData: Data + communication::Data { } -impl ExchangeData for T { } +pub trait ExchangeData: Data + encoding::Data { } +impl ExchangeData for T { } #[doc = include_str!("../../README.md")] #[cfg(doctest)] pub struct ReadmeDoctests; + +/// A wrapper that indicates a serialization/deserialization strategy. +use encoding::Bincode as Message; + +mod encoding { + + use std::any::Any; + use serde::{Serialize, Deserialize}; + use timely_bytes::arc::Bytes; + use timely_communication::Bytesable; + + /// A composite trait for types that may be used with channels. + pub trait Data : Send+Any+Serialize+for<'a>Deserialize<'a> { } + implDeserialize<'a>> Data for T { } + + /// A wrapper that indicates `bincode` as the serialization/deserialization strategy. + pub struct Bincode { + /// Bincode contents. + pub payload: T, + } + + impl Bincode { + /// Wrap a typed item as a Bincode. + pub fn from_typed(typed: T) -> Self { + Bincode { payload: typed } + } + } + + impl Bytesable for Bincode { + fn from_bytes(bytes: Bytes) -> Self { + let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed"); + Bincode { payload: typed } + } + + fn length_in_bytes(&self) -> usize { + ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize + } + + fn into_bytes(&self, writer: &mut W) { + ::bincode::serialize_into(writer, &self.payload).expect("bincode::serialize_into() failed"); + } + } + + impl ::std::ops::Deref for Bincode { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.payload + } + } +} diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index b2ed501b6..4af9d274a 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -3,9 +3,10 @@ use std::rc::Rc; use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::{Location, Port}; -use crate::communication::{Message, Push, Pull}; +use crate::communication::{Push, Pull}; use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; +use crate::Message; /// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)` pub type ProgressVec = Vec<((Location, T), i64)>; diff --git a/timely/src/progress/timestamp.rs b/timely/src/progress/timestamp.rs index 3a7b79eb9..f6441b75c 100644 --- a/timely/src/progress/timestamp.rs +++ b/timely/src/progress/timestamp.rs @@ -5,11 +5,11 @@ use std::any::Any; use std::default::Default; use std::hash::Hash; -use crate::communication::Data; +use crate::ExchangeData; use crate::order::PartialOrder; /// A composite trait for types that serve as timestamps in timely dataflow. -pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+Data+Hash+Ord { +pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+ExchangeData+Hash+Ord { /// A type summarizing action on a timestamp along a dataflow path. type Summary : PathSummary + 'static; /// A minimum value suitable as a default. diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 64c79b8f1..949efe605 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::collections::hash_map::Entry; use std::sync::Arc; -use crate::communication::{Allocate, Data, Push, Pull}; +use crate::communication::{Allocate, Exchangeable, Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::scheduling::{Schedule, Scheduler, Activations}; use crate::progress::timestamp::{Refines}; @@ -191,12 +191,12 @@ pub trait AsWorker : Scheduler { /// scheduled in response to the receipt of records on the channel. /// Most commonly, this would be the address of the *target* of the /// channel. - fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>>, Box>>); + fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>); /// Constructs a pipeline channel from the worker to itself. /// /// By default this method uses the native channel allocation mechanism, but the expectation is /// that this behavior will be overridden to be more efficient. - fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher>, ThreadPuller>); + fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller); /// Allocates a new worker-unique identifier. fn new_identifier(&mut self) -> usize; @@ -233,14 +233,14 @@ impl AsWorker for Worker { fn config(&self) -> &Config { &self.config } fn index(&self) -> usize { self.allocator.borrow().index() } fn peers(&self) -> usize { self.allocator.borrow().peers() } - fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>) { if address.is_empty() { panic!("Unacceptable address: Length zero"); } let mut paths = self.paths.borrow_mut(); paths.insert(identifier, address); self.temp_channel_ids.borrow_mut().push(identifier); self.allocator.borrow_mut().allocate(identifier) } - fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher>, ThreadPuller>) { + fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller) { if address.is_empty() { panic!("Unacceptable address: Length zero"); } let mut paths = self.paths.borrow_mut(); paths.insert(identifier, address); @@ -719,8 +719,6 @@ impl Worker { } } -use crate::communication::Message; - impl Clone for Worker { fn clone(&self) -> Self { Worker {