Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/iter/cloned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = &'a T>,
{
self.base = self.base.consume_iter(iter.into_iter().cloned());
self
}

fn complete(self) -> F::Result {
self.base.complete()
}
Expand Down
4 changes: 4 additions & 0 deletions src/iter/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ where
}
}

// This cannot easily specialize `consume_iter` to be better than
// the default, because that requires checking `self.base.full()`
// during a call to `self.base.consume_iter()`. (#632)

fn complete(self) -> Self::Result {
self.base.complete()
}
Expand Down
4 changes: 4 additions & 0 deletions src/iter/filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ where
}
}

// This cannot easily specialize `consume_iter` to be better than
// the default, because that requires checking `self.base.full()`
// during a call to `self.base.consume_iter()`. (#632)

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
15 changes: 15 additions & 0 deletions src/iter/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ where
self
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>,
{
self.item = iter
.into_iter()
// stop iterating if another thread has found something
.take_while(|_| !self.full())
.find(self.find_op);
if self.item.is_some() {
self.found.store(true, Ordering::Relaxed)
}
self
}

fn complete(self) -> Self::Result {
self.item
}
Expand Down
18 changes: 18 additions & 0 deletions src/iter/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ where
}
}

fn consume_iter<I>(self, iter: I) -> Self
where
I: IntoIterator<Item = T>,
{
let base = self.base;
let item = iter
.into_iter()
// stop iterating if another thread has finished
.take_while(|_| !base.full())
.fold(self.item, self.fold_op);

FoldFolder {
base: base,
item: item,
fold_op: self.fold_op,
}
}

fn complete(self) -> C::Result {
self.base.consume(self.item).complete()
}
Expand Down
10 changes: 10 additions & 0 deletions src/iter/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>,
{
self.base = self
.base
.consume_iter(iter.into_iter().inspect(self.inspect_op));
self
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
24 changes: 23 additions & 1 deletion src/iter/intersperse.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::plumbing::*;
use super::*;
use std::cell::Cell;
use std::iter::Fuse;
use std::iter::{self, Fuse};

/// `Intersperse` is an iterator that inserts a particular item between each
/// item of the adapted iterator. This struct is created by the
Expand Down Expand Up @@ -382,6 +382,28 @@ where
self
}

fn consume_iter<I>(self, iter: I) -> Self
where
I: IntoIterator<Item = T>,
{
let mut clone_first = self.clone_first;
let between_item = self.item;
let base = self.base.consume_iter(iter.into_iter().flat_map(|item| {
let first = if clone_first {
Some(between_item.clone())
} else {
clone_first = true;
None
};
first.into_iter().chain(iter::once(item))
}));
IntersperseFolder {
base: base,
item: between_item,
clone_first: clone_first,
}
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
8 changes: 8 additions & 0 deletions src/iter/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>,
{
self.base = self.base.consume_iter(iter.into_iter().map(self.map_op));
self
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
12 changes: 12 additions & 0 deletions src/iter/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@ where
}
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = T>,
{
let update_op = self.update_op;
self.base = self.base.consume_iter(iter.into_iter().map(|mut item| {
update_op(&mut item);
item
}));
self
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
19 changes: 19 additions & 0 deletions src/iter/while_some.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,25 @@ where
self
}

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = Option<T>>,
{
let full = self.full;
self.base = self.base.consume_iter(
iter.into_iter()
.take_while(|x| match *x {
Some(_) => !full.load(Ordering::Relaxed),
None => {
full.store(true, Ordering::Relaxed);
false
}
})
.map(|x| x.unwrap()),
);
self
}

fn complete(self) -> C::Result {
self.base.complete()
}
Expand Down
60 changes: 49 additions & 11 deletions tests/octillion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,65 @@ fn find_first_octillion_flat() {
assert_eq!(x, Some(0));
}

#[test]
fn find_last_octillion() {
fn two_threads<F: Send + FnOnce() -> R, R: Send>(f: F) -> R {
// FIXME: If we don't use at least two threads, then we end up walking
// through the entire iterator sequentially, without the benefit of any
// short-circuiting. We probably don't want testing to wait that long. ;)
// It would be nice if `find_last` could prioritize the later splits,
// basically flipping the `join` args, without needing indexed `rev`.
// (or could we have an unindexed `rev`?)
let builder = rayon::ThreadPoolBuilder::new().num_threads(2);
let pool = builder.build().unwrap();

let x = pool.install(|| octillion().find_last(|_| true));
pool.install(f)
}

#[test]
fn find_last_octillion() {
// It would be nice if `find_last` could prioritize the later splits,
// basically flipping the `join` args, without needing indexed `rev`.
// (or could we have an unindexed `rev`?)
let x = two_threads(|| octillion().find_last(|_| true));
assert_eq!(x, Some(OCTILLION - 1));
}

#[test]
fn find_last_octillion_flat() {
// FIXME: Ditto, need two threads.
let builder = rayon::ThreadPoolBuilder::new().num_threads(2);
let pool = builder.build().unwrap();

let x = pool.install(|| octillion_flat().find_last(|_| true));
let x = two_threads(|| octillion_flat().find_last(|_| true));
assert_eq!(x, Some(OCTILLION - 1));
}

#[test]
fn find_any_octillion() {
let x = two_threads(|| octillion().find_any(|x| *x > OCTILLION / 2));
assert!(x.is_some());
}

#[test]
fn find_any_octillion_flat() {
let x = two_threads(|| octillion_flat().find_any(|x| *x > OCTILLION / 2));
assert!(x.is_some());
}

#[test]
fn filter_find_any_octillion() {
let x = two_threads(|| {
octillion()
.filter(|x| *x > OCTILLION / 2)
.find_any(|_| true)
});
assert!(x.is_some());
}

#[test]
fn filter_find_any_octillion_flat() {
let x = two_threads(|| {
octillion_flat()
.filter(|x| *x > OCTILLION / 2)
.find_any(|_| true)
});
assert!(x.is_some());
}

#[test]
fn fold_find_any_octillion_flat() {
let x = two_threads(|| octillion_flat().fold(|| (), |_, _| ()).find_any(|_| true));
assert!(x.is_some());
}