Skip to content

Commit 4df8c61

Browse files
committed
graphql: Spawn ws connections as non-blocking
1 parent 634e239 commit 4df8c61

File tree

16 files changed

+165
-202
lines changed

16 files changed

+165
-202
lines changed

graph/src/data/query/error.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ pub enum QueryExecutionError {
5757
UndefinedFragment(String),
5858
// Using slow and prefetch query resolution yield different results
5959
IncorrectPrefetchResult { slow: q::Value, prefetch: q::Value },
60+
Panic(String),
61+
EventStreamError,
6062
}
6163

6264
impl Error for QueryExecutionError {
@@ -200,7 +202,9 @@ impl fmt::Display for QueryExecutionError {
200202
IncorrectPrefetchResult{ .. } => write!(f, "Running query with prefetch \
201203
and slow query resolution yielded different results. \
202204
This is a bug. Please open an issue at \
203-
https://github.com/graphprotocol/graph-node")
205+
https://github.com/graphprotocol/graph-node"),
206+
Panic(msg) => write!(f, "panic processing query: {}", msg),
207+
EventStreamError => write!(f, "error in the subscription event stream")
204208
}
205209
}
206210
}
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
use futures::prelude::*;
2-
31
use crate::prelude::QueryResult;
2+
use std::marker::Unpin;
43

54
/// A stream of query results for a subscription.
6-
pub type QueryResultStream = Box<dyn Stream<Item = QueryResult, Error = ()> + Send>;
5+
pub type QueryResultStream = Box<dyn futures03::stream::Stream<Item = QueryResult> + Send + Unpin>;
76

87
/// The result of running a subscription, if successful.
98
pub type SubscriptionResult = QueryResultStream;

graph/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ pub mod mock {
2121

2222
/// Wrapper for spawning tasks that abort on panic, which is our default.
2323
mod task_spawn;
24-
pub use task_spawn::{block_on_allow_panic, spawn, spawn_blocking, spawn_blocking_allow_panic};
24+
pub use task_spawn::{
25+
block_on_allow_panic, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic,
26+
};
2527

2628
/// A prelude that makes all system component traits and data types available.
2729
///

graph/src/task_spawn.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ pub fn spawn<T: Send + 'static>(f: impl Future03<Output = T> + Send + 'static) -
1919
tokio::spawn(abort_on_panic(f))
2020
}
2121

22+
pub fn spawn_allow_panic<T: Send + 'static>(
23+
f: impl Future03<Output = T> + Send + 'static,
24+
) -> JoinHandle<T> {
25+
tokio::spawn(f)
26+
}
27+
2228
/// Aborts on panic.
2329
pub fn spawn_blocking<T: Send + 'static>(
2430
f: impl Future03<Output = T> + Send + 'static,

graphql/src/execution/execution.rs

Lines changed: 51 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub enum ExecutionMode {
2929

3030
/// Contextual information passed around during query execution.
3131
#[derive(Clone)]
32-
pub struct ExecutionContext<'a, R>
32+
pub struct ExecutionContext<R>
3333
where
3434
R: Resolver,
3535
{
@@ -40,13 +40,13 @@ where
4040
pub schema: Arc<Schema>,
4141

4242
/// The query to execute.
43-
pub document: &'a q::Document,
43+
pub document: q::Document,
4444

4545
/// The resolver to use.
4646
pub resolver: Arc<R>,
4747

4848
/// The current field stack (e.g. allUsers > friends > name).
49-
pub fields: Vec<&'a q::Field>,
49+
pub fields: Vec<q::Field>,
5050

5151
/// Variable values.
5252
pub variable_values: Arc<HashMap<q::Name, q::Value>>,
@@ -89,18 +89,18 @@ fn get_field<'a>(object_type: impl Into<ObjectOrInterface<'a>>, name: &Name) ->
8989
}
9090
}
9191

