Skip to content

Commit 92c8ca8

Browse files
committed
Separate Time/Diff containers, allow IntoOwned vs refs
1 parent 075a15a commit 92c8ca8

File tree

5 files changed

+161
-91
lines changed

5 files changed

+161
-91
lines changed

src/operators/reduce.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ where
170170
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
171171
{
172172
fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
173-
self.reduce_abelian::<_,K,(),KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
173+
self.reduce_abelian::<_,K,(),KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
174174
.as_collection(|k,_| k.clone())
175175
}
176176
}
@@ -221,7 +221,7 @@ where
221221
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
222222
{
223223
fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
224-
self.reduce_abelian::<_,K,R,ValSpine<_,R,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
224+
self.reduce_abelian::<_,K,R,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
225225
.as_collection(|k,c| (k.clone(), c.clone()))
226226
}
227227
}

src/trace/implementations/huffman_container.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl<B: Ord + Clone + 'static> PushInto<Vec<B>> for HuffmanContainer<B> {
5050
}
5151

5252
impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
53+
type Owned = Vec<B>;
5354
type ReadItem<'a> = Wrapped<'a, B>;
5455

5556
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

src/trace/implementations/mod.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ pub trait Layout {
9393
type KeyContainer: BatchContainer + PushInto<<Self::Target as Update>::Key>;
9494
/// Container for update vals.
9595
type ValContainer: BatchContainer;
96-
/// Container for update vals.
97-
type UpdContainer:
98-
PushInto<(<Self::Target as Update>::Time, <Self::Target as Update>::Diff)> +
99-
for<'a> BatchContainer<ReadItem<'a> = &'a (<Self::Target as Update>::Time, <Self::Target as Update>::Diff)>;
96+
/// Container for times.
97+
type TimeContainer: BatchContainer<Owned = <Self::Target as Update>::Time> + PushInto<<Self::Target as Update>::Time>;
98+
/// Container for diffs.
99+
type DiffContainer: BatchContainer<Owned = <Self::Target as Update>::Diff> + PushInto<<Self::Target as Update>::Diff>;
100100
/// Container for offsets.
101101
type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
102102
}
@@ -113,7 +113,8 @@ where
113113
type Target = U;
114114
type KeyContainer = Vec<U::Key>;
115115
type ValContainer = Vec<U::Val>;
116-
type UpdContainer = Vec<(U::Time, U::Diff)>;
116+
type TimeContainer = Vec<U::Time>;
117+
type DiffContainer = Vec<U::Diff>;
117118
type OffsetContainer = OffsetList;
118119
}
119120

@@ -132,7 +133,8 @@ where
132133
type Target = U;
133134
type KeyContainer = TimelyStack<U::Key>;
134135
type ValContainer = TimelyStack<U::Val>;
135-
type UpdContainer = TimelyStack<(U::Time, U::Diff)>;
136+
type TimeContainer = TimelyStack<U::Time>;
137+
type DiffContainer = TimelyStack<U::Diff>;
136138
type OffsetContainer = OffsetList;
137139
}
138140

@@ -184,7 +186,8 @@ where
184186
type Target = Preferred<K, V, T, D>;
185187
type KeyContainer = K::Container;
186188
type ValContainer = V::Container;
187-
type UpdContainer = Vec<(T, D)>;
189+
type TimeContainer = Vec<T>;
190+
type DiffContainer = Vec<D>;
188191
type OffsetContainer = OffsetList;
189192
}
190193

@@ -304,6 +307,7 @@ impl<'a> IntoOwned<'a> for usize {
304307
}
305308

306309
impl BatchContainer for OffsetList {
310+
type Owned = usize;
307311
type ReadItem<'a> = usize;
308312

309313
fn copy(&mut self, item: Self::ReadItem<'_>) {
@@ -442,11 +446,15 @@ pub mod containers {
442446

443447
use timely::container::columnation::{Columnation, TimelyStack};
444448
use timely::container::PushInto;
449+
use crate::trace::IntoOwned;
445450

446451
/// A general-purpose container resembling `Vec<T>`.
447452
pub trait BatchContainer: 'static {
453+
/// An owned instance of `Self::ReadItem<'_>`.
454+
type Owned;
455+
448456
/// The type that can be read back out of the container.
449-
type ReadItem<'a>: Copy + Ord;
457+
type ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::Owned>;
450458

451459
/// Push an item into this container
452460
fn push<D>(&mut self, item: D) where Self: PushInto<D> {
@@ -532,6 +540,7 @@ pub mod containers {
532540
// All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
533541
// struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
534542
impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
543+
type Owned = T;
535544
type ReadItem<'a> = &'a T;
536545

537546
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
@@ -558,7 +567,8 @@ pub mod containers {
558567

559568
// The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
560569
// be presented with the actual contained type, rather than a type that borrows into it.
561-
impl<T: Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
570+
impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
571+
type Owned = T;
562572
type ReadItem<'a> = &'a T;
563573

564574
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
@@ -625,6 +635,7 @@ pub mod containers {
625635
where
626636
B: Ord + Clone + Sized + 'static,
627637
{
638+
type Owned = Vec<B>;
628639
type ReadItem<'a> = &'a [B];
629640

630641
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

0 commit comments

Comments
 (0)