Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 33e6029

Browse files
authored
Ensure that we inform all tasks to stop before starting the 60 seconds shutdown (#12897)
* Ensure that we inform all tasks to stop before starting the 60 seconds shutdown The change of waiting in maximum 60 seconds for the node to shutdown actually introduced a bug. We were actually waiting always 60 seconds as we didn't informed our tasks to shutdown. The solution to this problem is to drop the task manager as this will then inform all tasks to end. It also adds tests to ensure that the behaviors work as expected. (This should already have been done in the first pr! :() * ".git/.scripts/fmt.sh" 1 Co-authored-by: command-bot <>
1 parent 15cfd9c commit 33e6029

File tree

3 files changed

+213
-0
lines changed

3 files changed

+213
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ sp-version = { version = "5.0.0", path = "../../primitives/version" }
4949

5050
[dev-dependencies]
5151
tempfile = "3.1.0"
52+
futures-timer = "3.0.1"
5253

5354
[features]
5455
default = ["rocksdb"]

client/cli/src/runner.rs

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,14 @@ impl<C: SubstrateCli> Runner<C> {
145145
E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
146146
{
147147
self.print_node_infos();
148+
148149
let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
149150
let res = self.tokio_runtime.block_on(main(task_manager.future().fuse()));
151+
// We need to drop the task manager here to inform all tasks that they should shut down.
152+
//
153+
// This is important to be done before we instruct the tokio runtime to shutdown. Otherwise
154+
// the tokio runtime will wait the full 60 seconds for all tasks to stop.
155+
drop(task_manager);
150156

151157
// Give all futures 60 seconds to shutdown, before tokio "leaks" them.
152158
self.tokio_runtime.shutdown_timeout(Duration::from_secs(60));
@@ -208,3 +214,208 @@ pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
208214
);
209215
info!("⛓ Native runtime: {}", C::native_runtime_version(&config.chain_spec));
210216
}
217+
218+
#[cfg(test)]
219+
mod tests {
220+
use std::{
221+
path::PathBuf,
222+
sync::atomic::{AtomicU64, Ordering},
223+
};
224+
225+
use sc_network::config::NetworkConfiguration;
226+
use sc_service::{Arc, ChainType, GenericChainSpec, NoExtension};
227+
use sp_runtime::create_runtime_str;
228+
use sp_version::create_apis_vec;
229+
230+
use super::*;
231+
232+
struct Cli;
233+
234+
impl SubstrateCli for Cli {
235+
fn author() -> String {
236+
"test".into()
237+
}
238+
239+
fn impl_name() -> String {
240+
"yep".into()
241+
}
242+
243+
fn impl_version() -> String {
244+
"version".into()
245+
}
246+
247+
fn description() -> String {
248+
"desc".into()
249+
}
250+
251+
fn support_url() -> String {
252+
"no.pe".into()
253+
}
254+
255+
fn copyright_start_year() -> i32 {
256+
2042
257+
}
258+
259+
fn load_spec(
260+
&self,
261+
_: &str,
262+
) -> std::result::Result<Box<dyn sc_service::ChainSpec>, String> {
263+
Err("nope".into())
264+
}
265+
266+
fn native_runtime_version(
267+
_: &Box<dyn sc_service::ChainSpec>,
268+
) -> &'static sp_version::RuntimeVersion {
269+
const VERSION: sp_version::RuntimeVersion = sp_version::RuntimeVersion {
270+
spec_name: create_runtime_str!("spec"),
271+
impl_name: create_runtime_str!("name"),
272+
authoring_version: 0,
273+
spec_version: 0,
274+
impl_version: 0,
275+
apis: create_apis_vec!([]),
276+
transaction_version: 2,
277+
state_version: 0,
278+
};
279+
280+
&VERSION
281+
}
282+
}
283+
284+
fn create_runner() -> Runner<Cli> {
285+
let runtime = build_runtime().unwrap();
286+
287+
let runner = Runner::new(
288+
Configuration {
289+
impl_name: "spec".into(),
290+
impl_version: "3".into(),
291+
role: sc_service::Role::Authority,
292+
tokio_handle: runtime.handle().clone(),
293+
transaction_pool: Default::default(),
294+
network: NetworkConfiguration::new_memory(),
295+
keystore: sc_service::config::KeystoreConfig::InMemory,
296+
keystore_remote: None,
297+
database: sc_client_db::DatabaseSource::ParityDb { path: PathBuf::from("db") },
298+
trie_cache_maximum_size: None,
299+
state_pruning: None,
300+
blocks_pruning: sc_client_db::BlocksPruning::KeepAll,
301+
chain_spec: Box::new(GenericChainSpec::from_genesis(
302+
"test",
303+
"test_id",
304+
ChainType::Development,
305+
|| unimplemented!("Not required in tests"),
306+
Vec::new(),
307+
None,
308+
None,
309+
None,
310+
None,
311+
NoExtension::None,
312+
)),
313+
wasm_method: Default::default(),
314+
wasm_runtime_overrides: None,
315+
execution_strategies: Default::default(),
316+
rpc_http: None,
317+
rpc_ws: None,
318+
rpc_ipc: None,
319+
rpc_ws_max_connections: None,
320+
rpc_cors: None,
321+
rpc_methods: Default::default(),
322+
rpc_max_payload: None,
323+
rpc_max_request_size: None,
324+
rpc_max_response_size: None,
325+
rpc_id_provider: None,
326+
rpc_max_subs_per_conn: None,
327+
ws_max_out_buffer_capacity: None,
328+
prometheus_config: None,
329+
telemetry_endpoints: None,
330+
default_heap_pages: None,
331+
offchain_worker: Default::default(),
332+
force_authoring: false,
333+
disable_grandpa: false,
334+
dev_key_seed: None,
335+
tracing_targets: None,
336+
tracing_receiver: Default::default(),
337+
max_runtime_instances: 8,
338+
announce_block: true,
339+
base_path: None,
340+
informant_output_format: Default::default(),
341+
runtime_cache_size: 2,
342+
},
343+
runtime,
344+
)
345+
.unwrap();
346+
347+
runner
348+
}
349+
350+
#[test]
351+
fn ensure_run_until_exit_informs_tasks_to_end() {
352+
let runner = create_runner();
353+
354+
let counter = Arc::new(AtomicU64::new(0));
355+
let counter2 = counter.clone();
356+
357+
runner
358+
.run_node_until_exit(move |cfg| async move {
359+
let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
360+
let (sender, receiver) = futures::channel::oneshot::channel();
361+
362+
// We need to use `spawn_blocking` here so that we get a dedicated thread for our
363+
// future. This is important for this test, as otherwise tokio can just "drop" the
364+
// future.
365+
task_manager.spawn_handle().spawn_blocking("test", None, async move {
366+
let _ = sender.send(());
367+
loop {
368+
counter2.fetch_add(1, Ordering::Relaxed);
369+
futures_timer::Delay::new(Duration::from_millis(50)).await;
370+
}
371+
});
372+
373+
task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
374+
// Let's stop this essential task directly when our other task started.
375+
// It will signal that the task manager should end.
376+
let _ = receiver.await;
377+
});
378+
379+
Ok::<_, sc_service::Error>(task_manager)
380+
})
381+
.unwrap_err();
382+
383+
let count = counter.load(Ordering::Relaxed);
384+
385+
// Ensure that our counting task was running for less than 30 seconds.
386+
// It should be directly killed, but for CI and whatever we are being a little bit more
387+
// "relaxed".
388+
assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
389+
}
390+
391+
/// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60
392+
/// seconds, aka doesn't wait until they are finished (which may never happen).
393+
#[test]
394+
fn ensure_run_until_exit_is_not_blocking_indefinitely() {
395+
let runner = create_runner();
396+
397+
runner
398+
.run_node_until_exit(move |cfg| async move {
399+
let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
400+
let (sender, receiver) = futures::channel::oneshot::channel();
401+
402+
// We need to use `spawn_blocking` here so that we get a dedicated thread for our
403+
// future. This future is more blocking code that will never end.
404+
task_manager.spawn_handle().spawn_blocking("test", None, async move {
405+
let _ = sender.send(());
406+
loop {
407+
std::thread::sleep(Duration::from_secs(30));
408+
}
409+
});
410+
411+
task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
412+
// Let's stop this essential task directly when our other task started.
413+
// It will signal that the task manager should end.
414+
let _ = receiver.await;
415+
});
416+
417+
Ok::<_, sc_service::Error>(task_manager)
418+
})
419+
.unwrap_err();
420+
}
421+
}

0 commit comments

Comments
 (0)