Skip to content

Commit 17af6f7

Browse files
committed
auto merge of #10688 : bjz/rust/recv_iter, r=brson
I've noticed I use this pattern quite a bit: ~~~rust do spawn { loop { match port.try_recv() { Some(x) => ..., None => ..., } } } ~~~ The `RecvIterator`, returned from a default `recv_iter` method on the `GenericPort` trait, allows you to reduce this down to: ~~~rust do spawn { for x in port.recv_iter() { ... } } ~~~ As demonstrated in the tests, you can also access the port from within the `for` block for further `recv`ing and `peek`ing with no borrow errors, which is quite nice.
2 parents faf4c93 + 31da6b7 commit 17af6f7

File tree

1 file changed

+84
-3
lines changed

1 file changed

+84
-3
lines changed

src/libstd/comm.rs

+84-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Message passing
1515
#[allow(missing_doc)];
1616

1717
use clone::Clone;
18+
use iter::Iterator;
1819
use kinds::Send;
1920
use option::Option;
2021
use rtcomm = rt::comm;
@@ -43,10 +44,35 @@ pub trait GenericPort<T> {
4344
/// Receives a message, or fails if the connection closes.
4445
fn recv(&self) -> T;
4546

46-
/** Receives a message, or returns `none` if
47-
the connection is closed or closes.
48-
*/
47+
/// Receives a message, or returns `none` if
48+
/// the connection is closed or closes.
4949
fn try_recv(&self) -> Option<T>;
50+
51+
/// Returns an iterator that breaks once the connection closes.
52+
///
53+
/// # Example
54+
///
55+
/// ~~~rust
56+
/// do spawn {
57+
/// for x in port.recv_iter() {
58+
/// if pred(x) { break; }
59+
/// println!("{}", x);
60+
/// }
61+
/// }
62+
/// ~~~
63+
fn recv_iter<'a>(&'a self) -> RecvIterator<'a, Self> {
64+
RecvIterator { port: self }
65+
}
66+
}
67+
68+
pub struct RecvIterator<'a, P> {
69+
priv port: &'a P,
70+
}
71+
72+
impl<'a, T, P: GenericPort<T>> Iterator<T> for RecvIterator<'a, P> {
73+
fn next(&mut self) -> Option<T> {
74+
self.port.try_recv()
75+
}
5076
}
5177

5278
/// Ports that can `peek`
@@ -227,3 +253,58 @@ impl<T: Send> Clone for SharedPort<T> {
227253
SharedPort { x: p.clone() }
228254
}
229255
}
256+
257+
#[cfg(test)]
258+
mod tests {
259+
use comm::*;
260+
use prelude::*;
261+
262+
#[test]
263+
fn test_nested_recv_iter() {
264+
let (port, chan) = stream::<int>();
265+
let (total_port, total_chan) = oneshot::<int>();
266+
267+
do spawn {
268+
let mut acc = 0;
269+
for x in port.recv_iter() {
270+
acc += x;
271+
for x in port.recv_iter() {
272+
acc += x;
273+
for x in port.try_recv().move_iter() {
274+
acc += x;
275+
total_chan.send(acc);
276+
}
277+
}
278+
}
279+
}
280+
281+
chan.send(3);
282+
chan.send(1);
283+
chan.send(2);
284+
assert_eq!(total_port.recv(), 6);
285+
}
286+
287+
#[test]
288+
fn test_recv_iter_break() {
289+
let (port, chan) = stream::<int>();
290+
let (count_port, count_chan) = oneshot::<int>();
291+
292+
do spawn {
293+
let mut count = 0;
294+
for x in port.recv_iter() {
295+
if count >= 3 {
296+
count_chan.send(count);
297+
break;
298+
} else {
299+
count += x;
300+
}
301+
}
302+
}
303+
304+
chan.send(2);
305+
chan.send(2);
306+
chan.send(2);
307+
chan.send(2);
308+
assert_eq!(count_port.recv(), 4);
309+
}
310+
}

0 commit comments

Comments
 (0)