Skip to content

Commit 699dcc0

Browse files
committed
fix: port futures-rs fix on channel
1 parent 18068e5 commit 699dcc0

File tree

6 files changed

+214
-32
lines changed

6 files changed

+214
-32
lines changed

tentacle/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ socket2 = { version = "0.3.15" }
5454
[dev-dependencies]
5555
env_logger = "0.6.0"
5656
crossbeam-channel = "0.3.6"
57-
systemstat = "0.1.3"
57+
#systemstat = "0.1.3"
5858
futures-test = "0.3.5"
5959

6060
[target.'cfg(unix)'.dev-dependencies]

tentacle/src/channel/bound.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,12 @@ impl<T> Receiver<T> {
583583
}
584584
None => {
585585
let state = decode_state(inner.state.load(SeqCst));
586-
if state.is_open || state.num_messages != 0 {
586+
if state.is_closed() {
587+
// If closed flag is set AND there are no pending messages
588+
// it means end of stream
589+
self.inner = None;
590+
Poll::Ready(None)
591+
} else {
587592
// If queue is open, we need to return Pending
588593
// to be woken up when new messages arrive.
589594
// If queue is closed but num_messages is non-zero,
@@ -592,11 +597,6 @@ impl<T> Receiver<T> {
592597
// so we need to park until sender unparks the task
593598
// after queueing the message.
594599
Poll::Pending
595-
} else {
596-
// If closed flag is set AND there are no pending messages
597-
// it means end of stream
598-
self.inner = None;
599-
Poll::Ready(None)
600600
}
601601
}
602602
}
@@ -661,8 +661,26 @@ impl<T> Drop for Receiver<T> {
661661
// Drain the channel of all pending messages
662662
self.close();
663663
if self.inner.is_some() {
664-
while let Poll::Ready(Some(..)) = self.next_message() {
665-
// ...
664+
loop {
665+
match self.next_message() {
666+
Poll::Ready(Some(_)) => {}
667+
Poll::Ready(None) => break,
668+
Poll::Pending => {
669+
let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
670+
671+
// If the channel is closed, then there is no need to park.
672+
if state.is_closed() {
673+
break;
674+
}
675+
676+
// TODO: Spinning isn't ideal, it might be worth
677+
// investigating using a condvar or some other strategy
678+
// here. That said, if this case is hit, then another thread
679+
// is about to push the value into the queue and this isn't
680+
// the only spinlock in the impl right now.
681+
::std::thread::yield_now();
682+
}
683+
}
666684
}
667685
}
668686
}

tentacle/src/channel/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ struct State {
4444
num_messages: usize,
4545
}
4646

47+
impl State {
48+
fn is_closed(&self) -> bool {
49+
!self.is_open && self.num_messages == 0
50+
}
51+
}
52+
4753
fn decode_state(num: usize) -> State {
4854
State {
4955
is_open: num & OPEN_MASK == OPEN_MASK,

tentacle/src/channel/tests/mpsc_close.rs

Lines changed: 139 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@ use futures::executor::block_on;
33
use futures::future::FutureExt;
44
use futures::sink::SinkExt;
55
use futures::stream::StreamExt;
6-
use std::sync::Arc;
6+
use std::future::Future;
7+
use std::pin::Pin;
8+
use std::sync::{Arc, Weak};
9+
use std::task::{Context, Poll};
710
use std::thread;
11+
use std::time::{Duration, Instant};
812

913
#[test]
1014
fn smoke() {
@@ -141,3 +145,137 @@ fn single_receiver_drop_closes_channel_and_drains() {
141145
assert!(sender.is_closed());
142146
}
143147
}
148+
149+
// Stress test that `try_send()`s occurring concurrently with receiver
150+
// close/drops don't appear as successful sends.
151+
#[test]
152+
fn stress_try_send_as_receiver_closes() {
153+
const AMT: usize = 10000;
154+
// To provide variable timing characteristics (in the hopes of
155+
// reproducing the collision that leads to a race), we busy-re-poll
156+
// the test MPSC receiver a variable number of times before actually
157+
// stopping. We vary this countdown between 1 and the following
158+
// value.
159+
const MAX_COUNTDOWN: usize = 20;
160+
// When we detect that a successfully sent item is still in the
161+
// queue after a disconnect, we spin for up to 100ms to confirm that
162+
// it is a persistent condition and not a concurrency illusion.
163+
const SPIN_TIMEOUT_S: u64 = 10;
164+
const SPIN_SLEEP_MS: u64 = 10;
165+
struct TestRx {
166+
rx: mpsc::Receiver<Arc<()>>,
167+
// The number of times to query `rx` before dropping it.
168+
poll_count: usize,
169+
}
170+
struct TestTask {
171+
command_rx: mpsc::Receiver<TestRx>,
172+
test_rx: Option<mpsc::Receiver<Arc<()>>>,
173+
countdown: usize,
174+
}
175+
impl TestTask {
176+
/// Create a new TestTask
177+
fn new() -> (TestTask, mpsc::Sender<TestRx>) {
178+
let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
179+
(
180+
TestTask {
181+
command_rx,
182+
test_rx: None,
183+
countdown: 0, // 0 means no countdown is in progress.
184+
},
185+
command_tx,
186+
)
187+
}
188+
}
189+
impl Future for TestTask {
190+
type Output = ();
191+
192+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193+
// Poll the test channel, if one is present.
194+
if let Some(rx) = &mut self.test_rx {
195+
if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
196+
let _ = v.expect("test finished unexpectedly!");
197+
}
198+
self.countdown -= 1;
199+
// Busy-poll until the countdown is finished.
200+
cx.waker().wake_by_ref();
201+
}
202+
// Accept any newly submitted MPSC channels for testing.
203+
match self
204+
.command_rx
205+
.poll_next_unpin(cx)
206+
.map(|item| item.map(|i| i.1))
207+
{
208+
Poll::Ready(Some(TestRx { rx, poll_count })) => {
209+
self.test_rx = Some(rx);
210+
self.countdown = poll_count;
211+
cx.waker().wake_by_ref();
212+
}
213+
Poll::Ready(None) => return Poll::Ready(()),
214+
Poll::Pending => {}
215+
}
216+
if self.countdown == 0 {
217+
// Countdown complete -- drop the Receiver.
218+
self.test_rx = None;
219+
}
220+
Poll::Pending
221+
}
222+
}
223+
let (f, cmd_tx) = TestTask::new();
224+
let bg = thread::spawn(move || block_on(f));
225+
for i in 0..AMT {
226+
let (test_tx, rx) = mpsc::channel(0);
227+
let poll_count = i % MAX_COUNTDOWN;
228+
cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
229+
let mut prev_weak: Option<Weak<()>> = None;
230+
let mut attempted_sends = 0;
231+
let mut successful_sends = 0;
232+
loop {
233+
// Create a test item.
234+
let item = Arc::new(());
235+
let weak = Arc::downgrade(&item);
236+
match test_tx.try_send(item) {
237+
Ok(_) => {
238+
prev_weak = Some(weak);
239+
successful_sends += 1;
240+
}
241+
Err(ref e) if e.is_full() => {}
242+
Err(ref e) if e.is_disconnected() => {
243+
// Test for evidence of the race condition.
244+
if let Some(prev_weak) = prev_weak {
245+
if prev_weak.upgrade().is_some() {
246+
// The previously sent item is still allocated.
247+
// However, there appears to be some aspect of the
248+
// concurrency that can legitimately cause the Arc
249+
// to be momentarily valid. Spin for up to 100ms
250+
// waiting for the previously sent item to be
251+
// dropped.
252+
let t0 = Instant::now();
253+
let mut spins = 0;
254+
loop {
255+
if prev_weak.upgrade().is_none() {
256+
break;
257+
}
258+
assert!(
259+
t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
260+
"item not dropped on iteration {} after \
261+
{} sends ({} successful). spin=({})",
262+
i,
263+
attempted_sends,
264+
successful_sends,
265+
spins
266+
);
267+
spins += 1;
268+
thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
269+
}
270+
}
271+
}
272+
break;
273+
}
274+
Err(ref e) => panic!("unexpected error: {}", e),
275+
}
276+
attempted_sends += 1;
277+
}
278+
}
279+
drop(cmd_tx);
280+
bg.join().expect("background thread join");
281+
}

tentacle/src/channel/unbound.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,12 @@ impl<T> UnboundedReceiver<T> {
400400
}
401401
None => {
402402
let state = decode_state(inner.state.load(SeqCst));
403-
if state.is_open || state.num_messages != 0 {
403+
if state.is_closed() {
404+
// If closed flag is set AND there are no pending messages
405+
// it means end of stream
406+
self.inner = None;
407+
Poll::Ready(None)
408+
} else {
404409
// If queue is open, we need to return Pending
405410
// to be woken up when new messages arrive.
406411
// If queue is closed but num_messages is non-zero,
@@ -409,11 +414,6 @@ impl<T> UnboundedReceiver<T> {
409414
// so we need to park until sender unparks the task
410415
// after queueing the message.
411416
Poll::Pending
412-
} else {
413-
// If closed flag is set AND there are no pending messages
414-
// it means end of stream
415-
self.inner = None;
416-
Poll::Ready(None)
417417
}
418418
}
419419
}
@@ -466,8 +466,26 @@ impl<T> Drop for UnboundedReceiver<T> {
466466
// Drain the channel of all pending messages
467467
self.close();
468468
if self.inner.is_some() {
469-
while let Poll::Ready(Some(..)) = self.next_message() {
470-
// ...
469+
loop {
470+
match self.next_message() {
471+
Poll::Ready(Some(_)) => {}
472+
Poll::Ready(None) => break,
473+
Poll::Pending => {
474+
let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
475+
476+
// If the channel is closed, then there is no need to park.
477+
if state.is_closed() {
478+
break;
479+
}
480+
481+
// TODO: Spinning isn't ideal, it might be worth
482+
// investigating using a condvar or some other strategy
483+
// here. That said, if this case is hit, then another thread
484+
// is about to push the value into the queue and this isn't
485+
// the only spinlock in the impl right now.
486+
::std::thread::yield_now();
487+
}
488+
}
471489
}
472490
}
473491
}

tentacle/tests/test_kill.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use nix::{
66
unistd::{fork, ForkResult},
77
};
88
use std::{thread, time::Duration};
9-
use systemstat::{Platform, System};
9+
// use systemstat::{Platform, System};
1010
use tentacle::{
1111
builder::{MetaBuilder, ServiceBuilder},
1212
context::{ProtocolContext, ProtocolContextMutRef},
@@ -19,23 +19,25 @@ use tentacle::{
1919

2020
/// Get current used memory(bytes)
2121
fn current_used_memory() -> Option<f64> {
22-
let sys = System::new();
23-
match sys.memory() {
24-
Ok(mem) => Some((mem.total.as_u64() - mem.free.as_u64()) as f64),
25-
Err(_) => None,
26-
}
22+
// let sys = System::new();
23+
// match sys.memory() {
24+
// Ok(mem) => Some((mem.total.as_u64() - mem.free.as_u64()) as f64),
25+
// Err(_) => None,
26+
// }
27+
Some(1.0)
2728
}
2829

2930
/// Get current used cpu(all cores) average usage ratio
3031
fn current_used_cpu() -> Option<f32> {
31-
let sys = System::new();
32-
match sys.cpu_load_aggregate() {
33-
Ok(cpu) => {
34-
thread::sleep(Duration::from_secs(1));
35-
cpu.done().ok().map(|cpu| cpu.user)
36-
}
37-
Err(_) => None,
38-
}
32+
// let sys = System::new();
33+
// match sys.cpu_load_aggregate() {
34+
// Ok(cpu) => {
35+
// thread::sleep(Duration::from_secs(1));
36+
// cpu.done().ok().map(|cpu| cpu.user)
37+
// }
38+
// Err(_) => None,
39+
// }
40+
Some(1.0)
3941
}
4042

4143
pub fn create<F>(secio: bool, meta: ProtocolMeta, shandle: F) -> Service<F>

0 commit comments

Comments
 (0)