diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index aa76139fb47..c4982a243e6 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -206,8 +206,7 @@ where ) { let logger = self.logger_factory.subgraph_logger(&id); - // Blocking due to store interactions. Won't be blocking after #905. - match graph::spawn_blocking(Self::start_subgraph_inner( + match Self::start_subgraph_inner( logger.clone(), self.instances.clone(), self.host_builder.clone(), @@ -219,10 +218,8 @@ where manifest, self.metrics_registry.cheap_clone(), self.link_resolver.cheap_clone(), - )) + ) .await - .map_err(Error::from) - .and_then(|e| e) { Ok(()) => self.manager_metrics.subgraph_count.inc(), Err(err) => error!( @@ -355,7 +352,21 @@ where &network, &required_capabilities, e))?.clone(); - store.start_subgraph_deployment(&logger, &manifest.id)?; + { + let store = store.clone(); + let logger = logger.clone(); + let id = manifest.id.clone(); + + // `start_subgraph_deployment` is blocking. + tokio::task::spawn_blocking(move || { + store + .start_subgraph_deployment(&logger, &id) + .map_err(Error::from) + }) + .await + .map_err(Error::from) + .and_then(|x| x)?; + } // Clone the deployment ID for later let deployment_id = manifest.id.clone(); @@ -414,7 +425,7 @@ where templates, }, state: IndexingState { - logger, + logger: logger.cheap_clone(), instance, instances, log_filter, @@ -435,17 +446,22 @@ where // forward; this is easier than updating the existing block stream. // // This task has many calls to the store, so mark it as `blocking`. - graph::spawn_blocking(async move { - let res = run_subgraph(ctx).await; + graph::spawn_thread(deployment_id.to_string(), move || { + if let Err(e) = graph::block_on(run_subgraph(ctx)) { + error!( + &logger, + "Subgraph instance failed to run: {}", + format!("{:#}", e) + ); + } subgraph_metrics_unregister.unregister(registry); - res }); Ok(()) } } -async fn run_subgraph(mut ctx: IndexingContext) -> Result<(), ()> +async fn run_subgraph(mut ctx: IndexingContext) -> Result<(), Error> where B: BlockStreamBuilder, T: RuntimeHostBuilder, @@ -609,10 +625,7 @@ where if first_run { first_run = false; - ctx.inputs - .store - .unfail(&ctx.inputs.deployment_id) - .map_err(|_| ())?; + ctx.inputs.store.unfail(&ctx.inputs.deployment_id)?; } if needs_restart { @@ -633,18 +646,13 @@ where "Subgraph block stream shut down cleanly"; "id" => id_for_err.to_string(), ); - return Err(()); + return Ok(()); } // Handle unexpected stream errors by marking the subgraph as failed. Err(e) => { let message = format!("{:#}", e).replace("\n", "\t"); - error!( - &logger, - "Subgraph instance failed to run: {}", message; - "id" => id_for_err.to_string(), - "code" => LogCode::SubgraphSyncingFailure - ); + let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure); let error = SubgraphError { subgraph_id: id_for_err.clone(), @@ -654,15 +662,12 @@ where deterministic: e.is_deterministic(), }; - if let Err(e) = store_for_err.fail_subgraph(id_for_err.clone(), error).await { - error!( - &logger, - "Failed to set subgraph status to Failed: {}", e; - "id" => id_for_err.to_string(), - "code" => LogCode::SubgraphSyncingFailureNotRecorded - ); - } - return Err(()); + store_for_err + .fail_subgraph(id_for_err.clone(), error) + .await + .context("Failed to set subgraph status to `failed`")?; + + return Err(err); } } } @@ -774,7 +779,7 @@ where Err(MappingError::PossibleReorg(e)) => { info!(ctx.state.logger, "Possible reorg detected, retrying"; - "error" => format!("{:?}", e.to_string()), + "error" => format!("{:#}", e), "id" => ctx.inputs.deployment_id.to_string(), ); diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 169a506c282..17a06f84cae 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -242,8 +242,7 @@ where let sender = sender.clone(); let logger = logger.clone(); - // Blocking due to store interactions. Won't be blocking after #905. - graph::spawn_blocking( + graph::spawn( start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender)), ); } diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 3f7e3cac1d0..45ef0885668 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -27,7 +27,7 @@ pub mod mock { /// Wrapper for spawning tasks that abort on panic, which is our default. mod task_spawn; pub use task_spawn::{ - block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, + block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, spawn_thread, }; pub use bytes; diff --git a/graph/src/task_spawn.rs b/graph/src/task_spawn.rs index 913641b569a..f886beaff22 100644 --- a/graph/src/task_spawn.rs +++ b/graph/src/task_spawn.rs @@ -54,3 +54,14 @@ pub fn spawn_blocking_allow_panic( pub fn block_on(f: impl Future03) -> T { tokio::runtime::Handle::current().block_on(f) } + +/// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned. +pub fn spawn_thread(name: String, f: impl 'static + FnOnce() + Send) { + let conf = std::thread::Builder::new().name(name); + let runtime = tokio::runtime::Handle::current(); + conf.spawn(move || { + let _runtime_guard = runtime.enter(); + f() + }) + .unwrap(); +} diff --git a/node/src/main.rs b/node/src/main.rs index 12483952d6e..0dc4dc63331 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -93,10 +93,7 @@ fn read_expensive_queries() -> Result>, std::io::Error> { Ok(queries) } -// Saturating the blocking threads can cause all sorts of issues, so set a large maximum. -// Ideally we'd use semaphores to not use more blocking threads than DB connections, -// but for now this is necessary. -#[tokio::main(worker_threads = 2000)] +#[tokio::main] async fn main() { env_logger::init();