Skip to content

Commit 13c841a

Browse files
committed
Separate containers crate
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent b75307c commit 13c841a

File tree

16 files changed

+452
-444
lines changed

16 files changed

+452
-444
lines changed

Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
[workspace]
22
members = [
3-
"differential-dataflow",
43
# "advent_of_code_2017",
4+
"containers",
5+
"differential-dataflow",
56
"dogsdogsdogs",
7+
"doop",
68
"experiments",
79
"interactive",
810
"server",
911
"server/dataflows/degr_dist",
1012
"server/dataflows/neighborhood",
1113
"server/dataflows/random_graph",
1214
"server/dataflows/reachability",
13-
#"tpchlike",
14-
"doop"
15+
# "tpchlike",
1516
]
1617
resolver = "2"
1718

1819
[workspace.dependencies]
20+
columnar = { default-features = false, version = "0.3.0" }
1921
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.13.6" }
2022
timely = { version = "0.18", default-features = false }
2123
#timely = { path = "../timely-dataflow/timely/", default-features = false }

containers/Cargo.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[package]
2+
name = "differential-containers"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
columnation = "0.1.0"
8+
differential-dataflow = { workspace = true }
9+
serde = { version = "1.0", features = ["derive"] }
10+
timely = { workspace = true }
11+
12+
[dev-dependencies]
13+
bytemuck = { default-features = false, version = "1.21.0" }
14+
columnar = { workspace = true }

differential-dataflow/examples/columnar.rs renamed to containers/examples/columnar.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
//! Wordcount based on `columnar`.
22
3-
use {
4-
timely::container::{Container, CapacityContainerBuilder},
5-
timely::dataflow::channels::pact::ExchangeCore,
6-
timely::dataflow::InputHandleCore,
7-
timely::dataflow::ProbeHandle,
8-
};
9-
10-
11-
use differential_dataflow::trace::implementations::ord_neu::ColKeyBuilder;
12-
use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
13-
3+
use differential_containers::columnation::{ColKeyBuilder, ColKeySpine, ColMerger, TimelyStack};
144
use differential_dataflow::operators::arrange::arrangement::arrange_core;
5+
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
6+
use timely::container::{Container, CapacityContainerBuilder};
7+
use timely::dataflow::InputHandleCore;
8+
use timely::dataflow::ProbeHandle;
9+
use timely::dataflow::channels::pact::ExchangeCore;
1510

