Skip to content

Commit a8096d8

Browse files
Merge pull request #170 from TimelyDataflow/collection_concatenate
Add collection::concatenate
2 parents 5228166 + c6b08f4 commit a8096d8

File tree

2 files changed

+75
-2
lines changed

2 files changed

+75
-2
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ serde_derive = "1.0"
2828
abomonation = "0.7"
2929
abomonation_derive = "0.3"
3030
timely_sort="0.1.6"
31-
timely = "0.9"
32-
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
31+
#timely = "0.9"
32+
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
3333
#timely = { path = "../timely-dataflow/" }
3434
fnv="1.0.2"
3535

src/collection.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,41 @@ impl<G: Scope, D: Data, R: Monoid> Collection<G, D, R> where G::Timestamp: Data
199199
.concat(&other.inner)
200200
.as_collection()
201201
}
202+
/// Creates a new collection accumulating the contents of the two collections.
203+
///
204+
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
205+
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
206+
/// two collections.
207+
///
208+
/// # Examples
209+
///
210+
/// ```
211+
/// extern crate timely;
212+
/// extern crate differential_dataflow;
213+
///
214+
/// use differential_dataflow::input::Input;
215+
///
216+
/// fn main() {
217+
/// ::timely::example(|scope| {
218+
///
219+
/// let data = scope.new_collection_from(1 .. 10).1;
220+
///
221+
/// let odds = data.filter(|x| x % 2 == 1);
222+
/// let evens = data.filter(|x| x % 2 == 0);
223+
///
224+
/// odds.concatenate(Some(evens))
225+
/// .assert_eq(&data);
226+
/// });
227+
/// }
228+
/// ```
229+
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
230+
where
231+
I: IntoIterator<Item=Collection<G, D, R>>
232+
{
233+
self.inner
234+
.concatenate(sources.into_iter().map(|x| x.inner))
235+
.as_collection()
236+
}
202237
/// Replaces each record with another, with a new difference type.
203238
///
204239
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
@@ -582,4 +617,42 @@ impl<G: Scope, D: Data, R: Monoid> AsCollection<G, D, R> for Stream<G, (D, G::Ti
582617
fn as_collection(&self) -> Collection<G, D, R> {
583618
Collection::new(self.clone())
584619
}
620+
}
621+
622+
/// Concatenates multiple collections.
623+
///
624+
/// This method has the effect of a sequence of calls to `concat`, but it does
625+
/// so in one operator rather than a chain of many operators.
626+
///
627+
/// # Examples
628+
///
629+
/// ```
630+
/// extern crate timely;
631+
/// extern crate differential_dataflow;
632+
///
633+
/// use differential_dataflow::input::Input;
634+
///
635+
/// fn main() {
636+
/// ::timely::example(|scope| {
637+
///
638+
/// let data = scope.new_collection_from(1 .. 10).1;
639+
///
640+
/// let odds = data.filter(|x| x % 2 == 1);
641+
/// let evens = data.filter(|x| x % 2 == 0);
642+
///
643+
/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
644+
/// .assert_eq(&data);
645+
/// });
646+
/// }
647+
/// ```
648+
pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
649+
where
650+
G: Scope,
651+
D: Data,
652+
R: Monoid,
653+
I: IntoIterator<Item=Collection<G, D, R>>,
654+
{
655+
scope
656+
.concatenate(iterator.into_iter().map(|x| x.inner))
657+
.as_collection()
585658
}

0 commit comments

Comments
 (0)