Skip to content

Commit b3ce989

Browse files
committed
Removing unneeded properties and move watermark logic to the session
1 parent 55cf2c1 commit b3ce989

File tree

10 files changed

+123
-78
lines changed

10 files changed

+123
-78
lines changed

packages/bolt-connection/src/bolt/bolt-protocol-v1.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,9 @@ export default class BoltProtocol {
278278
afterError,
279279
beforeComplete,
280280
afterComplete,
281-
flush = true
281+
flush = true,
282+
highRecordWatermark = Number.MAX_VALUE,
283+
lowRecordWatermark = Number.MAX_VALUE
282284
} = {}
283285
) {
284286
const observer = new ResultStreamObserver({
@@ -288,7 +290,9 @@ export default class BoltProtocol {
288290
beforeError,
289291
afterError,
290292
beforeComplete,
291-
afterComplete
293+
afterComplete,
294+
highRecordWatermark,
295+
lowRecordWatermark
292296
})
293297

294298
// bookmark and mode are ignored in this version of the protocol

packages/bolt-connection/src/bolt/bolt-protocol-v3.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ export default class BoltProtocol extends BoltProtocolV2 {
163163
afterError,
164164
beforeComplete,
165165
afterComplete,
166-
flush = true
166+
flush = true,
167+
highRecordWatermark = Number.MAX_VALUE,
168+
lowRecordWatermark = Number.MAX_VALUE
167169
} = {}
168170
) {
169171
const observer = new ResultStreamObserver({
@@ -173,7 +175,9 @@ export default class BoltProtocol extends BoltProtocolV2 {
173175
beforeError,
174176
afterError,
175177
beforeComplete,
176-
afterComplete
178+
afterComplete,
179+
highRecordWatermark,
180+
lowRecordWatermark
177181
})
178182

179183
// passing in a database name on this protocol version throws an error

packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ export default class BoltProtocol extends BoltProtocolV3 {
9090
afterComplete,
9191
flush = true,
9292
reactive = false,
93-
fetchSize = FETCH_ALL
93+
fetchSize = FETCH_ALL,
94+
highRecordWatermark = Number.MAX_VALUE,
95+
lowRecordWatermark = Number.MAX_VALUE
9496
} = {}
9597
) {
9698
const observer = new ResultStreamObserver({
@@ -104,7 +106,9 @@ export default class BoltProtocol extends BoltProtocolV3 {
104106
beforeError,
105107
afterError,
106108
beforeComplete,
107-
afterComplete
109+
afterComplete,
110+
highRecordWatermark,
111+
lowRecordWatermark
108112
})
109113

110114
// passing impersonated user on this protocol version throws an error

packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ export default class BoltProtocol extends BoltProtocolV43 {
8484
afterComplete,
8585
flush = true,
8686
reactive = false,
87-
fetchSize = FETCH_ALL
87+
fetchSize = FETCH_ALL,
88+
highRecordWatermark = Number.MAX_VALUE,
89+
lowRecordWatermark = Number.MAX_VALUE
8890
} = {}
8991
) {
9092
const observer = new ResultStreamObserver({
@@ -98,7 +100,9 @@ export default class BoltProtocol extends BoltProtocolV43 {
98100
beforeError,
99101
afterError,
100102
beforeComplete,
101-
afterComplete
103+
afterComplete,
104+
highRecordWatermark,
105+
lowRecordWatermark
102106
})
103107

104108
const flushRun = reactive

packages/bolt-connection/src/bolt/stream-observers.js

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ class ResultStreamObserver extends StreamObserver {
6868
afterKeys,
6969
beforeComplete,
7070
afterComplete,
71-
server
71+
server,
72+
highRecordWatermark = Number.MAX_VALUE,
73+
lowRecordWatermark = Number.MAX_VALUE
7274
} = {}) {
7375
super()
7476

@@ -94,8 +96,10 @@ class ResultStreamObserver extends StreamObserver {
9496
this._discardFunction = discardFunction
9597
this._discard = false
9698
this._fetchSize = fetchSize
99+
this._lowRecordWatermark = lowRecordWatermark
100+
this._highRecordWatermark = highRecordWatermark
97101
this._setState(reactive ? _states.READY : _states.READY_STREAMING)
98-
this._setupAuoPull(fetchSize)
102+
this._setupAuoPull()
99103
this._pullMode = false;
100104
}
101105

@@ -107,17 +111,6 @@ class ResultStreamObserver extends StreamObserver {
107111
return this._state.pull(this)
108112
}
109113

110-
isReady() {
111-
return this._state === _states.READY
112-
}
113-
114-
getWatermaks () {
115-
return {
116-
high: this._highRecordWatermark,
117-
low: this._lowRecordWatermark
118-
}
119-
}
120-
121114
/**
122115
* Will be called on every record that comes in and transform a raw record
123116
* to a Object. If user-provided observer is present, pass transformed record
@@ -392,15 +385,8 @@ class ResultStreamObserver extends StreamObserver {
392385
this._state = state
393386
}
394387

395-
_setupAuoPull (fetchSize) {
388+
_setupAuoPull () {
396389
this._autoPull = true
397-
if (fetchSize === FETCH_ALL) {
398-
this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull
399-
this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull
400-
} else {
401-
this._lowRecordWatermark = 0.3 * fetchSize
402-
this._highRecordWatermark = 0.7 * fetchSize
403-
}
404390
}
405391
}
406392

packages/core/src/internal/observers.ts

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,6 @@ export interface ResultStreamObserver extends StreamObserver {
9393

9494
pull(): void
9595

96-
isReady(): boolean
97-
98-
getWatermaks(): { high: number; low: number }
99-
10096
/**
10197
* Mark this observer as if it has completed with no metadata.
10298
*/
@@ -131,14 +127,6 @@ export class CompletedObserver implements ResultStreamObserver {
131127
// do nothing
132128
}
133129

134-
isReady(): boolean {
135-
return false;
136-
}
137-
138-
getWatermaks(): { high: number; low: number } {
139-
return { high: 0, low: 0 };
140-
}
141-
142130
setPullMode(_: boolean): void {
143131
// do nothing
144132
}
@@ -194,14 +182,6 @@ export class FailedObserver implements ResultStreamObserver {
194182
// do nothing
195183
}
196184

197-
isReady(): boolean {
198-
return false;
199-
}
200-
201-
getWatermaks(): { high: number; low: number } {
202-
return { high: 0, low: 0 };
203-
}
204-
205185
setPullMode(_: boolean): void {
206186
// do nothing
207187
}

packages/core/src/result.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class Result implements Promise<QueryResult> {
103103
private _connectionHolder: connectionHolder.ConnectionHolder
104104
private _keys: string[] | null
105105
private _summary: ResultSummary | null
106+
private _watermarks: { high: number; low: number }
106107

107108
/**
108109
* Inject the observer to be used.
@@ -117,7 +118,8 @@ class Result implements Promise<QueryResult> {
117118
streamObserverPromise: Promise<observer.ResultStreamObserver>,
118119
query: Query,
119120
parameters?: any,
120-
connectionHolder?: connectionHolder.ConnectionHolder
121+
connectionHolder?: connectionHolder.ConnectionHolder,
122+
watermarks: { high: number; low: number } = { high: Number.MAX_VALUE, low: Number.MAX_VALUE }
121123
) {
122124
this._stack = captureStacktrace()
123125
this._streamObserverPromise = streamObserverPromise
@@ -127,6 +129,7 @@ class Result implements Promise<QueryResult> {
127129
this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER
128130
this._keys = null
129131
this._summary = null
132+
this._watermarks = watermarks
130133
}
131134

132135
/**
@@ -253,9 +256,9 @@ class Result implements Promise<QueryResult> {
253256
}
254257

255258
const streaming = await this._subscribe(observer, true)
256-
const watermarks = streaming.getWatermaks()
259+
257260
const pullIfNeeded = () => {
258-
if (observer.queueSize <= watermarks.high) {
261+
if (observer.queueSize <= this._watermarks.high) {
259262
streaming.pull()
260263
}
261264
}

packages/core/src/session.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
import { ResultStreamObserver, FailedObserver } from './internal/observers'
2020
import { validateQueryAndParameters } from './internal/util'
21+
import { FETCH_ALL } from './internal/constants'
2122
import { newError } from './error'
2223
import Result from './result'
2324
import Transaction from './transaction'
@@ -60,6 +61,8 @@ class Session {
6061
private _impersonatedUser?: string
6162
private _onComplete: (meta: any) => void
6263
private _databaseNameResolved: boolean
64+
private _lowRecordWatermark: number
65+
private _highRecordWatermark: number
6366

6467
/**
6568
* @constructor
@@ -157,7 +160,9 @@ class Session {
157160
impersonatedUser: this._impersonatedUser,
158161
afterComplete: this._onComplete,
159162
reactive: this._reactive,
160-
fetchSize: this._fetchSize
163+
fetchSize: this._fetchSize,
164+
lowRecordWatermark: this._lowRecordWatermark,
165+
highRecordWatermark: this._highRecordWatermark
161166
})
162167
})
163168
}
@@ -192,7 +197,8 @@ class Session {
192197
})
193198
)
194199
}
195-
return new Result(observerPromise, query, parameters, connectionHolder)
200+
const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark }
201+
return new Result(observerPromise, query, parameters, connectionHolder, watermarks)
196202
}
197203

198204
async _acquireConnection(connectionConsumer: ConnectionConsumer) {
@@ -269,7 +275,9 @@ class Session {
269275
onBookmark: this._updateBookmark.bind(this),
270276
onConnection: this._assertSessionIsOpen.bind(this),
271277
reactive: this._reactive,
272-
fetchSize: this._fetchSize
278+
fetchSize: this._fetchSize,
279+
lowRecordWatermark: this._lowRecordWatermark,
280+
highRecordWatermark: this._highRecordWatermark
273281
})
274282
tx._begin(this._lastBookmark, txConfig)
275283
return tx
@@ -418,6 +426,20 @@ class Session {
418426
this._updateBookmark(new Bookmark(meta.bookmark))
419427
}
420428

429+
/**
430+
* @private
431+
* @returns {void}
432+
*/
433+
private _setupWatermark(): void {
434+
if (this._fetchSize === FETCH_ALL) {
435+
this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull
436+
this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull
437+
} else {
438+
this._lowRecordWatermark = 0.3 * this._fetchSize
439+
this._highRecordWatermark = 0.7 * this._fetchSize
440+
}
441+
}
442+
421443
/**
422444
* @protected
423445
*/

0 commit comments

Comments
 (0)