Skip to content

Subscription execution #495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Aug 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
bd370d7
added subscribe support
branden-blackline Jul 14, 2019
e96c7c7
removing ordered fields code to keep PRs separate
branden-blackline Jul 14, 2019
a8d0d00
adding waitgroups to handle race
branden-blackline Jul 15, 2019
9cf0da7
Updating ResultIterator api and adding doneFunc per handler
branden-blackline Jul 16, 2019
4a374e3
ensure subscribe and resolve use the same cancellable context
branden-blackline Aug 18, 2019
ec949b3
switching from mutex to waitgroups and adding a subscriber type with …
branden-blackline Sep 11, 2019
08de84d
Merge branch 'master' into subscription-execution
bhoriuchi Dec 2, 2019
d6ffa49
Merge branch 'master' into subscription-execution
bhoriuchi Mar 9, 2020
f21d0c7
removing ResultIterator in favor of a result channel
branden-blackline May 17, 2020
a55996a
Merge branch 'master' into subscription-execution
bhoriuchi May 17, 2020
c991585
rewritten function signature and more tests
remorses May 18, 2020
ccd052d
initial tests pass
remorses May 18, 2020
7e815e2
added test for subscriptions parse errors
remorses May 18, 2020
2b63a00
handle errors in ExecuteSubscribe
remorses May 18, 2020
6baca7e
subscription: more tests cases
remorses May 18, 2020
a673078
subscription_test: refactored tests
remorses May 18, 2020
d55e8cb
subscription: removed some todos
remorses May 18, 2020
4e255bf
removed todos and prints
remorses May 18, 2020
1fc61f7
subscription: added more comments
remorses May 18, 2020
183515e
subscription: SubscriptableSchema conforms to ws interface
remorses May 18, 2020
77ab10b
subscription: added loop in subscriptabel schema
remorses May 18, 2020
ab87417
subscription: removed a print
remorses May 19, 2020
b63505c
subscription_test: added more tests for resolver behaviour
remorses May 19, 2020
3b13576
subscription_test: test for `panic inside subscribe is recovered`
remorses May 19, 2020
44a2828
subscription: added a comment
remorses May 19, 2020
eb0fdae
subscription: removed SubscriptableSchema
remorses May 19, 2020
d0a91ba
subscription: removed a print
remorses May 19, 2020
fd2a748
Merge pull request #1 from remorses/subscription-execution
bhoriuchi May 19, 2020
d54fb02
adding context cancel back to pass CI tests
branden-blackline May 19, 2020
7858706
Merge branch 'master' into subscription-execution
bhoriuchi Jul 1, 2020
7fe5fa1
Merge branch 'master' into subscription-execution
bhoriuchi Aug 20, 2020
5758832
Merge branch 'master' into subscription-execution
bhoriuchi Oct 14, 2020
88c3051
Merge branch 'master' into subscription-execution
bhoriuchi Nov 23, 2020
fe5921f
Merge branch 'master' into subscription-execution
bhoriuchi Jan 4, 2021
0770b96
Merge branch 'master' into subscription-execution
chris-ramon Jan 31, 2021
4ce4c9a
Merge branch 'master' into subscription-execution
bhoriuchi Feb 24, 2021
f51a3a2
Merge branch 'master' into subscription-execution
bhoriuchi Apr 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func defineFieldMap(ttype Named, fieldMap Fields) (FieldDefinitionMap, error) {
Description: field.Description,
Type: field.Type,
Resolve: field.Resolve,
Subscribe: field.Subscribe,
DeprecationReason: field.DeprecationReason,
}

