Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/iter/cloned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = &'a T>
{
self.base = self.base.consume_iter(iter.into_iter().cloned());
self
}

fn complete(self) -> F::Result {
self.base.complete()
}
Expand Down
4 changes: 4 additions & 0 deletions src/iter/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ where
}
}

// This cannot easily specialize `consume_iter` to be better than
// the default, because that requires checking `self.base.full()`
// during a call to `self.base.consume_iter()`. (#632)

fn complete(self) -> Self::Result {
self.base.complete()
}
Expand Down
4 changes: 4 additions & 0 deletions src/iter/filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ where
}
}

// This cannot easily specialize `consume_iter` to be better than
// the default, because that requires checking `self.base.full()`
// during a call to `self.base.consume_iter()`. (#632)

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
14 changes: 14 additions & 0 deletions src/iter/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ where
self
}

fn consume_iter<I>(mut self, iter: I) -> Self
where I: IntoIterator<Item = T>
{
self.item = iter
.into_iter()
// stop iterating if another thread has found something
.take_while(|_| !self.full())
.find(self.find_op);
if self.item.is_some() {
self.found.store(true, Ordering::Relaxed)
}
self
}

fn complete(self) -> Self::Result {
self.item
}
Expand Down
14 changes: 14 additions & 0 deletions src/iter/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ where
}
}

fn consume_iter<I>(self, iter: I) -> Self
where
I: IntoIterator<Item = T>
{
let base = self.base;
let item = iter
.into_iter()
// stop iterating if another thread has finished
.take_while(|_| !base.full())
.fold(self.item, self.fold_op);

FoldFolder { base: base, item: item, fold_op: self.fold_op }
}

fn complete(self) -> C::Result {
self.base.consume(self.item).complete()
}
Expand Down
8 changes: 8 additions & 0 deletions src/iter/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>
{
self.base = self.base.consume_iter(iter.into_iter().inspect(self.inspect_op));
self
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
26 changes: 25 additions & 1 deletion src/iter/intersperse.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::plumbing::*;
use super::*;
use std::cell::Cell;
use std::iter::Fuse;
use std::iter::{self, Fuse};

/// `Intersperse` is an iterator that inserts a particular item between each
/// item of the adapted iterator. This struct is created by the
Expand Down Expand Up @@ -382,6 +382,30 @@ where
self
}

fn consume_iter<I>(self, iter: I) -> Self
where I: IntoIterator<Item = T>
{
let mut clone_first = self.clone_first;
let between_item = self.item;
let base = self.base.consume_iter(
iter.into_iter()
.flat_map(|item| {
let first = if clone_first {
Some(between_item.clone())
} else {
clone_first = true;
None
};
first.into_iter().chain(iter::once(item))
})
);
IntersperseFolder {
base: base,
item: between_item,
clone_first: clone_first,
}
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
8 changes: 8 additions & 0 deletions src/iter/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>,
{
self.base = self.base.consume_iter(iter.into_iter().map(self.map_op));
self
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
14 changes: 14 additions & 0 deletions src/iter/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,20 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>
{
let update_op = self.update_op;
self.base = self.base.consume_iter(
iter.into_iter().map(|mut item| {
update_op(&mut item);
item
})
);
self
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
20 changes: 20 additions & 0 deletions src/iter/while_some.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,26 @@ where
self
}

fn consume_iter<I>(mut self, iter: I) -> Self
where I: IntoIterator<Item = Option<T>>
{
let full = self.full;
self.base = self.base.consume_iter(
iter.into_iter()
.take_while(|x| {
match *x {
Some(_) => !full.load(Ordering::Relaxed),
None => {
full.store(true, Ordering::Relaxed);
false
}
}
})
.map(|x| x.unwrap())
);
self
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
52 changes: 41 additions & 11 deletions tests/octillion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,57 @@ fn find_first_octillion_flat() {
assert_eq!(x, Some(0));
}

#[test]
fn find_last_octillion() {
fn two_threads<F: Send + FnOnce() -> R, R: Send>(f: F) -> R {
// FIXME: If we don't use at least two threads, then we end up walking
// through the entire iterator sequentially, without the benefit of any
// short-circuiting. We probably don't want testing to wait that long. ;)
// It would be nice if `find_last` could prioritize the later splits,
// basically flipping the `join` args, without needing indexed `rev`.
// (or could we have an unindexed `rev`?)
let builder = rayon::ThreadPoolBuilder::new().num_threads(2);
let pool = builder.build().unwrap();

let x = pool.install(|| octillion().find_last(|_| true));
pool.install(f)
}

#[test]
fn find_last_octillion() {
// It would be nice if `find_last` could prioritize the later splits,
// basically flipping the `join` args, without needing indexed `rev`.
// (or could we have an unindexed `rev`?)
let x = two_threads(|| octillion().find_last(|_| true));
assert_eq!(x, Some(OCTILLION - 1));
}

#[test]
fn find_last_octillion_flat() {
// FIXME: Ditto, need two threads.
let builder = rayon::ThreadPoolBuilder::new().num_threads(2);
let pool = builder.build().unwrap();

let x = pool.install(|| octillion_flat().find_last(|_| true));
let x = two_threads(|| octillion_flat().find_last(|_| true));
assert_eq!(x, Some(OCTILLION - 1));
}

#[test]
fn find_any_octillion() {
let x = two_threads(|| octillion().find_any(|x| *x > OCTILLION / 2));
assert!(x.is_some());
}

#[test]
fn find_any_octillion_flat() {
let x = two_threads(|| octillion_flat().find_any(|x| *x > OCTILLION / 2));
assert!(x.is_some());
}

#[test]
fn filter_find_any_octillion() {
let x = two_threads(|| octillion().filter(|x| *x > OCTILLION / 2).find_any(|_| true));
assert!(x.is_some());
}

#[test]
fn filter_find_any_octillion_flat() {
let x = two_threads(|| octillion_flat().filter(|x| *x > OCTILLION / 2).find_any(|_| true));
assert!(x.is_some());
}

#[test]
fn fold_find_any_octillion_flat() {
let x = two_threads(|| octillion_flat().fold(|| (), |_, _| ()).find_any(|_| true));
assert!(x.is_some());
}