Skip to content

Commit ffe245e

Browse files
committed
Introduce the hability of awaiting for eager begining a tx
1 parent b99d313 commit ffe245e

File tree

9 files changed

+686
-78
lines changed

9 files changed

+686
-78
lines changed

packages/core/src/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import Result, { QueryResult, ResultObserver } from './result'
6969
import ConnectionProvider from './connection-provider'
7070
import Connection from './connection'
7171
import Transaction from './transaction'
72+
import TransactionPromise from './transaction-promise'
7273
import Session, { TransactionConfig } from './session'
7374
import Driver, * as driver from './driver'
7475
import auth from './auth'
@@ -134,6 +135,7 @@ const forExport = {
134135
Stats,
135136
Result,
136137
Transaction,
138+
TransactionPromise,
137139
Session,
138140
Driver,
139141
Connection,
@@ -191,6 +193,7 @@ export {
191193
ConnectionProvider,
192194
Connection,
193195
Transaction,
196+
TransactionPromise,
194197
Session,
195198
Driver,
196199
types,

packages/core/src/session.ts

+5-4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import ConnectionProvider from './connection-provider'
3131
import { Query, SessionMode } from './types'
3232
import Connection from './connection'
3333
import { NumberOrInteger } from './graph-types'
34+
import TransactionPromise from './transaction-promise'
3435

3536
type ConnectionConsumer = (connection: Connection | void) => any | undefined
3637
type TransactionWork<T> = (tx: Transaction) => Promise<T> | T
@@ -239,9 +240,9 @@ class Session {
239240
* While a transaction is open the session cannot be used to run queries outside the transaction.
240241
*
241242
* @param {TransactionConfig} [transactionConfig] - Configuration for the new auto-commit transaction.
242-
* @returns {Transaction} New Transaction.
243+
* @returns {TransactionPromise} New Transaction.
243244
*/
244-
beginTransaction(transactionConfig?: TransactionConfig): Transaction {
245+
beginTransaction(transactionConfig?: TransactionConfig): TransactionPromise {
245246
// this function needs to support bookmarks parameter for backwards compatibility
246247
// parameter was of type {string|string[]} and represented either a single or multiple bookmarks
247248
// that's why we need to check parameter type and decide how to interpret the value
@@ -255,7 +256,7 @@ class Session {
255256
return this._beginTransaction(this._mode, txConfig)
256257
}
257258

258-
_beginTransaction(accessMode: SessionMode, txConfig: TxConfig): Transaction {
259+
_beginTransaction(accessMode: SessionMode, txConfig: TxConfig): TransactionPromise {
259260
if (!this._open) {
260261
throw newError('Cannot begin a transaction on a closed session.')
261262
}
@@ -271,7 +272,7 @@ class Session {
271272
connectionHolder.initializeConnection()
272273
this._hasTx = true
273274

274-
const tx = new Transaction({
275+
const tx = new TransactionPromise({
275276
connectionHolder,
276277
impersonatedUser: this._impersonatedUser,
277278
onClose: this._transactionClosed.bind(this),
+193
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import Transaction from "./transaction"
21+
import {
22+
ConnectionHolder
23+
} from './internal/connection-holder'
24+
25+
import { Bookmarks } from './internal/bookmarks'
26+
import { TxConfig } from "./internal/tx-config";
27+
28+
/**
29+
* Represents a {@link Promise<Transaction>} object and a {@link Transaction} object.
30+
*
31+
* Resolving this object promise verifies the result of the transaction begin and returns the {@link Transaction} object in case of success.
32+
*
33+
* The object can still also used as {@link Transaction} for convenience. The result of begin will be checked
34+
* during the next API calls in the object as it is in the transaction.
35+
*
36+
* @access public
37+
*/
38+
class TransactionPromise extends Transaction implements Promise<Transaction>{
39+
[Symbol.toStringTag]: string = "TransactionPromise"
40+
private _beginError?: Error;
41+
private _beginMetadata?: any;
42+
private _beginPromise?: Promise<Transaction>;
43+
private _reject?: (error: Error) => void;
44+
private _resolve?: (value?: Transaction | PromiseLike<Transaction> | undefined) => void;
45+
46+
/**
47+
* @constructor
48+
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
49+
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
50+
* @param {function(bookmarks: Bookmarks)} onBookmarks callback invoked when new bookmark is produced.
51+
* * @param {function()} onConnection - Function to be called when a connection is obtained to ensure the conneciton
52+
* is not yet released.
53+
* @param {boolean} reactive whether this transaction generates reactive streams
54+
* @param {number} fetchSize - the record fetch size in each pulling batch.
55+
* @param {string} impersonatedUser - The name of the user which should be impersonated for the duration of the session.
56+
*/
57+
constructor({
58+
connectionHolder,
59+
onClose,
60+
onBookmarks,
61+
onConnection,
62+
reactive,
63+
fetchSize,
64+
impersonatedUser,
65+
highRecordWatermark,
66+
lowRecordWatermark
67+
}: {
68+
connectionHolder: ConnectionHolder
69+
onClose: () => void
70+
onBookmarks: (bookmarks: Bookmarks) => void
71+
onConnection: () => void
72+
reactive: boolean
73+
fetchSize: number
74+
impersonatedUser?: string,
75+
highRecordWatermark: number,
76+
lowRecordWatermark: number
77+
}) {
78+
super({
79+
connectionHolder,
80+
onClose,
81+
onBookmarks,
82+
onConnection,
83+
reactive,
84+
fetchSize,
85+
impersonatedUser,
86+
highRecordWatermark,
87+
lowRecordWatermark
88+
})
89+
}
90+
91+
/**
92+
* Waits for the begin to complete.
93+
*
94+
* @param {function(transaction: Transaction)} onFulfilled - function to be called when finished.
95+
* @param {function(error: {message:string, code:string})} onRejected - function to be called upon errors.
96+
* @return {Promise} promise.
97+
*/
98+
then<TResult1 = Transaction, TResult2 = never>(
99+
onfulfilled?:
100+
((value: Transaction) => TResult1 | PromiseLike<TResult1>)
101+
| null,
102+
onrejected?:
103+
((reason: any) => TResult2 | PromiseLike<TResult2>)
104+
| null
105+
): Promise<TResult1 | TResult2> {
106+
return this._getOrCreateBeginPromise().then(onfulfilled, onrejected);
107+
}
108+
109+
/**
110+
* Catch errors when using promises.
111+
*
112+
* @param {function(error: Neo4jError)} onRejected - Function to be called upon errors.
113+
* @return {Promise} promise.
114+
*/
115+
catch<TResult = never>(onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | null): Promise<any> {
116+
return this._getOrCreateBeginPromise().catch(onrejected);
117+
}
118+
119+
/**
120+
* Called when finally the begin is done
121+
*
122+
* @param {function()|null} onfinally - function when the promise finished
123+
* @return {Promise} promise.
124+
*/
125+
finally(onfinally?: (() => void) | null): Promise<Transaction> {
126+
return this._getOrCreateBeginPromise().finally(onfinally);
127+
}
128+
129+
private _getOrCreateBeginPromise(): Promise<Transaction> {
130+
if (!this._beginPromise) {
131+
this._beginPromise = new Promise((resolve, reject) => {
132+
this._resolve = resolve;
133+
this._reject = reject;
134+
if (this._beginError) {
135+
reject(this._beginError);
136+
}
137+
if (this._beginMetadata) {
138+
resolve(this._toTransaction());
139+
}
140+
});
141+
}
142+
return this._beginPromise;
143+
}
144+
145+
/**
146+
* @access private
147+
*/
148+
private _toTransaction(): Transaction {
149+
//@ts-ignore
150+
return {
151+
...this,
152+
run: super.run.bind(this),
153+
commit: super.commit.bind(this),
154+
rollback: super.rollback.bind(this),
155+
close: super.close.bind(this),
156+
isOpen: super.isOpen.bind(this),
157+
_begin: this._begin.bind(this),
158+
}
159+
}
160+
161+
/**
162+
* @access private
163+
*/
164+
_begin(bookmarks: string | Bookmarks | string[], txConfig: TxConfig): void {
165+
return super._begin(bookmarks, txConfig, {
166+
onError: this._onBeginError.bind(this),
167+
onComplete: this._onBeginMetadata.bind(this)
168+
});
169+
}
170+
171+
/**
172+
* @access private
173+
*/
174+
private _onBeginError(error: Error): void {
175+
this._beginError = error;
176+
if (this._reject) {
177+
this._reject(error);
178+
}
179+
}
180+
181+
/**
182+
* @access private
183+
*/
184+
private _onBeginMetadata(metadata: any): void {
185+
this._beginMetadata = metadata || {};
186+
if (this._resolve) {
187+
this._resolve(this._toTransaction());
188+
}
189+
}
190+
191+
}
192+
193+
export default TransactionPromise

packages/core/src/transaction.ts

+22-4
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ class Transaction {
109109
* @param {TxConfig} txConfig
110110
* @returns {void}
111111
*/
112-
_begin(bookmarks: Bookmarks | string | string[], txConfig: TxConfig): void {
112+
_begin(bookmarks: Bookmarks | string | string[], txConfig: TxConfig, events?: {
113+
onError: (error: Error) => void
114+
onComplete: (metadata: any) => void
115+
}): void {
113116
this._connectionHolder
114117
.getConnection()
115118
.then(connection => {
@@ -121,14 +124,29 @@ class Transaction {
121124
mode: this._connectionHolder.mode(),
122125
database: this._connectionHolder.database(),
123126
impersonatedUser: this._impersonatedUser,
124-
beforeError: this._onError,
125-
afterComplete: this._onComplete
127+
beforeError: (error: Error) => {
128+
if (events) {
129+
events.onError(error)
130+
}
131+
return this._onError(error).catch(() => {})
132+
},
133+
afterComplete: (metadata: any) => {
134+
if (events) {
135+
events.onComplete(metadata)
136+
}
137+
return this._onComplete(metadata)
138+
}
126139
})
127140
} else {
128141
throw newError('No connection available')
129142
}
130143
})
131-
.catch(error => this._onError(error))
144+
.catch(error => {
145+
if (events) {
146+
events.onError(error)
147+
}
148+
this._onError(error).catch(() => {})
149+
})
132150
}
133151

134152
/**

packages/core/test/session.test.ts

+30-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
import { ConnectionProvider, Session, Connection } from '../src'
19+
import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction } from '../src'
2020
import { bookmarks } from '../src/internal'
2121
import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants'
2222
import FakeConnection from './utils/connection.fake'
@@ -227,6 +227,35 @@ describe('session', () => {
227227
expect(session.lastBookmarks()).toEqual(bookmarks.values())
228228
})
229229
})
230+
231+
describe('.beginTransaction()', () => {
232+
it('should return a TransactionPromise', () => {
233+
const session = newSessionWithConnection(newFakeConnection(), false, 1000)
234+
235+
const tx: Transaction = session.beginTransaction()
236+
237+
expect(tx).toBeInstanceOf(TransactionPromise)
238+
})
239+
240+
it('should resolves a Transaction', async () => {
241+
const connection = newFakeConnection()
242+
const protocol = connection.protocol()
243+
// @ts-ignore
244+
connection.protocol = () => {
245+
return {
246+
...protocol,
247+
beginTransaction: (params: { afterComplete: () => {} }) => {
248+
params.afterComplete()
249+
}
250+
}
251+
}
252+
const session = newSessionWithConnection(connection, false, 1000)
253+
254+
const tx: Transaction = await session.beginTransaction()
255+
256+
expect(tx).toBeDefined()
257+
})
258+
})
230259
})
231260

232261
function newSessionWithConnection(

0 commit comments

Comments
 (0)