diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index bae14c7b3..d86fb5089 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::TraceReader; /// Reports a number of extensions to a stream of prefixes. /// @@ -23,8 +23,6 @@ where G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, P: ExchangeData, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 8f6f4d0cf..338a994aa 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -42,7 +42,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Monoid, Semigroup}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// A binary equijoin that responds to updates on only its first input. @@ -81,8 +81,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, @@ -137,8 +135,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 671e24921..029e1f942 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Proposes extensions to a stream of prefixes. /// @@ -32,8 +32,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, F: FnMut(&D, &mut Tr::Key)+Clone+'static, D: ExchangeData, diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index cf7e793c8..2a5e6747a 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::TraceReader; /// Proposes extensions to a prefix stream. /// @@ -25,8 +25,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, P: ExchangeData, @@ -58,8 +56,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, P: ExchangeData, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 7152f1876..41efa8df2 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::TraceReader; /// Proposes extensions to a stream of prefixes. /// @@ -24,8 +24,6 @@ where Tr: TraceReader+Clone+'static, K: Ord+Hash+Clone+Default, V: ExchangeData+Hash+Default, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, P: ExchangeData, diff --git a/examples/cursors.rs b/examples/cursors.rs index 29e9ddc00..5ab55b5e6 100644 --- a/examples/cursors.rs +++ b/examples/cursors.rs @@ -43,7 +43,7 @@ use timely::progress::frontier::AntichainRef; use differential_dataflow::input::Input; use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::*; -use differential_dataflow::trace::cursor::CursorDebug; +use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::TraceReader; type Node = u32; diff --git a/src/algorithms/graphs/bfs.rs b/src/algorithms/graphs/bfs.rs index 2931922a8..6774f535f 100644 --- a/src/algorithms/graphs/bfs.rs +++ b/src/algorithms/graphs/bfs.rs @@ -30,8 +30,6 @@ where G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, - Tr::Cursor: crate::trace::Cursor+'static, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/src/algorithms/graphs/bijkstra.rs b/src/algorithms/graphs/bijkstra.rs index 91e02dbde..eb803f160 100644 --- a/src/algorithms/graphs/bijkstra.rs +++ b/src/algorithms/graphs/bijkstra.rs @@ -46,8 +46,6 @@ where G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, - Tr::Cursor: crate::trace::Cursor+'static, { forward .stream diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index c94e49814..62bd92283 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -65,8 +65,6 @@ where R: From, L: ExchangeData, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, - Tr::Cursor: crate::trace::Cursor+'static, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 13c3b02cd..8016c12cc 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -77,7 +77,7 @@ where fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() } - fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, >::Storage)> { + fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, ::Storage)> { self.trace.borrow_mut().trace.cursor_through(frontier) } fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } @@ -92,7 +92,7 @@ where pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter) where Tr: Trace, - Tr::Batch: Batch, + Tr::Batch: Batch, { let trace = Rc::new(RefCell::new(TraceBox::new(trace))); let queues = Rc::new(RefCell::new(Vec::new())); diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 09a48e248..d452c7113 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -65,8 +65,6 @@ impl Clone for Arranged where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, { fn clone(&self) -> Self { Arranged { @@ -83,8 +81,6 @@ impl Arranged where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, { /// Brings an arranged collection into a nested scope. /// @@ -405,7 +401,7 @@ where // Determine new frontier on queries that may be issued. // TODO: This code looks very suspect; explain better or fix. - let frontier = std::array::IntoIter::new([ + let frontier = IntoIterator::into_iter([ capability.as_ref().map(|c| c.time().clone()), input1.frontier().frontier().get(0).cloned(), ]).filter_map(|t| t).min(); @@ -425,8 +421,6 @@ impl<'a, G: Scope, Tr> Arranged, Tr> where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, - Tr::Cursor: Cursor, { /// Brings an arranged collection out of a nested region. /// @@ -462,8 +456,7 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, { self.arrange_named("Arrange") } @@ -479,8 +472,7 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) @@ -495,8 +487,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, ; } @@ -512,8 +503,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -547,7 +537,7 @@ where }; // Where we will deposit received updates, and from which we extract batches. - let mut batcher = >::Batcher::new(); + let mut batcher = ::Batcher::new(); // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); @@ -684,8 +674,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, { self.map(|k| (k, ())) .arrange_core(pact, name) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 978d9596f..468398ae6 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -145,8 +145,7 @@ where Tr::Key: ExchangeData+Hashable+std::hash::Hash, Tr::Val: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, { let mut reader: Option> = None; @@ -252,7 +251,7 @@ where // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); - let mut builder = >::Builder::new(); + let mut builder = ::Builder::new(); for (key, mut list) in to_process.drain(..) { // The prior value associated with the key. diff --git a/src/operators/arrange/writer.rs b/src/operators/arrange/writer.rs index b6c4c5710..a42924b75 100644 --- a/src/operators/arrange/writer.rs +++ b/src/operators/arrange/writer.rs @@ -23,7 +23,7 @@ pub struct TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { /// Current upper limit. upper: Antichain, @@ -37,7 +37,7 @@ impl TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { /// Creates a new `TraceWriter`. pub fn new( @@ -96,7 +96,7 @@ where pub fn seal(&mut self, upper: Antichain) { if self.upper != upper { use trace::Builder; - let builder = >::Builder::new(); + let builder = ::Builder::new(); let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum())); self.insert(batch, None); } @@ -107,7 +107,7 @@ impl Drop for TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { fn drop(&mut self) { self.seal(Antichain::new()) diff --git a/src/operators/count.rs b/src/operators/count.rs index f119d648e..47864622b 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -74,8 +74,6 @@ where T1: TraceReader+Clone+'static, T1::Key: ExchangeData, T1::R: ExchangeData+Semigroup, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn count_total_core>(&self) -> Collection { diff --git a/src/operators/join.rs b/src/operators/join.rs index ea72b32f6..2db2560e1 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -190,8 +190,6 @@ where Tr::Key: Data+Hashable, Tr::Val: Data, Tr::R: Semigroup, - Tr::Batch: BatchReader+'static, - Tr::Cursor: Cursor+'static, { fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> where Tr::Key: ExchangeData, Tr::R: Multiply, >::Output: Semigroup, L: FnMut(&Tr::Key, &Tr::Val, &V2)->D+'static { @@ -258,8 +256,6 @@ pub trait JoinCore where G::Time fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, R: Multiply, @@ -311,8 +307,6 @@ pub trait JoinCore where G::Time fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, D: Data, @@ -334,8 +328,6 @@ where fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, R: Multiply, @@ -351,8 +343,6 @@ where fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, R: Semigroup, @@ -374,15 +364,11 @@ impl JoinCore for Arranged T1::Key: Ord+Debug+'static, T1::Val: Ord+Clone+Debug+'static, T1::R: Semigroup, - T1::Batch: BatchReader+'static, - T1::Cursor: Cursor+'static, { fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where Tr2::Val: Ord+Clone+Debug+'static, Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::R: Semigroup, T1::R: Multiply, >::Output: Semigroup, @@ -401,8 +387,6 @@ impl JoinCore for Arranged fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, - Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, D: Data, @@ -651,39 +635,39 @@ impl JoinCore for Arranged /// The structure wraps cursors which allow us to play out join computation at whatever rate we like. /// This allows us to avoid producing and buffering massive amounts of data, without giving the timely /// dataflow system a chance to run operators that can consume and aggregate the data. -struct Deferred +struct Deferred where - V1: Ord+Clone, - V2: Ord+Clone, T: Timestamp+Lattice+Ord+Debug, - R1: Semigroup, - R2: Semigroup, - R3: Semigroup, - C1: Cursor, - C2: Cursor, + R: Semigroup, + C1: Cursor, + C2: Cursor, + C1::Val: Ord+Clone, + C2::Val: Ord+Clone, + C1::R: Semigroup, + C2::R: Semigroup, D: Ord+Clone+Data, { - phant: ::std::marker::PhantomData<(K, V1, V2, R1, R2)>, + phant: ::std::marker::PhantomData, trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability, done: bool, - temp: Vec<((D, T), R3)>, + temp: Vec<((D, T), R)>, } -impl Deferred +impl Deferred where K: Ord+Debug+Eq, - V1: Ord+Clone+Debug, - V2: Ord+Clone+Debug, + C1: Cursor, + C2: Cursor, + C1::Val: Ord+Clone+Debug, + C2::Val: Ord+Clone+Debug, + C1::R: Semigroup, + C2::R: Semigroup, T: Timestamp+Lattice+Ord+Debug, - R1: Semigroup, - R2: Semigroup, - R3: Semigroup, - C1: Cursor, - C2: Cursor, + R: Semigroup, D: Clone+Data, { fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { @@ -705,8 +689,8 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) - where I: IntoIterator, L: FnMut(&K, &V1, &V2, &T, &R1, &R2)->I { + fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) + where I: IntoIterator, L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I { let meet = self.capability.time(); diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 967370e85..96f0acc93 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -39,8 +39,8 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup { } } /// Loads the contents of a cursor. - fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L) - where K: Eq, V: Clone, C: Cursor, L: Fn(&T)->T { + fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L) + where V: Clone, C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.clear(); while cursor.val_valid(storage) { cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone())); @@ -102,22 +102,22 @@ impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueH self.history.clear(); self.buffer.clear(); } - fn load(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L) - where K: Eq, C: Cursor, L: Fn(&T)->T { + fn load(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L) + where C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.edits.load(cursor, storage, logic); } /// Loads and replays a specified key. /// /// If the key is absent, the replayed history will be empty. - fn replay_key<'history, K, C, L>( + fn replay_key<'history, C, L>( &'history mut self, cursor: &mut C, storage: &'storage C::Storage, - key: &K, + key: &C::Key, logic: L ) -> HistoryReplay<'storage, 'history, V, T, R> - where K: Eq, C: Cursor, L: Fn(&T)->T + where C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.clear(); cursor.seek_key(storage, key); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index f8d9edeb1..2b5ea9aa2 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -90,8 +90,6 @@ impl Reduce for Arrang where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { @@ -179,8 +177,6 @@ impl Threshold for Arranged+Clone+'static, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { self.reduce_abelian::<_,DefaultKeyTrace<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) @@ -236,8 +232,6 @@ impl Count for Arranged where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn count_core>(&self) -> Collection { self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -282,8 +276,7 @@ pub trait ReduceCore where G::Timestam T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Abelian, - T2::Batch: Batch, - T2::Cursor: Cursor, + T2::Batch: Batch, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -305,8 +298,7 @@ pub trait ReduceCore where G::Timestam T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Semigroup, - T2::Batch: Batch, - T2::Cursor: Cursor, + T2::Batch: Batch, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -324,8 +316,7 @@ where T2::Val: Data, T2::R: Semigroup, T2: Trace+TraceReader+'static, - T2::Batch: Batch, - T2::Cursor: Cursor, + T2::Batch: Batch, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -337,16 +328,13 @@ impl ReduceCore for Ar where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn reduce_core(&self, name: &str, mut logic: L) -> Arranged> where T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Semigroup, - T2::Batch: Batch, - T2::Cursor: Cursor, + T2::Batch: Batch, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { let mut result_trace = None; @@ -488,7 +476,7 @@ where let mut builders = Vec::new(); for i in 0 .. capabilities.len() { buffers.push((capabilities[i].time().clone(), Vec::new())); - builders.push(>::Builder::new()); + builders.push(::Builder::new()); } // cursors for navigating input and output traces. @@ -682,9 +670,9 @@ where new_interesting: &mut Vec) -> (usize, usize) where K: Eq+Clone, - C1: Cursor, - C2: Cursor, - C3: Cursor, + C1: Cursor, + C2: Cursor, + C3: Cursor, L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>); } @@ -760,9 +748,9 @@ mod history_replay { new_interesting: &mut Vec) -> (usize, usize) where K: Eq+Clone, - C1: Cursor, - C2: Cursor, - C3: Cursor, + C1: Cursor, + C2: Cursor, + C3: Cursor, L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>) { diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index 427093a04..cbe1b8d8f 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -108,8 +108,6 @@ where T1: TraceReader+Clone+'static, T1::Key: ExchangeData, T1::R: ExchangeData+Semigroup, - T1::Batch: BatchReader, - T1::Cursor: Cursor, { fn threshold_semigroup(&self, mut thresh: F) -> Collection where diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index a7685c6f1..a7adf8cc1 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -7,19 +7,17 @@ use super::Cursor; /// The `CursorList` tracks the indices of cursors with the minimum key, and the the indices of cursors with /// the minimum key and minimum value. It performs no clever management of these sets otherwise. #[derive(Debug)] -pub struct CursorList> { - _phantom: ::std::marker::PhantomData<(K, V, T, R)>, +pub struct CursorList { cursors: Vec, min_key: Vec, min_val: Vec, } -impl> CursorList where K: Ord, V: Ord { +impl CursorList where C::Key: Ord, C::Val: Ord { /// Creates a new cursor list from pre-existing cursors. pub fn new(cursors: Vec, storage: &[C::Storage]) -> Self { let mut result = CursorList { - _phantom: ::std::marker::PhantomData, cursors, min_key: Vec::new(), min_val: Vec::new(), @@ -43,7 +41,7 @@ impl> CursorList where K: Ord, self.min_key.clear(); // Determine the index of the cursor with minimum key. - let mut min_key_opt: Option<&K> = None; + let mut min_key_opt = None; for (index, cursor) in self.cursors.iter().enumerate() { let key = cursor.get_key(&storage[index]); if key.is_some() { @@ -71,7 +69,7 @@ impl> CursorList where K: Ord, self.min_val.clear(); // Determine the index of the cursor with minimum value. - let mut min_val: Option<&V> = None; + let mut min_val = None; for &index in self.min_key.iter() { let val = self.cursors[index].get_val(&storage[index]); if val.is_some() { @@ -87,10 +85,15 @@ impl> CursorList where K: Ord, } } -impl> Cursor for CursorList +impl Cursor for CursorList where - K: Ord, - V: Ord { + C::Key: Ord, + C::Val: Ord, +{ + type Key = C::Key; + type Val = C::Val; + type Time = C::Time; + type R = C::R; type Storage = Vec; @@ -102,20 +105,20 @@ where // accessors #[inline] - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { debug_assert!(self.key_valid(storage)); debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]])); self.cursors[self.min_key[0]].key(&storage[self.min_key[0]]) } #[inline] - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { debug_assert!(self.key_valid(storage)); debug_assert!(self.val_valid(storage)); debug_assert!(self.cursors[self.min_val[0]].val_valid(&storage[self.min_val[0]])); self.cursors[self.min_val[0]].val(&storage[self.min_val[0]]) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { for &index in self.min_val.iter() { self.cursors[index].map_times(&storage[index], |t,d| logic(t,d)); } @@ -130,7 +133,7 @@ where self.minimize_keys(storage); } #[inline] - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { for index in 0 .. self.cursors.len() { self.cursors[index].seek_key(&storage[index], key); } @@ -146,7 +149,7 @@ where self.minimize_vals(storage); } #[inline] - fn seek_val(&mut self, storage: &Self::Storage, val: &V) { + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { for &index in self.min_key.iter() { self.cursors[index].seek_val(&storage[index], val); } diff --git a/src/trace/cursor/cursor_pair.rs b/src/trace/cursor/cursor_pair.rs index 1ae361c7f..c34f03abb 100644 --- a/src/trace/cursor/cursor_pair.rs +++ b/src/trace/cursor/cursor_pair.rs @@ -15,13 +15,18 @@ pub struct CursorPair { val_order: Ordering, // Invalid vals are `Greater` than all other vals. `Equal` implies both valid. } -impl Cursor for CursorPair +impl Cursor for CursorPair where K: Ord, V: Ord, - C1: Cursor, - C2: Cursor, + C1: Cursor, + C2: Cursor, { + type Key = K; + type Val = V; + type Time = T; + type R = R; + type Storage = (C1::Storage, C2::Storage); // validation methods diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index d76e7cab0..a71ca2dad 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -12,7 +12,16 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; /// A cursor for navigating ordered `(key, val, time, diff)` updates. -pub trait Cursor { +pub trait Cursor { + + /// Key by which updates are indexed. + type Key; + /// Values associated with keys. + type Val; + /// Timestamps associated with updates + type Time; + /// Associated update. + type R; /// Type the cursor addresses data in. type Storage; @@ -27,43 +36,46 @@ pub trait Cursor { fn val_valid(&self, storage: &Self::Storage) -> bool; /// A reference to the current key. Asserts if invalid. - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K; + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key; /// A reference to the current value. Asserts if invalid. - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V; + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val; /// Returns a reference to the current key, if valid. - fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<&'a K> { + fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<&'a Self::Key> { if self.key_valid(storage) { Some(self.key(storage)) } else { None } } /// Returns a reference to the current value, if valid. - fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a V> { + fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a Self::Val> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } } /// Applies `logic` to each pair of time and difference. Intended for mutation of the /// closure's scope. - fn map_times(&mut self, storage: &Self::Storage, logic: L); + fn map_times(&mut self, storage: &Self::Storage, logic: L); /// Advances the cursor to the next key. fn step_key(&mut self, storage: &Self::Storage); /// Advances the cursor to the specified key. - fn seek_key(&mut self, storage: &Self::Storage, key: &K); + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key); /// Advances the cursor to the next value. fn step_val(&mut self, storage: &Self::Storage); /// Advances the cursor to the specified value. - fn seek_val(&mut self, storage: &Self::Storage, val: &V); + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val); /// Rewinds the cursor to the first key. fn rewind_keys(&mut self, storage: &Self::Storage); /// Rewinds the cursor to the first value for current key. fn rewind_vals(&mut self, storage: &Self::Storage); -} -/// Debugging and testing utilities for Cursor. -pub trait CursorDebug : Cursor { /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((K, V), Vec<(T, R)>)> { + fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((Self::Key, Self::Val), Vec<(Self::Time, Self::R)>)> + where + Self::Key: Clone, + Self::Val: Clone, + Self::Time: Clone, + Self::R: Clone, + { let mut out = Vec::new(); self.rewind_keys(storage); self.rewind_vals(storage); @@ -81,5 +93,3 @@ pub trait CursorDebug : Cursor CursorDebug for C where C: Cursor { } diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 14ccc0f3a..f1f2264ff 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -8,32 +8,32 @@ use lattice::Lattice; use trace::{Batch, Batcher, Builder}; /// Creates batches from unordered tuples. -pub struct MergeBatcher> { - sorter: MergeSorter<(K, V), T, R>, - lower: Antichain, - frontier: Antichain, +pub struct MergeBatcher where B::Key: Ord, B::Val: Ord, B::Time: Ord, B::R: Semigroup { + sorter: MergeSorter<(B::Key, B::Val), B::Time, B::R>, + lower: Antichain, + frontier: Antichain, phantom: ::std::marker::PhantomData, } -impl Batcher for MergeBatcher +impl Batcher for MergeBatcher where - K: Ord+Clone, - V: Ord+Clone, - T: Lattice+timely::progress::Timestamp+Ord+Clone, - R: Semigroup, - B: Batch, + B: Batch, + B::Key: Ord+Clone, + B::Val: Ord+Clone, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, + B::R: Semigroup, { fn new() -> Self { MergeBatcher { sorter: MergeSorter::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(T::minimum()), + lower: Antichain::from_elem(::minimum()), phantom: ::std::marker::PhantomData, } } #[inline(never)] - fn push_batch(&mut self, batch: &mut Vec<((K,V),T,R)>) { + fn push_batch(&mut self, batch: &mut Vec<((B::Key,B::Val),B::Time,B::R)>) { self.sorter.push(batch); } @@ -42,7 +42,7 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline(never)] - fn seal(&mut self, upper: Antichain) -> B { + fn seal(&mut self, upper: Antichain) -> B { let mut builder = B::Builder::new(); @@ -92,18 +92,18 @@ where let mut buffer = Vec::new(); self.sorter.push(&mut buffer); // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,R)>() > 0 { + while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { buffer = Vec::new(); self.sorter.push(&mut buffer); } - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index f394d2365..a7c4fcae1 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -35,16 +35,16 @@ use super::merge_batcher::MergeBatcher; use abomonation::abomonated::Abomonated; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine, Vec>>>; +pub type OrdValSpineAbom = Spine, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>; +pub type OrdKeySpine = Spine>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine, Vec>>>; +pub type OrdKeySpineAbom = Spine, Vec>>>; /// An immutable collection of update tuples, from a contiguous interval of logical times. @@ -62,7 +62,7 @@ where pub desc: Description, } -impl BatchReader for OrdValBatch +impl BatchReader for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -70,13 +70,18 @@ where R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { - type Cursor = OrdValCursor; - fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor() } } + type Key = K; + type Val = V; + type Time = T; + type R = R; + + type Cursor = OrdValCursor; + fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor(), phantom: std::marker::PhantomData } } fn len(&self) -> usize { , O>, O> as Trie>::tuples(&self.layer) } fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdValBatch +impl Batch for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -84,7 +89,7 @@ where R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { - type Batcher = MergeBatcher; + type Batcher = MergeBatcher; type Builder = OrdValBuilder; type Merger = OrdValMerger; @@ -215,7 +220,7 @@ where should_compact: bool, } -impl Merger> for OrdValMerger +impl Merger> for OrdValMerger where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -306,17 +311,18 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdValCursor +pub struct OrdValCursor where V: Ord+Clone, T: Lattice+Ord+Clone, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { + phantom: std::marker::PhantomData, cursor: OrderedCursor, O>>, } -impl Cursor for OrdValCursor +impl Cursor for OrdValCursor where K: Ord+Clone, V: Ord+Clone, @@ -324,6 +330,11 @@ where R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { + type Key = K; + type Val = V; + type Time = T; + type R = R; + type Storage = OrdValBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } @@ -358,7 +369,7 @@ where builder: OrderedBuilder, O>, O>, } -impl Builder> for OrdValBuilder +impl Builder> for OrdValBuilder where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -409,17 +420,21 @@ where pub desc: Description, } -impl BatchReader for OrdKeyBatch +impl BatchReader for OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { - type Cursor = OrdKeyCursor; + type Key = K; + type Val = (); + type Time = T; + type R = R; + + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { - empty: (), valid: true, cursor: self.layer.cursor(), phantom: PhantomData @@ -429,14 +444,14 @@ where fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdKeyBatch +impl Batch for OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { - type Batcher = MergeBatcher; + type Batcher = MergeBatcher; type Builder = OrdKeyBuilder; type Merger = OrdKeyMerger; @@ -535,7 +550,7 @@ where should_compact: bool, } -impl Merger> for OrdKeyMerger +impl Merger> for OrdKeyMerger where K: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, @@ -632,25 +647,28 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor { +pub struct OrdKeyCursor { valid: bool, - empty: (), cursor: OrderedCursor>, - phantom: PhantomData + phantom: PhantomData<(K, O)> } -impl Cursor for OrdKeyCursor +impl Cursor for OrdKeyCursor where K: Ord+Clone, T: Lattice+Ord+Clone, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { + type Key = K; + type Val = (); + type Time = T; + type R = R; type Storage = OrdKeyBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { unsafe { ::std::mem::transmute(&self.empty) } } + fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { self.cursor.child.rewind(&storage.layer.vals); while self.cursor.child.valid(&storage.layer.vals) { @@ -680,7 +698,7 @@ where builder: OrderedBuilder, O>, } -impl Builder> for OrdKeyBuilder +impl Builder> for OrdKeyBuilder where K: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 8624247e6..ba737293b 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -87,41 +87,40 @@ use ::timely::order::PartialOrder; /// A spine maintains a small number of immutable collections of update tuples, merging the collections when /// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with /// other immutable collections. -pub struct Spine> { +pub struct Spine where B::Time: Lattice+Ord, B::R: Semigroup { operator: OperatorInfo, logger: Option, - phantom: ::std::marker::PhantomData<(K, V, R)>, - logical_frontier: Antichain, // Times after which the trace must accumulate correctly. - physical_frontier: Antichain, // Times after which the trace must be able to subset its inputs. - merging: Vec>,// Several possibly shared collections of updates. - pending: Vec, // Batches at times in advance of `frontier`. - upper: Antichain, + logical_frontier: Antichain, // Times after which the trace must accumulate correctly. + physical_frontier: Antichain, // Times after which the trace must be able to subset its inputs. + merging: Vec>, // Several possibly shared collections of updates. + pending: Vec, // Batches at times in advance of `frontier`. + upper: Antichain, effort: usize, activator: Option, } -impl TraceReader for Spine +impl TraceReader for Spine where - K: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). - V: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). - T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, - R: Semigroup, - B: Batch+Clone+'static, + B: Batch+Clone+'static, + B::Key: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). + B::Val: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, + B::R: Semigroup, { - type Key = K; - type Val = V; - type Time = T; - type R = R; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; type Batch = B; - type Cursor = CursorList>::Cursor>; + type Cursor = CursorList<::Cursor>; - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { // If `upper` is the minimum frontier, we can return an empty cursor. // This can happen with operators that are written to expect the ability to acquire cursors // for their prior frontiers, and which start at `[T::minimum()]`, such as `Reduce`, sadly. - if upper.less_equal(&T::minimum()) { + if upper.less_equal(&::minimum()) { let cursors = Vec::new(); let storage = Vec::new(); return Some((CursorList::new(cursors, &storage), storage)); @@ -211,19 +210,19 @@ where Some((CursorList::new(cursors, &storage), storage)) } - fn set_logical_compaction(&mut self, frontier: AntichainRef) { + fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.logical_frontier.clear(); self.logical_frontier.extend(frontier.iter().cloned()); } - fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } - fn set_physical_compaction(&mut self, frontier: AntichainRef) { + fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } + fn set_physical_compaction(&mut self, frontier: AntichainRef) { // We should never request to rewind the frontier. debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); self.physical_frontier.clear(); self.physical_frontier.extend(frontier.iter().cloned()); self.consider_merges(); } - fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() } + fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() } fn map_batches(&self, mut f: F) { for batch in self.merging.iter().rev() { @@ -242,13 +241,13 @@ where // A trace implementation for any key type that can be borrowed from or converted into `Key`. // TODO: Almost all this implementation seems to be generic with respect to the trace and batch types. -impl Trace for Spine +impl Trace for Spine where - K: Ord+Clone, - V: Ord+Clone, - T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, - R: Semigroup, - B: Batch+Clone+'static, + B: Batch+Clone+'static, + B::Key: Ord+Clone, + B::Val: Ord+Clone, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, + B::R: Semigroup, { fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, @@ -311,18 +310,18 @@ where if !self.upper.borrow().is_empty() { use trace::Builder; let builder = B::Builder::new(); - let batch = builder.done(self.upper.clone(), Antichain::new(), Antichain::from_elem(T::minimum())); + let batch = builder.done(self.upper.clone(), Antichain::new(), Antichain::from_elem(::minimum())); self.insert(batch); } } } // Drop implementation allows us to log batch drops, to zero out maintained totals. -impl Drop for Spine +impl Drop for Spine where - T: Lattice+Ord, - R: Semigroup, - B: Batch, + B: Batch, + B::Time: Lattice+Ord, + B::R: Semigroup, { fn drop(&mut self) { self.drop_batches(); @@ -330,11 +329,11 @@ where } -impl Spine +impl Spine where - T: Lattice+Ord, - R: Semigroup, - B: Batch, + B: Batch, + B::Time: Lattice+Ord, + B::R: Semigroup, { /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { @@ -376,13 +375,13 @@ where } } -impl Spine +impl Spine where - K: Ord+Clone, - V: Ord+Clone, - T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, - R: Semigroup, - B: Batch, + B: Batch, + B::Key: Ord+Clone, + B::Val: Ord+Clone, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, + B::R: Semigroup, { /// True iff there is at most one non-empty batch in `self.merging`. /// @@ -432,12 +431,11 @@ where Spine { operator, logger, - phantom: ::std::marker::PhantomData, - logical_frontier: Antichain::from_elem(::minimum()), - physical_frontier: Antichain::from_elem(::minimum()), + logical_frontier: Antichain::from_elem(::minimum()), + physical_frontier: Antichain::from_elem(::minimum()), merging: Vec::new(), pending: Vec::new(), - upper: Antichain::from_elem(::minimum()), + upper: Antichain::from_elem(::minimum()), effort, activator, } @@ -766,7 +764,7 @@ where /// /// A layer can be empty, contain a single batch, or contain a pair of batches /// that are in the process of merging into a batch for the next layer. -enum MergeState> { +enum MergeState { /// An empty layer, containing no updates. Vacant, /// A layer containing a single batch. @@ -775,10 +773,10 @@ enum MergeState> { /// to ensure the progress of maintenance work. Single(Option), /// A layer containing two batches, in the process of merging. - Double(MergeVariant), + Double(MergeVariant), } -impl> MergeState { +impl MergeState where B::Time: Eq { /// The number of actual updates contained in the level. fn len(&self) -> usize { @@ -859,12 +857,12 @@ impl> MergeState { /// empty batch whose upper and lower froniers are equal. This /// option exists purely for bookkeeping purposes, and no computation /// is performed to merge the two batches. - fn begin_merge(batch1: Option, batch2: Option, compaction_frontier: Option>) -> MergeState { + fn begin_merge(batch1: Option, batch2: Option, compaction_frontier: Option>) -> MergeState { let variant = match (batch1, batch2) { (Some(batch1), Some(batch2)) => { assert!(batch1.upper() == batch2.lower()); - let begin_merge = >::begin_merge(&batch1, &batch2, compaction_frontier); + let begin_merge = ::begin_merge(&batch1, &batch2, compaction_frontier); MergeVariant::InProgress(batch1, batch2, begin_merge) } (None, Some(x)) => MergeVariant::Complete(Some((x, None))), @@ -876,14 +874,14 @@ impl> MergeState { } } -enum MergeVariant> { +enum MergeVariant { /// Describes an actual in-progress merge between two non-trivial batches. - InProgress(B, B, >::Merger), + InProgress(B, B, ::Merger), /// A merge that requires no further work. May or may not represent a non-trivial batch. Complete(Option<(B, Option<(B, B)>)>), } -impl> MergeVariant { +impl MergeVariant { /// Completes and extracts the batch, unless structurally empty. /// diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 45961491d..f1775050f 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -52,13 +52,13 @@ pub trait TraceReader { type R; /// The type of an immutable collection of updates. - type Batch: BatchReader+Clone+'static; + type Batch: BatchReader+Clone+'static; /// The type used to enumerate the collections contents. - type Cursor: Cursor; + type Cursor: Cursor; /// Provides a cursor over updates contained in the trace. - fn cursor(&mut self) -> (Self::Cursor, >::Storage) { + fn cursor(&mut self) -> (Self::Cursor, ::Storage) { if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) { cursor } @@ -74,7 +74,7 @@ pub trait TraceReader { /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses. - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)>; + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)>; /// Advances the frontier that constrains logical compaction. /// @@ -197,7 +197,7 @@ pub trait TraceReader { /// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need /// to return them. pub trait Trace : TraceReader -where ::Batch: Batch { +where ::Batch: Batch { /// Allocates a new empty trace. fn new( @@ -232,12 +232,21 @@ where ::Batch: Batch +pub trait BatchReader where Self: ::std::marker::Sized, { + /// Key by which updates are indexed. + type Key; + /// Values associated with keys. + type Val; + /// Timestamps associated with updates + type Time; + /// Associated update. + type R; + /// The type used to enumerate the batch's contents. - type Cursor: Cursor; + type Cursor: Cursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. @@ -245,70 +254,70 @@ where /// True if the batch is empty. fn is_empty(&self) -> bool { self.len() == 0 } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description; + fn description(&self) -> &Description; /// All times in the batch are greater or equal to an element of `lower`. - fn lower(&self) -> &Antichain { self.description().lower() } + fn lower(&self) -> &Antichain { self.description().lower() } /// All times in the batch are not greater or equal to any element of `upper`. - fn upper(&self) -> &Antichain { self.description().upper() } + fn upper(&self) -> &Antichain { self.description().upper() } } /// An immutable collection of updates. -pub trait Batch : BatchReader where Self: ::std::marker::Sized { +pub trait Batch : BatchReader where Self: ::std::marker::Sized { /// A type used to assemble batches from disordered updates. - type Batcher: Batcher; + type Batcher: Batcher; /// A type used to assemble batches from ordered update sequences. - type Builder: Builder; + type Builder: Builder; /// A type used to progressively merge batches. - type Merger: Merger; + type Merger: Merger; /// Initiates the merging of consecutive batches. /// /// The result of this method can be exercised to eventually produce the same result /// that a call to `self.merge(other)` would produce, but it can be done in a measured /// fashion. This can help to avoid latency spikes where a large merge needs to happen. - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { Self::Merger::new(self, other, compaction_frontier) } /// Creates an empty batch with the stated bounds. - fn empty(lower: Antichain, upper: Antichain, since: Antichain) -> Self { + fn empty(lower: Antichain, upper: Antichain, since: Antichain) -> Self { ::new().done(lower, upper, since) } } /// Functionality for collecting and batching updates. -pub trait Batcher> { +pub trait Batcher { /// Allocates a new empty batcher. fn new() -> Self; /// Adds an unordered batch of elements to the batcher. - fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>); + fn push_batch(&mut self, batch: &mut Vec<((Output::Key, Output::Val), Output::Time, Output::R)>); /// Returns all updates not greater or equal to an element of `upper`. - fn seal(&mut self, upper: Antichain) -> Output; + fn seal(&mut self, upper: Antichain) -> Output; /// Returns the lower envelope of contained update times. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef; + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef; } /// Functionality for building batches from ordered update sequences. -pub trait Builder> { +pub trait Builder { /// Allocates an empty builder. fn new() -> Self; /// Allocates an empty builder with some capacity. fn with_capacity(cap: usize) -> Self; /// Adds an element to the batch. - fn push(&mut self, element: (K, V, T, R)); + fn push(&mut self, element: (Output::Key, Output::Val, Output::Time, Output::R)); /// Adds an ordered sequence of elements to the batch. - fn extend>(&mut self, iter: I) { + fn extend>(&mut self, iter: I) { for item in iter { self.push(item); } } /// Completes building and returns the batch. - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Output; + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Output; } /// Represents a merge in progress. -pub trait Merger> { +pub trait Merger { /// Creates a new merger to merge the supplied batches, optionally compacting /// up to the supplied frontier. - fn new(source1: &Output, source2: &Output, compaction_frontier: Option>) -> Self; + fn new(source1: &Output, source2: &Output, compaction_frontier: Option>) -> Self; /// Perform some amount of work, decrementing `fuel`. /// /// If `fuel` is non-zero after the call, the merging is complete and @@ -331,10 +340,14 @@ pub mod rc_blanket_impls { use timely::progress::{Antichain, frontier::AntichainRef}; use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - impl> BatchReader for Rc { + impl BatchReader for Rc { + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; /// The type used to enumerate the batch's contents. - type Cursor = RcBatchCursor; + type Cursor = RcBatchCursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor { RcBatchCursor::new((&**self).cursor()) @@ -343,84 +356,87 @@ pub mod rc_blanket_impls { /// The number of updates in the batch. fn len(&self) -> usize { (&**self).len() } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } + fn description(&self) -> &Description { (&**self).description() } } /// Wrapper to provide cursor to nested scope. - pub struct RcBatchCursor> { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, + pub struct RcBatchCursor { cursor: B::Cursor, } - impl> RcBatchCursor { + impl RcBatchCursor { fn new(cursor: B::Cursor) -> Self { RcBatchCursor { cursor, - phantom: ::std::marker::PhantomData, } } } - impl> Cursor for RcBatchCursor { + impl Cursor for RcBatchCursor { + + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; type Storage = Rc; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times(&mut self, storage: &Self::Storage, logic: L) { self.cursor.map_times(storage, logic) } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } } /// An immutable collection of updates. - impl> Batch for Rc { - type Batcher = RcBatcher; - type Builder = RcBuilder; - type Merger = RcMerger; + impl Batch for Rc { + type Batcher = RcBatcher; + type Builder = RcBuilder; + type Merger = RcMerger; } /// Wrapper type for batching reference counted batches. - pub struct RcBatcher> { batcher: B::Batcher } + pub struct RcBatcher { batcher: B::Batcher } /// Functionality for collecting and batching updates. - impl> Batcher> for RcBatcher { - fn new() -> Self { RcBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) } - fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } + impl Batcher> for RcBatcher { + fn new() -> Self { RcBatcher { batcher: >::new() } } + fn push_batch(&mut self, batch: &mut Vec<((B::Key, B::Val), B::Time, B::R)>) { self.batcher.push_batch(batch) } + fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } } /// Wrapper type for building reference counted batches. - pub struct RcBuilder> { builder: B::Builder } + pub struct RcBuilder { builder: B::Builder } /// Functionality for building batches from ordered update sequences. - impl> Builder> for RcBuilder { - fn new() -> Self { RcBuilder { builder: >::new() } } - fn with_capacity(cap: usize) -> Self { RcBuilder { builder: >::with_capacity(cap) } } - fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } + impl Builder> for RcBuilder { + fn new() -> Self { RcBuilder { builder: >::new() } } + fn with_capacity(cap: usize) -> Self { RcBuilder { builder: >::with_capacity(cap) } } + fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } /// Wrapper type for merging reference counted batches. - pub struct RcMerger> { merger: B::Merger } + pub struct RcMerger { merger: B::Merger } /// Represents a merge in progress. - impl> Merger> for RcMerger { - fn new(source1: &Rc, source2: &Rc, compaction_frontier: Option>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } + impl Merger> for RcMerger { + fn new(source1: &Rc, source2: &Rc, compaction_frontier: Option>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } fn work(&mut self, source1: &Rc, source2: &Rc, fuel: &mut isize) { self.merger.work(source1, source2, fuel) } fn done(self) -> Rc { Rc::new(self.merger.done()) } } @@ -438,10 +454,15 @@ pub mod abomonated_blanket_impls { use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - impl+Abomonation> BatchReader for Abomonated> { + impl BatchReader for Abomonated> { + + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; /// The type used to enumerate the batch's contents. - type Cursor = AbomonatedBatchCursor; + type Cursor = AbomonatedBatchCursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor { AbomonatedBatchCursor::new((&**self).cursor()) @@ -450,81 +471,84 @@ pub mod abomonated_blanket_impls { /// The number of updates in the batch. fn len(&self) -> usize { (&**self).len() } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } + fn description(&self) -> &Description { (&**self).description() } } /// Wrapper to provide cursor to nested scope. - pub struct AbomonatedBatchCursor> { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, + pub struct AbomonatedBatchCursor { cursor: B::Cursor, } - impl> AbomonatedBatchCursor { + impl AbomonatedBatchCursor { fn new(cursor: B::Cursor) -> Self { AbomonatedBatchCursor { cursor, - phantom: ::std::marker::PhantomData, } } } - impl+Abomonation> Cursor for AbomonatedBatchCursor { + impl Cursor for AbomonatedBatchCursor { + + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; type Storage = Abomonated>; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times(&mut self, storage: &Self::Storage, logic: L) { self.cursor.map_times(storage, logic) } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } } /// An immutable collection of updates. - impl+Abomonation> Batch for Abomonated> { - type Batcher = AbomonatedBatcher; - type Builder = AbomonatedBuilder; - type Merger = AbomonatedMerger; + impl Batch for Abomonated> { + type Batcher = AbomonatedBatcher; + type Builder = AbomonatedBuilder; + type Merger = AbomonatedMerger; } /// Wrapper type for batching reference counted batches. - pub struct AbomonatedBatcher> { batcher: B::Batcher } + pub struct AbomonatedBatcher { batcher: B::Batcher } /// Functionality for collecting and batching updates. - impl+Abomonation> Batcher>> for AbomonatedBatcher { - fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) } - fn seal(&mut self, upper: Antichain) -> Abomonated> { + impl Batcher>> for AbomonatedBatcher { + fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } + fn push_batch(&mut self, batch: &mut Vec<((B::Key, B::Val), B::Time, B::R)>) { self.batcher.push_batch(batch) } + fn seal(&mut self, upper: Antichain) -> Abomonated> { let batch = self.batcher.seal(upper); let mut bytes = Vec::with_capacity(measure(&batch)); unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; unsafe { Abomonated::::new(bytes).unwrap() } } - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } } /// Wrapper type for building reference counted batches. - pub struct AbomonatedBuilder> { builder: B::Builder } + pub struct AbomonatedBuilder { builder: B::Builder } /// Functionality for building batches from ordered update sequences. - impl+Abomonation> Builder>> for AbomonatedBuilder { - fn new() -> Self { AbomonatedBuilder { builder: >::new() } } - fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: >::with_capacity(cap) } } - fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Abomonated> { + impl Builder>> for AbomonatedBuilder { + fn new() -> Self { AbomonatedBuilder { builder: >::new() } } + fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: >::with_capacity(cap) } } + fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Abomonated> { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch)); unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; @@ -533,11 +557,11 @@ pub mod abomonated_blanket_impls { } /// Wrapper type for merging reference counted batches. - pub struct AbomonatedMerger> { merger: B::Merger } + pub struct AbomonatedMerger { merger: B::Merger } /// Represents a merge in progress. - impl+Abomonation> Merger>> for AbomonatedMerger { - fn new(source1: &Abomonated>, source2: &Abomonated>, compaction_frontier: Option>) -> Self { + impl Merger>> for AbomonatedMerger { + fn new(source1: &Abomonated>, source2: &Abomonated>, compaction_frontier: Option>) -> Self { AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } fn work(&mut self, source1: &Abomonated>, source2: &Abomonated>, fuel: &mut isize) { diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index c1d28fd92..1b4316dc4 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -47,8 +47,8 @@ where type Time = TInner; type R = Tr::R; - type Batch = BatchEnter; - type Cursor = CursorEnter; + type Batch = BatchEnter; + type Cursor = CursorEnter; fn map_batches(&self, mut f: F) { self.trace.map_batches(|batch| { @@ -86,7 +86,7 @@ where self.stash2.borrow() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { self.stash1.clear(); for time in upper.iter() { self.stash1.insert(time.clone().to_outer()); @@ -113,29 +113,24 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchEnter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchEnter { batch: B, description: Description, } -impl Clone for BatchEnter { - fn clone(&self) -> Self { - BatchEnter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - description: self.description.clone(), - } - } -} - -impl BatchReader for BatchEnter +impl BatchReader for BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { - type Cursor = BatchCursorEnter; + type Key = B::Key; + type Val = B::Val; + type Time = TInner; + type R = B::R; + + type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { BatchCursorEnter::new(self.batch.cursor()) @@ -144,11 +139,11 @@ where fn description(&self) -> &Description { &self.description } } -impl BatchEnter +impl BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { /// Makes a new batch wrapper pub fn make_from(batch: B) -> Self { @@ -157,7 +152,6 @@ where let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect(); BatchEnter { - phantom: ::std::marker::PhantomData, batch: batch, description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)) } @@ -165,12 +159,12 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorEnter, TInner> { - phantom: ::std::marker::PhantomData<(K, V, T, R, TInner)>, +pub struct CursorEnter { + phantom: ::std::marker::PhantomData, cursor: C, } -impl, TInner> CursorEnter { +impl CursorEnter { fn new(cursor: C) -> Self { CursorEnter { phantom: ::std::marker::PhantomData, @@ -179,32 +173,37 @@ impl, TInner> CursorEnter Cursor for CursorEnter +impl Cursor for CursorEnter where - C: Cursor, - T: Timestamp, - TInner: Refines+Lattice, + C: Cursor, + C::Time: Timestamp, + TInner: Refines+Lattice, { + type Key = C::Key; + type Val = C::Val; + type Time = TInner; + type R = C::R; + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { self.cursor.map_times(storage, |time, diff| { logic(&TInner::to_inner(time.clone()), diff) }) } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } @@ -213,12 +212,12 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorEnter, TInner> { - phantom: ::std::marker::PhantomData<(K, V, R, TInner)>, +pub struct BatchCursorEnter { + phantom: ::std::marker::PhantomData, cursor: B::Cursor, } -impl, TInner> BatchCursorEnter { +impl BatchCursorEnter { fn new(cursor: B::Cursor) -> Self { BatchCursorEnter { phantom: ::std::marker::PhantomData, @@ -227,31 +226,36 @@ impl, TInner> BatchCursorEnter> Cursor for BatchCursorEnter +impl Cursor for BatchCursorEnter where - T: Timestamp, - TInner: Refines+Lattice, + B::Time: Timestamp, + TInner: Refines+Lattice, { - type Storage = BatchEnter; + type Key = B::Key; + type Val = B::Val; + type Time = TInner; + type R = B::R; + + type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { self.cursor.map_times(&storage.batch, |time, diff| { logic(&TInner::to_inner(time.clone()), diff) }) } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index 9562fc90c..e5c5419ed 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -62,8 +62,8 @@ where type Time = TInner; type R = Tr::R; - type Batch = BatchEnter; - type Cursor = CursorEnter; + type Batch = BatchEnter; + type Cursor = CursorEnter; fn map_batches(&self, mut f: F2) { let logic = self.logic.clone(); @@ -102,7 +102,7 @@ where self.stash2.borrow() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { self.stash1.clear(); for time in upper.iter() { self.stash1.insert(time.clone().to_outer()); @@ -131,32 +131,26 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchEnter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchEnter { batch: B, description: Description, logic: F, } -impl Clone for BatchEnter { - fn clone(&self) -> Self { - BatchEnter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - description: self.description.clone(), - logic: self.logic.clone(), - } - } -} - -impl BatchReader for BatchEnter +impl BatchReader for BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, - F: FnMut(&K, &V, &T)->TInner+Clone, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, + F: FnMut(&B::Key, &B::Val, &B::Time)->TInner+Clone, { - type Cursor = BatchCursorEnter; + type Key = B::Key; + type Val = B::Val; + type Time = TInner; + type R = B::R; + + type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { BatchCursorEnter::new(self.batch.cursor(), self.logic.clone()) @@ -165,11 +159,11 @@ where fn description(&self) -> &Description { &self.description } } -impl BatchEnter +impl BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { /// Makes a new batch wrapper pub fn make_from(batch: B, logic: F) -> Self { @@ -178,7 +172,6 @@ where let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect(); BatchEnter { - phantom: ::std::marker::PhantomData, batch, description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)), logic, @@ -187,13 +180,13 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorEnter, TInner, F> { - phantom: ::std::marker::PhantomData<(K, V, T, R, TInner)>, +pub struct CursorEnter { + phantom: ::std::marker::PhantomData, cursor: C, logic: F, } -impl, TInner, F> CursorEnter { +impl CursorEnter { fn new(cursor: C, logic: F) -> Self { CursorEnter { phantom: ::std::marker::PhantomData, @@ -203,23 +196,28 @@ impl, TInner, F> CursorEnter Cursor for CursorEnter +impl Cursor for CursorEnter where - C: Cursor, - T: Timestamp, - TInner: Refines+Lattice, - F: FnMut(&K, &V, &T)->TInner, + C: Cursor, + C::Time: Timestamp, + TInner: Refines+Lattice, + F: FnMut(&C::Key, &C::Val, &C::Time)->TInner, { + type Key = C::Key; + type Val = C::Val; + type Time = TInner; + type R = C::R; + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { let key = self.key(storage); let val = self.val(storage); let logic2 = &mut self.logic; @@ -229,10 +227,10 @@ where } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } @@ -241,13 +239,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorEnter, TInner, F> { - phantom: ::std::marker::PhantomData<(K, V, R, TInner)>, +pub struct BatchCursorEnter { + phantom: ::std::marker::PhantomData, cursor: B::Cursor, logic: F, } -impl, TInner, F> BatchCursorEnter { +impl BatchCursorEnter { fn new(cursor: B::Cursor, logic: F) -> Self { BatchCursorEnter { phantom: ::std::marker::PhantomData, @@ -257,22 +255,27 @@ impl, TInner, F> BatchCursorEnter, F> Cursor for BatchCursorEnter +impl Cursor for BatchCursorEnter where - T: Timestamp, - TInner: Refines+Lattice, - F: FnMut(&K, &V, &T)->TInner, + B::Time: Timestamp, + TInner: Refines+Lattice, + F: FnMut(&B::Key, &B::Val, &B::Time)->TInner, { - type Storage = BatchEnter; + type Key = B::Key; + type Val = B::Val; + type Time = TInner; + type R = B::R; + + type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { let key = self.key(storage); let val = self.val(storage); let logic2 = &mut self.logic; @@ -282,10 +285,10 @@ where } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index 0f1a531ab..3fdf29603 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -40,8 +40,8 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFilter; - type Cursor = CursorFilter; + type Batch = BatchFilter; + type Cursor = CursorFilter; fn map_batches(&self, mut f: F2) { let logic = self.logic.clone(); @@ -55,7 +55,7 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y)) } } @@ -76,46 +76,40 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFilter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchFilter { batch: B, logic: F, } -impl Clone for BatchFilter { - fn clone(&self) -> Self { - BatchFilter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - logic: self.logic.clone(), - } - } -} - -impl BatchReader for BatchFilter +impl BatchReader for BatchFilter where - B: BatchReader, - T: Timestamp, - F: FnMut(&K, &V)->bool+Clone+'static + B: BatchReader, + B::Time: Timestamp, + F: FnMut(&B::Key, &B::Val)->bool+Clone+'static { - type Cursor = BatchCursorFilter; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFilter; fn cursor(&self) -> Self::Cursor { BatchCursorFilter::new(self.batch.cursor(), self.logic.clone()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { &self.batch.description() } + fn description(&self) -> &Description { &self.batch.description() } } -impl BatchFilter +impl BatchFilter where - B: BatchReader, - T: Timestamp, + B: BatchReader, + B::Time: Timestamp, { /// Makes a new batch wrapper pub fn make_from(batch: B, logic: F) -> Self { BatchFilter { - phantom: ::std::marker::PhantomData, batch, logic, } @@ -123,38 +117,41 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorFilter, F> { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +pub struct CursorFilter { cursor: C, logic: F, } -impl, F> CursorFilter { +impl CursorFilter { fn new(cursor: C, logic: F) -> Self { CursorFilter { - phantom: ::std::marker::PhantomData, cursor, logic, } } } -impl Cursor for CursorFilter +impl Cursor for CursorFilter where - C: Cursor, - T: Timestamp, - F: FnMut(&K, &V)->bool+'static + C: Cursor, + C::Time: Timestamp, + F: FnMut(&C::Key, &C::Val)->bool+'static { + type Key = C::Key; + type Val = C::Val; + type Time = C::Time; + type R = C::R; + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times(&mut self, storage: &Self::Storage, logic: L) { let key = self.key(storage); let val = self.val(storage); if (self.logic)(key, val) { @@ -163,10 +160,10 @@ where } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } @@ -175,37 +172,40 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFilter, F> { - phantom: ::std::marker::PhantomData<(K, V, R)>, +pub struct BatchCursorFilter { cursor: B::Cursor, logic: F, } -impl, F> BatchCursorFilter { +impl BatchCursorFilter { fn new(cursor: B::Cursor, logic: F) -> Self { BatchCursorFilter { - phantom: ::std::marker::PhantomData, cursor, logic, } } } -impl, F> Cursor for BatchCursorFilter +impl Cursor for BatchCursorFilter where - T: Timestamp, - F: FnMut(&K, &V)->bool+'static, + B::Time: Timestamp, + F: FnMut(&B::Key, &B::Val)->bool+'static, { - type Storage = BatchFilter; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Storage = BatchFilter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times(&mut self, storage: &Self::Storage, logic: L) { let key = self.key(storage); let val = self.val(storage); if (self.logic)(key, val) { @@ -214,10 +214,10 @@ where } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index fa24b91d7..ed5965266 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -41,8 +41,6 @@ where T::Key: 'static, T::Val: 'static, T::R: 'static, - T::Batch: BatchReader, - T::Cursor: Cursor, F: Fn(&G::Timestamp)->Option+'static, { let func1 = Rc::new(func); @@ -93,8 +91,8 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFreeze; - type Cursor = CursorFreeze; + type Batch = BatchFreeze; + type Cursor = CursorFreeze; fn map_batches(&self, mut f: F2) { let func = &self.func; @@ -109,7 +107,7 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { let func = &self.func; self.trace.cursor_through(upper) .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage)) @@ -137,47 +135,49 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFreeze { - phantom: ::std::marker::PhantomData<(K, V, R, T)>, +pub struct BatchFreeze { batch: B, func: Rc, } -impl Clone for BatchFreeze { +impl Clone for BatchFreeze { fn clone(&self) -> Self { BatchFreeze { - phantom: ::std::marker::PhantomData, batch: self.batch.clone(), func: self.func.clone(), } } } -impl BatchReader for BatchFreeze +impl BatchReader for BatchFreeze where - B: BatchReader, - T: Clone, - F: Fn(&T)->Option, + B: BatchReader, + B::Time: Clone, + F: Fn(&B::Time)->Option, { - type Cursor = BatchCursorFreeze; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFreeze; fn cursor(&self) -> Self::Cursor { BatchCursorFreeze::new(self.batch.cursor(), self.func.clone()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { self.batch.description() } + fn description(&self) -> &Description { self.batch.description() } } -impl BatchFreeze +impl BatchFreeze where - B: BatchReader, - T: Clone, - F: Fn(&T)->Option + B: BatchReader, + B::Time: Clone, + F: Fn(&B::Time)->Option { /// Makes a new batch wrapper pub fn make_from(batch: B, func: Rc) -> Self { BatchFreeze { - phantom: ::std::marker::PhantomData, batch: batch, func: func, } @@ -185,37 +185,39 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorFreeze, F> { - phantom: ::std::marker::PhantomData<(K, V, R, T)>, +pub struct CursorFreeze { cursor: C, func: Rc, } -impl, F> CursorFreeze { +impl CursorFreeze { fn new(cursor: C, func: Rc) -> Self { CursorFreeze { - phantom: ::std::marker::PhantomData, cursor: cursor, func: func, } } } -impl Cursor for CursorFreeze +impl Cursor for CursorFreeze where - C: Cursor, - F: Fn(&T)->Option, + C: Cursor, + F: Fn(&C::Time)->Option, { + type Key = C::Key; + type Val = C::Val; + type Time = C::Time; + type R = C::R; type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } - #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { let func = &self.func; self.cursor.map_times(storage, |time, diff| { if let Some(time) = func(time) { @@ -225,10 +227,10 @@ where } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } @@ -236,36 +238,38 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFreeze, F> { - phantom: ::std::marker::PhantomData<(K, V, R, T)>, +pub struct BatchCursorFreeze { cursor: B::Cursor, func: Rc, } -impl, F> BatchCursorFreeze { +impl BatchCursorFreeze { fn new(cursor: B::Cursor, func: Rc) -> Self { BatchCursorFreeze { - phantom: ::std::marker::PhantomData, cursor: cursor, func: func, } } } -impl, F> Cursor for BatchCursorFreeze +impl Cursor for BatchCursorFreeze where - F: Fn(&T)->Option, + F: Fn(&B::Time)->Option, { + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; - type Storage = BatchFreeze; + type Storage = BatchFreeze; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } - #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { let func = &self.func; self.cursor.map_times(&storage.batch, |time, diff| { if let Some(time) = func(time) { @@ -275,10 +279,10 @@ where } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index 43ed2096b..ec321b71e 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -47,8 +47,8 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFrontier; - type Cursor = CursorFrontier; + type Batch = BatchFrontier; + type Cursor = CursorFrontier; fn map_batches(&self, mut f: F) { let frontier = self.frontier.borrow(); @@ -61,7 +61,7 @@ where fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { let frontier = self.frontier.borrow(); self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, frontier), y)) } @@ -83,45 +83,39 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFrontier { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchFrontier { batch: B, - frontier: Antichain, + frontier: Antichain, } -impl Clone for BatchFrontier { - fn clone(&self) -> Self { - BatchFrontier { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - frontier: self.frontier.to_owned(), - } - } -} - -impl BatchReader for BatchFrontier +impl BatchReader for BatchFrontier where - B: BatchReader, - T: Timestamp+Lattice, + B: BatchReader, + B::Time: Timestamp+Lattice, { - type Cursor = BatchCursorFrontier; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFrontier; fn cursor(&self) -> Self::Cursor { BatchCursorFrontier::new(self.batch.cursor(), self.frontier.borrow()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { &self.batch.description() } + fn description(&self) -> &Description { &self.batch.description() } } -impl BatchFrontier +impl BatchFrontier where - B: BatchReader, - T: Timestamp+Lattice, + B: BatchReader, + B::Time: Timestamp+Lattice, { /// Makes a new batch wrapper - pub fn make_from(batch: B, frontier: AntichainRef) -> Self { + pub fn make_from(batch: B, frontier: AntichainRef) -> Self { BatchFrontier { - phantom: ::std::marker::PhantomData, batch, frontier: frontier.to_owned(), } @@ -129,39 +123,42 @@ where } /// Wrapper to provide cursor to nested scope. -pub struct CursorFrontier> { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +pub struct CursorFrontier { cursor: C, - frontier: Antichain, + frontier: Antichain, } -impl> CursorFrontier { - fn new(cursor: C, frontier: AntichainRef) -> Self { +impl CursorFrontier where C::Time: Clone { + fn new(cursor: C, frontier: AntichainRef) -> Self { CursorFrontier { - phantom: ::std::marker::PhantomData, cursor, frontier: frontier.to_owned(), } } } -impl Cursor for CursorFrontier +impl Cursor for CursorFrontier where - C: Cursor, - T: Timestamp+Lattice, + C: Cursor, + C::Time: Timestamp+Lattice, { + type Key = C::Key; + type Val = C::Val; + type Time = C::Time; + type R = C::R; + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { let frontier = self.frontier.borrow(); - let mut temp: T = ::minimum(); + let mut temp: C::Time = ::minimum(); self.cursor.map_times(storage, |time, diff| { temp.clone_from(time); temp.advance_by(frontier); @@ -170,10 +167,10 @@ where } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } @@ -182,38 +179,41 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFrontier> { - phantom: ::std::marker::PhantomData<(K, V, R)>, +pub struct BatchCursorFrontier { cursor: B::Cursor, - frontier: Antichain, + frontier: Antichain, } -impl> BatchCursorFrontier { - fn new(cursor: B::Cursor, frontier: AntichainRef) -> Self { +impl BatchCursorFrontier where B::Time: Clone { + fn new(cursor: B::Cursor, frontier: AntichainRef) -> Self { BatchCursorFrontier { - phantom: ::std::marker::PhantomData, cursor, frontier: frontier.to_owned(), } } } -impl> Cursor for BatchCursorFrontier +impl Cursor for BatchCursorFrontier where - T: Timestamp+Lattice, + B::Time: Timestamp+Lattice, { - type Storage = BatchFrontier; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Storage = BatchFrontier; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(&storage.batch) } + #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(&storage.batch) } + #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { let frontier = self.frontier.borrow(); - let mut temp: T = ::minimum(); + let mut temp: B::Time = ::minimum(); self.cursor.map_times(&storage.batch, |time, diff| { temp.clone_from(time); temp.advance_by(frontier); @@ -222,10 +222,10 @@ where } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(&storage.batch, key) } + #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(&storage.batch, key) } #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(&storage.batch, val) } + #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(&storage.batch, val) } #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } diff --git a/src/trace/wrappers/rc.rs b/src/trace/wrappers/rc.rs index 17796d12b..b31f8779e 100644 --- a/src/trace/wrappers/rc.rs +++ b/src/trace/wrappers/rc.rs @@ -120,7 +120,7 @@ where } fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() } /// Creates a new cursor over the wrapped trace. - fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, >::Storage)> { + fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, ::Storage)> { ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier) } diff --git a/tests/trace.rs b/tests/trace.rs index 1c22f4665..f87ffa33e 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -8,18 +8,18 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use differential_dataflow::trace::implementations::ord::OrdValBatch; use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; -use differential_dataflow::trace::cursor::CursorDebug; +use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::implementations::spine_fueled::Spine; -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>; type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine>> { +fn get_trace() -> Spine>> { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); { - let mut batcher = <::Batch as Batch>::Batcher::new(); + let mut batcher = <::Batch as Batch>::Batcher::new(); batcher.push_batch(&mut vec![ ((1, 2), 0, 1),