Skip to content

Commit 5fd95bf

Browse files
authored
Merge pull request #60 from apollographql/feature/new-graphql
WIP: Support for new `graphql-js` subscriptions support
2 parents f0e5bcf + 7fec9d4 commit 5fd95bf

File tree

10 files changed

+437
-238
lines changed

10 files changed

+437
-238
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ dist
44
coverage
55
typings
66
npm-debug.log
7+
yarn.lock

package.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
"url": "https://github.com/apollostack/graphql-subscriptions.git"
99
},
1010
"dependencies": {
11-
"es6-promise": "^4.0.5"
11+
"@types/graphql": "^0.9.1",
12+
"es6-promise": "^4.0.5",
13+
"graphql": "^0.9.6",
14+
"iterall": "^1.1.1"
1215
},
1316
"peerDependencies": {
1417
"graphql": "^0.7.0 || ^0.8.0 || ^0.9.0"
@@ -25,19 +28,17 @@
2528
"postcoverage": "remap-istanbul --input coverage/coverage.raw.json --type lcovonly --output coverage/lcov.info"
2629
},
2730
"devDependencies": {
28-
"@types/graphql": "^0.9.0",
2931
"@types/mocha": "^2.2.39",
3032
"@types/node": "^7.0.5",
3133
"chai": "^3.5.0",
3234
"chai-as-promised": "^6.0.0",
33-
"graphql": "^0.9.3",
3435
"istanbul": "^1.0.0-alpha.2",
3536
"mocha": "^3.3.0",
3637
"remap-istanbul": "^0.9.1",
3738
"sinon": "^2.1.0",
3839
"sinon-chai": "^2.9.0",
39-
"tslint": "^5.1.0",
40-
"typescript": "^2.3.0"
40+
"tslint": "^5.2.0",
41+
"typescript": "^2.3.2"
4142
},
4243
"typescript": {
4344
"definition": "dist/index.d.ts"
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { $$asyncIterator } from 'iterall';
2+
import { EventEmitter } from 'events';
3+
4+
export function eventEmitterAsyncIterator<T>(eventEmitter: EventEmitter,
5+
eventsNames: string | string[]): AsyncIterator<T> {
6+
const pullQueue = [];
7+
const pushQueue = [];
8+
const eventsArray = typeof eventsNames === 'string' ? [eventsNames] : eventsNames;
9+
let listening = true;
10+
11+
const pushValue = event => {
12+
if (pullQueue.length !== 0) {
13+
pullQueue.shift()({ value: event, done: false });
14+
} else {
15+
pushQueue.push(event);
16+
}
17+
};
18+
19+
const pullValue = () => {
20+
return new Promise(resolve => {
21+
if (pushQueue.length !== 0) {
22+
resolve({ value: pushQueue.shift(), done: false });
23+
} else {
24+
pullQueue.push(resolve);
25+
}
26+
});
27+
};
28+
29+
const emptyQueue = () => {
30+
if (listening) {
31+
listening = false;
32+
removeEventListeners();
33+
pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
34+
pullQueue.length = 0;
35+
pushQueue.length = 0;
36+
}
37+
};
38+
39+
const addEventListeners = () => {
40+
for (const eventName of eventsArray) {
41+
eventEmitter.addListener(eventName, pushValue);
42+
}
43+
};
44+
45+
const removeEventListeners = () => {
46+
for (const eventName of eventsArray) {
47+
eventEmitter.removeListener(eventName, pushValue);
48+
}
49+
};
50+
51+
addEventListeners();
52+
53+
return {
54+
next() {
55+
return listening ? pullValue() : this.return();
56+
},
57+
return() {
58+
emptyQueue();
59+
60+
return Promise.resolve({ value: undefined, done: true });
61+
},
62+
throw(error) {
63+
emptyQueue();
64+
65+
return Promise.reject(error);
66+
},
67+
[$$asyncIterator]() {
68+
return this;
69+
},
70+
};
71+
}

src/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
1-
export { PubSub, SubscriptionManager } from './pubsub';
1+
export { PubSubEngine } from './pubsub-engine';
2+
export { PubSub } from './pubsub';
3+
export { withFilter, ResolverFn, FilterFn } from './with-filter';
4+
5+
export * from './subscriptions-manager';

src/pubsub-engine.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export interface PubSubEngine {
2+
publish(triggerName: string, payload: any): boolean;
3+
subscribe(triggerName: string, onMessage: Function, options: Object): Promise<number>;
4+
unsubscribe(subId: number);
5+
asyncIterator<T>(triggers: string | string[]): AsyncIterator<T>;
6+
}

src/pubsub.ts

Lines changed: 35 additions & 217 deletions
Original file line numberDiff line numberDiff line change
@@ -1,221 +1,39 @@
1-
//
2-
// This is basically just event emitters wrapped with a function that filters messages.
3-
//
41
import { EventEmitter } from 'events';
5-
import {
6-
GraphQLSchema,
7-
GraphQLError,
8-
validate,
9-
execute,
10-
parse,
11-
specifiedRules,
12-
OperationDefinitionNode,
13-
FieldNode,
14-
} from 'graphql';
15-
import { getArgumentValues } from 'graphql/execution/values';
16-
17-
import {
18-
subscriptionHasSingleRootField,
19-
} from './validation';
20-
21-
export interface PubSubEngine {
22-
publish(triggerName: string, payload: any): boolean;
23-
subscribe(triggerName: string, onMessage: Function, options: Object): Promise<number>;
24-
unsubscribe(subId: number);
25-
}
2+
import { PubSubEngine } from './pubsub-engine';
3+
import { eventEmitterAsyncIterator } from './event-emitter-to-async-iterator';
264

275
export class PubSub implements PubSubEngine {
28-
private ee: EventEmitter;
29-
private subscriptions: {[key: string]: [string, Function]};
30-
private subIdCounter: number;
31-
32-
constructor() {
33-
this.ee = new EventEmitter(); // max listeners = 10.
34-
this.subscriptions = {};
35-
this.subIdCounter = 0;
36-
}
37-
38-
public publish(triggerName: string, payload: any): boolean {
39-
this.ee.emit(triggerName, payload);
40-
// Not using the value returned from emit method because it gives
41-
// irrelevant false when there are no listeners.
42-
return true;
43-
}
44-
45-
public subscribe(triggerName: string, onMessage: Function): Promise<number> {
46-
this.ee.addListener(triggerName, onMessage);
47-
this.subIdCounter = this.subIdCounter + 1;
48-
this.subscriptions[this.subIdCounter] = [triggerName, onMessage];
49-
return Promise.resolve(this.subIdCounter);
50-
}
51-
52-
public unsubscribe(subId: number) {
53-
const [triggerName, onMessage] = this.subscriptions[subId];
54-
delete this.subscriptions[subId];
55-
this.ee.removeListener(triggerName, onMessage);
56-
}
57-
}
58-
59-
export class ValidationError extends Error {
60-
errors: Array<GraphQLError>;
61-
message: string;
62-
63-
constructor(errors){
64-
super();
65-
this.errors = errors;
66-
this.message = 'Subscription query has validation errors';
67-
}
68-
}
69-
70-
export interface SubscriptionOptions {
71-
query: string;
72-
operationName: string;
73-
callback: Function;
74-
variables?: { [key: string]: any };
75-
context?: any;
76-
formatError?: Function;
77-
formatResponse?: Function;
78-
};
79-
80-
export interface TriggerConfig {
81-
channelOptions?: Object;
82-
filter?: Function;
83-
}
84-
85-
export interface TriggerMap {
86-
[triggerName: string]: TriggerConfig;
87-
}
88-
89-
export interface SetupFunction {
90-
(options: SubscriptionOptions, args: {[key: string]: any}, subscriptionName: string): TriggerMap;
91-
}
92-
93-
export interface SetupFunctions {
94-
[subscriptionName: string]: SetupFunction;
95-
}
96-
97-
// This manages actual GraphQL subscriptions.
98-
export class SubscriptionManager {
99-
private pubsub: PubSubEngine;
100-
private schema: GraphQLSchema;
101-
private setupFunctions: SetupFunctions;
102-
private subscriptions: { [externalId: number]: Array<number>};
103-
private maxSubscriptionId: number;
104-
105-
constructor(options: { schema: GraphQLSchema,
106-
setupFunctions: SetupFunctions,
107-
pubsub: PubSubEngine }){
108-
this.pubsub = options.pubsub;
109-
this.schema = options.schema;
110-
this.setupFunctions = options.setupFunctions || {};
111-
this.subscriptions = {};
112-
this.maxSubscriptionId = 0;
113-
}
114-
115-
public publish(triggerName: string, payload: any) {
116-
this.pubsub.publish(triggerName, payload);
117-
}
118-
119-
public subscribe(options: SubscriptionOptions): Promise<number> {
120-
121-
// 1. validate the query, operationName and variables
122-
const parsedQuery = parse(options.query);
123-
const errors = validate(
124-
this.schema,
125-
parsedQuery,
126-
[...specifiedRules, subscriptionHasSingleRootField]
127-
);
128-
129-
// TODO: validate that all variables have been passed (and are of correct type)?
130-
if (errors.length){
131-
// this error kills the subscription, so we throw it.
132-
return Promise.reject<number>(new ValidationError(errors));
133-
}
134-
135-
let args = {};
136-
137-
// operationName is the name of the only root field in the subscription document
138-
let subscriptionName = '';
139-
parsedQuery.definitions.forEach( definition => {
140-
if (definition.kind === 'OperationDefinition'){
141-
// only one root field is allowed on subscription. No fragments for now.
142-
const rootField = (definition as OperationDefinitionNode).selectionSet.selections[0] as FieldNode;
143-
subscriptionName = rootField.name.value;
144-
145-
const fields = this.schema.getSubscriptionType().getFields();
146-
args = getArgumentValues(fields[subscriptionName], rootField, options.variables);
147-
}
148-
});
149-
150-
let triggerMap: TriggerMap;
151-
152-
if (this.setupFunctions[subscriptionName]) {
153-
triggerMap = this.setupFunctions[subscriptionName](options, args, subscriptionName);
154-
} else {
155-
// if not provided, the triggerName will be the subscriptionName, The trigger will not have any
156-
// options and rely on defaults that are set later.
157-
triggerMap = {[subscriptionName]: {}};
158-
}
159-
160-
const externalSubscriptionId = this.maxSubscriptionId++;
161-
this.subscriptions[externalSubscriptionId] = [];
162-
const subscriptionPromises = [];
163-
Object.keys(triggerMap).forEach( triggerName => {
164-
// Deconstruct the trigger options and set any defaults
165-
const {
166-
channelOptions = {},
167-
filter = () => true, // Let all messages through by default.
168-
} = triggerMap[triggerName];
169-
170-
// 2. generate the handler function
171-
//
172-
// rootValue is the payload sent by the event emitter / trigger by
173-
// convention this is the value returned from the mutation
174-
// resolver
175-
const onMessage = (rootValue) => {
176-
return Promise.resolve().then(() => {
177-
if (typeof options.context === 'function') {
178-
return options.context();
179-
}
180-
return options.context;
181-
}).then((context) => {
182-
return Promise.all([
183-
context,
184-
filter(rootValue, context),
185-
]);
186-
}).then(([context, doExecute]) => {
187-
if (!doExecute) {
188-
return;
189-
}
190-
execute(
191-
this.schema,
192-
parsedQuery,
193-
rootValue,
194-
context,
195-
options.variables,
196-
options.operationName
197-
).then( data => options.callback(null, data) );
198-
}).catch((error) => {
199-
options.callback(error);
200-
});
201-
}
202-
203-
// 3. subscribe and keep the subscription id
204-
subscriptionPromises.push(
205-
this.pubsub.subscribe(triggerName, onMessage, channelOptions)
206-
.then(id => this.subscriptions[externalSubscriptionId].push(id))
207-
);
208-
});
209-
210-
// Resolve the promise with external sub id only after all subscriptions completed
211-
return Promise.all(subscriptionPromises).then(() => externalSubscriptionId);
212-
}
213-
214-
public unsubscribe(subId){
215-
// pass the subId right through to pubsub. Do nothing else.
216-
this.subscriptions[subId].forEach( internalId => {
217-
this.pubsub.unsubscribe(internalId);
218-
});
219-
delete this.subscriptions[subId];
220-
}
6+
protected ee: EventEmitter;
7+
private subscriptions: { [key: string]: [string, Function] };
8+
private subIdCounter: number;
9+
10+
constructor() {
11+
this.ee = new EventEmitter();
12+
this.subscriptions = {};
13+
this.subIdCounter = 0;
14+
}
15+
16+
public publish(triggerName: string, payload: any): boolean {
17+
this.ee.emit(triggerName, payload);
18+
19+
return true;
20+
}
21+
22+
public subscribe(triggerName: string, onMessage: Function): Promise<number> {
23+
this.ee.addListener(triggerName, onMessage);
24+
this.subIdCounter = this.subIdCounter + 1;
25+
this.subscriptions[this.subIdCounter] = [triggerName, onMessage];
26+
27+
return Promise.resolve(this.subIdCounter);
28+
}
29+
30+
public unsubscribe(subId: number) {
31+
const [triggerName, onMessage] = this.subscriptions[subId];
32+
delete this.subscriptions[subId];
33+
this.ee.removeListener(triggerName, onMessage);
34+
}
35+
36+
public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
37+
return eventEmitterAsyncIterator<T>(this.ee, triggers);
38+
}
22139
}

0 commit comments

Comments
 (0)