Skip to content

Commit 85b126c

Browse files
authored
Pass data from batcher to builder by chunk (#491)
* Pass data from batcher to builder by chain Currently, the data shared between the batcher and the builder are individual tuples, either moved or by reference. This limits flexibility around what kind of data can be provided to a builder, i.e., it has to be in the form of tuples, either owned or a reference to a fully-formed one. This works fine for vector-like structures, but will not work for containers that like to arrange their data differently. This change alters the contract between the batcher and the builder to provide chunks instead of individual items (it does not require _chains_.) The data in the chunks must be sorted, and subsequent calls must maintain order, too. The input containers need to implement `BuilderInput`, a type that describes how a container's items can be broken into key, value, time, and diff, where key and value can be references or owned data, as long as they can be pushed into the underlying key and value containers. The change has some quirks around comparing keys to keys already in the builder. The types can differ, and the best solution I could come up with was to add two explicit comparison functions to `BuilderInput` to compare keys and values. While it is not elegant, it allows us to move forward with this change, without adding nightmare-inducing trait bounds all-over. Signed-off-by: Moritz Hoffmann <[email protected]> * Address feedback Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 9731d7f commit 85b126c

File tree

9 files changed

+298
-190
lines changed

9 files changed

+298
-190
lines changed

src/operators/arrange/arrangement.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ where
7575

7676
use ::timely::dataflow::scopes::Child;
7777
use ::timely::progress::timestamp::Refines;
78+
use timely::Container;
79+
use timely::container::PushInto;
7880

7981
impl<G, Tr> Arranged<G, Tr>
8082
where
@@ -292,7 +294,8 @@ where
292294
F: Fn(T2::Val<'_>) -> V + 'static,
293295
T2::Diff: Abelian,
294296
T2::Batch: Batch,
295-
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
297+
<T2::Builder as Builder>::Input: Container,
298+
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
296299
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
297300
{
298301
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
@@ -311,7 +314,8 @@ where
311314
V: Data,
312315
F: Fn(T2::Val<'_>) -> V + 'static,
313316
T2::Batch: Batch,
314-
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
317+
<T2::Builder as Builder>::Input: Container,
318+
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
315319
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
316320
{
317321
use crate::operators::reduce::reduce_trace;

src/operators/arrange/upsert.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ where
138138
F: Fn(Tr::Val<'_>) -> V + 'static,
139139
Tr::Time: TotalOrder+ExchangeData,
140140
Tr::Batch: Batch,
141-
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
141+
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
142142
{
143143
let mut reader: Option<TraceAgent<Tr>> = None;
144144

@@ -282,9 +282,7 @@ where
282282
}
283283
// Must insert updates in (key, val, time) order.
284284
updates.sort();
285-
for update in updates.drain(..) {
286-
builder.push(update);
287-
}
285+
builder.push(&mut updates);
288286
}
289287
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
290288
prev_frontier.clone_from(&upper);

src/operators/reduce.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
//! to the key and the list of values.
66
//! The function is expected to populate a list of output values.
77
8+
use timely::Container;
9+
use timely::container::PushInto;
810
use crate::hashable::Hashable;
911
use crate::{Data, ExchangeData, Collection};
1012
use crate::difference::{Semigroup, Abelian};
@@ -252,7 +254,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
252254
F: Fn(T2::Val<'_>) -> V + 'static,
253255
T2::Diff: Abelian,
254256
T2::Batch: Batch,
255-
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
257+
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
256258
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
257259
{
258260
self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| {
@@ -274,7 +276,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
274276
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
275277
F: Fn(T2::Val<'_>) -> V + 'static,
276278
T2::Batch: Batch,
277-
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
279+
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
278280
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
279281
;
280282
}
@@ -293,7 +295,7 @@ where
293295
F: Fn(T2::Val<'_>) -> V + 'static,
294296
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
295297
T2::Batch: Batch,
296-
T2::Builder: Builder<Input = ((K, V), T2::Time, T2::Diff)>,
298+
T2::Builder: Builder<Input = Vec<((K, V), T2::Time, T2::Diff)>>,
297299
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
298300
{
299301
self.arrange_by_key_named(&format!("Arrange: {}", name))
@@ -312,7 +314,8 @@ where
312314
V: Data,
313315
F: Fn(T2::Val<'_>) -> V + 'static,
314316
T2::Batch: Batch,
315-
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
317+
<T2::Builder as Builder>::Input: Container,
318+
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
316319
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
317320
{
318321
let mut result_trace = None;
@@ -454,6 +457,8 @@ where
454457
builders.push(T2::Builder::new());
455458
}
456459

460+
let mut buffer = Default::default();
461+
457462
// cursors for navigating input and output traces.
458463
let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
459464
let source_storage = &source_storage;
@@ -531,7 +536,9 @@ where
531536
for index in 0 .. buffers.len() {
532537
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
533538
for (val, time, diff) in buffers[index].1.drain(..) {
534-
builders[index].push(((key.into_owned(), val), time, diff));
539+
((key.into_owned(), val), time, diff).push_into(&mut buffer);
540+
builders[index].push(&mut buffer);
541+
buffer.clear();
535542
}
536543
}
537544
}
@@ -648,7 +655,7 @@ where
648655
where
649656
F: Fn(C2::Val<'_>) -> V,
650657
L: FnMut(
651-
C1::Key<'a>,
658+
C1::Key<'a>,
652659
&[(C1::Val<'a>, C1::Diff)],
653660
&mut Vec<(V, C2::Diff)>,
654661
&mut Vec<(V, C2::Diff)>,
@@ -728,7 +735,7 @@ mod history_replay {
728735
where
729736
F: Fn(C2::Val<'_>) -> V,
730737
L: FnMut(
731-
C1::Key<'a>,
738+
C1::Key<'a>,
732739
&[(C1::Val<'a>, C1::Diff)],
733740
&mut Vec<(V, C2::Diff)>,
734741
&mut Vec<(V, C2::Diff)>,
@@ -1020,7 +1027,7 @@ mod history_replay {
10201027
new_interesting.push(next_time.clone());
10211028
debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
10221029
}
1023-
1030+
10241031

10251032
// Update `meet` to track the meet of each source of times.
10261033
meet = None;//T::maximum();

src/trace/implementations/merge_batcher.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ where
282282
type Time = T;
283283
type Input = Vec<((K, V), T, R)>;
284284
type Chunk = Vec<((K, V), T, R)>;
285-
type Output = ((K, V), T, R);
285+
type Output = Vec<((K, V), T, R)>;
286286

287287
fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
288288
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
@@ -497,8 +497,8 @@ where
497497
}
498498
let mut builder = B::with_capacity(keys, vals, upds);
499499

500-
for datum in chain.drain(..).flatten() {
501-
builder.push(datum);
500+
for mut chunk in chain.drain(..) {
501+
builder.push(&mut chunk);
502502
}
503503

504504
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())

src/trace/implementations/merge_batcher_col.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ where
6767
type Time = T;
6868
type Input = Vec<((K, V), T, R)>;
6969
type Chunk = TimelyStack<((K, V), T, R)>;
70-
type Output = ((K, V), T, R);
70+
type Output = TimelyStack<((K, V), T, R)>;
7171

7272
fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
7373
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
@@ -290,11 +290,8 @@ where
290290
}
291291
}
292292
let mut builder = B::with_capacity(keys, vals, upds);
293-
294-
for chunk in chain.drain(..) {
295-
for datum in chunk.iter() {
296-
builder.copy(datum);
297-
}
293+
for mut chunk in chain.drain(..) {
294+
builder.push(&mut chunk);
298295
}
299296

300297
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())

0 commit comments

Comments
 (0)