Skip to content

Database Notification Improvements #1008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3d9d59a
Database Notification Improvements
bigmontz Oct 19, 2022
d1cac6e
Add tests for Notification, notificationSeverityLevel and notificatio…
bigmontz Oct 19, 2022
eec82a8
Assert not sending notification filters the current implementations o…
bigmontz Oct 20, 2022
bac205a
Implement notification filters validation from v1 to v5.0
bigmontz Oct 26, 2022
9fd8e45
Add BoltProtocolV5x1 with notification filters
bigmontz Oct 26, 2022
c2e9bcb
Add bolt 5.1 to handshake
bigmontz Oct 26, 2022
3899d8d
Add notification filters to Connection.initialize
bigmontz Oct 26, 2022
ef4320c
Add notification configuration to session and driver
bigmontz Oct 27, 2022
0d255e7
testkit
bigmontz Oct 27, 2022
8b6dcf3
Adjust categories
bigmontz Nov 1, 2022
49d070b
Update HELLO message for 5.1
bigmontz Nov 1, 2022
7594988
Small adjustment
bigmontz Nov 1, 2022
73b86af
Fix features name
bigmontz Nov 1, 2022
03d6109
Add NotificationFilter and notificationFilter to core
bigmontz Nov 3, 2022
24bbe3f
Export NotificationFilter and notificationFilter
bigmontz Nov 3, 2022
33887c9
Sanitize notification filters
bigmontz Nov 3, 2022
2dd6816
Validate notification filters while create sessions and drivers
bigmontz Nov 3, 2022
cdfe17e
Add tests to verify filters are being send to the protocol
bigmontz Nov 3, 2022
1152f59
improve filters documentation
bigmontz Nov 3, 2022
8e7cb10
Improve notification filter definition by deriving from category and …
bigmontz Nov 4, 2022
3fa1439
Adjust deno code
bigmontz Nov 4, 2022
362b141
Add configuration docs
bigmontz Nov 4, 2022
ada3d04
Fix notification filters config
bigmontz Nov 7, 2022
d4d7259
Apply suggestions from code review
bigmontz Nov 8, 2022
5106406
Addressing PR comments
bigmontz Nov 8, 2022
c89afc6
Address comments in the PR
bigmontz Nov 8, 2022
c58ceb6
Remove QUERY from Category enum
bigmontz Nov 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions packages/bolt-connection/src/bolt/bolt-protocol-util.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { newError } from 'neo4j-driver-core'
import { newError, json } from 'neo4j-driver-core'
// eslint-disable-next-line no-unused-vars
import { ResultStreamObserver } from './stream-observers'

Expand Down Expand Up @@ -79,4 +79,23 @@ function assertImpersonatedUserIsEmpty (impersonatedUser, onProtocolError = () =
}
}

export { assertDatabaseIsEmpty, assertTxConfigIsEmpty, assertImpersonatedUserIsEmpty }
/* Asserts that the passed-in notificationFilters is empty
* @param {string[]} notificationFilters
* @param {function (err:Error)} onProtocolError Called when it does have notificationFilters user set
* @param {any} observer
*/
function assertNotificationFiltersIsEmpty (notificationFilters, onProtocolError = () => {}, observer) {
if (notificationFilters !== undefined) {
const error = newError(
'Driver is connected to a database that does not support user notification filters. ' +
'Please upgrade to neo4j 5.3.0 or later in order to use this functionality. ' +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not yet clear if this feature will make it into 5.3 or later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will hold this comment/pr until we have the functionality merged in the server and we know which version it will be part of.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 just make sure to not forget to update the parts of the PR that mention the version (also some doc comments).

`Trying to set notifications to ${json.stringify(notificationFilters)}.`
)
// unsupported API was used, consider this a fatal error for the current connection
onProtocolError(error.message)
observer.onError(error)
throw error
}
}

