Skip to content

Fix hanging when starting over 512 subgraphs #2354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ where
) {
let logger = self.logger_factory.subgraph_logger(&id);

// Blocking due to store interactions. Won't be blocking after #905.
match graph::spawn_blocking(Self::start_subgraph_inner(
match Self::start_subgraph_inner(
logger.clone(),
self.instances.clone(),
self.host_builder.clone(),
Expand All @@ -219,10 +218,8 @@ where
manifest,
self.metrics_registry.cheap_clone(),
self.link_resolver.cheap_clone(),
))
)
.await
.map_err(Error::from)
.and_then(|e| e)
{
Ok(()) => self.manager_metrics.subgraph_count.inc(),
Err(err) => error!(
Expand Down Expand Up @@ -355,7 +352,21 @@ where
&network,
&required_capabilities, e))?.clone();

store.start_subgraph_deployment(&logger, &manifest.id)?;
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok. In this commit I had to move start_subgraph_deployment to the same thread that does run_subgraph since, with copying, start_subgraph_deployment can take hours and would otherwise block the instance manager. (There's nothing to do for this PR, just an FYI)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thing it is in a blocking task then! Ideally it would use something like with_conn but it didn't seem so straightforward to refactor it, so that's left for another day.

let store = store.clone();
let logger = logger.clone();
let id = manifest.id.clone();

// `start_subgraph_deployment` is blocking.
tokio::task::spawn_blocking(move || {
store
.start_subgraph_deployment(&logger, &id)
.map_err(Error::from)
})
.await
.map_err(Error::from)
.and_then(|x| x)?;
}

// Clone the deployment ID for later
let deployment_id = manifest.id.clone();
Expand Down Expand Up @@ -414,7 +425,7 @@ where
templates,
},
state: IndexingState {
logger,
logger: logger.cheap_clone(),
instance,
instances,
log_filter,
Expand All @@ -435,17 +446,22 @@ where
// forward; this is easier than updating the existing block stream.
//
// This task has many calls to the store, so mark it as `blocking`.
graph::spawn_blocking(async move {
let res = run_subgraph(ctx).await;
graph::spawn_thread(deployment_id.to_string(), move || {
if let Err(e) = graph::block_on(run_subgraph(ctx)) {
error!(
&logger,
"Subgraph instance failed to run: {}",
format!("{:#}", e)
);
}
subgraph_metrics_unregister.unregister(registry);
res
});

Ok(())
}
}

async fn run_subgraph<B, T, S, C>(mut ctx: IndexingContext<B, T, S, C>) -> Result<(), ()>
async fn run_subgraph<B, T, S, C>(mut ctx: IndexingContext<B, T, S, C>) -> Result<(), Error>
where
B: BlockStreamBuilder,
T: RuntimeHostBuilder,
Expand Down Expand Up @@ -609,10 +625,7 @@ where
if first_run {
first_run = false;

ctx.inputs
.store
.unfail(&ctx.inputs.deployment_id)
.map_err(|_| ())?;
ctx.inputs.store.unfail(&ctx.inputs.deployment_id)?;
}

if needs_restart {
Expand All @@ -633,18 +646,13 @@ where
"Subgraph block stream shut down cleanly";
"id" => id_for_err.to_string(),
);
return Err(());
return Ok(());
}

// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
let message = format!("{:#}", e).replace("\n", "\t");
error!(
&logger,
"Subgraph instance failed to run: {}", message;
"id" => id_for_err.to_string(),
"code" => LogCode::SubgraphSyncingFailure
);
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);

let error = SubgraphError {
subgraph_id: id_for_err.clone(),
Expand All @@ -654,15 +662,12 @@ where
deterministic: e.is_deterministic(),
};

if let Err(e) = store_for_err.fail_subgraph(id_for_err.clone(), error).await {
error!(
&logger,
"Failed to set subgraph status to Failed: {}", e;
"id" => id_for_err.to_string(),
"code" => LogCode::SubgraphSyncingFailureNotRecorded
);
}
return Err(());
store_for_err
.fail_subgraph(id_for_err.clone(), error)
.await
.context("Failed to set subgraph status to `failed`")?;

return Err(err);
}
}
}
Expand Down Expand Up @@ -774,7 +779,7 @@ where
Err(MappingError::PossibleReorg(e)) => {
info!(ctx.state.logger,
"Possible reorg detected, retrying";
"error" => format!("{:?}", e.to_string()),
"error" => format!("{:#}", e),
"id" => ctx.inputs.deployment_id.to_string(),
);

Expand Down
3 changes: 1 addition & 2 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ where
let sender = sender.clone();
let logger = logger.clone();

// Blocking due to store interactions. Won't be blocking after #905.
graph::spawn_blocking(
graph::spawn(
start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender)),
);
}
Expand Down
2 changes: 1 addition & 1 deletion graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod mock {
/// Wrapper for spawning tasks that abort on panic, which is our default.
mod task_spawn;
pub use task_spawn::{
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic,
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, spawn_thread,
};

pub use bytes;
Expand Down
11 changes: 11 additions & 0 deletions graph/src/task_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,14 @@ pub fn spawn_blocking_allow_panic<R: 'static + Send>(
pub fn block_on<T>(f: impl Future03<Output = T>) -> T {
tokio::runtime::Handle::current().block_on(f)
}

/// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned.
pub fn spawn_thread(name: String, f: impl 'static + FnOnce() + Send) {
let conf = std::thread::Builder::new().name(name);
let runtime = tokio::runtime::Handle::current();
conf.spawn(move || {
let _runtime_guard = runtime.enter();
f()
})
.unwrap();
}
5 changes: 1 addition & 4 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ fn read_expensive_queries() -> Result<Vec<Arc<q::Document>>, std::io::Error> {
Ok(queries)
}

// Saturating the blocking threads can cause all sorts of issues, so set a large maximum.
// Ideally we'd use semaphores to not use more blocking threads than DB connections,
// but for now this is necessary.
#[tokio::main(worker_threads = 2000)]
#[tokio::main]
async fn main() {
env_logger::init();

Expand Down