Expand Down Expand Up @@ -606,6 +607,7 @@ type Field struct {
Type Output `json:"type"`
Args FieldConfigArgument `json:"args"`
Resolve FieldResolveFn `json:"-"`
Subscribe FieldResolveFn `json:"-"`
DeprecationReason string `json:"deprecationReason"`
Description string `json:"description"`
}
Expand All @@ -625,6 +627,7 @@ type FieldDefinition struct {
Type Output `json:"type"`
Args []*Argument `json:"args"`
Resolve FieldResolveFn `json:"-"`
Subscribe FieldResolveFn `json:"-"`
DeprecationReason string `json:"deprecationReason"`
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
module github.com/graphql-go/graphql

go 1.13
228 changes: 228 additions & 0 deletions subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package graphql

import (
"context"
"fmt"

"github.com/graphql-go/graphql/gqlerrors"
"github.com/graphql-go/graphql/language/parser"
"github.com/graphql-go/graphql/language/source"
)

// SubscribeParams parameters for subscribing
type SubscribeParams struct {
Schema Schema
RequestString string
RootValue interface{}
// ContextValue context.Context
VariableValues map[string]interface{}
OperationName string
FieldResolver FieldResolveFn
FieldSubscriber FieldResolveFn
}

// Subscribe performs a subscribe operation on the given query and schema
// To finish a subscription you can simply close the channel from inside the `Subscribe` function
// currently does not support extensions hooks
func Subscribe(p Params) chan *Result {

source := source.NewSource(&source.Source{
Body: []byte(p.RequestString),
Name: "GraphQL request",
})

// TODO run extensions hooks

// parse the source
AST, err := parser.Parse(parser.ParseParams{Source: source})
if err != nil {

// merge the errors from extensions and the original error from parser
return sendOneResultAndClose(&Result{
Errors: gqlerrors.FormatErrors(err),
})
}

// validate document
validationResult := ValidateDocument(&p.Schema, AST, nil)

if !validationResult.IsValid {
// run validation finish functions for extensions
return sendOneResultAndClose(&Result{
Errors: validationResult.Errors,
})

}
return ExecuteSubscription(ExecuteParams{
Schema: p.Schema,
Root: p.RootObject,
AST: AST,
OperationName: p.OperationName,
Args: p.VariableValues,
Context: p.Context,
})
}

func sendOneResultAndClose(res *Result) chan *Result {
resultChannel := make(chan *Result, 1)
resultChannel <- res
close(resultChannel)
return resultChannel
}

// ExecuteSubscription is similar to graphql.Execute but returns a channel instead of a Result
// currently does not support extensions
func ExecuteSubscription(p ExecuteParams) chan *Result {

if p.Context == nil {
p.Context = context.Background()
}

var mapSourceToResponse = func(payload interface{}) *Result {
return Execute(ExecuteParams{
Schema: p.Schema,
Root: payload,
AST: p.AST,
OperationName: p.OperationName,
Args: p.Args,
Context: p.Context,
})
}
var resultChannel = make(chan *Result)
go func() {
defer close(resultChannel)
defer func() {
if err := recover(); err != nil {
e, ok := err.(error)
if !ok {
return
}
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(e),
}
}
return
}()

exeContext, err := buildExecutionContext(buildExecutionCtxParams{
Schema: p.Schema,
Root: p.Root,
AST: p.AST,
OperationName: p.OperationName,
Args: p.Args,
Context: p.Context,
})

if err != nil {
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(err),
}

return
}

operationType, err := getOperationRootType(p.Schema, exeContext.Operation)
if err != nil {
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(err),
}

return
}

fields := collectFields(collectFieldsParams{
ExeContext: exeContext,
RuntimeType: operationType,
SelectionSet: exeContext.Operation.GetSelectionSet(),
})

responseNames := []string{}
for name := range fields {
responseNames = append(responseNames, name)
}
responseName := responseNames[0]
fieldNodes := fields[responseName]
fieldNode := fieldNodes[0]
fieldName := fieldNode.Name.Value
fieldDef := getFieldDef(p.Schema, operationType, fieldName)

if fieldDef == nil {
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription field %q is not defined", fieldName)),
}

return
}

resolveFn := fieldDef.Subscribe

if resolveFn == nil {
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription function %q is not defined", fieldName)),
}
return
}
fieldPath := &ResponsePath{
Key: responseName,
}

args := getArgumentValues(fieldDef.Args, fieldNode.Arguments, exeContext.VariableValues)
info := ResolveInfo{
FieldName: fieldName,
FieldASTs: fieldNodes,
Path: fieldPath,
ReturnType: fieldDef.Type,
ParentType: operationType,
Schema: p.Schema,
Fragments: exeContext.Fragments,
RootValue: exeContext.Root,
Operation: exeContext.Operation,
VariableValues: exeContext.VariableValues,
}

fieldResult, err := resolveFn(ResolveParams{
Source: p.Root,
Args: args,
Info: info,
Context: p.Context,
})
if err != nil {
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(err),
}

return
}

if fieldResult == nil {
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(fmt.Errorf("no field result")),
}

return
}

switch fieldResult.(type) {
case chan interface{}:
sub := fieldResult.(chan interface{})
for {
select {
case <-p.Context.Done():
return

case res, more := <-sub:
if !more {
return
}
resultChannel <- mapSourceToResponse(res)
}
}
default:
resultChannel <- mapSourceToResponse(fieldResult)
return
}
}()

// return a result channel
return resultChannel
}
Loading