Skip to content

Go back to running subgraphs with spawn_blocking #2357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,10 @@ where
// forward; this is easier than updating the existing block stream.
//
// This task has many calls to the store, so mark it as `blocking`.
graph::spawn_thread(deployment_id.to_string(), move || {
if let Err(e) = graph::block_on(run_subgraph(ctx)) {
// This call is the reason why the size of the blocking thread pool
// size must always be well above the number of deployed subgraphs.
graph::spawn_blocking(async move {
if let Err(e) = run_subgraph(ctx).await {
error!(
&logger,
"Subgraph instance failed to run: {}",
Expand Down
5 changes: 5 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,8 @@ those.
given the other load management configuration settings, but never
actually decline to run a query, instead log about load management
decisions. Set to `true` to turn simulation on, defaults to `false`
- `GRAPH_MAX_BLOCKING_THREADS`: Maximum number of blocking threads in the tokio blocking thread
pool. In an index node this should always be well above the number of subgraphs deployed to it,
because each subgraph permanently takes up one thread. Graphql queries are currently also run on
the blocking thread pool, but the DB connection pool size is usually the limiting factor for
queries. Defaults to 2000.
2 changes: 1 addition & 1 deletion graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod mock {
/// Wrapper for spawning tasks that abort on panic, which is our default.
mod task_spawn;
pub use task_spawn::{
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, spawn_thread,
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic,
};

pub use bytes;
Expand Down
11 changes: 0 additions & 11 deletions graph/src/task_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,3 @@ pub fn spawn_blocking_allow_panic<R: 'static + Send>(
pub fn block_on<T>(f: impl Future03<Output = T>) -> T {
tokio::runtime::Handle::current().block_on(f)
}

/// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned.
pub fn spawn_thread(name: String, f: impl 'static + FnOnce() + Send) {
let conf = std::thread::Builder::new().name(name);
let runtime = tokio::runtime::Handle::current();
conf.spawn(move || {
let _runtime_guard = runtime.enter();
f()
})
.unwrap();
}
21 changes: 19 additions & 2 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ lazy_static! {
.map(|s| BlockNumber::from_str(&s)
.unwrap_or_else(|_| panic!("failed to parse env var ETHEREUM_ANCESTOR_COUNT")))
.unwrap_or(50);

// Maximum number of blocking threads in the tokio blocking thread pool. This should always be
// well above the number of subgraphs deployed to an index node, because each subgraph takes up
// one thread. Defaults to 2000.
static ref MAX_BLOCKING_THREADS: usize = env::var("GRAPH_MAX_BLOCKING_THREADS")
.ok()
.map(|s| usize::from_str(&s)
.unwrap_or_else(|_| panic!("failed to parse env var ETHEREUM_ANCESTOR_COUNT")))
.unwrap_or(2000);
}

/// How long we will hold up node startup to get the net version and genesis
Expand Down Expand Up @@ -93,8 +102,16 @@ fn read_expensive_queries() -> Result<Vec<Arc<q::Document>>, std::io::Error> {
Ok(queries)
}

#[tokio::main]
async fn main() {
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.max_blocking_threads(*MAX_BLOCKING_THREADS)
.build()
.unwrap()
.block_on(async { async_main().await })
}

async fn async_main() {
env_logger::init();

// Allow configuring fail points on debug builds. Used for integration tests.
Expand Down