Skip to content

Commit 1eb052c

Browse files
authored
join_traces uses timely's container builder (#478)
join_traces can produce arbitrary containers. --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent b281e50 commit 1eb052c

File tree

8 files changed

+193
-53
lines changed

8 files changed

+193
-53
lines changed

examples/spines.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ fn main() {
5555
let data =
5656
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
5757
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
58-
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |v| v.clone(), |_,_,output| output.push(((), 1)));
58+
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));
5959
let keys =
6060
keys.map(|x| (x.clone().into_bytes(), 7))
6161
.arrange::<PreferredSpine<[u8],u8,_,_>>()
62-
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |v| v.clone(), |_,_,output| output.push(((), 1)));
62+
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));
6363

6464
keys.join_core(&data, |k,_v1,_v2| {
6565
println!("{:?}", k.text);

server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ path="../"
1010
[dependencies]
1111
rand="0.3.13"
1212
libloading="*"
13-
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
13+
timely = { workspace = true }
1414

1515
#[workspace]
1616
#members = [

src/capture.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ pub mod source {
399399
for message in source.by_ref() {
400400
match message {
401401
Message::Updates(mut updates) => {
402-
updates_session.give_vec(&mut updates);
402+
updates_session.give_container(&mut updates);
403403
}
404404
Message::Progress(progress) => {
405405
// We must send a copy of each progress message to all workers,

src/consolidation.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
1111
//! specific behavior you require.
1212
13+
use std::collections::VecDeque;
14+
use timely::container::{ContainerBuilder, PushContainer, PushInto};
15+
use crate::Data;
1316
use crate::difference::Semigroup;
1417

1518
/// Sorts and consolidates `vec`.
@@ -145,6 +148,84 @@ pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D,
145148
offset
146149
}
147150

151+
/// A container builder that consolidates data in-places into fixed-sized containers. Does not
152+
/// maintain FIFO ordering.
153+
#[derive(Default)]
154+
pub struct ConsolidatingContainerBuilder<C>{
155+
current: C,
156+
empty: Vec<C>,
157+
outbound: VecDeque<C>,
158+
}
159+
160+
impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
161+
where
162+
D: Data,
163+
T: Data,
164+
R: Semigroup,
165+
{
166+
/// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements.
167+
// TODO: Can we replace `multiple` by a bool?
168+
fn consolidate_and_flush_through(&mut self, multiple: usize) {
169+
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
170+
consolidate_updates(&mut self.current);
171+
let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
172+
while drain.peek().is_some() {
173+
let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
174+
container.extend((&mut drain).take(preferred_capacity));
175+
self.outbound.push_back(container);
176+
}
177+
}
178+
}
179+
180+
impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
181+
where
182+
D: Data,
183+
T: Data,
184+
R: Semigroup,
185+
{
186+
type Container = Vec<(D,T,R)>;
187+
188+
/// Push an element.
189+
///
190+
/// Precondition: `current` is not allocated or has space for at least one element.
191+
#[inline]
192+
fn push<P: PushInto<Self::Container>>(&mut self, item: P) {
193+
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
194+
if self.current.capacity() < preferred_capacity * 2 {
195+
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
196+
}
197+
item.push_into(&mut self.current);
198+
if self.current.len() == self.current.capacity() {
199+
// Flush complete containers.
200+
self.consolidate_and_flush_through(preferred_capacity);
201+
}
202+
}
203+
204+
fn push_container(&mut self, container: &mut Self::Container) {
205+
for item in container.drain(..) {
206+
self.push(item);
207+
}
208+
}
209+
210+
fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
211+
if let Some(container) = self.outbound.pop_front() {
212+
self.empty.push(container);
213+
self.empty.last_mut()
214+
} else {
215+
None
216+
}
217+
}
218+
219+
fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
220+
// Flush all
221+
self.consolidate_and_flush_through(1);
222+
// Remove all but two elements from the stash of empty to avoid memory leaks. We retain
223+
// two to match `current` capacity.
224+
self.empty.truncate(2);
225+
self.extract()
226+
}
227+
}
228+
148229
#[cfg(test)]
149230
mod tests {
150231
use super::*;
@@ -211,4 +292,46 @@ mod tests {
211292
assert_eq!(input, output);
212293
}
213294
}
295+
296+
#[test]
297+
fn test_consolidating_container_builder() {
298+
let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
299+
for _ in 0..1024 {
300+
ccb.push((0, 0, 0));
301+
}
302+
assert_eq!(ccb.extract(), None);
303+
assert_eq!(ccb.finish(), None);
304+
305+
for i in 0..1024 {
306+
ccb.push((i, 0, 1));
307+
}
308+
309+
let mut collected = Vec::default();
310+
while let Some(container) = ccb.finish() {
311+
collected.append(container);
312+
}
313+
// The output happens to be sorted, but it's not guaranteed.
314+
collected.sort();
315+
for i in 0..1024 {
316+
assert_eq!((i, 0, 1), collected[i]);
317+
}
318+
319+
ccb = Default::default();
320+
ccb.push_container(&mut Vec::default());
321+
assert_eq!(ccb.extract(), None);
322+
assert_eq!(ccb.finish(), None);
323+
324+
ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1))));
325+
ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1))));
326+
collected.clear();
327+
while let Some(container) = ccb.finish() {
328+
collected.append(container);
329+
}
330+
// The output happens to be sorted, but it's not guaranteed.
331+
consolidate_updates(&mut collected);
332+
for i in 0..1024 {
333+
assert_eq!((i, 0, 2), collected[i]);
334+
}
335+
336+
}
214337
}

src/dynamic/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ where
6060
vec.truncate(level - 1);
6161
time.inner = PointStamp::new(vec);
6262
}
63-
output.session(&new_cap).give_vec(&mut vector);
63+
output.session(&new_cap).give_container(&mut vector);
6464
});
6565
});
6666

src/operators/arrange/arrangement.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ where
255255
self.join_core_internal_unsafe(other, result)
256256
}
257257
/// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
258-
pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, result: L) -> Collection<G,D,ROut>
258+
pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,D,ROut>
259259
where
260260
T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
261261
D: Data,
@@ -264,7 +264,15 @@ where
264264
L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
265265
{
266266
use crate::operators::join::join_traces;
267-
join_traces(self, other, result)
267+
join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
268+
self,
269+
other,
270+
move |k, v1, v2, t, d1, d2, c| {
271+
for datum in result(k, v1, v2, t, d1, d2) {
272+
c.give(datum);
273+
}
274+
}
275+
)
268276
.as_collection()
269277
}
270278
}

src/operators/consolidate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ where
9999
input.for_each(|time, data| {
100100
data.swap(&mut vector);
101101
crate::consolidation::consolidate_updates(&mut vector);
102-
output.session(&time).give_vec(&mut vector);
102+
output.session(&time).give_container(&mut vector);
103103
})
104104
}
105105
})

0 commit comments

Comments
 (0)