Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

[client]: Fix for incorrectly dropped consensus messages (#11082) #11086

Merged
merged 1 commit into from
Sep 25, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

use std::cmp;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::convert::TryFrom;
use std::io::{BufRead, BufReader};
use std::str::from_utf8;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
use std::time::{Duration, Instant};

use ansi_term::Colour;
Expand Down Expand Up @@ -2739,12 +2740,17 @@ fn transaction_receipt(

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
/// Using a *signed* integer for counting currently queued messages since the
/// order in which the counter is incremented and decremented is not defined.
/// Using an unsigned integer can (and will) result in integer underflow,
/// incorrectly rejecting messages and returning a FullQueue error.
currently_queued: Arc<AtomicI64>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add docs why we're using signed integers here, so it won't get accidentally reverted in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added documentation to the currently_queued field to clarify why signed integers are being used here.

limit: i64,
}

impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your code, but maybe this can be a private method?

let limit = i64::try_from(limit).unwrap_or(i64::max_value());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider changing the input arg to isize rather than casting it? Seems like there are 3 call sites in total (in Client::new()) so perhaps changing the type is not too bad?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the input arg type does spread into a couple of corners in the code, especially into client settings.
It is an option of course - on the other hand it should only be configurable as a positive number, making the setting itself a signed integer might be confusing to the user.
Using a signed integer in IoChannelQueue is a local implementation strategy to ensure the queue size counter checks work correctly in a multi-threaded environment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the other hand it should only be configurable as a positive number, making the setting itself a signed integer might be confusing to the user.

That could be handled when reading the user provided config, i.e. continue using an unsigned integer in configs and convert to signed there rather than for each message.

IoChannelQueue {
currently_queued: Default::default(),
limit,
Expand All @@ -2756,9 +2762,12 @@ impl IoChannelQueue {
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
if queue_size >= self.limit {
return Err(EthcoreError::FullQueue(self.limit))
let err_limit = usize::try_from(self.limit).unwrap_or(usize::max_value());
return Err(EthcoreError::FullQueue(err_limit))
};

let count = i64::try_from(count).unwrap_or(i64::max_value());

let currently_queued = self.currently_queued.clone();
let _ok = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
Expand Down