Skip to content

Commit f02a1c9

Browse files
authored
Merge pull request #495 from bhoriuchi/subscription-execution
Subscription execution
2 parents 8a92e97 + f51a3a2 commit f02a1c9

File tree

5 files changed

+669
-0
lines changed

5 files changed

+669
-0
lines changed

definition.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,7 @@ func defineFieldMap(ttype Named, fieldMap Fields) (FieldDefinitionMap, error) {
534534
Description: field.Description,
535535
Type: field.Type,
536536
Resolve: field.Resolve,
537+
Subscribe: field.Subscribe,
537538
DeprecationReason: field.DeprecationReason,
538539
}
539540

@@ -606,6 +607,7 @@ type Field struct {
606607
Type Output `json:"type"`
607608
Args FieldConfigArgument `json:"args"`
608609
Resolve FieldResolveFn `json:"-"`
610+
Subscribe FieldResolveFn `json:"-"`
609611
DeprecationReason string `json:"deprecationReason"`
610612
Description string `json:"description"`
611613
}
@@ -625,6 +627,7 @@ type FieldDefinition struct {
625627
Type Output `json:"type"`
626628
Args []*Argument `json:"args"`
627629
Resolve FieldResolveFn `json:"-"`
630+
Subscribe FieldResolveFn `json:"-"`
628631
DeprecationReason string `json:"deprecationReason"`
629632
}
630633

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
module github.com/graphql-go/graphql
2+
3+
go 1.13

