Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 482244e

Browse files
committed
WIP Replace NetworkStatusSinks with Arch<NetworkService>
1 parent bc43ad4 commit 482244e

File tree

7 files changed

+70
-70
lines changed

7 files changed

+70
-70
lines changed

bin/node/cli/src/service.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ pub struct NewFullBase {
204204
pub task_manager: TaskManager,
205205
pub client: Arc<FullClient>,
206206
pub network: Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
207-
pub network_status_sinks: sc_service::NetworkStatusSinks<Block>,
208207
pub transaction_pool: Arc<sc_transaction_pool::FullPool<Block, FullClient>>,
209208
}
210209

@@ -242,7 +241,7 @@ pub fn new_full_base(
242241
)
243242
);
244243

245-
let (network, network_status_sinks, system_rpc_tx, network_starter) =
244+
let (network, system_rpc_tx, network_starter) =
246245
sc_service::build_network(sc_service::BuildNetworkParams {
247246
config: &config,
248247
client: client.clone(),
@@ -279,7 +278,6 @@ pub fn new_full_base(
279278
task_manager: &mut task_manager,
280279
on_demand: None,
281280
remote_blockchain: None,
282-
network_status_sinks: network_status_sinks.clone(),
283281
system_rpc_tx,
284282
telemetry: telemetry.as_mut(),
285283
},
@@ -415,7 +413,6 @@ pub fn new_full_base(
415413
task_manager,
416414
client,
417415
network,
418-
network_status_sinks,
419416
transaction_pool,
420417
})
421418
}
@@ -519,7 +516,7 @@ pub fn new_light_base(
519516
telemetry.as_ref().map(|x| x.handle()),
520517
)?;
521518

