Skip to content

Commit af912ad

Browse files
authored
Merge pull request #3 from winstonewert/master
Improved error handling #2
2 parents 2895a57 + 85dfd37 commit af912ad

File tree

5 files changed

+119
-31
lines changed

5 files changed

+119
-31
lines changed

src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,10 @@ extern crate crossbeam;
22

33
pub mod read;
44
pub mod write;
5+
6+
fn unwrap_or_resume_unwind<V>(value: Result<V, Box<dyn std::any::Any + Send>>) -> V {
7+
match value {
8+
Ok(value) => value,
9+
Err(error) => std::panic::resume_unwind(error),
10+
}
11+
}

src/read.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,6 @@ impl Reader {
9494
self.empty_send.send(None).ok();
9595
}
9696

97-
/// return errors that may still be in the queue. This is still possible even if the
98-
/// sender was dropped, as they are in the queue buffer (see docs for std::mpsc::Receiver::recv).
99-
#[inline]
100-
fn get_errors(&self) -> io::Result<()> {
101-
for res in &self.full_recv {
102-
res?;
103-
}
104-
Ok(())
105-
}
106-
10797
// assumes that self.buffer is not None. Returns a tuple of the read result
10898
// and a flag indicating if a new buffer should be received (cannot be done
10999
// here due to borrow checker)
@@ -203,7 +193,7 @@ pub fn reader<R, F, O, E>(bufsize: usize, queuelen: usize, reader: R, func: F) -
203193
where
204194
F: FnOnce(&mut Reader) -> Result<O, E>,
205195
R: io::Read + Send,
206-
E: Send + From<io::Error>,
196+
E: Send,
207197
{
208198
reader_init(bufsize, queuelen, || Ok(reader), func)
209199
}
@@ -237,7 +227,7 @@ where
237227
I: Send + FnOnce() -> Result<R, E>,
238228
F: FnOnce(&mut Reader) -> Result<O, E>,
239229
R: io::Read,
240-
E: Send + From<io::Error>,
230+
E: Send,
241231
{
242232
assert!(queuelen >= 1);
243233
assert!(bufsize > 0);
@@ -255,15 +245,17 @@ where
255245
Ok::<_, E>(())
256246
});
257247

258-
let out = func(&mut reader)?;
248+
let out = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(&mut reader)));
259249

260250
reader.done();
261251

262-
handle.join().unwrap()?;
263-
264-
reader.get_errors()?;
265-
266-
Ok(out)
252+
// We deliberately ensure that errors from the background reading thread are given priority.
253+
// This does NOT include errors returned from the actual I/O which are returned via the channels
254+
// To the reader. It includes errors returned by init_reader() and panics that occured while reading.
255+
// Either of those cases will have cause the reader to be in an unworkable state. Consequently, we want to
256+
// surface the error that caused this.
257+
crate::unwrap_or_resume_unwind(handle.join())?;
258+
crate::unwrap_or_resume_unwind(out)
267259
})
268260
.unwrap()
269261
}

src/write.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,13 @@ impl Writer {
4848
if self.full_send.send(Message::Buffer(full)).is_err() {
4949
self.get_errors()?;
5050
}
51+
Ok(())
5152
} else {
5253
self.get_errors()?;
54+
// If we reach this point, we couldn't communicate with the background writer
55+
// but there were no errors recorded in the queue. BrokenPipe seems to closest error to return.
56+
Err(io::Error::from(io::ErrorKind::BrokenPipe))
5357
}
54-
Ok(())
5558
}
5659

5760
#[inline]
@@ -293,12 +296,21 @@ where
293296
Ok(None)
294297
});
295298

296-
let out = func(&mut writer)?;
299+
let out = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(&mut writer)));
300+
301+
let writer_result = writer.done();
297302

298-
writer.done()?;
303+
let handle = handle.join();
299304

300-
let of = handle.join().unwrap()?;
305+
// Prefer errors from the background thread. This doesn't include actual I/O errors from the writing
306+
// because those are sent via the channel to the main thread. Instead, it returns errors from init_writer
307+
// or panics from the writing thread. If either of those happen, writing in the main thread will fail
308+
// but we want to return the underlying reason.
309+
let of = crate::unwrap_or_resume_unwind(handle)?;
310+
let out = crate::unwrap_or_resume_unwind(out)?;
301311

312+
// Report write errors that happened after the main thread stopped writing.
313+
writer_result?;
302314
writer.get_errors()?;
303315

304316
Ok((out, of.unwrap()))

tests/read.rs

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,28 @@ struct Reader<'a> {
1010
// used to test what happens to errors that are
1111
// stuck in the queue
1212
fails_after: usize,
13+
panic: bool,
1314
}
1415

