Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 7d88b03

Browse files
dmitry-markingpestana
authored andcommitted
Replace request-response incoming requests queue with async-channel (#14199)
1 parent da4d931 commit 7d88b03

File tree

13 files changed

+53
-53
lines changed

13 files changed

+53
-53
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/consensus/beefy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ homepage = "https://substrate.io"
1010

1111
[dependencies]
1212
array-bytes = "4.1"
13+
async-channel = "1.8.0"
1314
async-trait = "0.1.57"
1415
codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] }
1516
fnv = "1.0.6"

client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
//! Helper for handling (i.e. answering) BEEFY justifications requests from a remote peer.
1818
1919
use codec::Decode;
20-
use futures::{
21-
channel::{mpsc, oneshot},
22-
StreamExt,
23-
};
20+
use futures::{channel::oneshot, StreamExt};
2421
use log::{debug, trace};
2522
use sc_client_api::BlockBackend;
2623
use sc_network::{
@@ -102,11 +99,11 @@ impl<B: Block> IncomingRequest<B> {
10299
///
103100
/// Takes care of decoding and handling of invalid encoded requests.
104101
pub(crate) struct IncomingRequestReceiver {
105-
raw: mpsc::Receiver<netconfig::IncomingRequest>,
102+
raw: async_channel::Receiver<netconfig::IncomingRequest>,
106103
}
107104

108105
impl IncomingRequestReceiver {
109-
pub fn new(inner: mpsc::Receiver<netconfig::IncomingRequest>) -> Self {
106+
pub fn new(inner: async_channel::Receiver<netconfig::IncomingRequest>) -> Self {
110107
Self { raw: inner }
111108
}
112109

client/consensus/beefy/src/communication/request_response/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ pub(crate) mod outgoing_requests_engine;
2323

2424
pub use incoming_requests_handler::BeefyJustifsRequestHandler;
2525

26-
use futures::channel::mpsc;
2726
use std::time::Duration;
2827

2928
use codec::{Decode, Encode, Error as CodecError};
@@ -54,7 +53,7 @@ pub(crate) fn on_demand_justifications_protocol_config<Hash: AsRef<[u8]>>(
5453
) -> (IncomingRequestReceiver, RequestResponseConfig) {
5554
let name = justifications_protocol_name(genesis_hash, fork_id);
5655
let fallback_names = vec![];
57-
let (tx, rx) = mpsc::channel(JUSTIF_CHANNEL_SIZE);
56+
let (tx, rx) = async_channel::bounded(JUSTIF_CHANNEL_SIZE);
5857
let rx = IncomingRequestReceiver::new(rx);
5958
let cfg = RequestResponseConfig {
6059
name,

client/network/bitswap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
1616
prost-build = "0.11"
1717

1818
[dependencies]
19+
async-channel = "1.8.0"
1920
cid = "0.8.6"
2021
futures = "0.3.21"
2122
libp2p-identity = { version = "0.1.2", features = ["peerid"] }

client/network/bitswap/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
//! CID is expected to reference 256-bit Blake2b transaction hash.
2222
2323
use cid::{self, Version};
24-
use futures::{channel::mpsc, StreamExt};
24+
use futures::StreamExt;
2525
use libp2p_identity::PeerId;
2626
use log::{debug, error, trace};
2727
use prost::Message;
@@ -93,13 +93,13 @@ impl Prefix {
9393
/// Bitswap request handler
9494
pub struct BitswapRequestHandler<B> {
9595
client: Arc<dyn BlockBackend<B> + Send + Sync>,
96-
request_receiver: mpsc::Receiver<IncomingRequest>,
96+
request_receiver: async_channel::Receiver<IncomingRequest>,
9797
}
9898

9999
impl<B: BlockT> BitswapRequestHandler<B> {
100100
/// Create a new [`BitswapRequestHandler`].
101101
pub fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
102-
let (tx, request_receiver) = mpsc::channel(MAX_REQUEST_QUEUE);
102+
let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);
103103

104104
let config = ProtocolConfig {
105105
name: ProtocolName::from(PROTOCOL_NAME),
@@ -289,7 +289,7 @@ pub enum BitswapError {
289289
#[cfg(test)]
290290
mod tests {
291291
use super::*;
292-
use futures::{channel::oneshot, SinkExt};
292+
use futures::channel::oneshot;
293293
use sc_block_builder::BlockBuilderProvider;
294294
use schema::bitswap::{
295295
message::{wantlist::Entry, Wantlist},

client/network/light/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
1616
prost-build = "0.11"
1717

1818
[dependencies]
19+
async-channel = "1.8.0"
1920
array-bytes = "4.1"
2021
codec = { package = "parity-scale-codec", version = "3.2.2", features = [
2122
"derive",

client/network/light/src/light_client_requests/handler.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
2525
use crate::schema;
2626
use codec::{self, Decode, Encode};
27-
use futures::{channel::mpsc, prelude::*};
27+
use futures::prelude::*;
2828
use libp2p_identity::PeerId;
2929
use log::{debug, trace};
3030
use prost::Message;
@@ -43,9 +43,13 @@ use std::{marker::PhantomData, sync::Arc};
4343

4444
const LOG_TARGET: &str = "light-client-request-handler";
4545

46+
/// Incoming requests bounded queue size. For now due to lack of data on light client request
47+
/// handling in production systems, this value is chosen to match the block request limit.
48+
const MAX_LIGHT_REQUEST_QUEUE: usize = 20;
49+
4650
/// Handler for incoming light client requests from a remote peer.
4751
pub struct LightClientRequestHandler<B, Client> {
48-
request_receiver: mpsc::Receiver<IncomingRequest>,
52+
request_receiver: async_channel::Receiver<IncomingRequest>,
4953
/// Blockchain client.
5054
client: Arc<Client>,
5155
_block: PhantomData<B>,
@@ -62,9 +66,7 @@ where
6266
fork_id: Option<&str>,
6367
client: Arc<Client>,
6468
) -> (Self, ProtocolConfig) {
65-
// For now due to lack of data on light client request handling in production systems, this
66-
// value is chosen to match the block request limit.
67-
let (tx, request_receiver) = mpsc::channel(20);
69+
let (tx, request_receiver) = async_channel::bounded(MAX_LIGHT_REQUEST_QUEUE);
6870

6971
let mut protocol_config = super::generate_protocol_config(
7072
protocol_id,

client/network/src/request_responses.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@
3636
3737
use crate::{types::ProtocolName, ReputationChange};
3838

39-
use futures::{
40-
channel::{mpsc, oneshot},
41-
prelude::*,
42-
};
39+
use futures::{channel::oneshot, prelude::*};
4340
use libp2p::{
4441
core::{Endpoint, Multiaddr},
4542
request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
@@ -126,7 +123,7 @@ pub struct ProtocolConfig {
126123
/// other peers. If this is `Some` but the channel is closed, then the local node will
127124
/// advertise support for this protocol, but any incoming request will lead to an error being
128125
/// sent back.
129-
pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
126+
pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
130127
}
131128

132129
/// A single request received by a peer on a request-response protocol.
@@ -259,8 +256,10 @@ pub struct RequestResponsesBehaviour {
259256
///
260257
/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
261258
/// "response builder" used to build responses for incoming requests.
262-
protocols:
263-
HashMap<ProtocolName, (Behaviour<GenericCodec>, Option<mpsc::Sender<IncomingRequest>>)>,
259+
protocols: HashMap<
260+
ProtocolName,
261+
(Behaviour<GenericCodec>, Option<async_channel::Sender<IncomingRequest>>),
262+
>,
264263

265264
/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
266265
pending_requests:
@@ -295,7 +294,10 @@ struct MessageRequest {
295294
request: Vec<u8>,
296295
channel: ResponseChannel<Result<Vec<u8>, ()>>,
297296
protocol: ProtocolName,
298-
resp_builder: Option<futures::channel::mpsc::Sender<IncomingRequest>>,
297+
// A builder used for building responses for incoming requests. Note that we use
298+
// `async_channel` and not `mpsc` on purpose, because `mpsc::channel` allocates an extra
299+
// message slot for every cloned `Sender` and this breaks a back-pressure mechanism.
300+
resp_builder: Option<async_channel::Sender<IncomingRequest>>,
299301
// Once we get incoming request we save all params, create an async call to Peerset
300302
// to get the reputation of the peer.
301303
get_peer_reputation: Pin<Box<dyn Future<Output = Result<i32, ()>> + Send>>,
@@ -618,10 +620,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
618620

619621
// Submit the request to the "response builder" passed by the user at
620622
// initialization.
621-
if let Some(mut resp_builder) = resp_builder {
623+
if let Some(resp_builder) = resp_builder {
622624
// If the response builder is too busy, silently drop `tx`. This
623625
// will be reported by the corresponding request-response [`Behaviour`]
624626
// through an `InboundFailure::Omission` event.
627+
// Note that we use `async_channel::bounded` and not `mpsc::channel`
628+
// because the latter allocates an extra slot for every cloned sender.
625629
let _ = resp_builder.try_send(IncomingRequest {
626630
peer,
627631
payload: request,
@@ -1036,11 +1040,7 @@ impl Codec for GenericCodec {
10361040
mod tests {
10371041
use super::*;
10381042

1039-
use futures::{
1040-
channel::{mpsc, oneshot},
1041-
executor::LocalPool,
1042-
task::Spawn,
1043-
};
1043+
use futures::{channel::oneshot, executor::LocalPool, task::Spawn};
10441044
use libp2p::{
10451045
core::{
10461046
transport::{MemoryTransport, Transport},
@@ -1112,7 +1112,7 @@ mod tests {
11121112
// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
11131113
let mut swarms = (0..2)
11141114
.map(|_| {
1115-
let (tx, mut rx) = mpsc::channel::<IncomingRequest>(64);
1115+
let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
11161116

11171117
pool.spawner()
11181118
.spawn_obj(
@@ -1215,7 +1215,7 @@ mod tests {
12151215
// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
12161216
let mut swarms = (0..2)
12171217
.map(|_| {
1218-
let (tx, mut rx) = mpsc::channel::<IncomingRequest>(64);
1218+
let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
12191219

12201220
pool.spawner()
12211221
.spawn_obj(
@@ -1353,8 +1353,8 @@ mod tests {
13531353
};
13541354

13551355
let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2, peerset) = {
1356-
let (tx_1, rx_1) = mpsc::channel(64);
1357-
let (tx_2, rx_2) = mpsc::channel(64);
1356+
let (tx_1, rx_1) = async_channel::bounded(64);
1357+
let (tx_2, rx_2) = async_channel::bounded(64);
13581358

13591359
let protocol_configs = vec![
13601360
ProtocolConfig {

client/network/sync/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ prost-build = "0.11"
1717

1818
[dependencies]
1919
array-bytes = "4.1"
20+
async-channel = "1.8.0"
2021
async-trait = "0.1.58"
2122
codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] }
2223
futures = "0.3.21"

0 commit comments

Comments
 (0)