522-
let (network, network_status_sinks, system_rpc_tx, network_starter) =
519+
let (network, system_rpc_tx, network_starter) =
523520
sc_service::build_network(sc_service::BuildNetworkParams {
524521
config: &config,
525522
client: client.clone(),
@@ -554,7 +551,7 @@ pub fn new_light_base(
554551
client: client.clone(),
555552
transaction_pool: transaction_pool.clone(),
556553
keystore: keystore_container.sync_keystore(),
557-
config, backend, network_status_sinks, system_rpc_tx,
554+
config, backend, system_rpc_tx,
558555
network: network.clone(),
559556
task_manager: &mut task_manager,
560557
telemetry: telemetry.as_mut(),

client/informant/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ use futures::prelude::*;
2323
use log::{info, trace, warn};
2424
use parity_util_mem::MallocSizeOf;
2525
use sc_client_api::{BlockchainEvents, UsageProvider};
26-
use sc_network::NetworkStatus;
26+
use sc_network::NetworkService;
2727
use sp_blockchain::HeaderMetadata;
2828
use sp_runtime::traits::{Block as BlockT, Header};
2929
use sp_transaction_pool::TransactionPool;
30-
use sp_utils::{status_sinks, mpsc::tracing_unbounded};
30+
use sp_utils::mpsc::tracing_unbounded;
3131
use std::{fmt::Display, sync::Arc, time::Duration, collections::VecDeque};
3232

3333
mod display;
@@ -66,7 +66,7 @@ impl<T: TransactionPool + MallocSizeOf> TransactionPoolAndMaybeMallogSizeOf for
6666
/// Builds the informant and returns a `Future` that drives the informant.
6767
pub fn build<B: BlockT, C>(
6868
client: Arc<C>,
69-
network_status_sinks: Arc<status_sinks::StatusSinks<NetworkStatus<B>>>,
69+
network: Arc<NetworkService<B, <B as BlockT>::Hash>>,
7070
pool: Arc<impl TransactionPoolAndMaybeMallogSizeOf>,
7171
format: OutputFormat,
7272
) -> impl futures::Future<Output = ()>
@@ -78,7 +78,7 @@ where
7878

7979
let client_1 = client.clone();
8080
let (network_status_sink, network_status_stream) = tracing_unbounded("mpsc_network_status");
81-
network_status_sinks.push(Duration::from_millis(5000), network_status_sink);
81+
network.status_sinks.status.push(Duration::from_millis(5000), network_status_sink);
8282

8383
let display_notifications = network_status_stream
8484
.for_each(move |net_status| {

client/network/src/lib.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ pub use service::{
276276

277277
pub use sc_peerset::ReputationChange;
278278
use sp_runtime::traits::{Block as BlockT, NumberFor};
279+
use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver}};
280+
use crate::network_state::NetworkState;
281+
use std::sync::Arc;
282+
use std::time::Duration;
279283

280284
/// The maximum allowed number of established connections per peer.
281285
///
@@ -322,3 +326,41 @@ pub struct NetworkStatus<B: BlockT> {
322326
/// The total number of bytes sent.
323327
pub total_bytes_outbound: u64,
324328
}
329+
330+
/// Sinks to propagate network status updates.
331+
/// For each element, every time the `Interval` fires we push an element on the sender.
332+
#[derive(Clone)]
333+
pub struct NetworkStatusSinks<Block: BlockT> {
334+
/// TODO
335+
pub status: Arc<status_sinks::StatusSinks<NetworkStatus<Block>>>,
336+
/// TODO
337+
pub state: Arc<status_sinks::StatusSinks<NetworkState>>,
338+
}
339+
340+
impl<Block: BlockT> NetworkStatusSinks<Block> {
341+
fn new() -> Self {
342+
Self {
343+
status: Arc::new(status_sinks::StatusSinks::new()),
344+
state: Arc::new(status_sinks::StatusSinks::new()),
345+
}
346+
}
347+
348+
/// Returns a receiver that periodically yields a [`NetworkStatus`].
349+
pub fn status_stream(&self, interval: Duration)
350+
-> TracingUnboundedReceiver<NetworkStatus<Block>>
351+
{
352+
let (sink, stream) = tracing_unbounded("mpsc_network_status");
353+
self.status.push(interval, sink);
354+
stream
355+
}
356+
357+
/// Returns a receiver that periodically yields a [`NetworkState`].
358+
pub fn state_stream(&self, interval: Duration)
359+
-> TracingUnboundedReceiver<NetworkState>
360+
{
361+
let (sink, stream) = tracing_unbounded("mpsc_network_state");
362+
self.state.push(interval, sink);
363+
stream
364+
}
365+
366+
}

client/network/src/service.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
//! which is then processed by [`NetworkWorker::poll`].
2929
3030
use crate::{
31-
ExHashT, NetworkStateInfo, NetworkStatus,
31+
ExHashT, NetworkStateInfo, NetworkStatus, NetworkStatusSinks,
3232
behaviour::{self, Behaviour, BehaviourOut},
3333
config::{parse_str_addr, Params, TransportConfig},
3434
DhtEvent,
@@ -123,6 +123,8 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
123123
local_peer_id: PeerId,
124124
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
125125
bandwidth: Arc<transport::BandwidthSinks>,
126+
/// Sinks to propagate network status updates.
127+
pub status_sinks: NetworkStatusSinks<B>,
126128
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
127129
/// nodes it should be connected to or not.
128130
peerset: PeersetHandle,
@@ -400,9 +402,11 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
400402

401403
let external_addresses = Arc::new(Mutex::new(Vec::new()));
402404
let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new()));
405+
let status_sinks = NetworkStatusSinks::new();
403406

404407
let service = Arc::new(NetworkService {
405408
bandwidth,
409+
status_sinks,
406410
external_addresses: external_addresses.clone(),
407411
num_connected: num_connected.clone(),
408412
is_major_syncing: is_major_syncing.clone(),

client/service/src/builder.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

1919
use crate::{
20-
error::Error, MallocSizeOfWasm, RpcHandlers, NetworkStatusSinks,
20+
error::Error, MallocSizeOfWasm, RpcHandlers,
2121
start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle,
2222
metrics::MetricsService,
2323
client::{light, Client, ClientConfig},
@@ -504,8 +504,6 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
504504
pub remote_blockchain: Option<Arc<dyn RemoteBlockchain<TBl>>>,
505505
/// A shared network instance.
506506
pub network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
507-
/// Sinks to propagate network status updates.
508-
pub network_status_sinks: NetworkStatusSinks<TBl>,
509507
/// A Sender for RPC requests.
510508
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
511509
/// Telemetry instance for this node.
@@ -575,7 +573,6 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
575573
rpc_extensions_builder,
576574
remote_blockchain,
577575
network,
578-
network_status_sinks,
579576
system_rpc_tx,
580577
telemetry,
581578
} = params;
@@ -639,7 +636,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
639636
metrics_service.run(
640637
client.clone(),
641638
transaction_pool.clone(),
642-
network_status_sinks.clone()
639+
network.clone()
643640
)
644641
);
645642

@@ -664,7 +661,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
664661
// Spawn informant task
665662
spawn_handle.spawn("informant", sc_informant::build(
666663
client.clone(),
667-
network_status_sinks.status.clone(),
664+
network.clone(),
668665
transaction_pool.clone(),
669666
config.informant_output_format,
670667
));
@@ -850,7 +847,6 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
850847
) -> Result<
851848
(
852849
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
853-
NetworkStatusSinks<TBl>,
854850
TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
855851
NetworkStarter,
856852
),
@@ -944,15 +940,13 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
944940
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
945941
let network_mut = sc_network::NetworkWorker::new(network_params)?;
946942
let network = network_mut.service().clone();
947-
let network_status_sinks = NetworkStatusSinks::new();
948943

949944
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
950945

951946
let future = build_network_future(
952947
config.role.clone(),
953948
network_mut,
954949
client,
955-
network_status_sinks.clone(),
956950
system_rpc_rx,
957951
has_bootnodes,
958952
config.announce_block,
@@ -995,7 +989,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
995989
future.await
996990
});
997991

998-
Ok((network, network_status_sinks, system_rpc_tx, NetworkStarter(network_start_tx)))
992+
Ok((network, system_rpc_tx, NetworkStarter(network_start_tx)))
999993
}
1000994

1001995
/// Object used to start the network.

client/service/src/lib.rs

Lines changed: 7 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ mod task_manager;
3737
use std::{io, pin::Pin};
3838
use std::net::SocketAddr;
3939
use std::collections::HashMap;
40-
use std::time::Duration;
4140
use std::task::Poll;
4241

4342
use futures::{Future, FutureExt, Stream, StreamExt, stream, compat::*};
@@ -47,7 +46,7 @@ use codec::{Encode, Decode};
4746
use sp_runtime::generic::BlockId;
4847
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
4948
use parity_util_mem::MallocSizeOf;
50-
use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver}};
49+
use sp_utils::mpsc::TracingUnboundedReceiver;
5150

