Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ default = ["getopts"]

[dependencies]
getopts = { version = "0.2.21", optional = true }
bincode = { version = "1.0" }
byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
timely_bytes = { path = "../bytes", version = "0.12" }
Expand Down
27 changes: 23 additions & 4 deletions communication/examples/comm_hello.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
use std::ops::Deref;
use timely_communication::{Message, Allocate};
use timely_communication::{Allocate, Bytesable};

/// A wrapper that indicates the serialization/deserialization strategy.
pub struct Message {
/// Text contents.
pub payload: String,
}

impl Bytesable for Message {
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
}

fn length_in_bytes(&self) -> usize {
self.payload.len()
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
writer.write_all(self.payload.as_bytes()).unwrap();
}
}

fn main() {

Expand All @@ -14,7 +33,7 @@ fn main() {

// send typed data along each channel
for i in 0 .. allocator.peers() {
senders[i].send(Message::from_typed(format!("hello, {}", i)));
senders[i].send(Message { payload: format!("hello, {}", i)});
senders[i].done();
}

Expand All @@ -26,7 +45,7 @@ fn main() {
allocator.receive();

if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message.deref());
println!("worker {}: received: <{}>", allocator.index(), message.payload);
received += 1;
}

Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use std::cell::RefCell;

use crate::allocator::thread::ThreadBuilder;
use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
use crate::allocator::{Allocate, AllocateBuilder, Thread, Process};
use crate::allocator::{Allocate, AllocateBuilder, Exchangeable, Thread, Process};
use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};

use crate::{Push, Pull, Data, Message};
use crate::{Push, Pull};

