From 4df8c61148b3b9a1ba176014015ee6c9a0942503 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Tue, 3 Mar 2020 16:55:58 -0300 Subject: [PATCH 1/2] graphql: Spawn ws connections as non-blocking --- graph/src/data/query/error.rs | 6 +- graph/src/data/subscription/result.rs | 5 +- graph/src/lib.rs | 4 +- graph/src/task_spawn.rs | 6 ++ graphql/src/execution/execution.rs | 135 ++++++++++---------------- graphql/src/execution/resolver.rs | 4 +- graphql/src/introspection/resolver.rs | 4 +- graphql/src/query/mod.rs | 2 +- graphql/src/store/prefetch.rs | 67 +++++-------- graphql/src/store/resolver.rs | 4 +- graphql/src/subscription/mod.rs | 111 ++++++++++----------- graphql/tests/introspection.rs | 4 +- graphql/tests/query.rs | 4 +- server/index-node/src/resolver.rs | 4 +- server/websocket/src/connection.rs | 4 +- server/websocket/src/server.rs | 3 +- 16 files changed, 165 insertions(+), 202 deletions(-) diff --git a/graph/src/data/query/error.rs b/graph/src/data/query/error.rs index a0c8a962c50..b6e316f58d9 100644 --- a/graph/src/data/query/error.rs +++ b/graph/src/data/query/error.rs @@ -57,6 +57,8 @@ pub enum QueryExecutionError { UndefinedFragment(String), // Using slow and prefetch query resolution yield different results IncorrectPrefetchResult { slow: q::Value, prefetch: q::Value }, + Panic(String), + EventStreamError, } impl Error for QueryExecutionError { @@ -200,7 +202,9 @@ impl fmt::Display for QueryExecutionError { IncorrectPrefetchResult{ .. } => write!(f, "Running query with prefetch \ and slow query resolution yielded different results. \ This is a bug. Please open an issue at \ - https://github.com/graphprotocol/graph-node") + https://github.com/graphprotocol/graph-node"), + Panic(msg) => write!(f, "panic processing query: {}", msg), + EventStreamError => write!(f, "error in the subscription event stream") } } } diff --git a/graph/src/data/subscription/result.rs b/graph/src/data/subscription/result.rs index 093fd480569..60b53abf315 100644 --- a/graph/src/data/subscription/result.rs +++ b/graph/src/data/subscription/result.rs @@ -1,9 +1,8 @@ -use futures::prelude::*; - use crate::prelude::QueryResult; +use std::marker::Unpin; /// A stream of query results for a subscription. -pub type QueryResultStream = Box + Send>; +pub type QueryResultStream = Box + Send + Unpin>; /// The result of running a subscription, if successful. pub type SubscriptionResult = QueryResultStream; diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 88c047515bc..1069019c066 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -21,7 +21,9 @@ pub mod mock { /// Wrapper for spawning tasks that abort on panic, which is our default. mod task_spawn; -pub use task_spawn::{block_on_allow_panic, spawn, spawn_blocking, spawn_blocking_allow_panic}; +pub use task_spawn::{ + block_on_allow_panic, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic, +}; /// A prelude that makes all system component traits and data types available. /// diff --git a/graph/src/task_spawn.rs b/graph/src/task_spawn.rs index 003d635cba8..cdfdca2c16b 100644 --- a/graph/src/task_spawn.rs +++ b/graph/src/task_spawn.rs @@ -19,6 +19,12 @@ pub fn spawn(f: impl Future03 + Send + 'static) - tokio::spawn(abort_on_panic(f)) } +pub fn spawn_allow_panic( + f: impl Future03 + Send + 'static, +) -> JoinHandle { + tokio::spawn(f) +} + /// Aborts on panic. pub fn spawn_blocking( f: impl Future03 + Send + 'static, diff --git a/graphql/src/execution/execution.rs b/graphql/src/execution/execution.rs index dd099a326ee..36bca271875 100644 --- a/graphql/src/execution/execution.rs +++ b/graphql/src/execution/execution.rs @@ -29,7 +29,7 @@ pub enum ExecutionMode { /// Contextual information passed around during query execution. #[derive(Clone)] -pub struct ExecutionContext<'a, R> +pub struct ExecutionContext where R: Resolver, { @@ -40,13 +40,13 @@ where pub schema: Arc, /// The query to execute. - pub document: &'a q::Document, + pub document: q::Document, /// The resolver to use. pub resolver: Arc, /// The current field stack (e.g. allUsers > friends > name). - pub fields: Vec<&'a q::Field>, + pub fields: Vec, /// Variable values. pub variable_values: Arc>, @@ -89,18 +89,18 @@ fn get_field<'a>(object_type: impl Into>, name: &Name) -> } } -impl<'a, R> ExecutionContext<'a, R> +impl ExecutionContext where R: Resolver, { /// Creates a derived context for a new field (added to the top of the field stack). - pub fn for_field( + pub fn for_field<'a>( &self, - field: &'a q::Field, + field: &q::Field, object_type: impl Into>, ) -> Result { let mut ctx = self.clone(); - ctx.fields.push(field); + ctx.fields.push(field.clone()); if let Some(bc) = field.block_constraint(object_type)? { ctx.block = self.resolver.locate_block(&bc)?; } @@ -116,7 +116,7 @@ where logger: self.logger.clone(), resolver: Arc::new(introspection_resolver), schema: Arc::new(introspection_schema), - document: &self.document, + document: self.document.clone(), fields: vec![], variable_values: self.variable_values.clone(), deadline: self.deadline, @@ -327,13 +327,10 @@ where } /// Executes the root selection set of a query. -pub fn execute_root_selection_set<'a, R>( - ctx: &ExecutionContext<'a, R>, - selection_set: &'a q::SelectionSet, -) -> Result> -where - R: Resolver, -{ +pub fn execute_root_selection_set( + ctx: &ExecutionContext, + selection_set: &q::SelectionSet, +) -> Result> { // Obtain the root Query type and fail if there isn't one let query_type = match sast::get_root_query_type(&ctx.schema.document) { Some(t) => t, @@ -405,15 +402,12 @@ where /// Executes a selection set, requiring the result to be of the given object type. /// /// Allows passing in a parent value during recursive processing of objects and their fields. -pub fn execute_selection_set<'a, R>( - ctx: &ExecutionContext<'a, R>, - selection_set: &'a q::SelectionSet, +pub fn execute_selection_set( + ctx: &ExecutionContext, + selection_set: &q::SelectionSet, object_type: &s::ObjectType, object_value: &Option, -) -> Result> -where - R: Resolver, -{ +) -> Result> { Ok(q::Value::Object(execute_selection_set_to_map( ctx, selection_set, @@ -422,15 +416,12 @@ where )?)) } -fn execute_selection_set_to_map<'a, R>( - ctx: &ExecutionContext<'a, R>, - selection_set: &'a q::SelectionSet, +fn execute_selection_set_to_map( + ctx: &ExecutionContext, + selection_set: &q::SelectionSet, object_type: &s::ObjectType, object_value: &Option, -) -> Result, Vec> -where - R: Resolver, -{ +) -> Result, Vec> { let mut errors: Vec = Vec::new(); let mut result_map: BTreeMap = BTreeMap::new(); @@ -486,15 +477,12 @@ where } /// Collects fields of a selection set. -pub fn collect_fields<'a, R>( - ctx: &ExecutionContext<'a, R>, +pub fn collect_fields<'a>( + ctx: &'a ExecutionContext, object_type: &s::ObjectType, selection_set: &'a q::SelectionSet, visited_fragments: Option>, -) -> IndexMap<&'a String, Vec<&'a q::Field>> -where - R: Resolver, -{ +) -> IndexMap<&'a String, Vec<&'a q::Field>> { let mut visited_fragments = visited_fragments.unwrap_or_default(); let mut grouped_fields: IndexMap<_, Vec<_>> = IndexMap::new(); @@ -592,7 +580,7 @@ where /// Determines whether a fragment is applicable to the given object type. fn does_fragment_type_apply( - ctx: ExecutionContext<'_, impl Resolver>, + ctx: ExecutionContext, object_type: &s::ObjectType, fragment_type: &q::TypeCondition, ) -> bool { @@ -622,17 +610,14 @@ fn does_fragment_type_apply( } /// Executes a field. -fn execute_field<'a, R>( - ctx: &ExecutionContext<'a, R>, +fn execute_field( + ctx: &ExecutionContext, object_type: &s::ObjectType, object_value: &Option, - field: &'a q::Field, + field: &q::Field, field_definition: &s::Field, - fields: Vec<&'a q::Field>, -) -> Result> -where - R: Resolver, -{ + fields: Vec<&q::Field>, +) -> Result> { coerce_argument_values(ctx, object_type, field) .and_then(|argument_values| { resolve_field_value( @@ -649,18 +634,15 @@ where } /// Resolves the value of a field. -fn resolve_field_value<'a, R>( - ctx: &ExecutionContext<'a, R>, +fn resolve_field_value( + ctx: &ExecutionContext, object_type: &s::ObjectType, object_value: &Option, field: &q::Field, field_definition: &s::Field, field_type: &s::Type, argument_values: &HashMap<&q::Name, q::Value>, -) -> Result> -where - R: Resolver, -{ +) -> Result> { match field_type { s::Type::NonNullType(inner_type) => resolve_field_value( ctx, @@ -695,18 +677,15 @@ where } /// Resolves the value of a field that corresponds to a named type. -fn resolve_field_value_for_named_type<'a, R>( - ctx: &ExecutionContext<'a, R>, +fn resolve_field_value_for_named_type( + ctx: &ExecutionContext, object_type: &s::ObjectType, object_value: &Option, field: &q::Field, field_definition: &s::Field, type_name: &s::Name, argument_values: &HashMap<&q::Name, q::Value>, -) -> Result> -where - R: Resolver, -{ +) -> Result> { // Try to resolve the type name into the actual type let named_type = sast::get_named_type(&ctx.schema.document, type_name) .ok_or_else(|| QueryExecutionError::NamedTypeError(type_name.to_string()))?; @@ -762,18 +741,15 @@ where } /// Resolves the value of a field that corresponds to a list type. -fn resolve_field_value_for_list_type<'a, R>( - ctx: &ExecutionContext<'a, R>, +fn resolve_field_value_for_list_type( + ctx: &ExecutionContext, object_type: &s::ObjectType, object_value: &Option, field: &q::Field, field_definition: &s::Field, inner_type: &s::Type, argument_values: &HashMap<&q::Name, q::Value>, -) -> Result> -where - R: Resolver, -{ +) -> Result> { match inner_type { s::Type::NonNullType(inner_type) => resolve_field_value_for_list_type( ctx, @@ -858,16 +834,13 @@ where } /// Ensures that a value matches the expected return type. -fn complete_value<'a, R>( - ctx: &ExecutionContext<'a, R>, - field: &'a q::Field, - field_type: &'a s::Type, - fields: Vec<&'a q::Field>, +fn complete_value( + ctx: &ExecutionContext, + field: &q::Field, + field_type: &s::Type, + fields: Vec<&q::Field>, resolved_value: q::Value, -) -> Result> -where - R: Resolver, -{ +) -> Result> { match field_type { // Fail if the field type is non-null but the value is null s::Type::NonNullType(inner_type) => { @@ -987,14 +960,11 @@ where } /// Resolves an abstract type (interface, union) into an object type based on the given value. -fn resolve_abstract_type<'a, R>( - ctx: &'a ExecutionContext<'a, R>, - abstract_type: &'a s::TypeDefinition, +fn resolve_abstract_type<'a>( + ctx: &'a ExecutionContext, + abstract_type: &s::TypeDefinition, object_value: &q::Value, -) -> Result<&'a s::ObjectType, Vec> -where - R: Resolver, -{ +) -> Result<&'a s::ObjectType, Vec> { // Let the resolver handle the type resolution, return an error if the resolution // yields nothing ctx.resolver @@ -1035,14 +1005,11 @@ pub fn merge_selection_sets(fields: Vec<&q::Field>) -> q::SelectionSet { } /// Coerces argument values into GraphQL values. -pub fn coerce_argument_values<'a, R>( - ctx: &ExecutionContext<'_, R>, +pub fn coerce_argument_values<'a>( + ctx: &ExecutionContext, object_type: &'a s::ObjectType, field: &q::Field, -) -> Result, Vec> -where - R: Resolver, -{ +) -> Result, Vec> { let mut coerced_values = HashMap::new(); let mut errors = vec![]; diff --git a/graphql/src/execution/resolver.rs b/graphql/src/execution/resolver.rs index 44fcd221cb2..f0f0a3d8fcc 100644 --- a/graphql/src/execution/resolver.rs +++ b/graphql/src/execution/resolver.rs @@ -64,9 +64,9 @@ impl<'a> ObjectOrInterface<'a> { /// A GraphQL resolver that can resolve entities, enum values, scalar types and interfaces/unions. pub trait Resolver: Clone + Send + Sync { /// Prepare for executing a query by prefetching as much data as possible - fn prefetch<'a>( + fn prefetch( &self, - ctx: &ExecutionContext<'a, Self>, + ctx: &ExecutionContext, selection_set: &q::SelectionSet, ) -> Result, Vec>; diff --git a/graphql/src/introspection/resolver.rs b/graphql/src/introspection/resolver.rs index 5f98c896171..1047ce7cb73 100644 --- a/graphql/src/introspection/resolver.rs +++ b/graphql/src/introspection/resolver.rs @@ -456,9 +456,9 @@ impl<'a> IntrospectionResolver<'a> { /// A GraphQL resolver that can resolve entities, enum values, scalar types and interfaces/unions. impl<'a> Resolver for IntrospectionResolver<'a> { - fn prefetch<'r>( + fn prefetch( &self, - _: &ExecutionContext<'r, Self>, + _: &ExecutionContext, _: &q::SelectionSet, ) -> Result, Vec> { Ok(None) diff --git a/graphql/src/query/mod.rs b/graphql/src/query/mod.rs index 6ca6c3fe61a..65ac871ed4d 100644 --- a/graphql/src/query/mod.rs +++ b/graphql/src/query/mod.rs @@ -76,7 +76,7 @@ where logger: query_logger.clone(), resolver: Arc::new(options.resolver), schema: query.schema.clone(), - document: &query.document, + document: query.document.clone(), fields: vec![], variable_values: Arc::new(coerced_variable_values), deadline: options.deadline, diff --git a/graphql/src/store/prefetch.rs b/graphql/src/store/prefetch.rs index a71a37d73f9..f9ea182ca18 100644 --- a/graphql/src/store/prefetch.rs +++ b/graphql/src/store/prefetch.rs @@ -457,15 +457,11 @@ impl<'a> Join<'a> { /// cases where the store contains data that violates the data model by having /// multiple values for what should be a relationship to a single object in /// @derivedFrom fields -pub fn run<'a, R, S>( - ctx: &ExecutionContext<'a, R>, +pub fn run( + ctx: &ExecutionContext, selection_set: &q::SelectionSet, - store: Arc, -) -> Result> -where - R: Resolver, - S: Store, -{ + store: Arc, +) -> Result> { execute_root_selection_set(ctx, store.as_ref(), selection_set).map(|nodes| { let mut map = BTreeMap::default(); map.insert(PREFETCH_KEY.to_owned(), q::Value::Boolean(true)); @@ -480,15 +476,11 @@ where } /// Executes the root selection set of a query. -fn execute_root_selection_set<'a, R, S>( - ctx: &ExecutionContext<'a, R>, - store: &S, - selection_set: &'a q::SelectionSet, -) -> Result, Vec> -where - R: Resolver, - S: Store, -{ +fn execute_root_selection_set( + ctx: &ExecutionContext, + store: &impl Store, + selection_set: &q::SelectionSet, +) -> Result, Vec> { // Obtain the root Query type and fail if there isn't one let query_type = match sast::get_root_query_type(&ctx.schema.document) { Some(t) => t, @@ -546,17 +538,13 @@ fn object_or_interface_by_name<'a>( } } -fn execute_selection_set<'a, R, S>( - ctx: &ExecutionContext<'a, R>, - store: &S, +fn execute_selection_set( + ctx: &ExecutionContext, + store: &impl Store, mut parents: Vec, - selection_set: &'a q::SelectionSet, + selection_set: &q::SelectionSet, object_type: &ObjectOrInterface, -) -> Result, Vec> -where - R: Resolver, - S: Store, -{ +) -> Result, Vec> { let mut errors: Vec = Vec::new(); // Group fields with the same response key, so we can execute them together @@ -657,15 +645,12 @@ where /// Collects fields of a selection set. The resulting map indicates for each /// response key from which types to fetch what fields to express the effect /// of fragment spreads -fn collect_fields<'a, R>( - ctx: &ExecutionContext<'a, R>, +fn collect_fields<'a>( + ctx: &'a ExecutionContext, object_type: &ObjectOrInterface, selection_set: &'a q::SelectionSet, visited_fragments: Option>, -) -> HashMap<&'a String, HashMap>> -where - R: Resolver, -{ +) -> HashMap<&'a String, HashMap>> { let mut visited_fragments = visited_fragments.unwrap_or_default(); let mut grouped_fields: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new(); @@ -785,19 +770,15 @@ where } /// Executes a field. -fn execute_field<'a, R, S>( - ctx: &ExecutionContext<'a, R>, - store: &S, +fn execute_field( + ctx: &ExecutionContext, + store: &impl Store, object_type: &ObjectOrInterface<'_>, parents: &Vec, - join: &Join<'a>, - field: &'a q::Field, - field_definition: &'a s::Field, -) -> Result, Vec> -where - R: Resolver, - S: Store, -{ + join: &Join<'_>, + field: &q::Field, + field_definition: &s::Field, +) -> Result, Vec> { let mut argument_values = match object_type { ObjectOrInterface::Object(object_type) => { crate::execution::coerce_argument_values(ctx, object_type, field) diff --git a/graphql/src/store/resolver.rs b/graphql/src/store/resolver.rs index 0edf63f12c6..aaddf23ddf7 100644 --- a/graphql/src/store/resolver.rs +++ b/graphql/src/store/resolver.rs @@ -239,9 +239,9 @@ impl Resolver for StoreResolver where S: Store, { - fn prefetch<'r>( + fn prefetch( &self, - ctx: &ExecutionContext<'r, Self>, + ctx: &ExecutionContext, selection_set: &q::SelectionSet, ) -> Result, Vec> { super::prefetch::run(ctx, selection_set, self.store.clone()).map(|value| Some(value)) diff --git a/graphql/src/subscription/mod.rs b/graphql/src/subscription/mod.rs index 98657fddc44..36c7d7ac572 100644 --- a/graphql/src/subscription/mod.rs +++ b/graphql/src/subscription/mod.rs @@ -58,7 +58,7 @@ where logger: options.logger, resolver: Arc::new(options.resolver), schema: subscription.query.schema.clone(), - document: &subscription.query.document, + document: subscription.query.document.clone(), fields: vec![], variable_values: Arc::new(coerced_variable_values), deadline: None, @@ -99,7 +99,7 @@ where selection_set, source_stream, options.timeout, - )?; + ); Ok(response_stream) } } @@ -112,13 +112,10 @@ where } } -fn create_source_event_stream<'a, R>( - ctx: &'a ExecutionContext<'a, R>, +fn create_source_event_stream( + ctx: &ExecutionContext, selection_set: &q::SelectionSet, -) -> Result -where - R: Resolver, -{ +) -> Result { let subscription_type = sast::get_root_subscription_type(&ctx.schema.document) .ok_or(QueryExecutionError::NoRootSubscriptionObjectType)?; @@ -139,29 +136,23 @@ where resolve_field_stream(ctx, subscription_type, field, argument_values) } -fn resolve_field_stream<'a, R>( - ctx: &'a ExecutionContext<'a, R>, - object_type: &'a s::ObjectType, - field: &'a q::Field, +fn resolve_field_stream( + ctx: &ExecutionContext, + object_type: &s::ObjectType, + field: &q::Field, _argument_values: HashMap<&q::Name, q::Value>, -) -> Result -where - R: Resolver, -{ +) -> Result { ctx.resolver .resolve_field_stream(&ctx.schema.document, object_type, field) .map_err(SubscriptionError::from) } -fn map_source_to_response_stream<'a, R>( - ctx: &ExecutionContext<'a, R>, - selection_set: &'a q::SelectionSet, +fn map_source_to_response_stream( + ctx: &ExecutionContext, + selection_set: &q::SelectionSet, source_stream: StoreEventStreamBox, timeout: Option, -) -> Result -where - R: Resolver + 'static, -{ +) -> QueryResultStream { let logger = ctx.logger.clone(); let resolver = ctx.resolver.clone(); let schema = ctx.schema.clone(); @@ -175,50 +166,53 @@ where // at least once. This satisfies the GraphQL over Websocket protocol // requirement of "respond[ing] with at least one GQL_DATA message", see // https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_data - let trigger_stream = stream::iter_ok(vec![StoreEvent { + let trigger_stream = futures03::stream::iter(vec![Ok(StoreEvent { tag: 0, changes: Default::default(), - }]); - - Ok(Box::new(trigger_stream.chain(source_stream).map( - move |event| { - execute_subscription_event( - logger.clone(), - resolver.clone(), - schema.clone(), - document.clone(), - &selection_set, - variable_values.clone(), - event, - timeout.clone(), - max_first, - ) - }, - ))) + })]); + + Box::new( + trigger_stream + .chain(source_stream.compat()) + .then(move |res| match res { + Err(()) => { + futures03::future::ready(QueryExecutionError::EventStreamError.into()).boxed() + } + Ok(event) => execute_subscription_event( + logger.clone(), + resolver.clone(), + schema.clone(), + document.clone(), + selection_set.clone(), + variable_values.clone(), + event, + timeout.clone(), + max_first, + ) + .boxed(), + }), + ) } -fn execute_subscription_event( +async fn execute_subscription_event( logger: Logger, - resolver: Arc, + resolver: Arc, schema: Arc, document: q::Document, - selection_set: &q::SelectionSet, + selection_set: q::SelectionSet, variable_values: Arc>, event: StoreEvent, timeout: Option, max_first: u32, -) -> QueryResult -where - R1: Resolver + 'static, -{ +) -> QueryResult { debug!(logger, "Execute subscription event"; "event" => format!("{:?}", event)); // Create a fresh execution context with deadline. let ctx = ExecutionContext { - logger: logger, - resolver: resolver, - schema: schema, - document: &document, + logger, + resolver, + schema, + document, fields: vec![], variable_values, deadline: timeout.map(|t| Instant::now() + t), @@ -228,9 +222,16 @@ where }; // We have established that this exists earlier in the subscription execution - let subscription_type = sast::get_root_subscription_type(&ctx.schema.document).unwrap(); - - let result = execute_selection_set(&ctx, selection_set, subscription_type, &None); + let subscription_type = sast::get_root_subscription_type(&ctx.schema.document) + .unwrap() + .clone(); + + let result = graph::spawn_blocking_allow_panic(async move { + execute_selection_set(&ctx, &selection_set, &subscription_type, &None) + }) + .await + .map_err(|e| vec![QueryExecutionError::Panic(e.to_string())]) + .and_then(|x| x); match result { Ok(value) => QueryResult::new(Some(value)), diff --git a/graphql/tests/introspection.rs b/graphql/tests/introspection.rs index b3907d1d5e7..b1fe03459be 100644 --- a/graphql/tests/introspection.rs +++ b/graphql/tests/introspection.rs @@ -12,9 +12,9 @@ use graph_graphql::prelude::*; pub struct MockResolver; impl Resolver for MockResolver { - fn prefetch<'r>( + fn prefetch( &self, - _: &ExecutionContext<'r, Self>, + _: &ExecutionContext, _: &q::SelectionSet, ) -> Result, Vec> { Ok(None) diff --git a/graphql/tests/query.rs b/graphql/tests/query.rs index e88a76535dc..0502139d78a 100644 --- a/graphql/tests/query.rs +++ b/graphql/tests/query.rs @@ -1183,9 +1183,11 @@ async fn subscription_gets_result_even_without_events() { // Execute the subscription and expect at least one result to be // available in the result stream let stream = execute_subscription(&Subscription { query }, options).unwrap(); - let results = stream + let results: Vec<_> = stream .take(1) .collect() + .map(Result::<_, ()>::Ok) + .compat() .timeout(Duration::from_secs(3)) .await .unwrap() diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 51ce83393ad..73c2156e4c9 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -502,9 +502,9 @@ where R: GraphQlRunner, S: Store + SubgraphDeploymentStore, { - fn prefetch<'r>( + fn prefetch( &self, - _: &ExecutionContext<'r, Self>, + _: &ExecutionContext, _: &q::SelectionSet, ) -> Result, Vec> { Ok(None) diff --git a/server/websocket/src/connection.rs b/server/websocket/src/connection.rs index 1c2552cab40..06f2201ba46 100644 --- a/server/websocket/src/connection.rs +++ b/server/websocket/src/connection.rs @@ -336,6 +336,8 @@ where OutgoingMessage::from_query_result(result_id.clone(), result) }) .map(WsMessage::from) + .map(Ok) + .compat() .forward(result_sink.sink_map_err(|_| ())) .map(|_| ()) }); @@ -352,7 +354,7 @@ where }); operations.insert(id, guard); - graph::spawn(run_subscription.compat()); + graph::spawn_allow_panic(run_subscription.compat()); Ok(()) } }? diff --git a/server/websocket/src/server.rs b/server/websocket/src/server.rs index 0f58bfdfd42..3d43242ce58 100644 --- a/server/websocket/src/server.rs +++ b/server/websocket/src/server.rs @@ -165,8 +165,7 @@ where graphql_runner.clone(), ); - // Blocking due to store interactions. Won't be blocking after #905. - graph::spawn_blocking_allow_panic(service.into_future().compat()); + graph::spawn_allow_panic(service.into_future().compat()); } Err(e) => { // We gracefully skip over failed connection attempts rather From cf733c3da8dc740ed2e4f229b1f3a501669dba05 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Wed, 4 Mar 2020 15:44:01 -0300 Subject: [PATCH 2/2] subscription: Limit concurrent subcription queries --- graphql/src/subscription/mod.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/graphql/src/subscription/mod.rs b/graphql/src/subscription/mod.rs index 36c7d7ac572..a2c79a0e9c9 100644 --- a/graphql/src/subscription/mod.rs +++ b/graphql/src/subscription/mod.rs @@ -2,6 +2,7 @@ use graphql_parser::{query as q, schema as s, Style}; use std::collections::HashMap; use std::result::Result; use std::time::{Duration, Instant}; +use tokio::sync::Semaphore; use graph::prelude::*; @@ -9,6 +10,23 @@ use crate::execution::*; use crate::query::ast as qast; use crate::schema::ast as sast; +use lazy_static::lazy_static; + +lazy_static! { + static ref SUBSCRIPTION_QUERY_SEMAPHORE: Semaphore = { + // This is duplicating the logic in main.rs to get the connection pool size, which is + // unfourtunate. But because this module has no share state otherwise, it's not simple to + // refactor so that the semaphore isn't a global. + let db_conn_pool_size = std::env::var("STORE_CONNECTION_POOL_SIZE") + .unwrap_or("10".into()) + .parse::() + .expect("invalid STORE_CONNECTION_POOL_SIZE"); + + // Limit the amount of connections that can be taken up by subscription queries. + Semaphore::new((0.7 * db_conn_pool_size as f64).ceil() as usize) + }; +} + /// Options available for subscription execution. pub struct SubscriptionExecutionOptions where @@ -226,6 +244,9 @@ async fn execute_subscription_event( .unwrap() .clone(); + // Use a semaphore to prevent subscription queries, which can be numerous and might query all at + // once, from flooding the blocking thread pool and the DB connection pool. + let _permit = SUBSCRIPTION_QUERY_SEMAPHORE.acquire(); let result = graph::spawn_blocking_allow_panic(async move { execute_selection_set(&ctx, &selection_set, &subscription_type, &None) })