5251
pub use self::error::Error;
5352
pub use self::builder::{
@@ -124,42 +123,6 @@ impl RpcHandlers {
124123
}
125124
}
126125

127-
/// Sinks to propagate network status updates.
128-
/// For each element, every time the `Interval` fires we push an element on the sender.
129-
#[derive(Clone)]
130-
pub struct NetworkStatusSinks<Block: BlockT> {
131-
status: Arc<status_sinks::StatusSinks<NetworkStatus<Block>>>,
132-
state: Arc<status_sinks::StatusSinks<NetworkState>>,
133-
}
134-
135-
impl<Block: BlockT> NetworkStatusSinks<Block> {
136-
fn new() -> Self {
137-
Self {
138-
status: Arc::new(status_sinks::StatusSinks::new()),
139-
state: Arc::new(status_sinks::StatusSinks::new()),
140-
}
141-
}
142-
143-
/// Returns a receiver that periodically yields a [`NetworkStatus`].
144-
pub fn status_stream(&self, interval: Duration)
145-
-> TracingUnboundedReceiver<NetworkStatus<Block>>
146-
{
147-
let (sink, stream) = tracing_unbounded("mpsc_network_status");
148-
self.status.push(interval, sink);
149-
stream
150-
}
151-
152-
/// Returns a receiver that periodically yields a [`NetworkState`].
153-
pub fn state_stream(&self, interval: Duration)
154-
-> TracingUnboundedReceiver<NetworkState>
155-
{
156-
let (sink, stream) = tracing_unbounded("mpsc_network_state");
157-
self.state.push(interval, sink);
158-
stream
159-
}
160-
161-
}
162-
163126
/// An incomplete set of chain components, but enough to run the chain ops subcommands.
164127
pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
165128
/// A shared client instance.
@@ -191,7 +154,6 @@ async fn build_network_future<
191154
role: Role,
192155
mut network: sc_network::NetworkWorker<B, H>,
193156
client: Arc<C>,
194-
status_sinks: NetworkStatusSinks<B>,
195157
mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
196158
should_have_peers: bool,
197159
announce_imported_blocks: bool,
@@ -340,13 +302,13 @@ async fn build_network_future<
340302
// detailed state information of the network on what are called
341303
// "status sinks".
342304