1611
fn main() {
1712

@@ -346,10 +341,6 @@ mod builder {
346341
}
347342

348343

349-
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
350-
use differential_dataflow::trace::implementations::merge_batcher::ColMerger;
351-
use differential_dataflow::containers::TimelyStack;
352-
353344
/// A batcher for columnar storage.
354345
pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<Column<((K,V),T,R)>, batcher::Chunker<TimelyStack<((K,V),T,R)>>, ColMerger<(K,V),T,R>>;
355346
pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;

differential-dataflow/examples/spines.rs renamed to containers/examples/spines.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ fn main() {
2828

2929
match mode.as_str() {
3030
"new" => {
31-
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
31+
use differential_containers::columnation::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
3232
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
3333
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
3434
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
@@ -42,15 +42,15 @@ fn main() {
4242
.probe_with(&mut probe);
4343
},
4444
"rhh" => {
45-
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
45+
use differential_containers::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
4646
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
4747
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
4848
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
4949
.probe_with(&mut probe);
5050
},
5151
"slc" => {
5252

53-
use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine};
53+
use differential_containers::{PreferredBatcher, PreferredBuilder, PreferredSpine};
5454

5555
let data =
5656
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))

differential-dataflow/src/containers.rs renamed to containers/src/columnation.rs

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,83 @@
11
//! A columnar container based on the columnation library.
22
33
use std::iter::FromIterator;
4-
5-
pub use columnation::*;
4+
use std::rc::Rc;
5+
6+
use columnation::{Columnation, Region};
7+
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
8+
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder, OrdValBatch, OrdValBuilder};
9+
use differential_dataflow::trace::implementations::spine_fueled::Spine;
10+
use differential_dataflow::trace::implementations::{BatchContainer, Layout, OffsetList, Update};
11+
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
612
use timely::container::PushInto;
713

14+
mod merge_batcher;
15+
mod chunker;
16+
17+
pub use merge_batcher::ColMerger;
18+
pub use chunker::ColumnationChunker;
19+
20+
/// A layout based on timely stacks
21+
pub struct TStack<U: Update> {
22+
phantom: std::marker::PhantomData<U>,
23+
}
24+
25+
impl<U: Update> Layout for TStack<U>
26+
where
27+
U::Key: Columnation,
28+
U::Val: Columnation,
29+
U::Time: Columnation,
30+
U::Diff: Columnation + Ord,
31+
{
32+
type Target = U;
33+
type KeyContainer = TimelyStack<U::Key>;
34+
type ValContainer = TimelyStack<U::Val>;
35+
type TimeContainer = TimelyStack<U::Time>;
36+
type DiffContainer = TimelyStack<U::Diff>;
37+
type OffsetContainer = OffsetList;
38+
}
39+
40+
41+
/// A trace implementation backed by columnar storage.
42+
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
43+
/// A batcher for columnar storage.
44+
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
45+
/// A builder for columnar storage.
46+
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
47+
48+
49+
/// A trace implementation backed by columnar storage.
50+
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
51+
/// A batcher for columnar storage
52+
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
53+
/// A builder for columnar storage
54+
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
55+
56+
// The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
57+
// be presented with the actual contained type, rather than a type that borrows into it.
58+
impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
59+
type Owned = T;
60+
type ReadItem<'a> = &'a T;
61+
62+
fn with_capacity(size: usize) -> Self {
63+
Self::with_capacity(size)
64+
}
65+
66+
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
67+
let mut new = Self::default();
68+
new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
69+
new
70+
}
71+
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
72+
fn index(&self, index: usize) -> Self::ReadItem<'_> {
73+
&self[index]
74+
}
75+
fn len(&self) -> usize {
76+
self[..].len()
77+
}
78+
}
79+
80+
881
/// An append-only vector that store records as columns.
982
///
1083
/// This container maintains elements that might conventionally own
@@ -274,7 +347,7 @@ mod container {
274347
use timely::Container;
275348
use timely::container::SizableContainer;
276349

277-
use crate::containers::TimelyStack;
350+
use crate::columnation::TimelyStack;
278351

279352
impl<T: Columnation> Container for TimelyStack<T> {
280353
type ItemRef<'a> = &'a T where Self: 'a;

containers/src/columnation/chunker.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use std::collections::VecDeque;
2+
use columnation::Columnation;
3+
use timely::container::{ContainerBuilder, PushInto};
4+
use differential_dataflow::consolidation::consolidate_updates;
5+
use differential_dataflow::difference::Semigroup;
6+
7+
use crate::columnation::TimelyStack;
8+
9+
/// Chunk a stream of vectors into chains of vectors.
10+
pub struct ColumnationChunker<T: Columnation> {
11+
pending: Vec<T>,
12+
ready: VecDeque<TimelyStack<T>>,
13+
empty: Option<TimelyStack<T>>,
14+
}
15+
16+
17+
impl<T: Columnation> Default for ColumnationChunker<T> {
18+
fn default() -> Self {
19+
Self {
20+
pending: Vec::default(),
21+
ready: VecDeque::default(),
22+
empty: None,
23+
}
24+
}
25+
}
26+
27+
impl<D,T,R> ColumnationChunker<(D, T, R)>
28+
where
29+
D: Columnation + Ord,
30+
T: Columnation + Ord,
31+
R: Columnation + Semigroup,
32+
{
33+
const BUFFER_SIZE_BYTES: usize = 64 << 10;
34+
fn chunk_capacity() -> usize {
35+
let size = ::std::mem::size_of::<(D, T, R)>();
36+
if size == 0 {
37+
Self::BUFFER_SIZE_BYTES
38+
} else if size <= Self::BUFFER_SIZE_BYTES {
39+
Self::BUFFER_SIZE_BYTES / size
40+
} else {
41+
1
42+
}
43+
}
44+
45+
/// Form chunks out of pending data, if needed. This function is meant to be applied to
46+
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
47+
/// half full when the function returns.
48+
///
49+
/// `form_chunk` does the following:
50+
/// * If pending is full, consolidate.
51+
/// * If after consolidation it's more than half full, peel off chunks,
52+
/// leaving behind any partial chunk in pending.
53+
fn form_chunk(&mut self) {
54+
consolidate_updates(&mut self.pending);
55+
if self.pending.len() >= Self::chunk_capacity() {
56+
while self.pending.len() > Self::chunk_capacity() {
57+
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
58+
for item in self.pending.drain(..chunk.capacity()) {
59+
chunk.copy(&item);
60+
}
61+
self.ready.push_back(chunk);
62+
}
63+
}
64+
}
65+
}
66+
67+
impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
68+
where
69+
D: Columnation + Ord + Clone,
70+
T: Columnation + Ord + Clone,
71+
R: Columnation + Semigroup + Clone,
72+
{
73+
fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
74+
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
75+
// because we don't write more than capacity elements into the buffer.
76+
if self.pending.capacity() < Self::chunk_capacity() * 2 {
77+
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
78+
}
79+
80+
let mut drain = container.drain(..).peekable();
81+
while drain.peek().is_some() {
82+
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
83+
if self.pending.len() == self.pending.capacity() {
84+
self.form_chunk();
85+
}
86+
}
87+
}
88+
}
89+
90+
impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
91+
where
92+
D: Columnation + Ord + Clone + 'static,
93+
T: Columnation + Ord + Clone + 'static,
94+
R: Columnation + Semigroup + Clone + 'static,
95+
{
96+
type Container = TimelyStack<(D,T,R)>;
97+
98+
fn extract(&mut self) -> Option<&mut Self::Container> {
99+
if let Some(ready) = self.ready.pop_front() {
100+
self.empty = Some(ready);
101+
self.empty.as_mut()
102+
} else {
103+
None
104+
}
105+
}
106+
107+
fn finish(&mut self) -> Option<&mut Self::Container> {
108+
consolidate_updates(&mut self.pending);
109+
while !self.pending.is_empty() {
110+
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
111+
for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) {
112+
chunk.copy(&item);
113+
}
114+
self.ready.push_back(chunk);
115+
}
116+
self.empty = self.ready.pop_front();
117+
self.empty.as_mut()
118+
}
119+
}
120+

0 commit comments

Comments
 (0)