Skip to content

Improve back-pressure mechanism for RxResult #882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 1, 2022

Conversation

bigmontz
Copy link
Contributor

@bigmontz bigmontz commented Feb 22, 2022

The lack of consumer oriented back-pressure mechanism in the RxJS apis was causing memory issues and other problems in the client code.

For solving this issue, the records observer returned by RxResult.records() was changed for using the async iterator as foundation. New methods for enabling the client for fine controlling the stream were added to the RxResult api. These methods are:

  • pause(): Pause the record streaming. No new record will be pushed to the stream util push or resume get called.
  • resume(): Resumes the records streaming.
  • push(): Push the next record. If the stream is not paused, this method will pause it for giving the push control to the client.

Comment on lines +1543 to +1563
const session = driver.rxSession()
const xs = []

const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x')
result
.records()
.pipe(map(record => record.get('x').toInt()))
.subscribe({
next: async x => {
xs.push(x)
// manual pushing reoords to the stream
// it pauses the automatic pushing
await result.push()
},
complete: async () => {
expect(xs).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
await session.close().toPromise()
done()
},
error: done.fail.bind(done)
})
Copy link
Contributor Author

@bigmontz bigmontz Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage for result.push api

Comment on lines +1571 to +1596
const session = driver.rxSession()

try {
const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x')
const xs = await result
.records()
.pipe(
map(record => record.get('x').toInt()),
bufferCount(5), // buffer 5 records
mergeMap(async theXs => {
// pausing the records coming from the stream
result.pause()
// some costly operation
await callCostlyApi(theXs)
// resume the stream
await result.resume()
return theXs
}),
toArray()
)
.toPromise()

expect(xs).toEqual([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10]])
} finally {
await session.close().toPromise()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Examples for pause and resume

@bigmontz bigmontz changed the title Fix backpressure in the rxSession Improve back-pressure mechanism for RxResult Feb 28, 2022
@bigmontz bigmontz marked this pull request as ready for review February 28, 2022 14:15
@bigmontz bigmontz linked an issue Feb 28, 2022 that may be closed by this pull request
@bigmontz bigmontz merged commit 17727e9 into neo4j:5.0 Mar 1, 2022
@bigmontz bigmontz deleted the 5.0-rx-backpressure branch March 1, 2022 16:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Pull through streaming API to prevent back-pressure
1 participant