92-
impl<'a, R> ExecutionContext<'a, R>
92+
impl<R> ExecutionContext<R>
9393
where
9494
R: Resolver,
9595
{
9696
/// Creates a derived context for a new field (added to the top of the field stack).
97-
pub fn for_field(
97+
pub fn for_field<'a>(
9898
&self,
99-
field: &'a q::Field,
99+
field: &q::Field,
100100
object_type: impl Into<ObjectOrInterface<'a>>,
101101
) -> Result<Self, QueryExecutionError> {
102102
let mut ctx = self.clone();
103-
ctx.fields.push(field);
103+
ctx.fields.push(field.clone());
104104
if let Some(bc) = field.block_constraint(object_type)? {
105105
ctx.block = self.resolver.locate_block(&bc)?;
106106
}
@@ -116,7 +116,7 @@ where
116116
logger: self.logger.clone(),
117117
resolver: Arc::new(introspection_resolver),
118118
schema: Arc::new(introspection_schema),
119-
document: &self.document,
119+
document: self.document.clone(),
120120
fields: vec![],
121121
variable_values: self.variable_values.clone(),
122122
deadline: self.deadline,
@@ -327,13 +327,10 @@ where
327327
}
328328

329329
/// Executes the root selection set of a query.
330-
pub fn execute_root_selection_set<'a, R>(
331-
ctx: &ExecutionContext<'a, R>,
332-
selection_set: &'a q::SelectionSet,
333-
) -> Result<q::Value, Vec<QueryExecutionError>>
334-
where
335-
R: Resolver,
336-
{
330+
pub fn execute_root_selection_set(
331+
ctx: &ExecutionContext<impl Resolver>,
332+
selection_set: &q::SelectionSet,
333+
) -> Result<q::Value, Vec<QueryExecutionError>> {
337334
// Obtain the root Query type and fail if there isn't one
338335
let query_type = match sast::get_root_query_type(&ctx.schema.document) {
339336
Some(t) => t,
@@ -405,15 +402,12 @@ where
405402
/// Executes a selection set, requiring the result to be of the given object type.
406403
///
407404
/// Allows passing in a parent value during recursive processing of objects and their fields.
408-
pub fn execute_selection_set<'a, R>(
409-
ctx: &ExecutionContext<'a, R>,
410-
selection_set: &'a q::SelectionSet,
405+
pub fn execute_selection_set(
406+
ctx: &ExecutionContext<impl Resolver>,
407+
selection_set: &q::SelectionSet,
411408
object_type: &s::ObjectType,
412409
object_value: &Option<q::Value>,
413-
) -> Result<q::Value, Vec<QueryExecutionError>>
414-
where
415-
R: Resolver,
416-
{
410+
) -> Result<q::Value, Vec<QueryExecutionError>> {
417411
Ok(q::Value::Object(execute_selection_set_to_map(
418412
ctx,
419413
selection_set,
@@ -422,15 +416,12 @@ where
422416
)?))
423417
}
424418

425-
fn execute_selection_set_to_map<'a, R>(
426-
ctx: &ExecutionContext<'a, R>,
427-
selection_set: &'a q::SelectionSet,
419+
fn execute_selection_set_to_map(
420+
ctx: &ExecutionContext<impl Resolver>,
421+
selection_set: &q::SelectionSet,
428422
object_type: &s::ObjectType,
429423
object_value: &Option<q::Value>,
430-
) -> Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>>
431-
where
432-
R: Resolver,
433-
{
424+
) -> Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>> {
434425
let mut errors: Vec<QueryExecutionError> = Vec::new();
435426
let mut result_map: BTreeMap<String, q::Value> = BTreeMap::new();
436427

@@ -486,15 +477,12 @@ where
486477
}
487478

488479
/// Collects fields of a selection set.
489-
pub fn collect_fields<'a, R>(
490-
ctx: &ExecutionContext<'a, R>,
480+
pub fn collect_fields<'a>(
481+
ctx: &'a ExecutionContext<impl Resolver>,
491482
object_type: &s::ObjectType,
492483
selection_set: &'a q::SelectionSet,
493484
visited_fragments: Option<HashSet<&'a q::Name>>,
494-
) -> IndexMap<&'a String, Vec<&'a q::Field>>
495-
where
496-
R: Resolver,
497-
{
485+
) -> IndexMap<&'a String, Vec<&'a q::Field>> {
498486
let mut visited_fragments = visited_fragments.unwrap_or_default();
499487
let mut grouped_fields: IndexMap<_, Vec<_>> = IndexMap::new();
500488

@@ -592,7 +580,7 @@ where
592580

593581
/// Determines whether a fragment is applicable to the given object type.
594582
fn does_fragment_type_apply(
595-
ctx: ExecutionContext<'_, impl Resolver>,
583+
ctx: ExecutionContext<impl Resolver>,
596584
object_type: &s::ObjectType,
597585
fragment_type: &q::TypeCondition,
598586
) -> bool {
@@ -622,17 +610,14 @@ fn does_fragment_type_apply(
622610
}
623611

624612
/// Executes a field.
625-
fn execute_field<'a, R>(
626-
ctx: &ExecutionContext<'a, R>,
613+
fn execute_field(
614+
ctx: &ExecutionContext<impl Resolver>,
627615
object_type: &s::ObjectType,
628616
object_value: &Option<q::Value>,
629-
field: &'a q::Field,
617+
field: &q::Field,
630618
field_definition: &s::Field,
631-
fields: Vec<&'a q::Field>,
632-
) -> Result<q::Value, Vec<QueryExecutionError>>
633-
where
634-
R: Resolver,
635-
{
619+
fields: Vec<&q::Field>,
620+
) -> Result<q::Value, Vec<QueryExecutionError>> {
636621
coerce_argument_values(ctx, object_type, field)
637622
.and_then(|argument_values| {
638623
resolve_field_value(
@@ -649,18 +634,15 @@ where
649634
}
650635

651636
/// Resolves the value of a field.
652-
fn resolve_field_value<'a, R>(
653-
ctx: &ExecutionContext<'a, R>,
637+
fn resolve_field_value(
638+
ctx: &ExecutionContext<impl Resolver>,
654639
object_type: &s::ObjectType,
655640
object_value: &Option<q::Value>,
656641
field: &q::Field,
657642
field_definition: &s::Field,
658643
field_type: &s::Type,
659644
argument_values: &HashMap<&q::Name, q::Value>,
660-
) -> Result<q::Value, Vec<QueryExecutionError>>
661-
where
662-
R: Resolver,
663-
{
645+
) -> Result<q::Value, Vec<QueryExecutionError>> {
664646
match field_type {
665647
s::Type::NonNullType(inner_type) => resolve_field_value(
666648
ctx,
@@ -695,18 +677,15 @@ where
695677
}
696678

697679
/// Resolves the value of a field that corresponds to a named type.
698-
fn resolve_field_value_for_named_type<'a, R>(
699-
ctx: &ExecutionContext<'a, R>,
680+
fn resolve_field_value_for_named_type(
681+
ctx: &ExecutionContext<impl Resolver>,
700682
object_type: &s::ObjectType,
701683
object_value: &Option<q::Value>,
702684
field: &q::Field,
703685
field_definition: &s::Field,
704686
type_name: &s::Name,
705687
argument_values: &HashMap<&q::Name, q::Value>,
706-
) -> Result<q::Value, Vec<QueryExecutionError>>
707-
where
708-
R: Resolver,
709-
{
688+
) -> Result<q::Value, Vec<QueryExecutionError>> {
710689
// Try to resolve the type name into the actual type
711690
let named_type = sast::get_named_type(&ctx.schema.document, type_name)
712691
.ok_or_else(|| QueryExecutionError::NamedTypeError(type_name.to_string()))?;
@@ -762,18 +741,15 @@ where
762741
}
763742

764743
/// Resolves the value of a field that corresponds to a list type.
765-
fn resolve_field_value_for_list_type<'a, R>(
766-
ctx: &ExecutionContext<'a, R>,
744+
fn resolve_field_value_for_list_type(
745+
ctx: &ExecutionContext<impl Resolver>,
767746
object_type: &s::ObjectType,
768747
object_value: &Option<q::Value>,
769748
field: &q::Field,
770749
field_definition: &s::Field,
771750
inner_type: &s::Type,
772751
argument_values: &HashMap<&q::Name, q::Value>,
773-
) -> Result<q::Value, Vec<QueryExecutionError>>
774-
where
775-
R: Resolver,
776-
{
752+
) -> Result<q::Value, Vec<QueryExecutionError>> {
777753
match inner_type {
778754
s::Type::NonNullType(inner_type) => resolve_field_value_for_list_type(
779755
ctx,
@@ -858,16 +834,13 @@ where
858834
}
859835

860836
/// Ensures that a value matches the expected return type.
861-
fn complete_value<'a, R>(
862-
ctx: &ExecutionContext<'a, R>,
863-
field: &'a q::Field,
864-
field_type: &'a s::Type,
865-
fields: Vec<&'a q::Field>,
837+
fn complete_value(
838+
ctx: &ExecutionContext<impl Resolver>,
839+
field: &q::Field,
840+
field_type: &s::Type,
841+
fields: Vec<&q::Field>,
866842
resolved_value: q::Value,
867-
) -> Result<q::Value, Vec<QueryExecutionError>>
868-
where
869-
R: Resolver,
870-
{
843+
) -> Result<q::Value, Vec<QueryExecutionError>> {
871844
match field_type {
872845
// Fail if the field type is non-null but the value is null
873846
s::Type::NonNullType(inner_type) => {
@@ -987,14 +960,11 @@ where
987960
}
988961

989962
/// Resolves an abstract type (interface, union) into an object type based on the given value.
990-
fn resolve_abstract_type<'a, R>(
991-
ctx: &'a ExecutionContext<'a, R>,
992-
abstract_type: &'a s::TypeDefinition,
963+
fn resolve_abstract_type<'a>(
964+
ctx: &'a ExecutionContext<impl Resolver>,
965+
abstract_type: &s::TypeDefinition,
993966
object_value: &q::Value,
994-
) -> Result<&'a s::ObjectType, Vec<QueryExecutionError>>
995-
where
996-
R: Resolver,
997-
{
967+
) -> Result<&'a s::ObjectType, Vec<QueryExecutionError>> {
998968
// Let the resolver handle the type resolution, return an error if the resolution
999969
// yields nothing
1000970
ctx.resolver
@@ -1035,14 +1005,11 @@ pub fn merge_selection_sets(fields: Vec<&q::Field>) -> q::SelectionSet {
10351005
}
10361006

10371007
/// Coerces argument values into GraphQL values.
1038-
pub fn coerce_argument_values<'a, R>(
1039-
ctx: &ExecutionContext<'_, R>,
1008+
pub fn coerce_argument_values<'a>(
1009+
ctx: &ExecutionContext<impl Resolver>,
10401010
object_type: &'a s::ObjectType,
10411011
field: &q::Field,
1042-
) -> Result<HashMap<&'a q::Name, q::Value>, Vec<QueryExecutionError>>
1043-
where
1044-
R: Resolver,
1045-
{
1012+
) -> Result<HashMap<&'a q::Name, q::Value>, Vec<QueryExecutionError>> {
10461013
let mut coerced_values = HashMap::new();
10471014
let mut errors = vec![];
10481015

graphql/src/execution/resolver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ impl<'a> ObjectOrInterface<'a> {
6464
/// A GraphQL resolver that can resolve entities, enum values, scalar types and interfaces/unions.
6565
pub trait Resolver: Clone + Send + Sync {
6666
/// Prepare for executing a query by prefetching as much data as possible
67-
fn prefetch<'a>(
67+
fn prefetch(
6868
&self,
69-
ctx: &ExecutionContext<'a, Self>,
69+
ctx: &ExecutionContext<Self>,
7070
selection_set: &q::SelectionSet,
7171
) -> Result<Option<q::Value>, Vec<QueryExecutionError>>;
7272

graphql/src/introspection/resolver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,9 +456,9 @@ impl<'a> IntrospectionResolver<'a> {
456456

457457
/// A GraphQL resolver that can resolve entities, enum values, scalar types and interfaces/unions.
458458
impl<'a> Resolver for IntrospectionResolver<'a> {
459-
fn prefetch<'r>(
459+
fn prefetch(
460460
&self,
461-
_: &ExecutionContext<'r, Self>,
461+
_: &ExecutionContext<Self>,
462462
_: &q::SelectionSet,
463463
) -> Result<Option<q::Value>, Vec<QueryExecutionError>> {
464464
Ok(None)

graphql/src/query/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ where
7676
logger: query_logger.clone(),
7777
resolver: Arc::new(options.resolver),
7878
schema: query.schema.clone(),
79-
document: &query.document,
79+
document: query.document.clone(),
8080
fields: vec![],
8181
variable_values: Arc::new(coerced_variable_values),
8282
deadline: options.deadline,

0 commit comments

Comments
 (0)