Skip to content

Spawn websocket connections in the core pool #1522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion graph/src/data/query/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub enum QueryExecutionError {
UndefinedFragment(String),
// Using slow and prefetch query resolution yield different results
IncorrectPrefetchResult { slow: q::Value, prefetch: q::Value },
Panic(String),
EventStreamError,
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if these aren't too much of an implementation detail; then again... why not! They might help us debug things in the future.

}

impl Error for QueryExecutionError {
Expand Down Expand Up @@ -200,7 +202,9 @@ impl fmt::Display for QueryExecutionError {
IncorrectPrefetchResult{ .. } => write!(f, "Running query with prefetch \
and slow query resolution yielded different results. \
This is a bug. Please open an issue at \
https://github.com/graphprotocol/graph-node")
https://github.com/graphprotocol/graph-node"),
Panic(msg) => write!(f, "panic processing query: {}", msg),
EventStreamError => write!(f, "error in the subscription event stream")
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions graph/src/data/subscription/result.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use futures::prelude::*;

use crate::prelude::QueryResult;
use std::marker::Unpin;

/// A stream of query results for a subscription.
pub type QueryResultStream = Box<dyn Stream<Item = QueryResult, Error = ()> + Send>;
pub type QueryResultStream = Box<dyn futures03::stream::Stream<Item = QueryResult> + Send + Unpin>;

/// The result of running a subscription, if successful.
pub type SubscriptionResult = QueryResultStream;
4 changes: 3 additions & 1 deletion graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ pub mod mock {

/// Wrapper for spawning tasks that abort on panic, which is our default.
mod task_spawn;
pub use task_spawn::{block_on_allow_panic, spawn, spawn_blocking, spawn_blocking_allow_panic};
pub use task_spawn::{
block_on_allow_panic, spawn, spawn_allow_panic, spawn_blocking, spawn_blocking_allow_panic,
};

/// A prelude that makes all system component traits and data types available.
///
Expand Down
6 changes: 6 additions & 0 deletions graph/src/task_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub fn spawn<T: Send + 'static>(f: impl Future03<Output = T> + Send + 'static) -
tokio::spawn(abort_on_panic(f))
}

pub fn spawn_allow_panic<T: Send + 'static>(
f: impl Future03<Output = T> + Send + 'static,
) -> JoinHandle<T> {
tokio::spawn(f)
}

/// Aborts on panic.
pub fn spawn_blocking<T: Send + 'static>(
f: impl Future03<Output = T> + Send + 'static,
Expand Down
135 changes: 51 additions & 84 deletions graphql/src/execution/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub enum ExecutionMode {

/// Contextual information passed around during query execution.
#[derive(Clone)]
pub struct ExecutionContext<'a, R>
pub struct ExecutionContext<R>
where
R: Resolver,
{
Expand All @@ -40,13 +40,13 @@ where
pub schema: Arc<Schema>,

/// The query to execute.
pub document: &'a q::Document,
pub document: q::Document,

/// The resolver to use.
pub resolver: Arc<R>,

/// The current field stack (e.g. allUsers > friends > name).
pub fields: Vec<&'a q::Field>,
pub fields: Vec<q::Field>,

/// Variable values.
pub variable_values: Arc<HashMap<q::Name, q::Value>>,
Expand Down Expand Up @@ -89,18 +89,18 @@ fn get_field<'a>(object_type: impl Into<ObjectOrInterface<'a>>, name: &Name) ->
}
}