343-
status_sink = status_sinks.status.next().fuse() => {
344-
status_sink.send(network.status());
345-
}
305+
// FIXME status_sink = network.service().status_sinks.status.next().fuse() => {
306+
// status_sink.send(network.status());
307+
//}
346308

347-
state_sink = status_sinks.state.next().fuse() => {
348-
state_sink.send(network.network_state());
349-
}
309+
// FIXME state_sink = network.service().status_sinks.state.next().fuse() => {
310+
// state_sink.send(network.network_state());
311+
//}
350312
}
351313
}
352314
}

client/service/src/metrics.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use std::{convert::TryFrom, time::SystemTime};
2020

21-
use crate::{NetworkStatus, NetworkState, NetworkStatusSinks, config::Configuration};
21+
use crate::{NetworkStatus, NetworkState, config::Configuration};
2222
use futures_timer::Delay;
2323
use prometheus_endpoint::{register, Gauge, U64, Registry, PrometheusError, Opts, GaugeVec};
2424
use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO};
@@ -29,6 +29,7 @@ use sp_utils::metrics::register_globals;
2929
use sp_utils::mpsc::TracingUnboundedReceiver;
3030
use sc_client_api::{ClientInfo, UsageProvider};
3131
use sc_network::config::Role;
32+
use sc_network::NetworkService;
3233
use std::sync::Arc;
3334
use std::time::Duration;
3435
use wasm_timer::Instant;
@@ -163,7 +164,7 @@ impl MetricsService {
163164
mut self,
164165
client: Arc<TCl>,
165166
transactions: Arc<TExPool>,
166-
network: NetworkStatusSinks<TBl>,
167+
network: Arc<NetworkService<TBl, <TBl as Block>::Hash>>,
167168
) where
168169
TBl: Block,
169170
TCl: ProvideRuntimeApi<TBl> + UsageProvider<TBl>,
@@ -177,8 +178,8 @@ impl MetricsService {
177178
let net_state_interval = Duration::from_secs(30);
178179

179180
// Source of network information.
180-
let mut net_status_rx = Some(network.status_stream(net_status_interval));
181-
let mut net_state_rx = Some(network.state_stream(net_state_interval));
181+
let mut net_status_rx = Some(network.status_sinks.status_stream(net_status_interval));
182+
let mut net_state_rx = Some(network.status_sinks.state_stream(net_state_interval));
182183

183184
loop {
184185
// Wait for the next tick of the timer.

0 commit comments

Comments
 (0)