Skip to content

Commit 67d742d

Browse files
committed
*: Fix hanging when starting over 500 subgraphs
1 parent b5bdf9a commit 67d742d

File tree

5 files changed

+51
-39
lines changed

5 files changed

+51
-39
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,7 @@ where
206206
) {
207207
let logger = self.logger_factory.subgraph_logger(&id);
208208

209-
// Blocking due to store interactions. Won't be blocking after #905.
210-
match graph::spawn_blocking(Self::start_subgraph_inner(
209+
match Self::start_subgraph_inner(
211210
logger.clone(),
212211
self.instances.clone(),
213212
self.host_builder.clone(),
@@ -219,10 +218,8 @@ where
219218
manifest,
220219
self.metrics_registry.cheap_clone(),
221220
self.link_resolver.cheap_clone(),
222-
))
221+
)
223222
.await
224-
.map_err(Error::from)
225-
.and_then(|e| e)
226223
{
227224
Ok(()) => self.manager_metrics.subgraph_count.inc(),
228225
Err(err) => error!(
@@ -355,7 +352,21 @@ where
355352
&network,
356353
&required_capabilities, e))?.clone();
357354

358-
store.start_subgraph_deployment(&logger, &manifest.id)?;
355+
{
356+
let store = store.clone();
357+
let logger = logger.clone();
358+
let id = manifest.id.clone();
359+
360+
// `start_subgraph_deployment` is blocking.
361+
tokio::task::spawn_blocking(move || {
362+
store
363+
.start_subgraph_deployment(&logger, &id)
364+
.map_err(Error::from)
365+
})
366+
.await
367+
.map_err(Error::from)
368+
.and_then(|x| x)?;
369+
}
359370

360371
// Clone the deployment ID for later
361372
let deployment_id = manifest.id.clone();
@@ -414,7 +425,7 @@ where
414425
templates,
415426
},
416427
state: IndexingState {
417-
logger,
428+
logger: logger.cheap_clone(),
418429
instance,
419430
instances,
420431
log_filter,
@@ -435,17 +446,22 @@ where
435446
// forward; this is easier than updating the existing block stream.
436447
//
437448
// This task has many calls to the store, so mark it as `blocking`.
438-
graph::spawn_blocking(async move {
439-
let res = run_subgraph(ctx).await;
449+
graph::spawn_thread(deployment_id.to_string(), move || {
450+
if let Err(e) = graph::block_on(run_subgraph(ctx)) {
451+
error!(
452+
&logger,
453+
"Subgraph instance failed to run: {}",
454+
format!("{:#}", e)
455+
);
456+
}
440457
subgraph_metrics_unregister.unregister(registry);
441-
res
442458
});
443459

444460
Ok(())
445461
}
446462
}
447463

448-
async fn run_subgraph<B, T, S, C>(mut ctx: IndexingContext<B, T, S, C>) -> Result<(), ()>
464+
async fn run_subgraph<B, T, S, C>(mut ctx: IndexingContext<B, T, S, C>) -> Result<(), Error>
449465
where
450466
B: BlockStreamBuilder,
451467
T: RuntimeHostBuilder,
@@ -609,10 +625,7 @@ where
609625
if first_run {
610626
first_run = false;
611627

612-
ctx.inputs
613-
.store
614-
.unfail(&ctx.inputs.deployment_id)
615-
.map_err(|_| ())?;
628+
ctx.inputs.store.unfail(&ctx.inputs.deployment_id)?;
616629
}
617630

618631
if needs_restart {
@@ -633,18 +646,13 @@ where
633646
"Subgraph block stream shut down cleanly";
634647
"id" => id_for_err.to_string(),
635648
);
636-
return Err(());
649+
return Ok(());
637650
}
638651

639652
// Handle unexpected stream errors by marking the subgraph as failed.
640653
Err(e) => {
641654
let message = format!("{:#}", e).replace("\n", "\t");
642-
error!(
643-
&logger,
644-
"Subgraph instance failed to run: {}", message;
645-
"id" => id_for_err.to_string(),
646-
"code" => LogCode::SubgraphSyncingFailure
647-
);
655+
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);
648656

649657
let error = SubgraphError {
650658
subgraph_id: id_for_err.clone(),
@@ -654,15 +662,12 @@ where
654662
deterministic: e.is_deterministic(),
655663
};
656664

657-
if let Err(e) = store_for_err.fail_subgraph(id_for_err.clone(), error).await {
658-
error!(
659-
&logger,
660-
"Failed to set subgraph status to Failed: {}", e;
661-
"id" => id_for_err.to_string(),
662-
"code" => LogCode::SubgraphSyncingFailureNotRecorded
663-
);
664-
}
665-
return Err(());
665+
store_for_err
666+
.fail_subgraph(id_for_err.clone(), error)
667+
.await
668+
.context("Failed to set subgraph status to `failed`")?;
669+
670+
return Err(err);
666671
}
667672
}
668673
}
@@ -774,7 +779,7 @@ where
774779
Err(MappingError::PossibleReorg(e)) => {
775780
info!(ctx.state.logger,
776781
"Possible reorg detected, retrying";
777-
"error" => format!("{:?}", e.to_string()),
782+
"error" => format!("{:#}", e),
778783
"id" => ctx.inputs.deployment_id.to_string(),
779784
);
780785

core/src/subgraph/registrar.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,7 @@ where
242242
let sender = sender.clone();
243243
let logger = logger.clone();
244244

245-
// Blocking due to store interactions. Won't be blocking after #905.
246-
graph::spawn_blocking(
245+
graph::spawn(
247246
start_subgraph(id, provider.clone(), logger).map(move |()| drop(sender)),
248247
);
249248
}

graph/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub mod mock {
2727
/// Wrapper for spawning tasks that abort on panic, which is our default.
2828
mod task_spawn;
2929
pub use task_spawn::{
30-
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic,
30+
block_on, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, spawn_thread,
3131
};
3232

3333
pub use bytes;

graph/src/task_spawn.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,14 @@ pub fn spawn_blocking_allow_panic<R: 'static + Send>(
5454
pub fn block_on<T>(f: impl Future03<Output = T>) -> T {
5555
tokio::runtime::Handle::current().block_on(f)
5656
}
57+
58+
/// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned.
59+
pub fn spawn_thread(name: String, f: impl 'static + FnOnce() + Send) {
60+
let conf = std::thread::Builder::new().name(name);
61+
let runtime = tokio::runtime::Handle::current();
62+
conf.spawn(move || {
63+
let _runtime_guard = runtime.enter();
64+
f()
65+
})
66+
.unwrap();
67+
}

node/src/main.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,7 @@ fn read_expensive_queries() -> Result<Vec<Arc<q::Document>>, std::io::Error> {
9393
Ok(queries)
9494
}
9595

96-
// Saturating the blocking threads can cause all sorts of issues, so set a large maximum.
97-
// Ideally we'd use semaphores to not use more blocking threads than DB connections,
98-
// but for now this is necessary.
99-
#[tokio::main(worker_threads = 2000)]
96+
#[tokio::main]
10097
async fn main() {
10198
env_logger::init();
10299

0 commit comments

Comments
 (0)