Skip to content

Commit 972daeb

Browse files
committed
compiles
1 parent c3ca293 commit 972daeb

File tree

17 files changed

+313
-276
lines changed

17 files changed

+313
-276
lines changed

examples/cursors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ where
139139
Tr::Val: Debug + Clone,
140140
Tr::Time: Debug + Clone,
141141
Tr::Diff: Debug + Clone,
142+
<Tr::Cursor as Cursor>::ValOwned: Debug,
142143
{
143144
let (mut cursor, storage) = trace.cursor();
144145
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {

src/operators/arrange/arrangement.rs

Lines changed: 48 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use trace::implementations::{KeySpine, ValSpine};
3535
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
3636
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
3737
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38-
use trace::wrappers::filter::{TraceFilter, BatchFilter};
38+
// use trace::wrappers::filter::{TraceFilter, BatchFilter};
3939

4040
use super::TraceAgent;
4141

@@ -143,53 +143,53 @@ where
143143
}
144144
}
145145

146-
/// Filters an arranged collection.
147-
///
148-
/// This method produces a new arrangement backed by the same shared
149-
/// arrangement as `self`, paired with user-specified logic that can
150-
/// filter by key and value. The resulting collection is restricted
151-
/// to the keys and values that return true under the user predicate.
152-
///
153-
/// # Examples
154-
///
155-
/// ```
156-
/// extern crate timely;
157-
/// extern crate differential_dataflow;
158-
///
159-
/// use differential_dataflow::input::Input;
160-
/// use differential_dataflow::operators::arrange::ArrangeByKey;
161-
///
162-
/// fn main() {
163-
/// ::timely::example(|scope| {
164-
///
165-
/// let arranged =
166-
/// scope.new_collection_from(0 .. 10).1
167-
/// .map(|x| (x, x+1))
168-
/// .arrange_by_key();
169-
///
170-
/// arranged
171-
/// .filter(|k,v| k == v)
172-
/// .as_collection(|k,v| (*k,*v))
173-
/// .assert_empty();
174-
/// });
175-
/// }
176-
/// ```
177-
pub fn filter<F>(&self, logic: F)
178-
-> Arranged<G, TraceFilter<Tr, F>>
179-
where
180-
Tr::Key: 'static,
181-
Tr::Val: 'static,
182-
Tr::Diff: 'static,
183-
G::Timestamp: Clone+'static,
184-
F: FnMut(&Tr::Key, &Tr::Val)->bool+Clone+'static,
185-
{
186-
let logic1 = logic.clone();
187-
let logic2 = logic.clone();
188-
Arranged {
189-
trace: TraceFilter::make_from(self.trace.clone(), logic1),
190-
stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())),
191-
}
192-
}
146+
// /// Filters an arranged collection.
147+
// ///
148+
// /// This method produces a new arrangement backed by the same shared
149+
// /// arrangement as `self`, paired with user-specified logic that can
150+
// /// filter by key and value. The resulting collection is restricted
151+
// /// to the keys and values that return true under the user predicate.
152+
// ///
153+
// /// # Examples
154+
// ///
155+
// /// ```
156+
// /// extern crate timely;
157+
// /// extern crate differential_dataflow;
158+
// ///
159+
// /// use differential_dataflow::input::Input;
160+
// /// use differential_dataflow::operators::arrange::ArrangeByKey;
161+
// ///
162+
// /// fn main() {
163+
// /// ::timely::example(|scope| {
164+
// ///
165+
// /// let arranged =
166+
// /// scope.new_collection_from(0 .. 10).1
167+
// /// .map(|x| (x, x+1))
168+
// /// .arrange_by_key();
169+
// ///
170+
// /// arranged
171+
// /// .filter(|k,v| k == v)
172+
// /// .as_collection(|k,v| (*k,*v))
173+
// /// .assert_empty();
174+
// /// });
175+
// /// }
176+
// /// ```
177+
// pub fn filter<F>(&self, logic: F)
178+
// -> Arranged<G, TraceFilter<Tr, F>>
179+
// where
180+
// Tr::Key: 'static,
181+
// Tr::Val: 'static,
182+
// Tr::Diff: 'static,
183+
// G::Timestamp: Clone+'static,
184+
// F: FnMut(&Tr::Key, &Tr::Val)->bool+Clone+'static,
185+
// {
186+
// let logic1 = logic.clone();
187+
// let logic2 = logic.clone();
188+
// Arranged {
189+
// trace: TraceFilter::make_from(self.trace.clone(), logic1),
190+
// stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())),
191+
// }
192+
// }
193193
/// Flattens the stream into a `Collection`.
194194
///
195195
/// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,