1516
impl<'a> Reader<'a> {
16-
fn new(data: &'a [u8], block_size: usize, fails_after: usize) -> Reader {
17+
fn new(data: &'a [u8], block_size: usize, fails_after: usize, panic: bool) -> Reader {
1718
Reader {
1819
data: data,
1920
block_size: block_size,
2021
fails_after: fails_after,
22+
panic: panic,
2123
}
2224
}
2325
}
2426

2527
impl<'a> Read for Reader<'a> {
2628
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
2729
if self.fails_after == 0 {
28-
return Err(io::Error::new(io::ErrorKind::Other, "read err"));
30+
if self.panic {
31+
panic!("read err");
32+
} else {
33+
return Err(io::Error::new(io::ErrorKind::Other, "read err"));
34+
}
2935
}
3036
self.fails_after -= 1;
3137
let amt = min(self.data.len(), min(buf.len(), self.block_size));
@@ -61,11 +67,11 @@ fn read() {
6167
for out_bufsize in 1..len {
6268
for queuelen in 1..len {
6369
// test the mock reader itself
64-
let mut rdr = Reader::new(text, rdr_block_size, ::std::usize::MAX);
70+
let rdr = Reader::new(text, rdr_block_size, ::std::usize::MAX, false);
6571
assert_eq!(read_chunks(rdr, out_bufsize).unwrap().as_slice(), &text[..]);
6672

6773
// test threaded reader
68-
let mut rdr = Reader::new(text, rdr_block_size, ::std::usize::MAX);
74+
let rdr = Reader::new(text, rdr_block_size, ::std::usize::MAX, false);
6975
let out = reader(channel_bufsize, queuelen, rdr, |r| {
7076
read_chunks(r, out_bufsize)
7177
})
@@ -91,7 +97,7 @@ fn read_fail() {
9197
for channel_bufsize in 1..len {
9298
for queuelen in 1..len {
9399
let mut out = vec![0];
94-
let mut rdr = Reader::new(text, channel_bufsize, len / channel_bufsize);
100+
let rdr = Reader::new(text, channel_bufsize, len / channel_bufsize, false);
95101
let res: io::Result<_> = reader(channel_bufsize, queuelen, rdr, |r| {
96102
while r.read(&mut out)? > 0 {}
97103
Ok(())
@@ -109,10 +115,52 @@ fn read_fail() {
109115
}
110116
}
111117

118+
#[test]
119+
#[should_panic(expected = "read err")]
120+
fn read_panic() {
121+
let text = b"The quick brown fox";
122+
let rdr = Reader::new(text, 1, 1, true);
123+
let _res: io::Result<_> = reader(1, 1, rdr, |r| {
124+
r.read_to_end(&mut Vec::new())?;
125+
Ok(())
126+
});
127+
}
128+
129+
#[test]
130+
fn read_fail_processing() {
131+
let text = b"The quick brown fox";
132+
133+
let rdr = Reader::new(text, 1, 1, false);
134+
let res: Result<(), &'static str> = reader(1, 1, rdr, |_r| Err("gave up"));
135+
136+
if let Err(e) = res {
137+
assert_eq!(&format!("{}", e), "gave up");
138+
} else {
139+
panic!("read should fail");
140+
}
141+
}
142+
143+
#[test]
144+
#[should_panic(expected = "gave up")]
145+
fn read_panic_processing() {
146+
let text = b"The quick brown fox";
147+
148+
let rdr = Reader::new(text, 1, 1, false);
149+
let _res: Result<(), &'static str> = reader(1, 1, rdr, |_r| panic!("gave up"));
150+
}
151+
112152
#[test]
113153
fn reader_init_fail() {
114154
let e = io::Error::new(io::ErrorKind::Other, "init err");
115-
let res = reader_init(5, 2, || Err::<&[u8], _>(e), |_| Ok(()));
155+
let res = reader_init(
156+
5,
157+
2,
158+
|| Err::<&[u8], _>(e),
159+
|reader| {
160+
reader.read_to_end(&mut Vec::new())?;
161+
Ok(())
162+
},
163+
);
116164
if let Err(e) = res {
117165
assert_eq!(&format!("{}", e), "init err");
118166
} else {

tests/write.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ fn write_thread() {
5858
for writer_bufsize in 1..len {
5959
for queuelen in 1..len {
6060
// Test the writer: write without flushing, which should result in empty output
61-
let mut w = writer_init_finish(
61+
let w = writer_init_finish(
6262
channel_bufsize,
6363
queuelen,
6464
|| Ok(Writer::new(false, false, writer_bufsize)),
@@ -106,7 +106,15 @@ fn write_thread() {
106106
#[test]
107107
fn writer_init_fail() {
108108
let e = io::Error::new(io::ErrorKind::Other, "init err");
109-
let res = writer_init(5, 2, || Err::<&mut [u8], _>(e), |_| Ok(()));
109+
let res = writer_init(
110+
5,
111+
2,
112+
|| Err::<&mut [u8], _>(e),
113+
|writer| {
114+
writer.write(b"let the cows come home")?;
115+
Ok(())
116+
},
117+
);
110118
if let Err(e) = res {
111119
assert_eq!(&format!("{}", e), "init err");
112120
} else {
@@ -123,7 +131,7 @@ fn write_fail() {
123131
for writer_bufsize in 1..len {
124132
for queuelen in 1..len {
125133
let w = Writer::new(true, false, writer_bufsize);
126-
let res = writer(channel_bufsize, queuelen, w, |w| w.write(text));
134+
let res = writer(channel_bufsize, queuelen, w, |w| w.write_all(text));
127135
if let Err(e) = res {
128136
assert_eq!(&format!("{}", e), "write err");
129137
} else {
@@ -141,3 +149,24 @@ fn write_fail() {
141149
}
142150
}
143151
}
152+
153+
#[test]
154+
fn write_source_fail() {
155+
let w = Writer::new(true, false, 1);
156+
let res: std::io::Result<()> = writer(1, 1, w, |_w| {
157+
Err(std::io::Error::from(std::io::ErrorKind::AddrInUse))
158+
});
159+
160+
if let Err(e) = res {
161+
assert_eq!(e.kind(), std::io::ErrorKind::AddrInUse);
162+
} else {
163+
panic!("expected error")
164+
}
165+
}
166+
167+
#[test]
168+
#[should_panic(expected = "all out of bubblegum")]
169+
fn write_source_panic() {
170+
let w = Writer::new(true, false, 1);
171+
let _res: std::io::Result<()> = writer(1, 1, w, |_w| panic!("all out of bubblegum"));
172+
}

0 commit comments

Comments
 (0)