diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index c4982a243e6..ab0cdbce3fe 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -446,8 +446,10 @@ where // forward; this is easier than updating the existing block stream. // // This task has many calls to the store, so mark it as `blocking`. - graph::spawn_thread(deployment_id.to_string(), move || { - if let Err(e) = graph::block_on(run_subgraph(ctx)) { + // This call is the reason why the size of the blocking thread pool + // size must always be well above the number of deployed subgraphs. + graph::spawn_blocking(async move { + if let Err(e) = run_subgraph(ctx).await { error!( &logger, "Subgraph instance failed to run: {}", diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 49e188f163b..45626bb1bba 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -154,3 +154,8 @@ those. given the other load management configuration settings, but never actually decline to run a query, instead log about load management decisions. Set to `true` to turn simulation on, defaults to `false` +- `GRAPH_MAX_BLOCKING_THREADS`: Maximum number of blocking threads in the tokio blocking thread + pool. In an index node this should always be well above the number of subgraphs deployed to it, + because each subgraph permanently takes up one thread. Graphql queries are currently also run on + the blocking thread pool, but the DB connection pool size is usually the limiting factor for + queries. Defaults to 2000. \ No newline at end of file diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 45ef0885668..3f7e3cac1d0 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -27,7 +27,7 @@ pub mod mock { /// Wrapper for spawning tasks that abort on panic, which is our default. mod task_spawn; pub use task_spawn::{ - block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, spawn_thread, + block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, }; pub use bytes; diff --git a/graph/src/task_spawn.rs b/graph/src/task_spawn.rs index f886beaff22..913641b569a 100644 --- a/graph/src/task_spawn.rs +++ b/graph/src/task_spawn.rs @@ -54,14 +54,3 @@ pub fn spawn_blocking_allow_panic( pub fn block_on(f: impl Future03) -> T { tokio::runtime::Handle::current().block_on(f) } - -/// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned. -pub fn spawn_thread(name: String, f: impl 'static + FnOnce() + Send) { - let conf = std::thread::Builder::new().name(name); - let runtime = tokio::runtime::Handle::current(); - conf.spawn(move || { - let _runtime_guard = runtime.enter(); - f() - }) - .unwrap(); -} diff --git a/node/src/main.rs b/node/src/main.rs index c97c6dd5e46..8bdd7f20d21 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -55,6 +55,15 @@ lazy_static! { .map(|s| BlockNumber::from_str(&s) .unwrap_or_else(|_| panic!("failed to parse env var ETHEREUM_ANCESTOR_COUNT"))) .unwrap_or(50); + + // Maximum number of blocking threads in the tokio blocking thread pool. This should always be + // well above the number of subgraphs deployed to an index node, because each subgraph takes up + // one thread. Defaults to 2000. + static ref MAX_BLOCKING_THREADS: usize = env::var("GRAPH_MAX_BLOCKING_THREADS") + .ok() + .map(|s| usize::from_str(&s) + .unwrap_or_else(|_| panic!("failed to parse env var ETHEREUM_ANCESTOR_COUNT"))) + .unwrap_or(2000); } /// How long we will hold up node startup to get the net version and genesis @@ -93,8 +102,16 @@ fn read_expensive_queries() -> Result>, std::io::Error> { Ok(queries) } -#[tokio::main] -async fn main() { +fn main() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .max_blocking_threads(*MAX_BLOCKING_THREADS) + .build() + .unwrap() + .block_on(async { async_main().await }) +} + +async fn async_main() { env_logger::init(); // Allow configuring fail points on debug builds. Used for integration tests.