impl<'a, R> ExecutionContext<'a, R>
impl<R> ExecutionContext<R>
where
R: Resolver,
{
/// Creates a derived context for a new field (added to the top of the field stack).
pub fn for_field(
pub fn for_field<'a>(
&self,
field: &'a q::Field,
field: &q::Field,
object_type: impl Into<ObjectOrInterface<'a>>,
) -> Result<Self, QueryExecutionError> {
let mut ctx = self.clone();
ctx.fields.push(field);
ctx.fields.push(field.clone());
if let Some(bc) = field.block_constraint(object_type)? {
ctx.block = self.resolver.locate_block(&bc)?;
}
Expand All @@ -116,7 +116,7 @@ where
logger: self.logger.clone(),
resolver: Arc::new(introspection_resolver),
schema: Arc::new(introspection_schema),
document: &self.document,
document: self.document.clone(),
fields: vec![],
variable_values: self.variable_values.clone(),
deadline: self.deadline,
Expand Down Expand Up @@ -327,13 +327,10 @@ where
}

/// Executes the root selection set of a query.
pub fn execute_root_selection_set<'a, R>(
ctx: &ExecutionContext<'a, R>,
selection_set: &'a q::SelectionSet,
) -> Result<q::Value, Vec<QueryExecutionError>>
where
R: Resolver,
{
pub fn execute_root_selection_set(
ctx: &ExecutionContext<impl Resolver>,
selection_set: &q::SelectionSet,
) -> Result<q::Value, Vec<QueryExecutionError>> {
// Obtain the root Query type and fail if there isn't one
let query_type = match sast::get_root_query_type(&ctx.schema.document) {
Some(t) => t,
Expand Down Expand Up @@ -405,15 +402,12 @@ where
/// Executes a selection set, requiring the result to be of the given object type.
///
/// Allows passing in a parent value during recursive processing of objects and their fields.
pub fn execute_selection_set<'a, R>(
ctx: &ExecutionContext<'a, R>,
selection_set: &'a q::SelectionSet,
pub fn execute_selection_set(
ctx: &ExecutionContext<impl Resolver>,
selection_set: &q::SelectionSet,
object_type: &s::ObjectType,
object_value: &Option<q::Value>,
) -> Result<q::Value, Vec<QueryExecutionError>>
where
R: Resolver,
{
) -> Result<q::Value, Vec<QueryExecutionError>> {
Ok(q::Value::Object(execute_selection_set_to_map(
ctx,
selection_set,
Expand All @@ -422,15 +416,12 @@ where
)?))
}

fn execute_selection_set_to_map<'a, R>(
ctx: &ExecutionContext<'a, R>,
selection_set: &'a q::SelectionSet,
fn execute_selection_set_to_map(
ctx: &ExecutionContext<impl Resolver>,
selection_set: &q::SelectionSet,
object_type: &s::ObjectType,
object_value: &Option<q::Value>,
) -> Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>>
where
R: Resolver,
{
) -> Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>> {
let mut errors: Vec<QueryExecutionError> = Vec::new();
let mut result_map: BTreeMap<String, q::Value> = BTreeMap::new();

Expand Down Expand Up @@ -486,15 +477,12 @@ where
}

/// Collects fields of a selection set.
pub fn collect_fields<'a, R>(
ctx: &ExecutionContext<'a, R>,
pub fn collect_fields<'a>(
ctx: &'a ExecutionContext<impl Resolver>,
object_type: &s::ObjectType,
selection_set: &'a q::SelectionSet,
visited_fragments: Option<HashSet<&'a q::Name>>,
) -> IndexMap<&'a String, Vec<&'a q::Field>>
where
R: Resolver,
{
) -> IndexMap<&'a String, Vec<&'a q::Field>> {
let mut visited_fragments = visited_fragments.unwrap_or_default();
let mut grouped_fields: IndexMap<_, Vec<_>> = IndexMap::new();

Expand Down Expand Up @@ -592,7 +580,7 @@ where

/// Determines whether a fragment is applicable to the given object type.
fn does_fragment_type_apply(
ctx: ExecutionContext<'_, impl Resolver>,
ctx: ExecutionContext<impl Resolver>,
object_type: &s::ObjectType,
fragment_type: &q::TypeCondition,
) -> bool {
Expand Down Expand Up @@ -622,17 +610,14 @@ fn does_fragment_type_apply(
}

/// Executes a field.
fn execute_field<'a, R>(
ctx: &ExecutionContext<'a, R>,
fn execute_field(
ctx: &ExecutionContext<impl Resolver>,
object_type: &s::ObjectType,
object_value: &Option<q::Value>,
field: &'a q::Field,
field: &q::Field,
field_definition: &s::Field,
fields: Vec<&'a q::Field>,
) -> Result<q::Value, Vec<QueryExecutionError>>
where
R: Resolver,
{
fields: Vec<&q::Field>,
) -> Result<q::Value, Vec<QueryExecutionError>> {
coerce_argument_values(ctx, object_type, field)
.and_then(|argument_values| {
resolve_field_value(
Expand All @@ -649,18 +634,15 @@ where
}

/// Resolves the value of a field.
fn resolve_field_value<'a, R>(
ctx: &ExecutionContext<'a, R>,
fn resolve_field_value(
ctx: &ExecutionContext<impl Resolver>,
object_type: &s::ObjectType,
object_value: &Option<q::Value>,
field: &q::Field,
field_definition: &s::Field,
field_type: &s::Type,
argument_values: &HashMap<&q::Name, q::Value>,
) -> Result<q::Value, Vec<QueryExecutionError>>
where
R: Resolver,
{
) -> Result<q::Value, Vec<QueryExecutionError>> {
match field_type {
s::Type::NonNullType(inner_type) => resolve_field_value(
ctx,
Expand Down Expand Up @@ -695,18 +677,15 @@ where
}

/// Resolves the value of a field that corresponds to a named type.
fn resolve_field_value_for_named_type<'a, R>(
ctx: &ExecutionContext<'a, R>,
fn resolve_field_value_for_named_type(
ctx: &ExecutionContext<impl Resolver>,
object_type: &s::ObjectType,
object_value: &Option<q::Value>,
field: &q::Field,
field_definition: &s::Field,
type_name: &s::Name,
argument_values: &HashMap<&q::Name, q::Value>,
) -> Result<q::Value, Vec<QueryExecutionError>>
where
R: Resolver,
{
) -> Result<q::Value, Vec<QueryExecutionError>> {
// Try to resolve the type name into the actual type
let named_type = sast::get_named_type(&ctx.schema.document, type_name)
.ok_or_else(|| QueryExecutionError::NamedTypeError(type_name.to_string()))?;
Expand Down Expand Up @@ -762,18 +741,15 @@ where
}

/// Resolves the value of a field that corresponds to a list type.
fn resolve_field_value_for_list_type<'a, R>(
ctx: &ExecutionContext<'a, R>,
fn resolve_field_value_for_list_type(
ctx: &ExecutionContext<impl Resolver>,
object_type: &s::ObjectType,
object_value: &Option<q::Value>,
field: &q::Field,
field_definition: &s::Field,
inner_type: &s::Type,
argument_values: &HashMap<&q::Name, q::Value>,
) -> Result<q::Value, Vec<QueryExecutionError>>
where
R: Resolver,
{
) -> Result<q::Value, Vec<QueryExecutionError>> {
match inner_type {
s::Type::NonNullType(inner_type) => resolve_field_value_for_list_type(
ctx,
Expand Down Expand Up @@ -858,16 +834,13 @@ where
}

/// Ensures that a value matches the expected return type.
fn complete_value<'a, R>(
ctx: &ExecutionContext<'a, R>,
field: &'a q::Field,
field_type: &'a s::Type,
fields: Vec<&'a q::Field>,
fn complete_value(
ctx: &ExecutionContext<impl Resolver>,
field: &q::Field,
field_type: &s::Type,
fields: Vec<&q::Field>,
resolved_value: q::Value,
) -> Result<q::Value, Vec<QueryExecutionError>>
where
R: Resolver,
{
) -> Result<q::Value, Vec<QueryExecutionError>> {
match field_type {
// Fail if the field type is non-null but the value is null
s::Type::NonNullType(inner_type) => {
Expand Down Expand Up @@ -987,14 +960,11 @@ where
}

/// Resolves an abstract type (interface, union) into an object type based on the given value.
fn resolve_abstract_type<'a, R>(
ctx: &'a ExecutionContext<'a, R>,
abstract_type: &'a s::TypeDefinition,
fn resolve_abstract_type<'a>(
ctx: &'a ExecutionContext<impl Resolver>,
abstract_type: &s::TypeDefinition,
object_value: &q::Value,
) -> Result<&'a s::ObjectType, Vec<QueryExecutionError>>
where
R: Resolver,
{
) -> Result<&'a s::ObjectType, Vec<QueryExecutionError>> {
// Let the resolver handle the type resolution, return an error if the resolution
// yields nothing
ctx.resolver
Expand Down Expand Up @@ -1035,14 +1005,11 @@ pub fn merge_selection_sets(fields: Vec<&q::Field>) -> q::SelectionSet {
}

/// Coerces argument values into GraphQL values.
pub fn coerce_argument_values<'a, R>(
ctx: &ExecutionContext<'_, R>,
pub fn coerce_argument_values<'a>(
ctx: &ExecutionContext<impl Resolver>,
object_type: &'a s::ObjectType,
field: &q::Field,
) -> Result<HashMap<&'a q::Name, q::Value>, Vec<QueryExecutionError>>
where
R: Resolver,
{
) -> Result<HashMap<&'a q::Name, q::Value>, Vec<QueryExecutionError>> {
let mut coerced_values = HashMap::new();
let mut errors = vec![];

Expand Down
4 changes: 2 additions & 2 deletions graphql/src/execution/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ impl<'a> ObjectOrInterface<'a> {
/// A GraphQL resolver that can resolve entities, enum values, scalar types and interfaces/unions.
pub trait Resolver: Clone + Send + Sync {
/// Prepare for executing a query by prefetching as much data as possible
fn prefetch<'a>(
fn prefetch(
&self,
ctx: &ExecutionContext<'a, Self>,
ctx: &ExecutionContext<Self>,
selection_set: &q::SelectionSet,
) -> Result<Option<q::Value>, Vec<QueryExecutionError>>;

Expand Down
4 changes: 2 additions & 2 deletions graphql/src/introspection/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,9 @@ impl<'a> IntrospectionResolver<'a> {

/// A GraphQL resolver that can resolve entities, enum values, scalar types and interfaces/unions.
impl<'a> Resolver for IntrospectionResolver<'a> {
fn prefetch<'r>(
fn prefetch(
&self,
_: &ExecutionContext<'r, Self>,
_: &ExecutionContext<Self>,
_: &q::SelectionSet,
) -> Result<Option<q::Value>, Vec<QueryExecutionError>> {
Ok(None)
Expand Down
2 changes: 1 addition & 1 deletion graphql/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
logger: query_logger.clone(),
resolver: Arc::new(options.resolver),
schema: query.schema.clone(),
document: &query.document,
document: query.document.clone(),
fields: vec![],
variable_values: Arc::new(coerced_variable_values),
deadline: options.deadline,
Expand Down
Loading