From 18826e10142e3c33fbbf9ec52dd6bf3351174754 Mon Sep 17 00:00:00 2001 From: Lee Byron Date: Mon, 24 Apr 2017 20:40:25 -0700 Subject: [PATCH 01/11] Rough take on subscriptions --- src/execution/execute.js | 17 +- .../eventEmitterAsyncIterator-test.js | 62 +++++ .../__tests__/eventEmitterAsyncIterator.js | 74 ++++++ .../__tests__/mapAsyncIterator-test.js | 193 ++++++++++++++++ src/subscription/__tests__/subscribe-test.js | 215 ++++++++++++++++++ src/subscription/mapAsyncIterator.js | 51 +++++ src/subscription/subscribe.js | 184 +++++++++++++++ 7 files changed, 789 insertions(+), 7 deletions(-) create mode 100644 src/subscription/__tests__/eventEmitterAsyncIterator-test.js create mode 100644 src/subscription/__tests__/eventEmitterAsyncIterator.js create mode 100644 src/subscription/__tests__/mapAsyncIterator-test.js create mode 100644 src/subscription/__tests__/subscribe-test.js create mode 100644 src/subscription/mapAsyncIterator.js create mode 100644 src/subscription/subscribe.js diff --git a/src/execution/execute.js b/src/execution/execute.js index a6da446f7f..d100d96886 100644 --- a/src/execution/execute.js +++ b/src/execution/execute.js @@ -80,7 +80,7 @@ import type { * Namely, schema of the type system that is currently executing, * and the fragments defined in the query document */ -type ExecutionContext = { +export type ExecutionContext = { schema: GraphQLSchema; fragments: {[key: string]: FragmentDefinitionNode}; rootValue: mixed; @@ -183,8 +183,11 @@ export function responsePathAsArray( return flattened.reverse(); } - -function addPath(prev: ResponsePath, key: string | number) { +/** + * Given a ResponsePath and a key, return a new ResponsePath containing the + * new key. + */ +export function addPath(prev: ResponsePath, key: string | number) { return { prev, key }; } @@ -194,7 +197,7 @@ function addPath(prev: ResponsePath, key: string | number) { * * Throws a GraphQLError if a valid execution context cannot be created. */ -function buildExecutionContext( +export function buildExecutionContext( schema: GraphQLSchema, document: DocumentNode, rootValue: mixed, @@ -280,7 +283,7 @@ function executeOperation( /** * Extracts the root type of the operation from the schema. */ -function getOperationRootType( +export function getOperationRootType( schema: GraphQLSchema, operation: OperationDefinitionNode ): GraphQLObjectType { @@ -408,7 +411,7 @@ function executeFields( * returns an Interface or Union type, the "runtime type" will be the actual * Object type returned by that field. */ -function collectFields( +export function collectFields( exeContext: ExecutionContext, runtimeType: GraphQLObjectType, selectionSet: SelectionSetNode, @@ -1178,7 +1181,7 @@ function getPromise(value: Promise | mixed): Promise | void { * added to the query type, but that would require mutating type * definitions, which would cause issues. */ -function getFieldDef( +export function getFieldDef( schema: GraphQLSchema, parentType: GraphQLObjectType, fieldName: string diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator-test.js b/src/subscription/__tests__/eventEmitterAsyncIterator-test.js new file mode 100644 index 0000000000..9c3b6f1ed4 --- /dev/null +++ b/src/subscription/__tests__/eventEmitterAsyncIterator-test.js @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import EventEmitter from 'events'; +import eventEmitterAsyncIterator from './eventEmitterAsyncIterator'; + +describe('eventEmitterAsyncIterator', () => { + + it('subscribe async-iterator mock', async () => { + // Create an AsyncIterator from an EventEmitter + const emitter = new EventEmitter(); + const iterator = eventEmitterAsyncIterator(emitter, 'publish'); + + // Queue up publishes + expect(emitter.emit('publish', 'Apple')).to.equal(true); + expect(emitter.emit('publish', 'Banana')).to.equal(true); + + // Read payloads + expect(await iterator.next()).to.deep.equal( + { done: false, value: 'Apple' } + ); + expect(await iterator.next()).to.deep.equal( + { done: false, value: 'Banana' } + ); + + // Read ahead + const i3 = iterator.next().then(x => x); + const i4 = iterator.next().then(x => x); + + // Publish + expect(emitter.emit('publish', 'Coconut')).to.equal(true); + expect(emitter.emit('publish', 'Durian')).to.equal(true); + + // Await out of order to get correct results + expect(await i4).to.deep.equal({ done: false, value: 'Durian'}); + expect(await i3).to.deep.equal({ done: false, value: 'Coconut'}); + + // Read ahead + const i5 = iterator.next().then(x => x); + + // Terminate emitter + await iterator.return(); + + // Publish is not caught after terminate + expect(emitter.emit('publish', 'Fig')).to.equal(false); + + // Find that cancelled read-ahead got a "done" result + expect(await i5).to.deep.equal({ done: true, value: undefined }); + + // And next returns empty completion value + expect(await iterator.next()).to.deep.equal({ done: true, value: undefined }); + }); +}); diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator.js b/src/subscription/__tests__/eventEmitterAsyncIterator.js new file mode 100644 index 0000000000..4662aa1a04 --- /dev/null +++ b/src/subscription/__tests__/eventEmitterAsyncIterator.js @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * @flow + */ + +import type EventEmitter from 'events'; + +const ASYNC_ITERATOR_SYMBOL = + typeof Symbol === 'function' && Symbol.asyncIterator || '@@asyncIterator'; + +/** + * Create an AsyncIterator from an EventEmitter. Useful for mocking a + * PubSub system for tests. + */ +export default function eventEmitterAsyncIterator( + eventEmitter: EventEmitter, + eventName: string +): AsyncIterator { + let pullQueue = []; + let pushQueue = []; + let listening = true; + eventEmitter.addListener(eventName, pushValue); + + function pushValue(event) { + if (pullQueue.length !== 0) { + pullQueue.shift()({ value: event, done: false }); + } else { + pushQueue.push(event); + } + } + + function pullValue() { + return new Promise(resolve => { + if (pushQueue.length !== 0) { + resolve({ value: pushQueue.shift(), done: false }); + } else { + pullQueue.push(resolve); + } + }); + } + + function emptyQueue() { + if (listening) { + listening = false; + eventEmitter.removeListener(eventName, pushValue); + pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); + pullQueue.length = 0; + pushQueue.length = 0; + } + } + + return { + next() { + return listening ? pullValue() : this.return(); + }, + return() { + emptyQueue(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error) { + emptyQueue(); + return Promise.reject(error); + }, + [ASYNC_ITERATOR_SYMBOL]() { + return this; + }, + }; +} diff --git a/src/subscription/__tests__/mapAsyncIterator-test.js b/src/subscription/__tests__/mapAsyncIterator-test.js new file mode 100644 index 0000000000..1498e33924 --- /dev/null +++ b/src/subscription/__tests__/mapAsyncIterator-test.js @@ -0,0 +1,193 @@ +/** + * Copyright (c) 2015, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +import { expect } from 'chai'; +import { describe, it } from 'mocha'; +import mapAsyncIterator from '../mapAsyncIterator'; + +describe('mapAsyncIterator', () => { + + it('maps over async values', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 6, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('maps over async values with async function', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const doubles = mapAsyncIterator(source(), async (x) => await x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 6, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('allows returning early from async values', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + + // Early return + expect( + await doubles.return() + ).to.deep.equal({ value: undefined, done: true }); + + // Subsequent nexts + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('passes through early return from async values', async () => { + async function* source() { + try { + yield 1; + yield 2; + yield 3; + } finally { + yield 'done'; + yield 'last'; + } + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + + // Early return + expect( + await doubles.return() + ).to.deep.equal({ value: 'donedone', done: false }); + + // Subsequent nexts may yield from finally block + expect( + await doubles.next() + ).to.deep.equal({ value: 'lastlast', done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('allows throwing errors through async generators', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + + // Throw error + let caughtError; + try { + await doubles.throw('ouch') + } catch (e) { + caughtError = e; + } + expect(caughtError).to.equal('ouch'); + + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + + it('passes through caught errors through async generators', async () => { + async function* source() { + try { + yield 1; + yield 2; + yield 3; + } catch (e) { + yield e; + } + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 2, done: false }); + expect( + await doubles.next() + ).to.deep.equal({ value: 4, done: false }); + + // Throw error + expect( + await doubles.throw('ouch') + ).to.deep.equal({ value: 'ouchouch', done: false }); + + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + +}); diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js new file mode 100644 index 0000000000..f950a0de57 --- /dev/null +++ b/src/subscription/__tests__/subscribe-test.js @@ -0,0 +1,215 @@ +/** + * Copyright (c) 2015, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +import { expect } from 'chai'; +import { describe, it } from 'mocha'; +import EventEmitter from 'events'; +import eventEmitterAsyncIterator from './eventEmitterAsyncIterator'; +import {subscribe} from '../subscribe'; +import { parse } from '../../language'; +import { + GraphQLSchema, + GraphQLObjectType, + GraphQLList, + GraphQLBoolean, + GraphQLInt, + GraphQLString, + GraphQLNonNull, +} from '../../type'; + + +describe('Subscribe', () => { + + const EmailType = new GraphQLObjectType({ + name: 'Email', + fields: { + from: { type: GraphQLString }, + subject: { type: GraphQLString }, + message: { type: GraphQLString }, + unread: { type: GraphQLBoolean }, + } + }); + + const InboxType = new GraphQLObjectType({ + name: 'Inbox', + fields: { + total: { + type: GraphQLInt, + resolve: inbox => inbox.emails.length, + }, + unread: { + type: GraphQLInt, + resolve: inbox => inbox.emails.filter(email => email.unread).length, + }, + emails: { type: new GraphQLList(EmailType) }, + } + }); + + const QueryType = new GraphQLObjectType({ + name: 'Query', + fields: { + inbox: { type: InboxType }, + } + }); + + const EmailEventType = new GraphQLObjectType({ + name: 'EmailEvent', + fields: { + email: { type: EmailType }, + inbox: { type: InboxType }, + } + }); + + const SubscriptionType = new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { type: EmailEventType }, + } + }); + + const emailSchema = new GraphQLSchema({ + query: QueryType, + subscription: SubscriptionType + }); + + it('produces a payload per subscription event', async () => { + + const pubsub = new EventEmitter(); + + const data = { + inbox: { + emails: [ + { + from: 'joe@graphql.org', + subject: 'Hello', + message: 'Hello World', + unread: false, + }, + ], + }, + importantEmail() { + return eventEmitterAsyncIterator(pubsub, 'importantEmail'); + } + }; + + function sendImportantEmail(newEmail) { + data.inbox.emails.push(newEmail); + // Returns true if the event was consumed by a subscriber. + return pubsub.emit('importantEmail', { + importantEmail: { + email: newEmail, + inbox: data.inbox, + } + }); + } + + const ast = parse(` + subscription ($priority: Int = 0) { + importantEmail(priority: $priority) { + email { + from + subject + } + inbox { + unread + total + } + } + } + `); + + // GraphQL `subscribe` has the same call signature as `execute`, but returns + // AsyncIterator instead of Promise. + const subscription = subscribe( + emailSchema, + ast, + data, + null, // context + { priority: 1 } + ); + + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + // The previously waited on payload now has a value. + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + // Another new email arrives, before subscription.next() is called. + expect(sendImportantEmail({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + })).to.equal(true); + + // The next waited on payload will have a value. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'hyo@graphql.org', + subject: 'Tools', + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + + // The client decides to disconnect. + expect(await subscription.return()).to.deep.equal({ + done: true, + value: undefined, + }); + + // Which may result in disconnecting upstream services as well. + expect(sendImportantEmail({ + from: 'adam@graphql.org', + subject: 'Important', + message: 'Read me please', + unread: true, + })).to.equal(false); // No more listeners. + + // Awaiting a subscription after closing it results in completed results. + expect(await subscription.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + +}); diff --git a/src/subscription/mapAsyncIterator.js b/src/subscription/mapAsyncIterator.js new file mode 100644 index 0000000000..ec7c0a1a96 --- /dev/null +++ b/src/subscription/mapAsyncIterator.js @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * @flow + */ + +const ASYNC_ITERATOR_SYMBOL = + typeof Symbol === 'function' && Symbol.asyncIterator || '@@asyncIterator'; + +/** + * Given an AsyncIterator and a callback function, return a new AsyncIterator + * which produces values mapped via calling the callback function. + */ +export default function mapAsyncIterator( + iterator: AsyncIterator, + callback: (value: T) => U +): AsyncIterator { + function mapResult(result) { + return result.done ? + result : + Promise.resolve(callback(result.value)).then( + mapped => ({ value: mapped, done: false }) + ); + } + + return { + next() { + return iterator.next().then(mapResult); + }, + return() { + if (typeof iterator.return === 'function') { + return iterator.return().then(mapResult); + } + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error) { + if (typeof iterator.throw === 'function') { + return iterator.throw(error).then(mapResult); + } + return Promise.reject(error); + }, + [ASYNC_ITERATOR_SYMBOL]() { + return this; + }, + }; +} diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js new file mode 100644 index 0000000000..e4c2bb77fe --- /dev/null +++ b/src/subscription/subscribe.js @@ -0,0 +1,184 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * @flow + */ + +import { + addPath, + buildExecutionContext, + collectFields, + defaultFieldResolver, + execute, + getFieldDef, + getOperationRootType, +} from '../execution/execute'; +import { getArgumentValues } from '../execution/values'; +import { GraphQLSchema } from '../type/schema'; +import invariant from '../jsutils/invariant'; +import mapAsyncIterator from './mapAsyncIterator'; + +import type { + ExecutionContext, + ExecutionResult, +} from '../execution/execute'; +import type { + DocumentNode, + OperationDefinitionNode, +} from '../language/ast'; +import type { GraphQLResolveInfo } from '../type/definition'; + + +/** + * Implements the "Subscribing to request" section of the GraphQL specification. + * + * Returns an AsyncIterator + * + * If the arguments to this function do not result in a legal execution context, + * a GraphQLError will be thrown immediately explaining the invalid input. + */ +export function subscribe( + schema: GraphQLSchema, + document: DocumentNode, + rootValue?: mixed, + contextValue?: mixed, + variableValues?: ?{[key: string]: mixed}, + operationName?: ?string, +): AsyncIterator { + // Note: these invariants are identical to execute.js + invariant(schema, 'Must provide schema'); + invariant(document, 'Must provide document'); + invariant( + schema instanceof GraphQLSchema, + 'Schema must be an instance of GraphQLSchema. Also ensure that there are ' + + 'not multiple versions of GraphQL installed in your node_modules directory.' + ); + + // Variables, if provided, must be an object. + invariant( + !variableValues || typeof variableValues === 'object', + 'Variables must be provided as an Object where each property is a ' + + 'variable value. Perhaps look to see if an unparsed JSON string ' + + 'was provided.' + ); + + // If a valid context cannot be created due to incorrect arguments, + // this will throw an error. + const exeContext = buildExecutionContext( + schema, + document, + rootValue, + contextValue, + variableValues, + operationName + ); + + // Call the `subscribe()` resolver or the default resolver to produce an + // AsyncIterable yielding raw payloads. + const subscription = resolveSubscription( + exeContext, + exeContext.operation, + rootValue + ); + + // For each payload yielded from a subscription, map it over the normal + // GraphQL `execute` function, with `payload` as the rootValue. + return mapAsyncIterator( + subscription, + payload => execute( + schema, + document, + payload, + contextValue, + variableValues, + operationName + ) + ); +} + +function resolveSubscription( + exeContext: ExecutionContext, + operation: OperationDefinitionNode, + rootValue: mixed +): AsyncIterator { + // Note: this function is almost the same as executeOperation() and + // resolveField() with only a few minor differences. + + const type = getOperationRootType(exeContext.schema, exeContext.operation); + const fields = collectFields( + exeContext, + type, + exeContext.operation.selectionSet, + Object.create(null), + Object.create(null) + ); + + const responseNames = Object.keys(fields); + invariant( + responseNames.length === 1, + 'A subscription must contain exactly one field.' + ); + const responseName = responseNames[0]; + const fieldNodes = fields[responseName]; + const fieldPath = addPath(undefined, responseName); + + const fieldNode = fieldNodes[0]; + const fieldName = fieldNode.name.value; + const fieldDef = getFieldDef(exeContext.schema, type, fieldName); + invariant( + fieldDef, + 'This subscription is not defined by the schema.' + ); + + // TODO: make GraphQLSubscription flow type special to support defining these? + const resolveFn = (fieldDef: any).subscribe || defaultFieldResolver; + + // The resolve function's optional third argument is a context value that + // is provided to every resolve function within an execution. It is commonly + // used to represent an authenticated user, or request-specific caches. + const context = exeContext.contextValue; + + // The resolve function's optional fourth argument is a collection of + // information about the current execution state. + const info: GraphQLResolveInfo = { + fieldName, + fieldNodes, + returnType: fieldDef.type, + parentType: type, + path: fieldPath, + schema: exeContext.schema, + fragments: exeContext.fragments, + rootValue: exeContext.rootValue, + operation: exeContext.operation, + variableValues: exeContext.variableValues, + }; + + // Build a JS object of arguments from the field.arguments AST, using the + // variables scope to fulfill any variable references. + const args = getArgumentValues( + fieldDef, + fieldNode, + exeContext.variableValues + ); + + // TODO: resolveFn could throw! + const subscription = resolveFn(rootValue, args, context, info); + + invariant( + isIterable(subscription), + 'Subscription must return async-iterator.' + ); + + return subscription; +} + +function isIterable(value) { + return typeof value === 'object' && + value !== null && + typeof value.next === 'function'; +} From b8a3c74f1ef26fef1325e08a0cc5a85faa3f7a71 Mon Sep 17 00:00:00 2001 From: Lee Byron Date: Tue, 25 Apr 2017 22:06:16 -0700 Subject: [PATCH 02/11] Minor refactoring of execute.js to make it more clear that most of subscribe.js is calling existing functions. --- .babelrc | 1 + .eslintrc | 2 +- package.json | 3 +- src/execution/__tests__/lists-test.js | 2 +- src/execution/execute.js | 101 ++++++++++-------- src/execution/values.js | 2 +- .../eventEmitterAsyncIterator-test.js | 4 +- .../__tests__/eventEmitterAsyncIterator.js | 10 +- .../__tests__/mapAsyncIterator-test.js | 4 +- src/subscription/__tests__/subscribe-test.js | 1 - src/subscription/mapAsyncIterator.js | 13 ++- src/subscription/subscribe.js | 84 ++++----------- 12 files changed, 101 insertions(+), 126 deletions(-) diff --git a/.babelrc b/.babelrc index 051ddecefc..39d11092ab 100644 --- a/.babelrc +++ b/.babelrc @@ -1,6 +1,7 @@ { "plugins": [ "syntax-async-functions", + "syntax-async-generators", "transform-class-properties", "transform-flow-strip-types", "transform-object-rest-spread", diff --git a/.eslintrc b/.eslintrc index dc76829be0..3421ddbd43 100644 --- a/.eslintrc +++ b/.eslintrc @@ -70,7 +70,7 @@ "eqeqeq": ["error", "smart"], "func-names": 0, "func-style": 0, - "generator-star-spacing": [2, {"before": true, "after": false}], + "generator-star-spacing": [2, {"before": false, "after": true}], "guard-for-in": 2, "handle-callback-err": [2, "error"], "id-length": 0, diff --git a/package.json b/package.json index 356721e6d4..a611e52357 100644 --- a/package.json +++ b/package.json @@ -36,13 +36,14 @@ "prepublish": ". ./resources/prepublish.sh" }, "dependencies": { - "iterall": "^1.0.0" + "iterall": "1.1.0" }, "devDependencies": { "babel-cli": "6.24.1", "babel-eslint": "7.2.3", "babel-plugin-check-es2015-constants": "6.22.0", "babel-plugin-syntax-async-functions": "6.13.0", + "babel-plugin-syntax-async-generators": "6.13.0", "babel-plugin-transform-class-properties": "6.24.1", "babel-plugin-transform-es2015-arrow-functions": "6.22.0", "babel-plugin-transform-es2015-block-scoped-functions": "6.22.0", diff --git a/src/execution/__tests__/lists-test.js b/src/execution/__tests__/lists-test.js index 6985015f94..ad621e8801 100644 --- a/src/execution/__tests__/lists-test.js +++ b/src/execution/__tests__/lists-test.js @@ -76,7 +76,7 @@ describe('Execute: Accepts any iterable as list value', () => { { data: { nest: { test: [ 'apple', 'banana', 'coconut' ] } } } )); - function *yieldItems() { + function* yieldItems() { yield 'one'; yield 2; yield true; diff --git a/src/execution/execute.js b/src/execution/execute.js index d100d96886..112baf7138 100644 --- a/src/execution/execute.js +++ b/src/execution/execute.js @@ -117,22 +117,6 @@ export function execute( variableValues?: ?{[key: string]: mixed}, operationName?: ?string ): Promise { - invariant(schema, 'Must provide schema'); - invariant(document, 'Must provide document'); - invariant( - schema instanceof GraphQLSchema, - 'Schema must be an instance of GraphQLSchema. Also ensure that there are ' + - 'not multiple versions of GraphQL installed in your node_modules directory.' - ); - - // Variables, if provided, must be an object. - invariant( - !variableValues || typeof variableValues === 'object', - 'Variables must be provided as an Object where each property is a ' + - 'variable value. Perhaps look to see if an unparsed JSON string ' + - 'was provided.' - ); - // If a valid context cannot be created due to incorrect arguments, // this will throw an error. const context = buildExecutionContext( @@ -205,6 +189,22 @@ export function buildExecutionContext( rawVariableValues: ?{[key: string]: mixed}, operationName: ?string ): ExecutionContext { + invariant(schema, 'Must provide schema'); + invariant(document, 'Must provide document'); + invariant( + schema instanceof GraphQLSchema, + 'Schema must be an instance of GraphQLSchema. Also ensure that there are ' + + 'not multiple versions of GraphQL installed in your node_modules directory.' + ); + + // Variables, if provided, must be an object. + invariant( + !rawVariableValues || typeof rawVariableValues === 'object', + 'Variables must be provided as an Object where each property is a ' + + 'variable value. Perhaps look to see if an unparsed JSON string ' + + 'was provided.' + ); + const errors: Array = []; let operation: ?OperationDefinitionNode; const fragments: {[name: string]: FragmentDefinitionNode} = @@ -580,44 +580,30 @@ function resolveField( return; } - const returnType = fieldDef.type; const resolveFn = fieldDef.resolve || defaultFieldResolver; - // The resolve function's optional third argument is a context value that - // is provided to every resolve function within an execution. It is commonly - // used to represent an authenticated user, or request-specific caches. - const context = exeContext.contextValue; - - // The resolve function's optional fourth argument is a collection of - // information about the current execution state. - const info: GraphQLResolveInfo = { - fieldName, + const info = buildResolveInfo( + exeContext, + fieldDef, fieldNodes, - returnType, parentType, - path, - schema: exeContext.schema, - fragments: exeContext.fragments, - rootValue: exeContext.rootValue, - operation: exeContext.operation, - variableValues: exeContext.variableValues, - }; + path + ); // Get the resolve function, regardless of if its result is normal // or abrupt (error). - const result = resolveOrError( + const result = resolveFieldValueOrError( exeContext, fieldDef, - fieldNode, + fieldNodes, resolveFn, source, - context, info ); return completeValueCatchingError( exeContext, - returnType, + fieldDef.type, fieldNodes, info, path, @@ -625,15 +611,37 @@ function resolveField( ); } +export function buildResolveInfo( + exeContext: ExecutionContext, + fieldDef: GraphQLField<*, *>, + fieldNodes: Array, + parentType: GraphQLObjectType, + path: ResponsePath +): GraphQLResolveInfo { + // The resolve function's optional fourth argument is a collection of + // information about the current execution state. + return { + fieldName: fieldNodes[0].name.value, + fieldNodes, + returnType: fieldDef.type, + parentType, + path, + schema: exeContext.schema, + fragments: exeContext.fragments, + rootValue: exeContext.rootValue, + operation: exeContext.operation, + variableValues: exeContext.variableValues, + }; +} + // Isolates the "ReturnOrAbrupt" behavior to not de-opt the `resolveField` // function. Returns the result of resolveFn or the abrupt-return Error object. -function resolveOrError( +export function resolveFieldValueOrError( exeContext: ExecutionContext, - fieldDef: GraphQLField, - fieldNode: FieldNode, - resolveFn: GraphQLFieldResolver, + fieldDef: GraphQLField, + fieldNodes: Array, + resolveFn: GraphQLFieldResolver, source: TSource, - context: TContext, info: GraphQLResolveInfo ): Error | mixed { try { @@ -642,10 +650,15 @@ function resolveOrError( // TODO: find a way to memoize, in case this field is within a List type. const args = getArgumentValues( fieldDef, - fieldNode, + fieldNodes[0], exeContext.variableValues ); + // The resolve function's optional third argument is a context value that + // is provided to every resolve function within an execution. It is commonly + // used to represent an authenticated user, or request-specific caches. + const context = exeContext.contextValue; + return resolveFn(source, args, context, info); } catch (error) { // Sometimes a non-error is thrown, wrap it as an Error for a diff --git a/src/execution/values.js b/src/execution/values.js index 8ce519c0f4..35f794bc19 100644 --- a/src/execution/values.js +++ b/src/execution/values.js @@ -191,7 +191,7 @@ function coerceValue(type: GraphQLInputType, value: mixed): mixed { const itemType = type.ofType; if (isCollection(_value)) { const coercedValues = []; - const valueIter = createIterator(_value); + const valueIter = createIterator((_value: any)); if (!valueIter) { return; // Intentionally return no value. } diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator-test.js b/src/subscription/__tests__/eventEmitterAsyncIterator-test.js index 9c3b6f1ed4..683e96142d 100644 --- a/src/subscription/__tests__/eventEmitterAsyncIterator-test.js +++ b/src/subscription/__tests__/eventEmitterAsyncIterator-test.js @@ -57,6 +57,8 @@ describe('eventEmitterAsyncIterator', () => { expect(await i5).to.deep.equal({ done: true, value: undefined }); // And next returns empty completion value - expect(await iterator.next()).to.deep.equal({ done: true, value: undefined }); + expect(await iterator.next()).to.deep.equal( + { done: true, value: undefined } + ); }); }); diff --git a/src/subscription/__tests__/eventEmitterAsyncIterator.js b/src/subscription/__tests__/eventEmitterAsyncIterator.js index 4662aa1a04..8fe45a4ec3 100644 --- a/src/subscription/__tests__/eventEmitterAsyncIterator.js +++ b/src/subscription/__tests__/eventEmitterAsyncIterator.js @@ -10,9 +10,7 @@ */ import type EventEmitter from 'events'; - -const ASYNC_ITERATOR_SYMBOL = - typeof Symbol === 'function' && Symbol.asyncIterator || '@@asyncIterator'; +import { $$asyncIterator } from 'iterall'; /** * Create an AsyncIterator from an EventEmitter. Useful for mocking a @@ -22,8 +20,8 @@ export default function eventEmitterAsyncIterator( eventEmitter: EventEmitter, eventName: string ): AsyncIterator { - let pullQueue = []; - let pushQueue = []; + const pullQueue = []; + const pushQueue = []; let listening = true; eventEmitter.addListener(eventName, pushValue); @@ -67,7 +65,7 @@ export default function eventEmitterAsyncIterator( emptyQueue(); return Promise.reject(error); }, - [ASYNC_ITERATOR_SYMBOL]() { + [$$asyncIterator]() { return this; }, }; diff --git a/src/subscription/__tests__/mapAsyncIterator-test.js b/src/subscription/__tests__/mapAsyncIterator-test.js index 1498e33924..5f5946a358 100644 --- a/src/subscription/__tests__/mapAsyncIterator-test.js +++ b/src/subscription/__tests__/mapAsyncIterator-test.js @@ -43,7 +43,7 @@ describe('mapAsyncIterator', () => { yield 3; } - const doubles = mapAsyncIterator(source(), async (x) => await x + x); + const doubles = mapAsyncIterator(source(), async x => await x + x); expect( await doubles.next() @@ -143,7 +143,7 @@ describe('mapAsyncIterator', () => { // Throw error let caughtError; try { - await doubles.throw('ouch') + await doubles.throw('ouch'); } catch (e) { caughtError = e; } diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index f950a0de57..a20d8987a0 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -20,7 +20,6 @@ import { GraphQLBoolean, GraphQLInt, GraphQLString, - GraphQLNonNull, } from '../../type'; diff --git a/src/subscription/mapAsyncIterator.js b/src/subscription/mapAsyncIterator.js index ec7c0a1a96..5a35dd196a 100644 --- a/src/subscription/mapAsyncIterator.js +++ b/src/subscription/mapAsyncIterator.js @@ -9,17 +9,20 @@ * @flow */ -const ASYNC_ITERATOR_SYMBOL = - typeof Symbol === 'function' && Symbol.asyncIterator || '@@asyncIterator'; +import { $$asyncIterator, getAsyncIterator } from 'iterall'; /** - * Given an AsyncIterator and a callback function, return a new AsyncIterator + * Given an AsyncIterable and a callback function, return an AsyncIterator * which produces values mapped via calling the callback function. */ export default function mapAsyncIterator( - iterator: AsyncIterator, + iterable: AsyncIterable, callback: (value: T) => U ): AsyncIterator { + // Fixes a temporary issue with Regenerator/Babel + // https://github.com/facebook/regenerator/pull/290 + const iterator = iterable.next ? (iterable: any) : getAsyncIterator(iterable); + function mapResult(result) { return result.done ? result : @@ -44,7 +47,7 @@ export default function mapAsyncIterator( } return Promise.reject(error); }, - [ASYNC_ITERATOR_SYMBOL]() { + [$$asyncIterator]() { return this; }, }; diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index e4c2bb77fe..96d16f79c9 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -9,6 +9,7 @@ * @flow */ +import { isAsyncIterable } from 'iterall'; import { addPath, buildExecutionContext, @@ -17,8 +18,9 @@ import { execute, getFieldDef, getOperationRootType, + buildResolveInfo, + resolveFieldValueOrError, } from '../execution/execute'; -import { getArgumentValues } from '../execution/values'; import { GraphQLSchema } from '../type/schema'; import invariant from '../jsutils/invariant'; import mapAsyncIterator from './mapAsyncIterator'; @@ -31,7 +33,6 @@ import type { DocumentNode, OperationDefinitionNode, } from '../language/ast'; -import type { GraphQLResolveInfo } from '../type/definition'; /** @@ -50,23 +51,6 @@ export function subscribe( variableValues?: ?{[key: string]: mixed}, operationName?: ?string, ): AsyncIterator { - // Note: these invariants are identical to execute.js - invariant(schema, 'Must provide schema'); - invariant(document, 'Must provide document'); - invariant( - schema instanceof GraphQLSchema, - 'Schema must be an instance of GraphQLSchema. Also ensure that there are ' + - 'not multiple versions of GraphQL installed in your node_modules directory.' - ); - - // Variables, if provided, must be an object. - invariant( - !variableValues || typeof variableValues === 'object', - 'Variables must be provided as an Object where each property is a ' + - 'variable value. Perhaps look to see if an unparsed JSON string ' + - 'was provided.' - ); - // If a valid context cannot be created due to incorrect arguments, // this will throw an error. const exeContext = buildExecutionContext( @@ -105,10 +89,7 @@ function resolveSubscription( exeContext: ExecutionContext, operation: OperationDefinitionNode, rootValue: mixed -): AsyncIterator { - // Note: this function is almost the same as executeOperation() and - // resolveField() with only a few minor differences. - +): AsyncIterable { const type = getOperationRootType(exeContext.schema, exeContext.operation); const fields = collectFields( exeContext, @@ -117,7 +98,6 @@ function resolveSubscription( Object.create(null), Object.create(null) ); - const responseNames = Object.keys(fields); invariant( responseNames.length === 1, @@ -125,11 +105,8 @@ function resolveSubscription( ); const responseName = responseNames[0]; const fieldNodes = fields[responseName]; - const fieldPath = addPath(undefined, responseName); - const fieldNode = fieldNodes[0]; - const fieldName = fieldNode.name.value; - const fieldDef = getFieldDef(exeContext.schema, type, fieldName); + const fieldDef = getFieldDef(exeContext.schema, type, fieldNode.name.value); invariant( fieldDef, 'This subscription is not defined by the schema.' @@ -138,47 +115,28 @@ function resolveSubscription( // TODO: make GraphQLSubscription flow type special to support defining these? const resolveFn = (fieldDef: any).subscribe || defaultFieldResolver; - // The resolve function's optional third argument is a context value that - // is provided to every resolve function within an execution. It is commonly - // used to represent an authenticated user, or request-specific caches. - const context = exeContext.contextValue; - - // The resolve function's optional fourth argument is a collection of - // information about the current execution state. - const info: GraphQLResolveInfo = { - fieldName, + const info = buildResolveInfo( + exeContext, + fieldDef, fieldNodes, - returnType: fieldDef.type, - parentType: type, - path: fieldPath, - schema: exeContext.schema, - fragments: exeContext.fragments, - rootValue: exeContext.rootValue, - operation: exeContext.operation, - variableValues: exeContext.variableValues, - }; + type, + addPath(undefined, responseName) + ); - // Build a JS object of arguments from the field.arguments AST, using the - // variables scope to fulfill any variable references. - const args = getArgumentValues( + // TODO: handle the error + const subscription = resolveFieldValueOrError( + exeContext, fieldDef, - fieldNode, - exeContext.variableValues + fieldNodes, + resolveFn, + rootValue, + info ); - // TODO: resolveFn could throw! - const subscription = resolveFn(rootValue, args, context, info); - invariant( - isIterable(subscription), - 'Subscription must return async-iterator.' + isAsyncIterable(subscription), + 'Subscription must return Async Iterable.' ); - return subscription; -} - -function isIterable(value) { - return typeof value === 'object' && - value !== null && - typeof value.next === 'function'; + return (subscription: any); } From 74a7dc6f20b832575c15a7f188a4d2e5333a2a92 Mon Sep 17 00:00:00 2001 From: Urigo Date: Wed, 3 May 2017 22:03:35 +0300 Subject: [PATCH 03/11] feat(subscriptions): expose `subscribe` in the bundle --- src/index.js | 1 + src/subscription/index.js | 1 + 2 files changed, 2 insertions(+) create mode 100644 src/subscription/index.js diff --git a/src/index.js b/src/index.js index 64952bbb0d..5e1c7b76d9 100644 --- a/src/index.js +++ b/src/index.js @@ -243,6 +243,7 @@ export type { ExecutionResult, } from './execution'; +export { subscribe } from './subscription'; // Validate GraphQL queries. export { diff --git a/src/subscription/index.js b/src/subscription/index.js new file mode 100644 index 0000000000..e9527da080 --- /dev/null +++ b/src/subscription/index.js @@ -0,0 +1 @@ +export { subscribe } from './subscribe'; From 1242665edab298d79c32ee9f5a94a39240370339 Mon Sep 17 00:00:00 2001 From: Urigo Date: Thu, 4 May 2017 19:22:14 +0300 Subject: [PATCH 04/11] feat(subscriptions): added unit tests --- src/subscription/__tests__/subscribe-test.js | 304 ++++++++++++++++++- 1 file changed, 292 insertions(+), 12 deletions(-) diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index a20d8987a0..972d1c2464 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -11,7 +11,7 @@ import { expect } from 'chai'; import { describe, it } from 'mocha'; import EventEmitter from 'events'; import eventEmitterAsyncIterator from './eventEmitterAsyncIterator'; -import {subscribe} from '../subscribe'; +import { subscribe } from '../subscribe'; import { parse } from '../../language'; import { GraphQLSchema, @@ -77,10 +77,7 @@ describe('Subscribe', () => { subscription: SubscriptionType }); - it('produces a payload per subscription event', async () => { - - const pubsub = new EventEmitter(); - + const createSubscription = pubsub => { const data = { inbox: { emails: [ @@ -125,13 +122,21 @@ describe('Subscribe', () => { // GraphQL `subscribe` has the same call signature as `execute`, but returns // AsyncIterator instead of Promise. - const subscription = subscribe( - emailSchema, - ast, - data, - null, // context - { priority: 1 } - ); + return { + subscription: subscribe( + emailSchema, + ast, + data, + null, // context + { priority: 1 } + ), + sendImportantEmail, + }; + }; + + it('produces a payload per subscription event', async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); // Wait for the next subscription payload. const payload = subscription.next(); @@ -211,4 +216,279 @@ describe('Subscribe', () => { }); }); + it('produces a payload when there are multiple events', async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + let payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright 2', + message: 'Tests are good 2', + unread: true, + })).to.equal(true); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright 2', + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + }); + + it('should not trigger when subscription is already done', async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + let payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + payload = subscription.next(); + subscription.return(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright 2', + message: 'Tests are good 2', + unread: true, + })).to.equal(false); + + expect(await payload).to.deep.equal({ + done: true, + value: undefined, + }); + }); + + it('events order is correct when multiple triggered together', async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + let payload = subscription.next(); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Message', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + // A new email arrives! + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Message 2', + message: 'Tests are good 2', + unread: true, + })).to.equal(true); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Message', + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + + payload = subscription.next(); + + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Message 2', + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + }); + + it('invalid query should result in error', async () => { + const invalidAST = parse(` + subscription { + invalidField + } + `); + + expect(() => { + subscribe( + emailSchema, + invalidAST, + null, + null, // context + { priority: 1 }); + }).to.throw('This subscription is not defined by the schema.'); + }); + + it('throws when subscription definition doesnt return iterator', () => { + const invalidEmailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + resolve: () => 'test', + }, + } + }) + }); + + const ast = parse(` + subscription { + importantEmail + } + `); + + expect(() => { + subscribe( + invalidEmailSchema, + ast, + null, + null, // context + { priority: 1 }); + }).to.throw('Subscription must return Async Iterable.'); + }); + + it('expects to have subscribe on type definition with iterator', () => { + const pubsub = new EventEmitter(); + const invalidEmailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => eventEmitterAsyncIterator(pubsub, 'importantEmail') + }, + } + }) + }); + + const ast = parse(` + subscription { + importantEmail + } + `); + + expect(() => { + subscribe( + invalidEmailSchema, + ast, + null, + null, // context + { priority: 1 }); + }).not.to.throw(); + }); + + it('throws when subscribe does not return a valid iterator', () => { + const invalidEmailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => 'test' + }, + } + }) + }); + + const ast = parse(` + subscription { + importantEmail + } + `); + + expect(() => { + subscribe( + invalidEmailSchema, + ast, + null, + null, // context + { priority: 1 }); + }).to.throw('Subscription must return Async Iterable.'); + }); }); From 0dd3ce3e656b76dfc3886a12c459aaa396ed4a27 Mon Sep 17 00:00:00 2001 From: Urigo Date: Sun, 14 May 2017 16:18:45 +0300 Subject: [PATCH 05/11] feat(subscriptions): added error handling and fixed tests --- src/subscription/__tests__/subscribe-test.js | 33 +++++++++----------- src/subscription/subscribe.js | 5 ++- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index 972d1c2464..a33b344cc4 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -77,7 +77,7 @@ describe('Subscribe', () => { subscription: SubscriptionType }); - const createSubscription = pubsub => { + function createSubscription(pubsub) { const data = { inbox: { emails: [ @@ -123,16 +123,15 @@ describe('Subscribe', () => { // GraphQL `subscribe` has the same call signature as `execute`, but returns // AsyncIterator instead of Promise. return { + sendImportantEmail, subscription: subscribe( emailSchema, ast, data, - null, // context - { priority: 1 } + null ), - sendImportantEmail, }; - }; + } it('produces a payload per subscription event', async () => { const pubsub = new EventEmitter(); @@ -396,8 +395,7 @@ describe('Subscribe', () => { emailSchema, invalidAST, null, - null, // context - { priority: 1 }); + null); }).to.throw('This subscription is not defined by the schema.'); }); @@ -409,7 +407,7 @@ describe('Subscribe', () => { fields: { importantEmail: { type: GraphQLString, - resolve: () => 'test', + subscribe: () => 'test', }, } }) @@ -426,8 +424,7 @@ describe('Subscribe', () => { invalidEmailSchema, ast, null, - null, // context - { priority: 1 }); + null); }).to.throw('Subscription must return Async Iterable.'); }); @@ -457,12 +454,11 @@ describe('Subscribe', () => { invalidEmailSchema, ast, null, - null, // context - { priority: 1 }); + null); }).not.to.throw(); }); - it('throws when subscribe does not return a valid iterator', () => { + it('should handle error thrown by subscribe method', () => { const invalidEmailSchema = new GraphQLSchema({ query: QueryType, subscription: new GraphQLObjectType({ @@ -470,9 +466,11 @@ describe('Subscribe', () => { fields: { importantEmail: { type: GraphQLString, - subscribe: () => 'test' + subscribe: () => { + throw new Error('test error'); + }, }, - } + }, }) }); @@ -487,8 +485,7 @@ describe('Subscribe', () => { invalidEmailSchema, ast, null, - null, // context - { priority: 1 }); - }).to.throw('Subscription must return Async Iterable.'); + null); + }).to.throw('test error'); }); }); diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index 96d16f79c9..067572fc98 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -123,7 +123,6 @@ function resolveSubscription( addPath(undefined, responseName) ); - // TODO: handle the error const subscription = resolveFieldValueOrError( exeContext, fieldDef, @@ -133,6 +132,10 @@ function resolveSubscription( info ); + if (subscription instanceof Error) { + throw subscription; + } + invariant( isAsyncIterable(subscription), 'Subscription must return Async Iterable.' From ba0f60e52610fbc1e781c22eda41a7022af9dcc6 Mon Sep 17 00:00:00 2001 From: Urigo Date: Sun, 14 May 2017 16:52:43 +0300 Subject: [PATCH 06/11] feat(subscriptions): added test cases for querying multiple subscription fields and multiple schema subscriptions --- src/subscription/__tests__/subscribe-test.js | 62 +++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index a33b344cc4..3635d0cf6b 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -77,7 +77,7 @@ describe('Subscribe', () => { subscription: SubscriptionType }); - function createSubscription(pubsub) { + function createSubscription(pubsub, schema = emailSchema) { const data = { inbox: { emails: [ @@ -125,7 +125,7 @@ describe('Subscribe', () => { return { sendImportantEmail, subscription: subscribe( - emailSchema, + schema, ast, data, null @@ -133,6 +133,64 @@ describe('Subscribe', () => { }; } + it('multiple subscription fields defined in schema', async () => { + const pubsub = new EventEmitter(); + const SubscriptionTypeMultiple = new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { type: EmailEventType }, + nonImportantEmail: { type: EmailEventType }, + } + }); + + const testSchema = new GraphQLSchema({ + query: QueryType, + subscription: SubscriptionTypeMultiple + }); + + expect(() => { + const { sendImportantEmail } = + createSubscription(pubsub, testSchema); + + sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }); + }).not.to.throw(); + }); + + it('should throw when querying for multiple fields', async () => { + const SubscriptionTypeMultiple = new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { type: EmailEventType }, + nonImportantEmail: { type: EmailEventType }, + } + }); + + const testSchema = new GraphQLSchema({ + query: QueryType, + subscription: SubscriptionTypeMultiple + }); + + const ast = parse(` + subscription { + importantEmail + nonImportantEmail + } + `); + + expect(() => { + subscribe( + testSchema, + ast, + null, + null); + }).to.throw('A subscription must contain exactly one field.'); + }); + it('produces a payload per subscription event', async () => { const pubsub = new EventEmitter(); const { sendImportantEmail, subscription } = createSubscription(pubsub); From e989ae5a32aae6e7ef0f6ab0eb2caa20d5314aa1 Mon Sep 17 00:00:00 2001 From: Urigo Date: Sun, 14 May 2017 17:14:04 +0300 Subject: [PATCH 07/11] feat(subscriptions): split subscribe function --- src/subscription/subscribe.js | 40 ++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index 067572fc98..07563c894b 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -34,23 +34,14 @@ import type { OperationDefinitionNode, } from '../language/ast'; - -/** - * Implements the "Subscribing to request" section of the GraphQL specification. - * - * Returns an AsyncIterator - * - * If the arguments to this function do not result in a legal execution context, - * a GraphQLError will be thrown immediately explaining the invalid input. - */ -export function subscribe( +export function getSubscriptionEventSource( schema: GraphQLSchema, document: DocumentNode, rootValue?: mixed, contextValue?: mixed, variableValues?: ?{[key: string]: mixed}, operationName?: ?string, -): AsyncIterator { +): AsyncIterable { // If a valid context cannot be created due to incorrect arguments, // this will throw an error. const exeContext = buildExecutionContext( @@ -64,11 +55,36 @@ export function subscribe( // Call the `subscribe()` resolver or the default resolver to produce an // AsyncIterable yielding raw payloads. - const subscription = resolveSubscription( + return resolveSubscription( exeContext, exeContext.operation, rootValue ); +} + +/** + * Implements the "Subscribing to request" section of the GraphQL specification. + * + * Returns an AsyncIterator + * + * If the arguments to this function do not result in a legal execution context, + * a GraphQLError will be thrown immediately explaining the invalid input. + */ +export function subscribe( + schema: GraphQLSchema, + document: DocumentNode, + rootValue?: mixed, + contextValue?: mixed, + variableValues?: ?{[key: string]: mixed}, + operationName?: ?string, +): AsyncIterator { + const subscription = getSubscriptionEventSource( + schema, + document, + rootValue, + contextValue, + variableValues, + operationName); // For each payload yielded from a subscription, map it over the normal // GraphQL `execute` function, with `payload` as the rootValue. From 0d8380fe5e2cad2240029fc12ffd4344894549ea Mon Sep 17 00:00:00 2001 From: Urigo Date: Sun, 14 May 2017 17:16:55 +0300 Subject: [PATCH 08/11] feat(subscriptions): export getSubscriptionEventSource --- src/index.js | 2 +- src/subscription/index.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/index.js b/src/index.js index 5e1c7b76d9..1b6fbfac65 100644 --- a/src/index.js +++ b/src/index.js @@ -243,7 +243,7 @@ export type { ExecutionResult, } from './execution'; -export { subscribe } from './subscription'; +export { subscribe, getSubscriptionEventSource } from './subscription'; // Validate GraphQL queries. export { diff --git a/src/subscription/index.js b/src/subscription/index.js index e9527da080..3a6686f49a 100644 --- a/src/subscription/index.js +++ b/src/subscription/index.js @@ -1 +1 @@ -export { subscribe } from './subscribe'; +export { subscribe, getSubscriptionEventSource } from './subscribe'; From e3f695c87048543c564975a41dd605ec0c0fc659 Mon Sep 17 00:00:00 2001 From: Urigo Date: Tue, 16 May 2017 15:34:38 +0300 Subject: [PATCH 09/11] feat(subscriptions): added `subscribe` to field flow type --- src/subscription/subscribe.js | 3 +-- src/type/definition.js | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index 07563c894b..3cd0729a01 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -128,8 +128,7 @@ function resolveSubscription( 'This subscription is not defined by the schema.' ); - // TODO: make GraphQLSubscription flow type special to support defining these? - const resolveFn = (fieldDef: any).subscribe || defaultFieldResolver; + const resolveFn = fieldDef.subscribe || defaultFieldResolver; const info = buildResolveInfo( exeContext, diff --git a/src/type/definition.js b/src/type/definition.js index 1e79aeeced..4311b08ff9 100644 --- a/src/type/definition.js +++ b/src/type/definition.js @@ -614,6 +614,7 @@ export type GraphQLFieldConfig = { type: GraphQLOutputType; args?: GraphQLFieldConfigArgumentMap; resolve?: GraphQLFieldResolver; + subscribe?: GraphQLFieldResolver; deprecationReason?: ?string; description?: ?string; }; @@ -638,6 +639,7 @@ export type GraphQLField = { type: GraphQLOutputType; args: Array; resolve?: GraphQLFieldResolver; + subscribe?: GraphQLFieldResolver; isDeprecated?: boolean; deprecationReason?: ?string; }; From 54ca9889e1d7095c90593d9a76048791384c8a05 Mon Sep 17 00:00:00 2001 From: Urigo Date: Wed, 17 May 2017 22:36:17 +0300 Subject: [PATCH 10/11] feat(subscriptions): minor fixes --- src/execution/values.js | 2 +- src/index.js | 2 +- src/subscription/__tests__/subscribe-test.js | 3 ++- src/subscription/index.js | 2 +- src/subscription/subscribe.js | 6 +++--- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/execution/values.js b/src/execution/values.js index 35f794bc19..8ce519c0f4 100644 --- a/src/execution/values.js +++ b/src/execution/values.js @@ -191,7 +191,7 @@ function coerceValue(type: GraphQLInputType, value: mixed): mixed { const itemType = type.ofType; if (isCollection(_value)) { const coercedValues = []; - const valueIter = createIterator((_value: any)); + const valueIter = createIterator(_value); if (!valueIter) { return; // Intentionally return no value. } diff --git a/src/index.js b/src/index.js index 1b6fbfac65..a031932503 100644 --- a/src/index.js +++ b/src/index.js @@ -243,7 +243,7 @@ export type { ExecutionResult, } from './execution'; -export { subscribe, getSubscriptionEventSource } from './subscription'; +export { subscribe, createSubscriptionSourceEventStream } from './subscription'; // Validate GraphQL queries. export { diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index 3635d0cf6b..45a096036c 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -188,7 +188,8 @@ describe('Subscribe', () => { ast, null, null); - }).to.throw('A subscription must contain exactly one field.'); + }).to.throw( + 'A subscription operation must contain exactly one root field.'); }); it('produces a payload per subscription event', async () => { diff --git a/src/subscription/index.js b/src/subscription/index.js index 3a6686f49a..a5c82511f3 100644 --- a/src/subscription/index.js +++ b/src/subscription/index.js @@ -1 +1 @@ -export { subscribe, getSubscriptionEventSource } from './subscribe'; +export { subscribe, createSubscriptionSourceEventStream } from './subscribe'; diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index 3cd0729a01..d44264bd21 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -34,7 +34,7 @@ import type { OperationDefinitionNode, } from '../language/ast'; -export function getSubscriptionEventSource( +export function createSubscriptionSourceEventStream( schema: GraphQLSchema, document: DocumentNode, rootValue?: mixed, @@ -78,7 +78,7 @@ export function subscribe( variableValues?: ?{[key: string]: mixed}, operationName?: ?string, ): AsyncIterator { - const subscription = getSubscriptionEventSource( + const subscription = createSubscriptionSourceEventStream( schema, document, rootValue, @@ -117,7 +117,7 @@ function resolveSubscription( const responseNames = Object.keys(fields); invariant( responseNames.length === 1, - 'A subscription must contain exactly one field.' + 'A subscription operation must contain exactly one root field.' ); const responseName = responseNames[0]; const fieldNodes = fields[responseName]; From 5f25f53bfedee361e66ee2af1f789ba6c0f6f776 Mon Sep 17 00:00:00 2001 From: Urigo Date: Wed, 17 May 2017 22:47:24 +0300 Subject: [PATCH 11/11] feat(subscriptions): added test for multiple subscribers for the same subscription --- src/subscription/__tests__/subscribe-test.js | 38 ++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index 45a096036c..a17c6fc530 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -192,6 +192,44 @@ describe('Subscribe', () => { 'A subscription operation must contain exactly one root field.'); }); + it('produces payload for multiple subscribe in same subscription', + async () => { + const pubsub = new EventEmitter(); + const { sendImportantEmail, subscription } = createSubscription(pubsub); + const second = createSubscription(pubsub); + + const payload1 = subscription.next(); + const payload2 = second.subscription.next(); + + expect(sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + })).to.equal(true); + + const expectedPayload = { + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }; + + expect(await payload1).to.deep.equal(expectedPayload); + expect(await payload2).to.deep.equal(expectedPayload); + }); + it('produces a payload per subscription event', async () => { const pubsub = new EventEmitter(); const { sendImportantEmail, subscription } = createSubscription(pubsub);