Skip to content

Commit 0062c29

Browse files
Several improvements around Bytesable and Message. (#601)
* Remove communication/message.rs * Clarify Message as Bincode * Update Bincode with DerefMut and From implementations * Remove Bundle type and have Message implement Bytesable directly
1 parent 1bcb574 commit 0062c29

File tree

15 files changed

+124
-159
lines changed

15 files changed

+124
-159
lines changed

communication/src/message.rs

Lines changed: 0 additions & 54 deletions
This file was deleted.

timely/src/dataflow/channels/mod.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@ pub mod pullers;
1111
/// Parallelization contracts, describing how data must be exchanged between operators.
1212
pub mod pact;
1313

14-
/// The input to and output from timely dataflow communication channels.
15-
pub type Bundle<T, C> = crate::Message<Message<T, C>>;
16-
1714
/// A serializable representation of timestamped data.
1815
#[derive(Clone, Serialize, Deserialize)]
1916
pub struct Message<T, C> {
@@ -44,17 +41,37 @@ impl<T, C: Container> Message<T, C> {
4441
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
4542
/// leaves in place, or the container's default element. The buffer is cleared.
4643
#[inline]
47-
pub fn push_at<P: Push<Bundle<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
44+
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
4845

4946
let data = ::std::mem::take(buffer);
5047
let message = Message::new(time, data, 0, 0);
51-
let mut bundle = Some(Bundle::from_typed(message));
48+
let mut bundle = Some(message);
5249

5350
pusher.push(&mut bundle);
5451

5552
if let Some(message) = bundle {
56-
*buffer = message.payload.data;
53+
*buffer = message.data;
5754
buffer.clear();
5855
}
5956
}
6057
}
58+
59+
// Instructions for serialization of `Message`.
60+
// Intended to swap out the constraint on `C` for `C: Bytesable`.
61+
impl<T, C> crate::communication::Bytesable for Message<T, C>
62+
where
63+
T: Serialize + for<'a> Deserialize<'a>,
64+
C: Serialize + for<'a> Deserialize<'a>,
65+
{
66+
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
67+
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
68+
}
69+
70+
fn length_in_bytes(&self) -> usize {
71+
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
72+
}
73+
74+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
75+
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
76+
}
77+
}

