Skip to content

Commit 4a9af6d

Browse files
authored
Merge pull request #403 from lutovich/1.7-bolt-v3
Bolt V3 support & transaction configuration
2 parents 3c42269 + 2ca7ac6 commit 4a9af6d

29 files changed

+1105
-152
lines changed

src/v1/driver.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class Driver {
192192
session(mode, bookmarkOrBookmarks) {
193193
const sessionMode = Driver._validateSessionMode(mode);
194194
const connectionProvider = this._getOrCreateConnectionProvider();
195-
const bookmark = new Bookmark(bookmarkOrBookmarks);
195+
const bookmark = bookmarkOrBookmarks ? new Bookmark(bookmarkOrBookmarks) : Bookmark.empty();
196196
return new Session(sessionMode, connectionProvider, bookmark, this._config);
197197
}
198198

src/v1/internal/bolt-protocol-v1.js

+41-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
import RequestMessage from './request-message';
2020
import * as v1 from './packstream-v1';
21+
import {newError} from '../error';
22+
import Bookmark from './bookmark';
23+
import TxConfig from './tx-config';
2124

2225
export default class BoltProtocol {
2326

@@ -49,6 +52,15 @@ export default class BoltProtocol {
4952
return this._unpacker;
5053
}
5154

55+
/**
56+
* Transform metadata received in SUCCESS message before it is passed to the handler.
57+
* @param {object} metadata the received metadata.
58+
* @return {object} transformed metadata.
59+
*/
60+
transformMetadata(metadata) {
61+
return metadata;
62+
}
63+
5264
/**
5365
* Perform initialization and authentication of the underlying connection.
5466
* @param {string} clientName the client name.
@@ -63,9 +75,12 @@ export default class BoltProtocol {
6375
/**
6476
* Begin an explicit transaction.
6577
* @param {Bookmark} bookmark the bookmark.
78+
* @param {TxConfig} txConfig the configuration.
6679
* @param {StreamObserver} observer the response observer.
6780
*/
68-
beginTransaction(bookmark, observer) {
81+
beginTransaction(bookmark, txConfig, observer) {
82+
assertTxConfigIsEmpty(txConfig, this._connection, observer);
83+
6984
const runMessage = RequestMessage.run('BEGIN', bookmark.asBeginTransactionParameters());
7085
const pullAllMessage = RequestMessage.pullAll();
7186

@@ -78,24 +93,29 @@ export default class BoltProtocol {
7893
* @param {StreamObserver} observer the response observer.
7994
*/
8095
commitTransaction(observer) {
81-
this.run('COMMIT', {}, observer);
96+
this.run('COMMIT', {}, Bookmark.empty(), TxConfig.empty(), observer);
8297
}
8398

8499
/**
85100
* Rollback the explicit transaction.
86101
* @param {StreamObserver} observer the response observer.
87102
*/
88103
rollbackTransaction(observer) {
89-
this.run('ROLLBACK', {}, observer);
104+
this.run('ROLLBACK', {}, Bookmark.empty(), TxConfig.empty(), observer);
90105
}
91106

92107
/**
93108
* Send a Cypher statement through the underlying connection.
94109
* @param {string} statement the cypher statement.
95110
* @param {object} parameters the statement parameters.
111+
* @param {Bookmark} bookmark the bookmark.
112+
* @param {TxConfig} txConfig the auto-commit transaction configuration.
96113
* @param {StreamObserver} observer the response observer.
97114
*/
98-
run(statement, parameters, observer) {
115+
run(statement, parameters, bookmark, txConfig, observer) {
116+
// bookmark is ignored in this version of the protocol
117+
assertTxConfigIsEmpty(txConfig, this._connection, observer);
118+
99119
const runMessage = RequestMessage.run(statement, parameters);
100120
const pullAllMessage = RequestMessage.pullAll();
101121

@@ -120,3 +140,20 @@ export default class BoltProtocol {
120140
return new v1.Unpacker(disableLosslessIntegers);
121141
}
122142
}
143+
144+
/**
145+
* @param {TxConfig} txConfig the auto-commit transaction configuration.
146+
* @param {Connection} connection the connection.
147+
* @param {StreamObserver} observer the response observer.
148+
*/
149+
function assertTxConfigIsEmpty(txConfig, connection, observer) {
150+
if (!txConfig.isEmpty()) {
151+
const error = newError('Driver is connected to the database that does not support transaction configuration. ' +
152+
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality');
153+
154+
// unsupported API was used, consider this a fatal error for the current connection
155+
connection._handleFatalError(error);
156+
observer.onError(error);
157+
throw error;
158+
}
159+
}

src/v1/internal/bolt-protocol-v3.js

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import BoltProtocolV2 from './bolt-protocol-v2';
20+
import RequestMessage from './request-message';
21+
22+
export default class BoltProtocol extends BoltProtocolV2 {
23+
24+
constructor(connection, chunker, disableLosslessIntegers) {
25+
super(connection, chunker, disableLosslessIntegers);
26+
}
27+
28+
transformMetadata(metadata) {
29+
if (metadata.t_first) {
30+
// Bolt V3 uses shorter key 't_first' to represent 'result_available_after'
31+
// adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value
32+
metadata.result_available_after = metadata.t_first;
33+
delete metadata.t_first;
34+
}
35+
if (metadata.t_last) {
36+
// Bolt V3 uses shorter key 't_last' to represent 'result_consumed_after'
37+
// adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value
38+
metadata.result_consumed_after = metadata.t_last;
39+
delete metadata.t_last;
40+
}
41+
return metadata;
42+
}
43+
44+
initialize(userAgent, authToken, observer) {
45+
prepareToHandleSingleResponse(observer);
46+
const message = RequestMessage.hello(userAgent, authToken);
47+
this._connection.write(message, observer, true);
48+
}
49+
50+
beginTransaction(bookmark, txConfig, observer) {
51+
prepareToHandleSingleResponse(observer);
52+
const message = RequestMessage.begin(bookmark, txConfig);
53+
this._connection.write(message, observer, true);
54+
}
55+
56+
commitTransaction(observer) {
57+
prepareToHandleSingleResponse(observer);
58+
const message = RequestMessage.commit();
59+
this._connection.write(message, observer, true);
60+
}
61+
62+
rollbackTransaction(observer) {
63+
prepareToHandleSingleResponse(observer);
64+
const message = RequestMessage.rollback();
65+
this._connection.write(message, observer, true);
66+
}
67+
68+
run(statement, parameters, bookmark, txConfig, observer) {
69+
const runMessage = RequestMessage.runWithMetadata(statement, parameters, bookmark, txConfig);
70+
const pullAllMessage = RequestMessage.pullAll();
71+
72+
this._connection.write(runMessage, observer, false);
73+
this._connection.write(pullAllMessage, observer, true);
74+
}
75+
}
76+
77+
function prepareToHandleSingleResponse(observer) {
78+
if (observer && typeof observer.prepareToHandleSingleResponse === 'function') {
79+
observer.prepareToHandleSingleResponse();
80+
}
81+
}

src/v1/internal/bookmark.js

+14
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ export default class Bookmark {
3636
this._maxValue = maxBookmark(this._values);
3737
}
3838

39+
static empty() {
40+
return EMPTY_BOOKMARK;
41+
}
42+
3943
/**
4044
* Check if the given bookmark is meaningful and can be send to the database.
4145
* @return {boolean} returns <code>true</code> bookmark has a value, <code>false</code> otherwise.
@@ -52,6 +56,14 @@ export default class Bookmark {
5256
return this._maxValue;
5357
}
5458

59+
/**
60+
* Get all bookmark values as an array.
61+
* @return {string[]} all values.
62+
*/
63+
values() {
64+
return this._values;
65+
}
66+
5567
/**
5668
* Get this bookmark as an object for begin transaction call.
5769
* @return {object} the value of this bookmark as object.
@@ -72,6 +84,8 @@ export default class Bookmark {
7284
}
7385
}
7486

87+
const EMPTY_BOOKMARK = new Bookmark(null);
88+
7589
/**
7690
* Converts given value to an array.
7791
* @param {string|string[]} [value=undefined] argument to convert.

src/v1/internal/connection.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ export default class Connection {
225225
* failing, and the connection getting ejected from the session pool.
226226
*
227227
* @param error an error object, forwarded to all current and future subscribers
228-
* @protected
229228
*/
230229
_handleFatalError(error) {
231230
this._isBroken = true;
@@ -267,7 +266,8 @@ export default class Connection {
267266
this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`);
268267
}
269268
try {
270-
this._currentObserver.onCompleted( payload );
269+
const metadata = this._protocol.transformMetadata(payload);
270+
this._currentObserver.onCompleted(metadata);
271271
} finally {
272272
this._updateCurrentObserver();
273273
}

src/v1/internal/protocol-handshaker.js

+14-10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {alloc} from './buf';
2121
import {newError} from '../error';
2222
import BoltProtocolV1 from './bolt-protocol-v1';
2323
import BoltProtocolV2 from './bolt-protocol-v2';
24+
import BoltProtocolV3 from './bolt-protocol-v3';
2425

2526
const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP"
2627
const BOLT_MAGIC_PREAMBLE = 0x6060B017;
@@ -69,15 +70,18 @@ export default class ProtocolHandshaker {
6970
* @private
7071
*/
7172
_createProtocolWithVersion(version) {
72-
if (version === 1) {
73-
return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers);
74-
} else if (version === 2) {
75-
return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers);
76-
} else if (version === HTTP_MAGIC_PREAMBLE) {
77-
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
78-
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
79-
} else {
80-
throw newError('Unknown Bolt protocol version: ' + version);
73+
switch (version) {
74+
case 1:
75+
return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers);
76+
case 2:
77+
return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers);
78+
case 3:
79+
return new BoltProtocolV3(this._connection, this._chunker, this._disableLosslessIntegers);
80+
case HTTP_MAGIC_PREAMBLE:
81+
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
82+
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
83+
default:
84+
throw newError('Unknown Bolt protocol version: ' + version);
8185
}
8286
}
8387
}
@@ -93,10 +97,10 @@ function newHandshakeBuffer() {
9397
handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE);
9498

9599
//proposed versions
100+
handshakeBuffer.writeInt32(3);
96101
handshakeBuffer.writeInt32(2);
97102
handshakeBuffer.writeInt32(1);
98103
handshakeBuffer.writeInt32(0);
99-
handshakeBuffer.writeInt32(0);
100104

101105
// reset the reader position
102106
handshakeBuffer.reset();

src/v1/internal/request-message.js

+82-3
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
*/
1919

2020
// Signature bytes for each request message type
21-
const INIT = 0x01; // 0000 0001 // INIT <user_agent>
21+
const INIT = 0x01; // 0000 0001 // INIT <user_agent> <authentication_token>
2222
const ACK_FAILURE = 0x0E; // 0000 1110 // ACK_FAILURE - unused
2323
const RESET = 0x0F; // 0000 1111 // RESET
2424
const RUN = 0x10; // 0001 0000 // RUN <statement> <parameters>
25-
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD * - unused
26-
const PULL_ALL = 0x3F; // 0011 1111 // PULL *
25+
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD_ALL - unused
26+
const PULL_ALL = 0x3F; // 0011 1111 // PULL_ALL
27+
28+
const HELLO = 0x01; // 0000 0001 // HELLO <metadata>
29+
const BEGIN = 0x11; // 0001 0001 // BEGIN <metadata>
30+
const COMMIT = 0x12; // 0001 0010 // COMMIT
31+
const ROLLBACK = 0x13; // 0001 0011 // ROLLBACK
2732

2833
export default class RequestMessage {
2934

@@ -68,8 +73,82 @@ export default class RequestMessage {
6873
static reset() {
6974
return RESET_MESSAGE;
7075
}
76+
77+
/**
78+
* Create a new HELLO message.
79+
* @param {string} userAgent the user agent.
80+
* @param {object} authToken the authentication token.
81+
* @return {RequestMessage} new HELLO message.
82+
*/
83+
static hello(userAgent, authToken) {
84+
const metadata = Object.assign({user_agent: userAgent}, authToken);
85+
return new RequestMessage(HELLO, [metadata], () => `HELLO {user_agent: '${userAgent}', ...}`);
86+
}
87+
88+
/**
89+
* Create a new BEGIN message.
90+
* @param {Bookmark} bookmark the bookmark.
91+
* @param {TxConfig} txConfig the configuration.
92+
* @return {RequestMessage} new BEGIN message.
93+
*/
94+
static begin(bookmark, txConfig) {
95+
const metadata = buildTxMetadata(bookmark, txConfig);
96+
return new RequestMessage(BEGIN, [metadata], () => `BEGIN ${JSON.stringify(metadata)}`);
97+
}
98+
99+
/**
100+
* Get a COMMIT message.
101+
* @return {RequestMessage} the COMMIT message.
102+
*/
103+
static commit() {
104+
return COMMIT_MESSAGE;
105+
}
106+
107+
/**
108+
* Get a ROLLBACK message.
109+
* @return {RequestMessage} the ROLLBACK message.
110+
*/
111+
static rollback() {
112+
return ROLLBACK_MESSAGE;
113+
}
114+
115+
/**
116+
* Create a new RUN message with additional metadata.
117+
* @param {string} statement the cypher statement.
118+
* @param {object} parameters the statement parameters.
119+
* @param {Bookmark} bookmark the bookmark.
120+
* @param {TxConfig} txConfig the configuration.
121+
* @return {RequestMessage} new RUN message with additional metadata.
122+
*/
123+
static runWithMetadata(statement, parameters, bookmark, txConfig) {
124+
const metadata = buildTxMetadata(bookmark, txConfig);
125+
return new RequestMessage(RUN, [statement, parameters, metadata],
126+
() => `RUN ${statement} ${JSON.stringify(parameters)} ${JSON.stringify(metadata)}`);
127+
}
128+
}
129+
130+
/**
131+
* Create an object that represent transaction metadata.
132+
* @param {Bookmark} bookmark the bookmark.
133+
* @param {TxConfig} txConfig the configuration.
134+
* @return {object} a metadata object.
135+
*/
136+
function buildTxMetadata(bookmark, txConfig) {
137+
const metadata = {};
138+
if (!bookmark.isEmpty()) {
139+
metadata['bookmarks'] = bookmark.values();
140+
}
141+
if (txConfig.timeout) {
142+
metadata['tx_timeout'] = txConfig.timeout;
143+
}
144+
if (txConfig.metadata) {
145+
metadata['tx_metadata'] = txConfig.metadata;
146+
}
147+
return metadata;
71148
}
72149

73150
// constants for messages that never change
74151
const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL');
75152
const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET');
153+
const COMMIT_MESSAGE = new RequestMessage(COMMIT, [], () => 'COMMIT');
154+
const ROLLBACK_MESSAGE = new RequestMessage(ROLLBACK, [], () => 'ROLLBACK');

0 commit comments

Comments
 (0)