subscription.go

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package graphql
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/graphql-go/graphql/gqlerrors"
8+
"github.com/graphql-go/graphql/language/parser"
9+
"github.com/graphql-go/graphql/language/source"
10+
)
11+
12+
// SubscribeParams parameters for subscribing
13+
type SubscribeParams struct {
14+
Schema Schema
15+
RequestString string
16+
RootValue interface{}
17+
// ContextValue context.Context
18+
VariableValues map[string]interface{}
19+
OperationName string
20+
FieldResolver FieldResolveFn
21+
FieldSubscriber FieldResolveFn
22+
}
23+
24+
// Subscribe performs a subscribe operation on the given query and schema
25+
// To finish a subscription you can simply close the channel from inside the `Subscribe` function
26+
// currently does not support extensions hooks
27+
func Subscribe(p Params) chan *Result {
28+
29+
source := source.NewSource(&source.Source{
30+
Body: []byte(p.RequestString),
31+
Name: "GraphQL request",
32+
})
33+
34+
// TODO run extensions hooks
35+
36+
// parse the source
37+
AST, err := parser.Parse(parser.ParseParams{Source: source})
38+
if err != nil {
39+
40+
// merge the errors from extensions and the original error from parser
41+
return sendOneResultAndClose(&Result{
42+
Errors: gqlerrors.FormatErrors(err),
43+
})
44+
}
45+
46+
// validate document
47+
validationResult := ValidateDocument(&p.Schema, AST, nil)
48+
49+
if !validationResult.IsValid {
50+
// run validation finish functions for extensions
51+
return sendOneResultAndClose(&Result{
52+
Errors: validationResult.Errors,
53+
})
54+
55+
}
56+
return ExecuteSubscription(ExecuteParams{
57+
Schema: p.Schema,
58+
Root: p.RootObject,
59+
AST: AST,
60+
OperationName: p.OperationName,
61+
Args: p.VariableValues,
62+
Context: p.Context,
63+
})
64+
}
65+
66+
func sendOneResultAndClose(res *Result) chan *Result {
67+
resultChannel := make(chan *Result, 1)
68+
resultChannel <- res
69+
close(resultChannel)
70+
return resultChannel
71+
}
72+
73+
// ExecuteSubscription is similar to graphql.Execute but returns a channel instead of a Result
74+
// currently does not support extensions
75+
func ExecuteSubscription(p ExecuteParams) chan *Result {
76+
77+
if p.Context == nil {
78+
p.Context = context.Background()
79+
}
80+
81+
var mapSourceToResponse = func(payload interface{}) *Result {
82+
return Execute(ExecuteParams{
83+
Schema: p.Schema,
84+
Root: payload,
85+
AST: p.AST,
86+
OperationName: p.OperationName,
87+
Args: p.Args,
88+
Context: p.Context,
89+
})
90+
}
91+
var resultChannel = make(chan *Result)
92+
go func() {
93+
defer close(resultChannel)
94+
defer func() {
95+
if err := recover(); err != nil {
96+
e, ok := err.(error)
97+
if !ok {
98+
return
99+
}
100+
resultChannel <- &Result{
101+
Errors: gqlerrors.FormatErrors(e),
102+
}
103+
}
104+
return
105+
}()
106+
107+
exeContext, err := buildExecutionContext(buildExecutionCtxParams{
108+
Schema: p.Schema,
109+
Root: p.Root,
110+
AST: p.AST,
111+
OperationName: p.OperationName,
112+
Args: p.Args,
113+
Context: p.Context,
114+
})
115+
116+
if err != nil {
117+
resultChannel <- &Result{
118+
Errors: gqlerrors.FormatErrors(err),
119+
}
120+
121+
return
122+
}
123+
124+
operationType, err := getOperationRootType(p.Schema, exeContext.Operation)
125+
if err != nil {
126+
resultChannel <- &Result{
127+
Errors: gqlerrors.FormatErrors(err),
128+
}
129+
130+
return
131+
}
132+
133+
fields := collectFields(collectFieldsParams{
134+
ExeContext: exeContext,
135+
RuntimeType: operationType,
136+
SelectionSet: exeContext.Operation.GetSelectionSet(),
137+
})
138+
139+
responseNames := []string{}
140+
for name := range fields {
141+
responseNames = append(responseNames, name)
142+
}
143+
responseName := responseNames[0]
144+
fieldNodes := fields[responseName]
145+
fieldNode := fieldNodes[0]
146+
fieldName := fieldNode.Name.Value
147+
fieldDef := getFieldDef(p.Schema, operationType, fieldName)
148+
149+
if fieldDef == nil {
150+
resultChannel <- &Result{
151+
Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription field %q is not defined", fieldName)),
152+
}
153+
154+
return
155+
}
156+
157+
resolveFn := fieldDef.Subscribe
158+
159+
if resolveFn == nil {
160+
resultChannel <- &Result{
161+
Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription function %q is not defined", fieldName)),
162+
}
163+
return
164+
}
165+
fieldPath := &ResponsePath{
166+
Key: responseName,
167+
}
168+
169+
args := getArgumentValues(fieldDef.Args, fieldNode.Arguments, exeContext.VariableValues)
170+
info := ResolveInfo{
171+
FieldName: fieldName,
172+
FieldASTs: fieldNodes,
173+
Path: fieldPath,
174+
ReturnType: fieldDef.Type,
175+
ParentType: operationType,
176+
Schema: p.Schema,
177+
Fragments: exeContext.Fragments,
178+
RootValue: exeContext.Root,
179+
Operation: exeContext.Operation,
180+
VariableValues: exeContext.VariableValues,
181+
}
182+
183+
fieldResult, err := resolveFn(ResolveParams{
184+
Source: p.Root,
185+
Args: args,
186+
Info: info,
187+
Context: p.Context,
188+
})
189+
if err != nil {
190+
resultChannel <- &Result{
191+
Errors: gqlerrors.FormatErrors(err),
192+
}
193+
194+
return
195+
}
196+
197+
if fieldResult == nil {
198+
resultChannel <- &Result{
199+
Errors: gqlerrors.FormatErrors(fmt.Errorf("no field result")),
200+
}
201+
202+
return
203+
}
204+
205+
switch fieldResult.(type) {
206+
case chan interface{}:
207+
sub := fieldResult.(chan interface{})
208+
for {
209+
select {
210+
case <-p.Context.Done():
211+
return
212+
213+
case res, more := <-sub:
214+
if !more {
215+
return
216+
}
217+
resultChannel <- mapSourceToResponse(res)
218+
}
219+
}
220+
default:
221+
resultChannel <- mapSourceToResponse(fieldResult)
222+
return
223+
}
224+
}()
225+
226+
// return a result channel
227+
return resultChannel
228+
}

0 commit comments

Comments
 (0)