timely/src/dataflow/channels/pact.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1515
use crate::communication::{Push, Pull};
1616
use crate::container::PushPartitioned;
1717
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
18-
use crate::dataflow::channels::Bundle;
18+
use crate::dataflow::channels::Message;
1919
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
2020
use crate::progress::Timestamp;
2121
use crate::worker::AsWorker;
@@ -24,9 +24,9 @@ use crate::ExchangeData;
2424
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
2525
pub trait ParallelizationContract<T, C> {
2626
/// Type implementing `Push` produced by this pact.
27-
type Pusher: Push<Bundle<T, C>>+'static;
27+
type Pusher: Push<Message<T, C>>+'static;
2828
/// Type implementing `Pull` produced by this pact.
29-
type Puller: Pull<Bundle<T, C>>+'static;
29+
type Puller: Pull<Message<T, C>>+'static;
3030
/// Allocates a matched pair of push and pull endpoints implementing the pact.
3131
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
3232
}
@@ -36,10 +36,10 @@ pub trait ParallelizationContract<T, C> {
3636
pub struct Pipeline;
3737

3838
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
39-
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
40-
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
39+
type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
40+
type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
4141
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
42-
let (pusher, puller) = allocator.pipeline::<Bundle<T, C>>(identifier, address);
42+
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
4343
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
4444
LogPuller::new(puller, allocator.index(), identifier, logging))
4545
}
@@ -71,11 +71,11 @@ where
7171
C: ExchangeData + PushPartitioned,
7272
for<'a> H: FnMut(&C::Item<'a>) -> u64
7373
{
74-
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
75-
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;
74+
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Message<T, C>>>>, H>;
75+
type Puller = LogPuller<T, C, Box<dyn Pull<Message<T, C>>>>;
7676

7777
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
78-
let (senders, receiver) = allocator.allocate::<Bundle<T, C>>(identifier, address);
78+
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
7979
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
8080
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
8181
}
@@ -89,7 +89,7 @@ impl<C, F> Debug for ExchangeCore<C, F> {
8989

9090
/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
9191
#[derive(Debug)]
92-
pub struct LogPusher<T, C, P: Push<Bundle<T, C>>> {
92+
pub struct LogPusher<T, C, P: Push<Message<T, C>>> {
9393
pusher: P,
9494
channel: usize,
9595
counter: usize,
@@ -99,7 +99,7 @@ pub struct LogPusher<T, C, P: Push<Bundle<T, C>>> {
9999
logging: Option<Logger>,
100100
}
101101

102-
impl<T, C, P: Push<Bundle<T, C>>> LogPusher<T, C, P> {
102+
impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
103103
/// Allocates a new pusher.
104104
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
105105
LogPusher {
@@ -114,16 +114,16 @@ impl<T, C, P: Push<Bundle<T, C>>> LogPusher<T, C, P> {
114114
}
115115
}
116116

117-
impl<T, C: Container, P: Push<Bundle<T, C>>> Push<Bundle<T, C>> for LogPusher<T, C, P> {
117+
impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
118118
#[inline]
119-
fn push(&mut self, pair: &mut Option<Bundle<T, C>>) {
119+
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
120120
if let Some(bundle) = pair {
121121
self.counter += 1;
122122

123123
// Stamp the sequence number and source.
124124
// FIXME: Awkward moment/logic.
125-
bundle.payload.seq = self.counter - 1;
126-
bundle.payload.from = self.source;
125+
bundle.seq = self.counter - 1;
126+
bundle.from = self.source;
127127

128128
if let Some(logger) = self.logging.as_ref() {
129129
logger.log(MessagesEvent {
@@ -143,15 +143,15 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Push<Bundle<T, C>> for LogPusher<T,
143143

144144
/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
145145
#[derive(Debug)]
146-
pub struct LogPuller<T, C, P: Pull<Bundle<T, C>>> {
146+
pub struct LogPuller<T, C, P: Pull<Message<T, C>>> {
147147
puller: P,
148148
channel: usize,
149149
index: usize,
150150
phantom: PhantomData<(T, C)>,
151151
logging: Option<Logger>,
152152
}
153153

154-
impl<T, C, P: Pull<Bundle<T, C>>> LogPuller<T, C, P> {
154+
impl<T, C, P: Pull<Message<T, C>>> LogPuller<T, C, P> {
155155
/// Allocates a new `Puller`.
156156
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
157157
LogPuller {
@@ -164,9 +164,9 @@ impl<T, C, P: Pull<Bundle<T, C>>> LogPuller<T, C, P> {
164164
}
165165
}
166166

167-
impl<T, C: Container, P: Pull<Bundle<T, C>>> Pull<Bundle<T, C>> for LogPuller<T, C, P> {
167+
impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
168168
#[inline]
169-
fn pull(&mut self) -> &mut Option<Bundle<T, C>> {
169+
fn pull(&mut self) -> &mut Option<Message<T, C>> {
170170
let result = self.puller.pull();
171171
if let Some(bundle) = result {
172172
let channel = self.channel;

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55

6-
use crate::dataflow::channels::Bundle;
6+
use crate::dataflow::channels::Message;
77
use crate::progress::ChangeBatch;
88
use crate::communication::Pull;
99
use crate::Container;
1010

1111
/// A wrapper which accounts records pulled past in a shared count map.
12-
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> {
12+
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
1313
pullable: P,
1414
consumed: Rc<RefCell<ChangeBatch<T>>>,
1515
phantom: ::std::marker::PhantomData<C>,
@@ -36,15 +36,15 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
3636
}
3737
}
3838

39-
impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
39+
impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P> {
4040
/// Retrieves the next timestamp and batch of data.
4141
#[inline]
42-
pub fn next(&mut self) -> Option<&mut Bundle<T, C>> {
42+
pub fn next(&mut self) -> Option<&mut Message<T, C>> {
4343
self.next_guarded().map(|(_guard, bundle)| bundle)
4444
}
4545

4646
#[inline]
47-
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, C>)> {
47+
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Message<T, C>)> {
4848
if let Some(message) = self.pullable.pull() {
4949
let guard = ConsumedGuard {
5050
consumed: Rc::clone(&self.consumed),
@@ -57,7 +57,7 @@ impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P>
5757
}
5858
}
5959

60-
impl<T:Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
60+
impl<T:Ord+Clone+'static, C, P: Pull<Message<T, C>>> Counter<T, C, P> {
6161
/// Allocates a new `Counter` from a boxed puller.
6262
pub fn new(pullable: P) -> Self {
6363
Counter {

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use crate::communication::Push;
55
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
6-
use crate::dataflow::channels::{Bundle, Message};
6+
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
99
use crate::Container;
@@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
4444
}
4545
}
4646

47-
impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
47+
impl<T, C: Container, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
4848
/// Returns a `Session`, which accepts data to send at the associated time
4949
#[inline]
5050
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
@@ -66,7 +66,7 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<
6666
}
6767
}
6868

69-
impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
69+
impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
7070
/// Returns a `Session`, which accepts data to send at the associated time
7171
pub fn session_with_builder(&mut self, time: &T) -> Session<T, CB, P> {
7272
if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
@@ -85,7 +85,7 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P
8585
}
8686
}
8787

88-
impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
88+
impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
8989
/// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush.
9090
pub fn cease(&mut self) {
9191
self.flush();
@@ -115,7 +115,7 @@ impl<T, CB, P, D> PushInto<D> for Buffer<T, CB, P>
115115
where
116116
T: Eq+Clone,
117117
CB: ContainerBuilder + PushInto<D>,
118-
P: Push<Bundle<T, CB::Container>>
118+
P: Push<Message<T, CB::Container>>
119119
{
120120
#[inline]
121121
fn push_into(&mut self, item: D) {
@@ -136,7 +136,7 @@ pub struct Session<'a, T, CB, P> {
136136
impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder<C>, P>
137137
where
138138
T: Eq + Clone + 'a,
139-
P: Push<Bundle<T, C>> + 'a,
139+
P: Push<Message<T, C>> + 'a,
140140
{
141141
/// Provide a container at the time specified by the [Session].
142142
pub fn give_container(&mut self, container: &mut C) {
@@ -148,7 +148,7 @@ impl<'a, T, CB, P> Session<'a, T, CB, P>
148148
where
149149
T: Eq + Clone + 'a,
150150
CB: ContainerBuilder + 'a,
151-
P: Push<Bundle<T, CB::Container>> + 'a
151+
P: Push<Message<T, CB::Container>> + 'a
152152
{
153153
/// Access the builder. Immutable access to prevent races with flushing
154154
/// the underlying buffer.
@@ -179,7 +179,7 @@ impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
179179
where
180180
T: Eq + Clone + 'a,
181181
CB: ContainerBuilder + PushInto<D> + 'a,
182-
P: Push<Bundle<T, CB::Container>> + 'a,
182+
P: Push<Message<T, CB::Container>> + 'a,
183183
{
184184
#[inline]
185185
fn push_into(&mut self, item: D) {
@@ -192,7 +192,7 @@ pub struct AutoflushSession<'a, T, CB, P>
192192
where
193193
T: Timestamp + 'a,
194194
CB: ContainerBuilder + 'a,
195-
P: Push<Bundle<T, CB::Container>> + 'a,
195+
P: Push<Message<T, CB::Container>> + 'a,
196196
{
197197
/// A reference to the underlying buffer.
198198
buffer: &'a mut Buffer<T, CB, P>,
@@ -204,7 +204,7 @@ impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P>
204204
where
205205
T: Timestamp + 'a,
206206
CB: ContainerBuilder + 'a,
207-
P: Push<Bundle<T, CB::Container>> + 'a,
207+
P: Push<Message<T, CB::Container>> + 'a,
208208
{
209209
/// Transmits a single record.
210210
#[inline]
@@ -231,7 +231,7 @@ impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
231231
where
232232
T: Timestamp + 'a,
233233
CB: ContainerBuilder + PushInto<D> + 'a,
234-
P: Push<Bundle<T, CB::Container>> + 'a,
234+
P: Push<Message<T, CB::Container>> + 'a,
235235
{
236236
#[inline]
237237
fn push_into(&mut self, item: D) {
@@ -243,7 +243,7 @@ impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
243243
where
244244
T: Timestamp + 'a,
245245
CB: ContainerBuilder + 'a,
246-
P: Push<Bundle<T, CB::Container>> + 'a,
246+
P: Push<Message<T, CB::Container>> + 'a,
247247
{
248248
fn drop(&mut self) {
249249
self.buffer.cease();

0 commit comments

Comments
 (0)