export { assertDatabaseIsEmpty, assertTxConfigIsEmpty, assertImpersonatedUserIsEmpty, assertNotificationFiltersIsEmpty }
16 changes: 14 additions & 2 deletions packages/bolt-connection/src/bolt/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import {
assertDatabaseIsEmpty,
assertTxConfigIsEmpty,
assertImpersonatedUserIsEmpty
assertImpersonatedUserIsEmpty,
assertNotificationFiltersIsEmpty
} from './bolt-protocol-util'
// eslint-disable-next-line no-unused-vars
import { Chunker } from '../channel'
Expand Down Expand Up @@ -149,14 +150,18 @@ export default class BoltProtocol {
* @param {Object} param.authToken the authentication token.
* @param {function(err: Error)} param.onError the callback to invoke on error.
* @param {function()} param.onComplete the callback to invoke on completion.
* @param {?string[]} param.notificationFilters the filtering for notifications.
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
*/
initialize ({ userAgent, authToken, onError, onComplete } = {}) {
initialize ({ userAgent, authToken, onError, onComplete, notificationFilters } = {}) {
const observer = new LoginObserver({
onError: error => this._onLoginError(error, onError),
onCompleted: metadata => this._onLoginCompleted(metadata, onComplete)
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)

this.write(RequestMessage.init(userAgent, authToken), observer, true)

return observer
Expand All @@ -177,6 +182,7 @@ export default class BoltProtocol {
* @param {string} param.database the target database name.
* @param {string} param.mode the access mode.
* @param {string} param.impersonatedUser the impersonated user
* @param {?string[]} param.notificationFilters the filtering for notifications.
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
Expand All @@ -189,6 +195,7 @@ export default class BoltProtocol {
database,
mode,
impersonatedUser,
notificationFilters,
beforeError,
afterError,
beforeComplete,
Expand All @@ -203,6 +210,7 @@ export default class BoltProtocol {
database,
mode,
impersonatedUser,
notificationFilters,
beforeError,
afterError,
beforeComplete,
Expand Down Expand Up @@ -285,6 +293,7 @@ export default class BoltProtocol {
* @param {TxConfig} param.txConfig the transaction configuration.
* @param {string} param.database the target database name.
* @param {string} param.impersonatedUser the impersonated user
* @param {?string[]} param.notificationFilters the filtering for notifications.
* @param {string} param.mode the access mode.
* @param {function(keys: string[])} param.beforeKeys the callback to invoke before handling the keys.
* @param {function(keys: string[])} param.afterKeys the callback to invoke after handling the keys.
Expand All @@ -304,6 +313,7 @@ export default class BoltProtocol {
database,
mode,
impersonatedUser,
notificationFilters,
beforeKeys,
afterKeys,
beforeError,
Expand All @@ -327,6 +337,8 @@ export default class BoltProtocol {
lowRecordWatermark
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)
// bookmarks and mode are ignored in this version of the protocol
assertTxConfigIsEmpty(txConfig, this._onProtocolError, observer)
// passing in a database name on this protocol version throws an error
Expand Down
13 changes: 11 additions & 2 deletions packages/bolt-connection/src/bolt/bolt-protocol-v3.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
import BoltProtocolV2 from './bolt-protocol-v2'
import RequestMessage from './request-message'
import { assertDatabaseIsEmpty, assertImpersonatedUserIsEmpty } from './bolt-protocol-util'
import { assertDatabaseIsEmpty, assertImpersonatedUserIsEmpty, assertNotificationFiltersIsEmpty } from './bolt-protocol-util'
import {
StreamObserver,
LoginObserver,
Expand Down Expand Up @@ -69,12 +69,15 @@ export default class BoltProtocol extends BoltProtocolV2 {
return metadata
}

initialize ({ userAgent, authToken, onError, onComplete } = {}) {
initialize ({ userAgent, authToken, onError, onComplete, notificationFilters } = {}) {
const observer = new LoginObserver({
onError: error => this._onLoginError(error, onError),
onCompleted: metadata => this._onLoginCompleted(metadata, onComplete)
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)

this.write(RequestMessage.hello(userAgent, authToken), observer, true)

return observer
Expand All @@ -89,6 +92,7 @@ export default class BoltProtocol extends BoltProtocolV2 {
txConfig,
database,
impersonatedUser,
notificationFilters,
mode,
beforeError,
afterError,
Expand All @@ -104,6 +108,8 @@ export default class BoltProtocol extends BoltProtocolV2 {
})
observer.prepareToHandleSingleResponse()

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)
// passing in a database name on this protocol version throws an error
assertDatabaseIsEmpty(database, this._onProtocolError, observer)
// passing impersonated user on this protocol version throws an error
Expand Down Expand Up @@ -166,6 +172,7 @@ export default class BoltProtocol extends BoltProtocolV2 {
txConfig,
database,
impersonatedUser,
notificationFilters,
mode,
beforeKeys,
afterKeys,
Expand All @@ -190,6 +197,8 @@ export default class BoltProtocol extends BoltProtocolV2 {
lowRecordWatermark
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)
// passing in a database name on this protocol version throws an error
assertDatabaseIsEmpty(database, this._onProtocolError, observer)
// passing impersonated user on this protocol version throws an error
Expand Down
8 changes: 7 additions & 1 deletion packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
import BoltProtocolV3 from './bolt-protocol-v3'
import RequestMessage from './request-message'
import { assertImpersonatedUserIsEmpty } from './bolt-protocol-util'
import { assertImpersonatedUserIsEmpty, assertNotificationFiltersIsEmpty } from './bolt-protocol-util'
import {
ResultStreamObserver,
ProcedureRouteObserver
Expand Down Expand Up @@ -56,6 +56,7 @@ export default class BoltProtocol extends BoltProtocolV3 {
txConfig,
database,
impersonatedUser,
notificationFilters,
mode,
beforeError,
afterError,
Expand All @@ -71,6 +72,8 @@ export default class BoltProtocol extends BoltProtocolV3 {
})
observer.prepareToHandleSingleResponse()

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)
// passing impersonated user on this protocol version throws an error
assertImpersonatedUserIsEmpty(impersonatedUser, this._onProtocolError, observer)

Expand All @@ -91,6 +94,7 @@ export default class BoltProtocol extends BoltProtocolV3 {
txConfig,
database,
impersonatedUser,
notificationFilters,
mode,
beforeKeys,
afterKeys,
Expand Down Expand Up @@ -121,6 +125,8 @@ export default class BoltProtocol extends BoltProtocolV3 {
lowRecordWatermark
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)
// passing impersonated user on this protocol version throws an error
assertImpersonatedUserIsEmpty(impersonatedUser, this._onProtocolError, observer)

Expand Down
6 changes: 5 additions & 1 deletion packages/bolt-connection/src/bolt/bolt-protocol-v4x1.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { internal } from 'neo4j-driver-core'

import transformersFactories from './bolt-protocol-v4x1.transformer'
import Transformer from './transformer'
import { assertNotificationFiltersIsEmpty } from './bolt-protocol-util'

const {
constants: { BOLT_PROTOCOL_V4_1 }
Expand Down Expand Up @@ -72,12 +73,15 @@ export default class BoltProtocol extends BoltProtocolV4 {
return this._transformer
}

initialize ({ userAgent, authToken, onError, onComplete } = {}) {
initialize ({ userAgent, authToken, onError, onComplete, notificationFilters } = {}) {
const observer = new LoginObserver({
onError: error => this._onLoginError(error, onError),
onCompleted: metadata => this._onLoginCompleted(metadata, onComplete)
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)

this.write(
RequestMessage.hello(userAgent, authToken, this._serversideRouting),
observer,
Expand Down
7 changes: 6 additions & 1 deletion packages/bolt-connection/src/bolt/bolt-protocol-v4x3.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import utcTransformersFactories from './bolt-protocol-v5x0.utc.transformer'
import Transformer from './transformer'

import { internal } from 'neo4j-driver-core'
import { assertNotificationFiltersIsEmpty } from './bolt-protocol-util'

const {
bookmarks: { Bookmarks },
Expand Down Expand Up @@ -85,9 +86,10 @@ export default class BoltProtocol extends BoltProtocolV42 {
* @param {any} param0.authToken The auth token
* @param {function(error)} param0.onError On error callback
* @param {function(onComplte)} param0.onComplete On complete callback
* @param {?string[]} param0.notificationFilters the filtering for notifications.
* @returns {LoginObserver} The Login observer
*/
initialize ({ userAgent, authToken, onError, onComplete } = {}) {
initialize ({ userAgent, authToken, onError, onComplete, notificationFilters } = {}) {
const observer = new LoginObserver({
onError: error => this._onLoginError(error, onError),
onCompleted: metadata => {
Expand All @@ -98,6 +100,9 @@ export default class BoltProtocol extends BoltProtocolV42 {
}
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)

this.write(
RequestMessage.hello(userAgent, authToken, this._serversideRouting, ['utc']),
observer,
Expand Down
10 changes: 10 additions & 0 deletions packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import transformersFactories from './bolt-protocol-v4x4.transformer'
import utcTransformersFactories from './bolt-protocol-v5x0.utc.transformer'
import Transformer from './transformer'

import { assertNotificationFiltersIsEmpty } from './bolt-protocol-util'

const {
constants: { BOLT_PROTOCOL_V4_4, FETCH_ALL },
bookmarks: { Bookmarks }
Expand Down Expand Up @@ -87,6 +89,7 @@ export default class BoltProtocol extends BoltProtocolV43 {
database,
mode,
impersonatedUser,
notificationFilters,
beforeKeys,
afterKeys,
beforeError,
Expand Down Expand Up @@ -116,6 +119,9 @@ export default class BoltProtocol extends BoltProtocolV43 {
lowRecordWatermark
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)

const flushRun = reactive
this.write(
RequestMessage.runWithMetadata(query, parameters, {
Expand All @@ -142,6 +148,7 @@ export default class BoltProtocol extends BoltProtocolV43 {
database,
mode,
impersonatedUser,
notificationFilters,
beforeError,
afterError,
beforeComplete,
Expand All @@ -156,6 +163,9 @@ export default class BoltProtocol extends BoltProtocolV43 {
})
observer.prepareToHandleSingleResponse()

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)

this.write(
RequestMessage.begin({ bookmarks, txConfig, database, mode, impersonatedUser }),
observer,
Expand Down
7 changes: 6 additions & 1 deletion packages/bolt-connection/src/bolt/bolt-protocol-v5x0.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import RequestMessage from './request-message'
import { LoginObserver } from './stream-observers'

import { internal } from 'neo4j-driver-core'
import { assertNotificationFiltersIsEmpty } from './bolt-protocol-util'

const {
constants: { BOLT_PROTOCOL_V5_0 }
Expand All @@ -49,14 +50,18 @@ export default class BoltProtocol extends BoltProtocolV44 {
* @param {any} param0.authToken The auth token
* @param {function(error)} param0.onError On error callback
* @param {function(onComplte)} param0.onComplete On complete callback
* @param {?string[]} param0.notificationFilters the filtering for notifications.
* @returns {LoginObserver} The Login observer
*/
initialize ({ userAgent, authToken, onError, onComplete } = {}) {
initialize ({ userAgent, authToken, onError, onComplete, notificationFilters } = {}) {
const observer = new LoginObserver({
onError: error => this._onLoginError(error, onError),
onCompleted: metadata => this._onLoginCompleted(metadata, onComplete)
})

// passing notification filters user on this protocol version throws an error
assertNotificationFiltersIsEmpty(notificationFilters, this._onProtocolError, observer)

this.write(
RequestMessage.hello(userAgent, authToken, this._serversideRouting),
observer,
Expand Down
Loading