Skip to content

Concurrently resolve fields #132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
09574d6
Ensure rw-safety of `ExecutionContext.errors` which will be read/writ…
sogko May 10, 2016
ab3b083
Experimental implementation of concurrently resolving fields
sogko May 10, 2016
2dcd77e
Merge branch 'sogko/0.5.0' into sogko/experiment-parallel-resolve
sogko May 31, 2016
453e0b5
Combine invariants into AST-aware error.
sogko Jun 1, 2016
66e6c34
Remove unused fragment in queries in unit tests
sogko Jun 1, 2016
f1164a6
Deepen introspection query
sogko Jun 1, 2016
237fab4
Improve validation error message when field names conflict
sogko Jun 1, 2016
46c5850
Rename `type_comparator` test to mark as internal test.
sogko Jun 1, 2016
91a3aa2
Include possible field, argument, type names when validation fails (#…
sogko Jun 1, 2016
599c577
Minor clean up
sogko Jun 3, 2016
96fc0ad
Minor refactoring of error messages for unknown fields
sogko Jun 6, 2016
2b3b103
Unit test quotedOrList (suggestion quoting utility)
sogko Jun 6, 2016
b71c906
Bug: printer can print non-parsable value
sogko Jun 6, 2016
1670401
documentation of schema constructor
sogko Jun 6, 2016
73f83b4
isTypeSubTypeOf documentation
sogko Jun 6, 2016
8c1f318
Improved `KnownArgumentNames` tests for coverage
sogko Jun 7, 2016
a19ac6c
Export introspection in public API
sogko Jun 7, 2016
e324096
Update `NewSchema()` to use new exported introspective types
sogko Jun 7, 2016
9cbbd65
RFC: Schema Language Directives (#376)
sogko Jun 7, 2016
988ab2e
RFC: Directive location: schema definition (#382)
sogko Jun 7, 2016
47b81e3
Deprecated directive (#384)
sogko Jun 7, 2016
0dcbbd2
Revert back directives to use `var`; `const` not applicable here
sogko Jun 7, 2016
1225ab0
Deprecated directive (#384)
sogko Jun 7, 2016
c0a6554
Merge branch 'sogko/0.6.0' of https://github.com/sogko/graphql into s…
sogko Jun 7, 2016
d923568
Factor out closure functions to normal functions
sogko Jun 7, 2016
6b9ac5e
Factor out more closure functions
sogko Jun 7, 2016
bb27894
Validation: context.getFragmentSpreads now accepts selectionSet rathe…
sogko Jun 8, 2016
6fecefe
Validation: improving overlapping fields quality (#386)
sogko Jun 9, 2016
fdd0e5b
Evaluate list elements concurrently
andreas Jun 9, 2016
99486b5
Simplify concurrent evaluation of fields
andreas Jun 9, 2016
6414283
Merge pull request #20 from andreas/experiment-parallel-resolve
sogko Jun 11, 2016
14fac79
Merge pull request #21 from andreas/simplify-concurrent-field-evaluation
sogko Jun 11, 2016
e13376c
Merge branch 'master' into sogko/0.6.0
sogko Jan 27, 2017
0ba9e8f
Merge branch 'sogko/0.6.0' into sogko/experiment-parallel-resolve
sogko Jan 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ var _ Output = (*NonNull)(nil)
// Composite interface for types that may describe the parent context of a selection set.
type Composite interface {
Name() string
Description() string
String() string
Error() error
}

var _ Composite = (*Object)(nil)
Expand Down
47 changes: 45 additions & 2 deletions directives.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,39 @@
package graphql

const (
// Operations
DirectiveLocationQuery = "QUERY"
DirectiveLocationMutation = "MUTATION"
DirectiveLocationSubscription = "SUBSCRIPTION"
DirectiveLocationField = "FIELD"
DirectiveLocationFragmentDefinition = "FRAGMENT_DEFINITION"
DirectiveLocationFragmentSpread = "FRAGMENT_SPREAD"
DirectiveLocationInlineFragment = "INLINE_FRAGMENT"

// Schema Definitions
DirectiveLocationSchema = "SCHEMA"
DirectiveLocationScalar = "SCALAR"
DirectiveLocationObject = "OBJECT"
DirectiveLocationFieldDefinition = "FIELD_DEFINITION"
DirectiveLocationArgumentDefinition = "ARGUMENT_DEFINITION"
DirectiveLocationInterface = "INTERFACE"
DirectiveLocationUnion = "UNION"
DirectiveLocationEnum = "ENUM"
DirectiveLocationEnumValue = "ENUM_VALUE"
DirectiveLocationInputObject = "INPUT_OBJECT"
DirectiveLocationInputFieldDefinition = "INPUT_FIELD_DEFINITION"
)

// DefaultDeprecationReason Constant string used for default reason for a deprecation.
const DefaultDeprecationReason = "No longer supported"

// SpecifiedRules The full list of specified directives.
var SpecifiedDirectives = []*Directive{
IncludeDirective,
SkipDirective,
DeprecatedDirective,
}

// Directive structs are used by the GraphQL runtime as a way of modifying execution
// behavior. Type system creators will usually not create these directly.
type Directive struct {
Expand Down Expand Up @@ -76,7 +100,7 @@ func NewDirective(config DirectiveConfig) *Directive {
return dir
}

// IncludeDirective is used to conditionally include fields or fragments
// IncludeDirective is used to conditionally include fields or fragments.
var IncludeDirective = NewDirective(DirectiveConfig{
Name: "include",
Description: "Directs the executor to include this field or fragment only when " +
Expand All @@ -94,7 +118,7 @@ var IncludeDirective = NewDirective(DirectiveConfig{
},
})

// SkipDirective Used to conditionally skip (exclude) fields or fragments
// SkipDirective Used to conditionally skip (exclude) fields or fragments.
var SkipDirective = NewDirective(DirectiveConfig{
Name: "skip",
Description: "Directs the executor to skip this field or fragment when the `if` " +
Expand All @@ -111,3 +135,22 @@ var SkipDirective = NewDirective(DirectiveConfig{
DirectiveLocationInlineFragment,
},
})

// DeprecatedDirective Used to declare element of a GraphQL schema as deprecated.
var DeprecatedDirective = NewDirective(DirectiveConfig{
Name: "deprecated",
Description: "Marks an element of a GraphQL schema as no longer supported.",
Args: FieldConfigArgument{
"reason": &ArgumentConfig{
Type: String,
Description: "Explains why this element was deprecated, usually also including a " +
"suggestion for how to access supported similar data. Formatted" +
"in [Markdown](https://daringfireball.net/projects/markdown/).",
DefaultValue: DefaultDeprecationReason,
},
},
Locations: []string{
DirectiveLocationFieldDefinition,
DirectiveLocationEnumValue,
},
})
12 changes: 0 additions & 12 deletions directives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,6 @@ func TestDirectivesWorksOnInlineFragmentIfFalseOmitsInlineFragment(t *testing.T)
b
}
}
fragment Frag on TestType {
b
}
`
expected := &graphql.Result{
Data: map[string]interface{}{
Expand All @@ -368,9 +365,6 @@ func TestDirectivesWorksOnInlineFragmentIfTrueIncludesInlineFragment(t *testing.
b
}
}
fragment Frag on TestType {
b
}
`
expected := &graphql.Result{
Data: map[string]interface{}{
Expand All @@ -395,9 +389,6 @@ func TestDirectivesWorksOnInlineFragmentUnlessFalseIncludesInlineFragment(t *tes
b
}
}
fragment Frag on TestType {
b
}
`
expected := &graphql.Result{
Data: map[string]interface{}{
Expand All @@ -422,9 +413,6 @@ func TestDirectivesWorksOnInlineFragmentUnlessTrueIncludesInlineFragment(t *test
b
}
}
fragment Frag on TestType {
b
}
`
expected := &graphql.Result{
Data: map[string]interface{}{
Expand Down
120 changes: 98 additions & 22 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"

"github.com/graphql-go/graphql/gqlerrors"
"github.com/graphql-go/graphql/language/ast"
Expand Down Expand Up @@ -48,8 +49,8 @@ func Execute(p ExecuteParams) (result *Result) {
if r, ok := r.(error); ok {
err = gqlerrors.FormatError(r)
}
exeContext.Errors = append(exeContext.Errors, gqlerrors.FormatError(err))
result.Errors = exeContext.Errors
exeContext.AppendError(err)
result.Errors = exeContext.Errors()
}
}()

Expand All @@ -76,12 +77,39 @@ type ExecutionContext struct {
Root interface{}
Operation ast.Definition
VariableValues map[string]interface{}
Errors []gqlerrors.FormattedError
Context context.Context

errLock sync.RWMutex
errors []gqlerrors.FormattedError
}

func (eCtx *ExecutionContext) AppendError(errs ...error) {
formattedErrors := []gqlerrors.FormattedError{}
for _, err := range errs {
formattedErrors = append(formattedErrors, gqlerrors.FormatError(err))
}
eCtx.errLock.Lock()
eCtx.errors = append(eCtx.errors, formattedErrors...)
eCtx.errLock.Unlock()
}

func (eCtx *ExecutionContext) Errors() (res []gqlerrors.FormattedError) {
eCtx.errLock.RLock()
res = eCtx.errors
eCtx.errLock.RUnlock()
return res
}

func (eCtx *ExecutionContext) SetErrors(errors []gqlerrors.FormattedError) {
eCtx.errLock.Lock()
eCtx.errors = errors
eCtx.errLock.Unlock()
}

func buildExecutionContext(p BuildExecutionCtxParams) (*ExecutionContext, error) {
eCtx := &ExecutionContext{}
eCtx := &ExecutionContext{
errLock: sync.RWMutex{},
}
var operation *ast.OperationDefinition
fragments := map[string]ast.Definition{}

Expand Down Expand Up @@ -122,7 +150,7 @@ func buildExecutionContext(p BuildExecutionCtxParams) (*ExecutionContext, error)
eCtx.Root = p.Root
eCtx.Operation = operation
eCtx.VariableValues = variableValues
eCtx.Errors = p.Errors
eCtx.SetErrors(p.Errors)
eCtx.Context = p.Context
return eCtx, nil
}
Expand Down Expand Up @@ -233,7 +261,7 @@ func executeFieldsSerially(p ExecuteFieldsParams) *Result {

return &Result{
Data: finalResults,
Errors: p.ExecutionContext.Errors,
Errors: p.ExecutionContext.Errors(),
}
}

Expand All @@ -246,18 +274,42 @@ func executeFields(p ExecuteFieldsParams) *Result {
p.Fields = map[string][]*ast.Field{}
}

finalResults := map[string]interface{}{}
// concurrently resolve fields
wg := sync.WaitGroup{}
mtx := sync.Mutex{}
finalResults := make(map[string]interface{}, len(p.Fields))
panics := make(chan interface{}, len(p.Fields))
for responseName, fieldASTs := range p.Fields {
resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs)
if state.hasNoFieldDefs {
continue
}
finalResults[responseName] = resolved
wg.Add(1)
go func(responseName string, fieldASTs []*ast.Field) {
defer func() {
if r := recover(); r != nil {
panics <- r
}
wg.Done()
}()
resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs)
if state.hasNoFieldDefs {
return
}
mtx.Lock()
finalResults[responseName] = resolved
mtx.Unlock()
}(responseName, fieldASTs)
}

// wait for all routines to complete and then perform clean up
wg.Wait()
close(panics)

// re-panic if a goroutine panicked
for p := range panics {
panic(p)
}

return &Result{
Data: finalResults,
Errors: p.ExecutionContext.Errors,
Errors: p.ExecutionContext.Errors(),
}
}

Expand All @@ -266,6 +318,7 @@ type CollectFieldsParams struct {
RuntimeType *Object // previously known as OperationType
SelectionSet *ast.SelectionSet
Fields map[string][]*ast.Field
FieldOrder []string
VisitedFragmentNames map[string]bool
}

Expand Down Expand Up @@ -480,7 +533,6 @@ func resolveField(eCtx *ExecutionContext, parentType *Object, source interface{}
var returnType Output
defer func() (interface{}, resolveFieldResultState) {
if r := recover(); r != nil {

var err error
if r, ok := r.(string); ok {
err = NewLocatedError(
Expand All @@ -495,7 +547,7 @@ func resolveField(eCtx *ExecutionContext, parentType *Object, source interface{}
if _, ok := returnType.(*NonNull); ok {
panic(gqlerrors.FormatError(err))
}
eCtx.Errors = append(eCtx.Errors, gqlerrors.FormatError(err))
eCtx.AppendError(err)
return result, resultState
}
return result, resultState
Expand Down Expand Up @@ -545,7 +597,8 @@ func resolveField(eCtx *ExecutionContext, parentType *Object, source interface{}
})

if resolveFnError != nil {
panic(gqlerrors.FormatError(resolveFnError))
eCtx.AppendError(resolveFnError)
return nil, resultState
}

completed := completeValueCatchingError(eCtx, returnType, fieldASTs, info, result)
Expand All @@ -561,7 +614,7 @@ func completeValueCatchingError(eCtx *ExecutionContext, returnType Type, fieldAS
panic(r)
}
if err, ok := r.(gqlerrors.FormattedError); ok {
eCtx.Errors = append(eCtx.Errors, err)
eCtx.AppendError(err)
}
return completed
}
Expand Down Expand Up @@ -664,7 +717,9 @@ func completeAbstractValue(eCtx *ExecutionContext, returnType Abstract, fieldAST
}

err := invariant(runtimeType != nil,
fmt.Sprintf(`Could not determine runtime type of value "%v" for field %v.%v.`, result, info.ParentType, info.FieldName),
fmt.Sprintf(`Abstract type %v must resolve to an Object type at runtime `+
`for field %v.%v with value "%v", received "%v".`,
returnType, info.ParentType, info.FieldName, result, runtimeType),
)
if err != nil {
panic(err)
Expand Down Expand Up @@ -755,13 +810,34 @@ func completeListValue(eCtx *ExecutionContext, returnType *List, fieldASTs []*as
panic(gqlerrors.FormatError(err))
}

// concurrently resolve list elements
itemType := returnType.OfType
completedResults := []interface{}{}
wg := sync.WaitGroup{}
completedResults := make([]interface{}, resultVal.Len())
panics := make(chan interface{}, resultVal.Len())
for i := 0; i < resultVal.Len(); i++ {
val := resultVal.Index(i).Interface()
completedItem := completeValueCatchingError(eCtx, itemType, fieldASTs, info, val)
completedResults = append(completedResults, completedItem)
wg.Add(1)
go func(j int) {
defer func() {
if r := recover(); r != nil {
panics <- r
}
wg.Done()
}()
val := resultVal.Index(j).Interface()
completedResults[j] = completeValueCatchingError(eCtx, itemType, fieldASTs, info, val)
}(i)
}

// wait for all routines to complete and then perform clean up
wg.Wait()
close(panics)

// re-panic if a goroutine panicked
for p := range panics {
panic(p)
}

return completedResults
}

Expand Down
Loading