src/operators/join.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -666,8 +666,6 @@ where
666666
R: Semigroup,
667667
C1: Cursor<Time=T>,
668668
C2: Cursor<Key=C1::Key, Time=T>,
669-
C1::Val: Ord,
670-
C2::Val: Ord,
671669
C1::Diff: Semigroup,
672670
C2::Diff: Semigroup,
673671
D: Ord+Clone+Data,
@@ -686,8 +684,6 @@ where
686684
C1::Key: Ord+Eq,
687685
C1: Cursor<Time=T>,
688686
C2: Cursor<Key=C1::Key, Time=T>,
689-
C1::Val: Ord,
690-
C2::Val: Ord,
691687
C1::Diff: Semigroup,
692688
C2::Diff: Semigroup,
693689
T: Timestamp+Lattice+Ord,
@@ -713,7 +709,10 @@ where
713709
/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
714710
#[inline(never)]
715711
fn work<L, I>(&mut self, output: &mut OutputHandle<T, (D, T, R), Tee<T, (D, T, R)>>, mut logic: L, fuel: &mut usize)
716-
where I: IntoIterator<Item=(D, T, R)>, L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::Diff, &C2::Diff)->I {
712+
where
713+
I: IntoIterator<Item=(D, T, R)>,
714+
L: for<'a> FnMut(&C1::Key, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I,
715+
{
717716

718717
let meet = self.capability.time();
719718

@@ -781,8 +780,6 @@ struct JoinThinker<'a, C1, C2>
781780
where
782781
C1: Cursor,
783782
C2: Cursor<Time = C1::Time>,
784-
C1::Val: Ord,
785-
C2::Val: Ord,
786783
C1::Time: Lattice+Ord+Clone,
787784
C1::Diff: Semigroup,
788785
C2::Diff: Semigroup,
@@ -795,8 +792,6 @@ impl<'a, C1, C2> JoinThinker<'a, C1, C2>
795792
where
796793
C1: Cursor,
797794
C2: Cursor<Time = C1::Time>,
798-
C1::Val: Ord,
799-
C2::Val: Ord,
800795
C1::Time: Lattice+Ord+Clone,
801796
C1::Diff: Semigroup,
802797
C2::Diff: Semigroup,
@@ -808,7 +803,7 @@ where
808803
}
809804
}
810805