/// Enumerates known implementors of `Allocate`.
/// Passes trait method calls on to members.
Expand Down Expand Up @@ -47,7 +47,7 @@ impl Generic {
}
}
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
match self {
Generic::Thread(t) => t.allocate(identifier),
Generic::Process(p) => p.allocate(identifier),
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Generic {
impl Allocate for Generic {
fn index(&self) -> usize { self.index() }
fn peers(&self) -> usize { self.peers() }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
self.allocate(identifier)
}

Expand Down
14 changes: 10 additions & 4 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub mod counters;

pub mod zero_copy;

use crate::{Data, Push, Pull, Message};
use crate::{Bytesable, Push, Pull};

/// A proto-allocator, which implements `Send` and can be completed with `build`.
///
Expand All @@ -32,6 +32,12 @@ pub trait AllocateBuilder : Send {
fn build(self) -> Self::Allocator;
}

use std::any::Any;

/// A type that can be sent along an allocated channel.
pub trait Exchangeable : Send+Any+Bytesable { }
impl<T: Send+Any+Bytesable> Exchangeable for T { }

/// A type capable of allocating channels.
///
/// There is some feature creep, in that this contains several convenience methods about the nature
Expand All @@ -42,7 +48,7 @@ pub trait Allocate {
/// The number of workers in the communication group.
fn peers(&self) -> usize;
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
/// A shared queue of communication events with channel identifier.
///
/// It is expected that users of the channel allocator will regularly
Expand Down Expand Up @@ -85,8 +91,8 @@ pub trait Allocate {
/// By default, this method uses the thread-local channel constructor
/// based on a shared `VecDeque` which updates the event queue.
fn pipeline<T: 'static>(&mut self, identifier: usize) ->
(thread::ThreadPusher<Message<T>>,
thread::ThreadPuller<Message<T>>)
(thread::ThreadPusher<T>,
thread::ThreadPuller<T>)
{
thread::Thread::new_from(identifier, self.events().clone())
}
Expand Down
12 changes: 6 additions & 6 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crossbeam_channel::{Sender, Receiver};

use crate::allocator::thread::{ThreadBuilder};
use crate::allocator::{Allocate, AllocateBuilder, Thread};
use crate::{Push, Pull, Message};
use crate::{Push, Pull};
use crate::buzzer::Buzzer;

/// An allocator for inter-thread, intra-process communication
Expand Down Expand Up @@ -110,7 +110,7 @@ impl Process {
impl Allocate for Process {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Any+Send+Sync+'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Any+Send>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {

// this is race-y global initialisation of all channels for all workers, performed by the
// first worker that enters this critical section
Expand All @@ -126,7 +126,7 @@ impl Allocate for Process {
let mut pushers = Vec::with_capacity(self.peers);
let mut pullers = Vec::with_capacity(self.peers);
for buzzer in self.buzzers.iter() {
let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = crossbeam_channel::unbounded();
let (s, r): (Sender<T>, Receiver<T>) = crossbeam_channel::unbounded();
// TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter.
pushers.push((Pusher { target: s }, buzzer.clone()));
pullers.push(Puller { source: r, current: None });
Expand All @@ -142,7 +142,7 @@ impl Allocate for Process {

let vector =
entry
.downcast_mut::<Vec<Option<(Vec<(Pusher<Message<T>>, Buzzer)>, Puller<Message<T>>)>>>()
.downcast_mut::<Vec<Option<(Vec<(Pusher<T>, Buzzer)>, Puller<T>)>>>()
.expect("failed to correctly cast channel");

let (sends, recv) =
Expand All @@ -166,10 +166,10 @@ impl Allocate for Process {
sends.into_iter()
.zip(self.counters_send.iter())
.map(|((s,b), sender)| CountPusher::new(s, identifier, sender.clone(), b))
.map(|s| Box::new(s) as Box<dyn Push<super::Message<T>>>)
.map(|s| Box::new(s) as Box<dyn Push<T>>)
.collect::<Vec<_>>();

let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box<dyn Pull<super::Message<T>>>;
let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box<dyn Pull<T>>;

(sends, recv)
}
Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::VecDeque;
use crate::allocator::{Allocate, AllocateBuilder};
use crate::allocator::counters::Pusher as CountPusher;
use crate::allocator::counters::Puller as CountPuller;
use crate::{Push, Pull, Message};
use crate::{Push, Pull};

/// Builder for single-threaded allocator.
pub struct ThreadBuilder;
Expand All @@ -28,7 +28,7 @@ pub struct Thread {
impl Allocate for Thread {
fn index(&self) -> usize { 0 }
fn peers(&self) -> usize { 1 }
fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
(vec![Box::new(pusher)], Box::new(puller))
}
Expand Down Expand Up @@ -62,9 +62,9 @@ impl Thread {

/// Creates a new thread-local channel from an identifier and shared counts.
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
-> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>)
-> (ThreadPusher<T>, ThreadPuller<T>)
{
let shared = Rc::new(RefCell::new((VecDeque::<Message<T>>::new(), VecDeque::<Message<T>>::new())));
let shared = Rc::new(RefCell::new((VecDeque::<T>::new(), VecDeque::<T>::new())));
let pusher = Pusher { target: shared.clone() };
let pusher = CountPusher::new(pusher, identifier, events.clone());
let puller = Puller { source: shared, current: None };
Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use timely_bytes::arc::Bytes;

use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::AllocateBuilder;
use crate::{Allocate, Push, Pull};
use crate::allocator::{AllocateBuilder, Exchangeable};
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -135,7 +135,7 @@ pub struct TcpAllocator<A: Allocate> {
impl<A: Allocate> Allocate for TcpAllocator<A> {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {

// Assume and enforce in-order identifier allocation.
if let Some(bound) = self.channel_id_bound {
Expand All @@ -144,7 +144,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
self.channel_id_bound = Some(identifier);

// Result list of boxed pushers.
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();
let mut pushes = Vec::<Box<dyn Push<T>>>::new();

// Inner exchange allocations.
let inner_peers = self.inner.peers();
Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use timely_bytes::arc::Bytes;

use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::{AllocateBuilder};
use crate::{Allocate, Push, Pull};
use crate::allocator::{AllocateBuilder, Exchangeable};
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -119,15 +119,15 @@ pub struct ProcessAllocator {
impl Allocate for ProcessAllocator {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {

// Assume and enforce in-order identifier allocation.
if let Some(bound) = self.channel_id_bound {
assert!(bound < identifier);
}
self.channel_id_bound = Some(identifier);

let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::with_capacity(self.peers());
let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.peers());

for target_index in 0 .. self.peers() {

Expand Down
32 changes: 15 additions & 17 deletions communication/src/allocator/zero_copy/push_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use timely_bytes::arc::Bytes;

use crate::allocator::canary::Canary;
use crate::networking::MessageHeader;

use crate::{Data, Push, Pull};
use crate::allocator::Message;
use crate::{Bytesable, Push, Pull};

use super::bytes_exchange::{BytesPush, SendEndpoint};

Expand All @@ -35,9 +33,9 @@ impl<T, P: BytesPush> Pusher<T, P> {
}
}

impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
impl<T: Bytesable, P: BytesPush> Push<T> for Pusher<T, P> {
#[inline]
fn push(&mut self, element: &mut Option<Message<T>>) {
fn push(&mut self, element: &mut Option<T>) {
if let Some(ref mut element) = *element {

// determine byte lengths and build header.
Expand Down Expand Up @@ -68,11 +66,11 @@ impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
/// allocation.
pub struct Puller<T> {
_canary: Canary,
current: Option<Message<T>>,
current: Option<T>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
}

impl<T:Data> Puller<T> {
impl<T: Bytesable> Puller<T> {
/// Creates a new `Puller` instance from a shared queue.
pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
Puller {
Expand All @@ -83,14 +81,14 @@ impl<T:Data> Puller<T> {
}
}

impl<T:Data> Pull<Message<T>> for Puller<T> {
impl<T: Bytesable> Pull<T> for Puller<T> {
#[inline]
fn pull(&mut self) -> &mut Option<Message<T>> {
fn pull(&mut self) -> &mut Option<T> {
self.current =
self.receiver
.borrow_mut()
.pop_front()
.map(Message::from_bytes);
.map(T::from_bytes);

&mut self.current
}
Expand All @@ -103,15 +101,15 @@ impl<T:Data> Pull<Message<T>> for Puller<T> {
/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
/// allocation.
pub struct PullerInner<T> {
inner: Box<dyn Pull<Message<T>>>, // inner pullable (e.g. intra-process typed queue)
inner: Box<dyn Pull<T>>, // inner pullable (e.g. intra-process typed queue)
_canary: Canary,
current: Option<Message<T>>,
current: Option<T>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
}

impl<T:Data> PullerInner<T> {
impl<T: Bytesable> PullerInner<T> {
/// Creates a new `PullerInner` instance from a shared queue.
pub fn new(inner: Box<dyn Pull<Message<T>>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
PullerInner {
inner,
_canary,
Expand All @@ -121,9 +119,9 @@ impl<T:Data> PullerInner<T> {
}
}

impl<T:Data> Pull<Message<T>> for PullerInner<T> {
impl<T: Bytesable> Pull<T> for PullerInner<T> {
#[inline]
fn pull(&mut self) -> &mut Option<Message<T>> {
fn pull(&mut self) -> &mut Option<T> {

let inner = self.inner.pull();
if inner.is_some() {
Expand All @@ -134,7 +132,7 @@ impl<T:Data> Pull<Message<T>> for PullerInner<T> {
self.receiver
.borrow_mut()
.pop_front()
.map(Message::from_bytes);
.map(T::from_bytes);

&mut self.current
}
Expand Down
Loading