Skip to content

Commit 9557f16

Browse files
committed
websockets support
1 parent d29e631 commit 9557f16

File tree

6 files changed

+178
-22
lines changed

6 files changed

+178
-22
lines changed

README.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,32 @@ client.on('log', (level, loggerName, message, furtherInfo) => {
242242
The `level` being passed to the listener can be `verbose`, `info`, `warning` or `error`. Visit the [logging
243243
documentation][doc-logging] for more information.
244244

245+
## WebSockets
246+
247+
You can use websocket as transport. But Cassandra doesn't support this protocol
248+
so some proxy should be deployed in front of Cassandra, which can handle this transport protocol.
249+
250+
```javascript
251+
const client = new cassandra.Client({
252+
transport: 'WebSocket',
253+
contactPoints: [
254+
// some proxies that support websocket transport
255+
'127.0.0.1:9043',
256+
'localhost:9044'
257+
],
258+
webSocketOptions: {
259+
// some client websocket options
260+
protocolVersion: 13,
261+
...
262+
}
263+
});
264+
```
265+
266+
You can configure your websocket client with `webSocketOptions`.
267+
To properly configure it follow [websocket/ws doc][ws-doc].
268+
269+
You also can use websockets over SSL by passing `transport: 'SecureWebSocket'`.
270+
245271
## Compatibility
246272

247273
- Apache Cassandra versions 2.1 and above.
@@ -291,4 +317,5 @@ Unless required by applicable law or agreed to in writing, software distributed
291317
[streams2]: https://nodejs.org/api/stream.html#stream_class_stream_readable
292318
[cql-udt]: https://cassandra.apache.org/doc/latest/cql/types.html#udts
293319
[dse]: https://www.datastax.com/products/datastax-enterprise
294-
[astra]: https://www.datastax.com/products/datastax-astra
320+
[astra]: https://www.datastax.com/products/datastax-astra
321+
[ws-doc]: https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketaddress-protocols-options

index.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { metrics } from './lib/metrics';
2424
import { tracker } from './lib/tracker';
2525
import { metadata } from './lib/metadata';
2626
import { datastax } from './lib/datastax/';
27+
import { ClientRequestArgs } from 'http';
2728
import Long = types.Long;
2829
import Uuid = types.Uuid;
2930
import graph = datastax.graph;
@@ -191,7 +192,11 @@ export interface ExecutionOptions {
191192
setHints(hints: string[]): void;
192193
}
193194

195+
export type WebSocketClientOptions = (ClientOptions | ClientRequestArgs)
196+
& {protocols?: string | string[] | undefined};
197+
194198
export interface ClientOptions {
199+
transport?: 'SecureWebSocket' | 'WebSocket' | undefined
195200
contactPoints?: string[];
196201
localDataCenter?: string;
197202
keyspace?: string;
@@ -253,6 +258,7 @@ export interface ClientOptions {
253258
tcpNoDelay?: boolean;
254259
};
255260
sslOptions?: tls.ConnectionOptions;
261+
webSocketOptions?: WebSocketClientOptions;
256262
}
257263

258264
export interface QueryOptions {

lib/connection.js

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const StreamIdStack = require('./stream-id-stack');
3232
const OperationState = require('./operation-state');
3333
const promiseUtils = require('./promise-utils');
3434
const { ExecutionOptions } = require('./execution-options');
35+
const { WebSocketWrapper } = require('./websocket');
3536

3637
/**
3738
* Represents a connection to a Cassandra node
@@ -171,30 +172,70 @@ class Connection extends events.EventEmitter {
171172
const self = this;
172173
this.log('info', `Connecting to ${this.endpointFriendlyName}`);
173174

174-
if (!this.options.sslOptions) {
175-
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });
176-
this.netClient.connect(this.port, this.address, function connectCallback() {
177-
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
178-
self.bindSocketListeners();
179-
self.startup(callback);
180-
});
181-
}
182-
else {
183-
// Use TLS
184-
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);
175+
if (this.options.transport) {
176+
if (this.options.transport.toLowerCase() === 'securewebsocket') {
177+
// Use secure WebSocket
178+
const options = utils.extend({ rejectUnauthorized: false, transport: this.options.transport },
179+
this.options.webSocketOptions);
180+
181+
if (!options.protocols) {
182+
options.protocols = ['cql'];
183+
}
184+
185+
this.netClient = new WebSocketWrapper(options);
186+
187+
this.netClient.connect(this.port, this.address, function connectCallback() {
188+
self.log('verbose', `Secure WebSocket to ${self.endpointFriendlyName}`);
189+
self.bindSocketListeners();
190+
self.startup(callback);
191+
});
192+
} else {
193+
// Use WebSocket
194+
const options = utils.extend({
195+
transport: this.options.transport,
196+
highWaterMark: this.options.socketOptions.coalescingThreshold,
197+
handshakeTimeout: this.options.socketOptions.connectTimeout,
198+
}, this.options.webSocketOptions);
199+
200+
if (!options.protocols) {
201+
options.protocols = ['cql'];
202+
}
185203

186-
if (this.options.sni) {
187-
sslOptions.servername = this._serverName;
204+
this.netClient = new WebSocketWrapper(options);
205+
206+
this.netClient.connect(this.port, this.address, function connectCallback() {
207+
self.log('verbose', `WebSocket connected to ${self.endpointFriendlyName}`);
208+
self.bindSocketListeners();
209+
self.startup(callback);
210+
});
188211
}
212+
} else {
213+
// Use Socket
214+
if (!this.options.sslOptions) {
215+
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });
216+
217+
this.netClient.connect(this.port, this.address, function connectCallback() {
218+
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
219+
self.bindSocketListeners();
220+
self.startup(callback);
221+
});
222+
} else {
223+
// Use Socket with TLS
224+
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);
189225

190-
this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
191-
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
192-
self.bindSocketListeners();
193-
self.startup(callback);
194-
});
226+
if (this.options.sni) {
227+
sslOptions.servername = this._serverName;
228+
}
195229

196-
// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
197-
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
230+
this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
231+
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
232+
self.bindSocketListeners();
233+
self.startup(callback);
234+
});
235+
236+
// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
237+
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
238+
}
198239
}
199240

200241
this.netClient.once('error', function socketError(err) {

lib/websocket.js

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use strict';
2+
3+
const { EventEmitter } = require('events');
4+
const { WebSocket } = require('ws');
5+
6+
/**
7+
* WebSocketWrapper is a wrapper on the `ws.Websocket` which implements
8+
* `net.Socket` interface to be used by the `cassandra.Connection`
9+
*/
10+
class WebSocketWrapper extends EventEmitter {
11+
/**
12+
* Creates a websocket wrapper instance. To connect use `connect` method
13+
* @param {object} options client options for a websocket
14+
*/
15+
constructor(options) {
16+
super();
17+
this.options = options;
18+
}
19+
20+
/**
21+
* Creates an instance of a websocket and connects
22+
* @param {String} port
23+
* @param {String} address
24+
* @param {() => void} connectionCallback is called when connection is successfully established
25+
* @returns {WebSocketWrapper} wrapper itself
26+
*/
27+
connect(port, address, connectionCallback) {
28+
const schema = this.options.transport.toLowerCase() === 'securewebsocket' ? 'wss' : 'ws';
29+
30+
this.ws = new WebSocket(schema+'://'+address+':'+port, this.options.protocols, this.options);
31+
32+
if (connectionCallback) {
33+
this.ws.on('open', connectionCallback);
34+
}
35+
36+
const stream = WebSocket.createWebSocketStream(this.ws, this.options);
37+
38+
stream.on('error', err => {
39+
this.emit('error', err);
40+
});
41+
stream.on('drain', () => {
42+
this.emit('drain');
43+
});
44+
stream.on('close', () => {
45+
this.emit('close');
46+
});
47+
stream.on('end', () => {
48+
this.emit('end');
49+
});
50+
51+
this.write = stream.write.bind(stream);
52+
this.pipe = stream.pipe.bind(stream);
53+
this.end = stream.end.bind(stream);
54+
this.destroy = stream.destroy.bind(stream);
55+
56+
return this;
57+
}
58+
59+
/**
60+
* It is not implemented because `ws` lib doesn't provide API to work with
61+
*/
62+
setTimeout() {}
63+
64+
/**
65+
* It is not implemented because `ws` lib doesn't provide API to work with
66+
*/
67+
setKeepAlive() {}
68+
69+
/**
70+
* It is not implemented because `ws` lib doesn't provide API to work with
71+
*/
72+
setNoDelay() {}
73+
}
74+
75+
module.exports.WebSocketWrapper = WebSocketWrapper;

package-lock.json

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
"@types/long": "^4.0.0",
2424
"@types/node": ">=8",
2525
"adm-zip": "^0.5.3",
26-
"long": "^2.2.0"
26+
"long": "^2.2.0",
27+
"ws": "^8.13.0"
2728
},
2829
"devDependencies": {
2930
"chai": "4.2.0",

0 commit comments

Comments
 (0)