811-
fn think<F: FnMut(&C1::Val,&C2::Val,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
806+
fn think<'b, F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&'b mut self, mut results: F) {
812807

813808
// for reasonably sized edits, do the dead-simple thing.
814809
if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
@@ -833,15 +828,15 @@ where
833828

834829
if replay1.time().unwrap().cmp(&replay2.time().unwrap()) == ::std::cmp::Ordering::Less {
835830
replay2.advance_buffer_by(replay1.meet().unwrap());
836-
for &((ref val2, ref time2), ref diff2) in replay2.buffer().iter() {
831+
for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
837832
let (val1, time1, diff1) = replay1.edit().unwrap();
838833
results(val1, val2, time1.join(time2), diff1, diff2);
839834
}
840835
replay1.step();
841836
}
842837
else {
843838
replay1.advance_buffer_by(replay2.meet().unwrap());
844-
for &((ref val1, ref time1), ref diff1) in replay1.buffer().iter() {
839+
for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
845840
let (val2, time2, diff2) = replay2.edit().unwrap();
846841
results(val1, val2, time1.join(time2), diff1, diff2);
847842
}
@@ -851,15 +846,15 @@ where
851846

852847
while !replay1.is_done() {
853848
replay2.advance_buffer_by(replay1.meet().unwrap());
854-
for &((ref val2, ref time2), ref diff2) in replay2.buffer().iter() {
849+
for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
855850
let (val1, time1, diff1) = replay1.edit().unwrap();
856851
results(val1, val2, time1.join(time2), diff1, diff2);
857852
}
858853
replay1.step();
859854
}
860855
while !replay2.is_done() {
861856
replay1.advance_buffer_by(replay2.meet().unwrap());
862-
for &((ref val1, ref time1), ref diff1) in replay1.buffer().iter() {
857+
for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
863858
let (val2, time2, diff2) = replay2.edit().unwrap();
864859
results(val1, val2, time1.join(time2), diff1, diff2);
865860
}

src/operators/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use trace::Cursor;
2424

2525
/// An accumulation of (value, time, diff) updates.
2626
struct EditList<'a, C: Cursor> where C::Time: Sized, C::Diff: Sized {
27-
values: Vec<(&'a C::Val, usize)>,
27+
values: Vec<(C::Val<'a>, usize)>,
2828
edits: Vec<(C::Time, C::Diff)>,
2929
}
3030

@@ -64,19 +64,19 @@ impl<'a, C: Cursor> EditList<'a, C> where C::Time: Ord+Clone, C::Diff: Semigroup
6464
}
6565
/// Associates all edits pushed since the previous `seal_value` call with `value`.
6666
#[inline]
67-
fn seal(&mut self, value: &'a C::Val) {
67+
fn seal(&mut self, value: C::Val<'a>) {
6868
let prev = self.values.last().map(|x| x.1).unwrap_or(0);
6969
crate::consolidation::consolidate_from(&mut self.edits, prev);
7070
if self.edits.len() > prev {
7171
self.values.push((value, self.edits.len()));
7272
}
7373
}
74-
fn map<F: FnMut(&C::Val, &C::Time, &C::Diff)>(&self, mut logic: F) {
74+
fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
7575
for index in 0 .. self.values.len() {
7676
let lower = if index == 0 { 0 } else { self.values[index-1].1 };
7777
let upper = self.values[index].1;
7878
for edit in lower .. upper {
79-
logic(&self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
79+
logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
8080
}
8181
}
8282
}
@@ -86,12 +86,12 @@ struct ValueHistory<'storage, C: Cursor> where C::Time: Sized, C::Diff: Sized {
8686

8787
edits: EditList<'storage, C>,
8888
history: Vec<(C::Time, C::Time, usize, usize)>, // (time, meet, value_index, edit_offset)
89-
buffer: Vec<((&'storage C::Val, C::Time), C::Diff)>, // where we accumulate / collapse updates.
89+
buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, // where we accumulate / collapse updates.
9090
}
9191

9292
impl<'storage, C: Cursor> ValueHistory<'storage, C>
9393
where
94-
C::Val: Ord+'storage,
94+
// C::Val: Ord+'storage,
9595
C::Time: Lattice+Ord+Clone,
9696
C::Diff: Semigroup,
9797
{
@@ -137,7 +137,7 @@ where
137137
}
138138

139139
/// Organizes history based on current contents of edits.
140-
fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
140+
fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> where 'storage: 'history {
141141

142142
self.buffer.clear();
143143
self.history.clear();
@@ -165,7 +165,7 @@ struct HistoryReplay<'storage, 'history, C>
165165
where
166166
'storage: 'history,
167167
C: Cursor,
168-
C::Val: Ord+'storage,
168+
// C::Val: Ord+'storage,
169169
C::Time: Lattice+Ord+Clone+'history,
170170
C::Diff: Semigroup+'history,
171171
{
@@ -176,17 +176,17 @@ impl<'storage, 'history, C> HistoryReplay<'storage, 'history, C>
176176
where
177177
'storage: 'history,
178178
C: Cursor,
179-
C::Val: Ord+'storage,
179+
// C::Val: Ord+'storage,
180180
C::Time: Lattice+Ord+Clone+'history,
181181
C::Diff: Semigroup+'history,
182182
{
183183
fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
184184
fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
185-
fn edit(&self) -> Option<(&C::Val, &C::Time, &C::Diff)> {
185+
fn edit<'s>(&'s self) -> Option<(C::Val<'storage>, &'s C::Time, &'s C::Diff)> {
186186
self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
187187
}
188188

189-
fn buffer(&self) -> &[((&'storage C::Val, C::Time), C::Diff)] {
189+
fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
190190
&self.replay.buffer[..]
191191
}
192192

0 commit comments

Comments
 (0)