diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 7995b4397..9856e38e0 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -444,7 +444,7 @@ where T2::ValOwned: Data, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -462,7 +462,7 @@ where T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time>+'static, T2::ValOwned: Data, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index e9d1744ff..4ca9dc9b3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -136,7 +136,7 @@ where Tr::ValOwned: ExchangeData, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder, { let mut reader: Option> = None; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 52fb8919f..b67c1c4b8 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -13,7 +13,7 @@ use crate::difference::Semigroup; use crate::Data; use crate::lattice::Lattice; -use crate::trace::{Batcher, Builder}; +use crate::trace::Batcher; /// Methods which require data be arrangeable. impl Collection @@ -53,8 +53,7 @@ where where Tr: crate::trace::Trace+'static, Tr::Batch: crate::trace::Batch, - Tr::Batcher: Batcher, Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>, - Tr::Builder: Builder, + Tr::Batcher: Batcher>, { use crate::operators::arrange::arrangement::Arrange; use crate::trace::cursor::MyTrait; diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 7f322c00e..37d98d8ca 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -251,7 +251,7 @@ pub trait ReduceCore, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -273,7 +273,7 @@ pub trait ReduceCore Trace=&'a K, Time=G::Timestamp>+'static, T2::ValOwned: Data, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, ; } @@ -292,7 +292,7 @@ where T2::ValOwned: Data, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -310,7 +310,7 @@ where T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time> + 'static, T2::ValOwned: Data, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, { let mut result_trace = None; diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 673fedce6..2cf27ed1b 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -25,8 +25,8 @@ where T: Timestamp, D: Semigroup, { - type Input = Vec; - type Item = ((K,V),T,D); + type Input = Vec<((K,V),T,D)>; + type Output = ((K,V),T,D); type Time = T; fn new(logger: Option>, operator_id: usize) -> Self { @@ -38,7 +38,7 @@ where } #[inline(never)] - fn push_batch(&mut self, batch: RefOrMut>) { + fn push_batch(&mut self, batch: RefOrMut) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { @@ -59,7 +59,7 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline(never)] - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal>(&mut self, upper: Antichain) -> B::Output { let mut merged = Vec::new(); self.sorter.finish_into(&mut merged); diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 7989eece8..21416170d 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -31,8 +31,8 @@ where T: Columnation + Timestamp + 'static, D: Columnation + Semigroup + 'static, { - type Input = Vec; - type Item = ((K,V),T,D); + type Input = Vec<((K,V),T,D)>; + type Output = ((K,V),T,D); type Time = T; fn new(logger: Option>, operator_id: usize) -> Self { @@ -44,7 +44,7 @@ where } #[inline] - fn push_batch(&mut self, batch: RefOrMut>) { + fn push_batch(&mut self, batch: RefOrMut) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { @@ -63,7 +63,7 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal>(&mut self, upper: Antichain) -> B::Output { let mut merged = Default::default(); self.sorter.finish_into(&mut merged); diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index bf4dd9544..ce0c6532a 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -540,7 +540,7 @@ mod val_batch { impl Builder for OrdValBuilder { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Input = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -560,7 +560,7 @@ mod val_batch { } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Item) { + fn push(&mut self, ((key, val), time, diff): Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { @@ -586,7 +586,7 @@ mod val_batch { } #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Item) { + fn copy(&mut self, ((key, val), time, diff): &Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { @@ -1006,7 +1006,7 @@ mod key_batch { impl Builder for OrdKeyBuilder { - type Item = ((::Key, ()), ::Time, ::Diff); + type Input = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; type Output = OrdKeyBatch; @@ -1024,7 +1024,7 @@ mod key_batch { } #[inline] - fn push(&mut self, ((key, ()), time, diff): Self::Item) { + fn push(&mut self, ((key, ()), time, diff): Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { @@ -1040,7 +1040,7 @@ mod key_batch { } #[inline] - fn copy(&mut self, ((key, ()), time, diff): &Self::Item) { + fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 94ed3b95b..f29d39a8b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -731,7 +731,7 @@ mod val_batch { ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Input = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = RhhValBatch; @@ -763,7 +763,7 @@ mod val_batch { } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Item) { + fn push(&mut self, ((key, val), time, diff): Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { @@ -790,7 +790,7 @@ mod val_batch { } #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Item) { + fn copy(&mut self, ((key, val), time, diff): &Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index fd3af3b76..c616e2b7d 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -255,7 +255,7 @@ impl Trace for Spine where B: Batch+Clone+'static, BA: Batcher