-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Transform a resultSet into a readable stream #219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@jgoux did you get any further with this? |
From the documentation you quoted, I believe it wants you to keep pushing data until you either reach the end or it tells you to stop (when push returns false). That way, the other end of the pipe will keep getting data until they pause it (or it ends). rs._read = async function() {
do {
row = await getRow()
if (!row) {
result.resultSet.close((e) => { if (e) throw e })
connection.release((e) => { if (e) throw e })
return rs.push(null)
}
} while (rs.push(row))
} |
Even with the corrected stream interface, I have problems. For me, all data pushed doesn't seem to actually go through the pipe, and the stream never emits its end/close events. Also, piping the stream directly only results in one record being fetched, while wrapping the pipe in a Promise (like your example usage) somehow returns more. Very odd. I strongly suspect it has something to do with transpiling async/awaits. I started implementing it in non-transpiled code, but quickly understood why you took the route you did: getRow doesn't actually return the data, it only provides the callback that you need to get data from an unrelated event (_read). Ugh... We could actually probably implement it pretty easily if getRow was synchronous, funny enough. @cjbj, any chance we can avoid callback hell and have a nice, pretty Stream interface like blobs/clobs? Bonus points if you can eliminate some backend traffic by grabbing whatever rows are "ready" on each DB call (up to a certain limit to prevent memory bloat, of course). EDIT: async function executeQuery(connectionParameters, query, queryParameters) {
const getConnection = promisify(oracle.getConnection.bind(oracle));
const connection = await getConnection(connectionParameters);
const exQuery = promisify(connection.execute.bind(connection));
const result = await exQuery(query, queryParameters, { resultSet: true, outFormat: oracle.OBJECT });
const getRow = promisify(result.resultSet.getRow.bind(result.resultSet));
const rs = Readable({ objectMode: true });
rs._read = async function(size) {
var row = null;
do {
row = await getRow();
if (!row) {
result.resultSet.close((e) => { if (e) throw e });
connection.release((e) => { if (e) throw e });
return rs.push(null);
}
} while (rs.push(row));
}
return rs;
} Might want to pass down that as a parameter or something. Or, if you don't want to wrestle with transpiling: util.inherits(OracleStream, Readable);
function OracleStream(connectionParameters, query, queryParameters, opt) {
opt.objectMode = true;
opt.resultSet = true;
Readable.call(this, opt);
this._isReading = true;
this._rowsPerFetch = opt.rowsPerFetch || 1;
oracle.getConnection(connectionParameters, function(err, connection) {
if (err) { console.error(err.message); return; }
this._connection = connection;
this._connection.execute(query, queryParameters, opt, function(err, result) {
if (err) {
console.error(err.message);
this._connection.release((e) => { if (e) throw e });
return;
}
this._resultSet = result.resultSet;
this.fetchRows();
}.bind(this));
}.bind(this));
}
OracleStream.prototype._read = function(size) {
if (!this._isReading) {
this._isReading = true;
this.fetchRows();
}
}
OracleStream.prototype.fetchRows = function() {
this._resultSet.getRows(this._rowsPerFetch, function (err, rows) {
if (err || !rows || rows.length == 0) {
if (err) {
console.error(err.message);
}
this._resultSet.close(function(err) {
if (err) throw err;
this._connection.release((e) => { if (e) throw e });
}.bind(this));
this.push(null);
this._isReading = false;
return;
}
var lowWater = true;
for (var i = 0; i < rows.length; i++) {
lowWater = this.push(rows[i]);
}
if (lowWater) {
this.fetchRows();
} else {
this._isReading = false;
}
}.bind(this));
}
exports.createReadStream = function(connectionParameters, query, queryParameters, opt) {
return new OracleStream(connectionParameters, query, queryParameters, opt);
}
// Example Usage
const oracleStream = require('./oracleStream.js');
const query$ = oracleStream.createReadStream(dbConfig, query, queryParameters, { outFormat: oracle.OBJECT, rowsPerFetch: 2 });
const json$ = JSONStream.stringify();
const file$ = fs.createWriteStream("./example.json");
query$.pipe(json$).pipe(file$); |
Based on other _read implementations I've seen, it appears that the most common strategy is to only push one "chunk" until the next read is called (like the OP's implementation). However, in my experimentation I only saw _read called once at the start of the pipe for tiny streams (since the buffers didn't fill up), which supports the idea that you should keep filling up the pipe until you reach the high water mark. For large amounts of data (that reach the high water mark during the stream), _read was called several times sequentially to resume the flow (think of an impatient user clicking a button a bunch of times). In my initial implementation, I forgot the this._isReading = true; line in _read, which caused fetchRows to be called several times in parallel as a result. A limitation of oracledb is that the resultSet can only be accessed sequentially; attempting to get data in parallel throws an error indicating such. In summary, I believe the OP's error was indeed in the _read implementation, specifically that there needed to be preventive measures taken to ensure that the _read doesn't fire parallel getRow calls. My second implementation (the non-transpiled version) handles this issue by preventing parallel calls. If Node.js was multi-threaded, this would have the race condition on the running check, but you should be safe since it handles thing with a single thread (and children via clusters would each have their own request they're handling). Either use my second solution, or add an _isReading check to the transpiled solution and I suspect your problem should go away. |
@leguma many thanks. I need to test it out. Regarding your question in #219 (comment) , I know we have other things on the list before we revisit the node-oracledb API. The network optimization already exists in the form of pre-fetching |
Thanks @leguma for your time invested in this issue. |
I now had the same requirement to handle huge amounts of results so I modified the oracledb wrapper with ability to stream bulks of rows (currently 100 rows at a time, but can be changed via bulkRowsAmount option). connection.query('SELECT * FROM departments WHERE manager_id > :id', [110], {
streamResults: true,
bulkRowsAmount: 100 //The amount of rows to fetch (for streaming, thats the max rows that the callback will get for each streaming invocation)
}, function onResults(error, results) {
if (error) {
//handle error...
} else if (results.length) {
//handle next bulk of results
} else {
//all rows read
}
}); |
updated the example above |
@sagiegurari Also, the desire was to use the Node.js streams interface, so we can do things like pipe these into other streams. For an example, see the original post. |
@leguma the difference is that with resultset you need to do getRows over and over until you finish. |
updated the streamResults to splitResults to be more clear based on your feedback and i'll probably add the streaming that i wrote in the lib later on to enabling piping. |
While it is a bit nicer to have the callback attached directly, it doesn't really take that many more lines of code with ResultSets (I'm counting ~10). Also, you don't have to call getRows everywhere; in the documentation, a recursive call is given as an example, which basically works like a callback in this case. Giving us the opportunity to call getRows manually allows us to be flexible and throttle/pause reading data if necessary, which is critically important for implementing the Node.js stream interface. If I were to try to implement the same with a mandatory onResults callback, then we'd be forced to store all the data in the event that the Node.js streams' buffers get filled and we have to pause writing to them. That means we'd run out of memory in a hurry. I'm interested to see the stream implementation you've mentioned adding. Perhaps you've resolved these issues there. As a side-note, I did some profiling with the fetch size for resultSets. It seems that the optimal fetch size differs based on:
Some preliminary (by no means exhaustive) testing, it seems like a fetch size of 2-3 works best for my streaming implementation for a simple query. When I have some free time, I might compile a performance analysis with different query complexities and fetch sizes. |
The thing you are forgetting, is that the example you gave doesn't count for LOB handling which adds more async code. the stream implementation i had was pretty basic and I worked with events and not pipe, so it is not tested for that. |
Right you are: my implementation even forces the response to be returned as objects, which may be less than optimal (probably resulting in a significant overhead for large amounts of data). This was mostly for convenience in passing the data to a jsonify stream. It would probably be more efficient to use the non-object version of execute and using the header row as a json template, then mapping each record into it. I digress. Luckily, LOBs are already streams in node-oracledb. Haven't tried 'em myself, but the syntax seems pretty easy for reading them. Writing them does look quite verbose, with non-intuitive configuration requirements (autoCommit=false and bound out vars), but I think it was done that way to have a flexible, generic execute function. A prettified interface with most of that stuff preconfigured is quite desirable. Something like // Get CLOB read stream
const clobStream$ = oracleStream.createLOBReadStream(dbConfig, query, queryParameters, { type=oracledb.CLOB });
// Get BLOB write stream
const blobWriteStream$ = oracleStream.createLOBWriteStream(dbConfig, query, queryParameters, { type=oracledb.BLOB }); Also, would be cool to get a streaming write interface set up for non-LOBs. Even more interesting might be making a duplex stream where you can pipe read/write data in and out. Example use case: need to run selects on a large set of data, so you pipe the query data in, it runs the query on each item (or chunk of items), and pipes the data out. However, I don't really anticipate that as a commonly needed operation. More of a fun project. |
Example usage of the aforementioned write/duplex streams: // No query params are passed into write/duplex streams, since those will be piped in
// Example of writing a massive number of records piped in from another stream
const oracleWriteStream$ = oracleStream.createWriteStream(dbConfig, writeQuery, { });
writeQueryParameterStream$.pipe(oracleWriteStream$);
// Example of running a bunch of selects as resultSets and piping the results to a stream
const oracleDuplexStream$ = oracleStream.createDuplexStream(dbConfig, selectQuery, { rowsPerFetch: 2 });
readQueryParameterStream$.pipe(oracleDuplexStream$).pipe(outputStream$); |
I merged the stream code to the lib: //stream all rows (options.streamResults)
connection.query('SELECT * FROM departments WHERE manager_id > :id', [110], {
streamResults: true
}, function onResults(error, stream) {
if (error) {
//handle error...
} else {
//listen to fetched rows via data event or just pipe to another handler
stream.on('data', function (row) {
//use row object
});
//listen to other events such as end/close/error....
}
}); you can see example usage in tests, for example: I never used it with a pipe before, so didn't test that, but since it is a nodejs read stream it should work fine. Hope it helps. I'm not planning to implement the write stream as currently I don't have any such use case, but with the way things are going, maybe it will become relevant in coming weeks as we might have a need for it. |
@jgoux maybe you can check if the stream capability helps you with your original issue |
Is each stream passed to the callback independent? If so, you probably could call pipe each to an output stream (as long as you're ok with out-of-order data). However, as I mentioned, you will run into memory issues if the output pipes cannot process data fast enough (which I think is a sure bet since I was able to hit the high water mark in testing with only 100 records). I recommend taking a gander at the Node.js documentation on stream implementation. Specifically, notice the way that writeable streams tell input streams to pause writing to it when its buffer gets full. The input streams' buffers fill up while they're paused too, and will stop reading once they're filled up. When you do a huge query and get millions of records, that's <=10k pipes with full buffers you're looking at. |
The connection.query will return a new stream to push all data of that specific query. As for the memory, since it is nodejs readstream, it supports pause/resume so it should be ok, if the nodejs piping works good. |
Ok, took a look at your solution and it's nearly the same as the one I provided (using resultSet as a backend, etc). Funny how much is cleared up by taking a look at the code. There are some important differences, though: It doesn't eagerly push data into the stream until its high water mark is reached. Not a big deal; as I mentioned earlier, I've seen other implementations doing the same thing (despite the streams documentation indicating that you should keep pushing). Not sure if this would result in any significant performance difference. Probably is insignificant. I automatically close the connection when the stream is finished. Took me a bit to figure out why my endpoint kept dying after serving a request (I had to add an end event to close it with yours). Merely a taste issue. Another difference I noticed is that null fields returned by simple-oracledb are omitted from the object whereas oracledb returns them as null. It looks like that's due to your implementation of read, where you're doing the same logic that setting the objectMode flag does in oracledb. Any reason for this? Finally, and most critically important: your implementation may have the same issue that the OP and I had. There's the possibility that _read can be called more than once by the output pipe, resulting in parallel getRows calls (and an error). Then again, I could be wrong since that might just be an artifact of my eager pushing. Here's a comparison of code usage, for fun: oracle.getConnection(dbConfig, function onConnection(err, connection) {
if (err) {
console.error(err);
} else {
connection.query(query, queryParams, { streamResults: true }, function onResults(err, query$) {
if (err) {
console.error(err);
} else {
query$.on('end', function () {
connection.release();
});
const json$ = JSONStream.stringify();
const file$ = fs.createWriteStream("./example.json");
query$.pipe(json$).pipe(file$);
}
});
}
});
const query$ = oracleStream.createReadStream(dbConfig, query, queryParameters, { outFormat: oracle.OBJECT, rowsPerFetch: 2 });
const json$ = JSONStream.stringify();
const file$ = fs.createWriteStream("./example.json");
query$.pipe(json$).pipe(file$); |
you are right i'm not pushing more and more in the _read. maybe i should change it, but it works for me and I ran it with 45k rows. basically it means that the abstract nodejs readstream is calling the _read and basically managing the load. not sure if it is a good thing or not.... espacially if that process is loaded with other requests doing the same on other connections. I tending towards leaving it like that. the 'end' event and basically other events as well (except error) are managed by the nodejs implementation, so doesn't matter what oracle stream provider you will use, if they implemented it based on nodejs, the events will look the same. as for the object and nulls, thats actually the main reason i wrote the wrapper in the first place. |
outformat won't convert LOBs to objects. |
Ok, managed to get some time for some performance testing. I used an expensive set of queries run in parallel, all piped together into a massive result set, then sent to a front-end averaged over ~50 calls. For my calls, I used a fetch size of 1 to mimic simple-oracledb, despite getting a bit better performance of a fetch size of 2-3. Using my light wrapper Using the simple-oracledb wrapper My experimentation revealed this fact about Node Streams: A _read expects at least 1 result to be pushed for each call. If you somehow do not respond to a _read call, either by programmatic mistake or race condition, the stream will be forever in a waiting status. As such, that makes eager fetching a little worrisome because you could theoretically be pushing data, reach the high water mark, and then have _read called before the getRows callback ends (resetting the _isReading flag). However, this is extremely unlikely since not only is Node single-threaded, but also _read is only called when the buffer is empty (I believe), so you'll never get it if you're at the high water mark. Theoretically, you might be able to simulate this with some ultra-fast buffer processor, assuming you can get the executing thread to swap between callbacks. In summary, it seems that eager fetching performs slightly better. As for why the simple-oracledb libs are slower: I suspect it might be due to the object serialization there (might be optimized in the oracledb lib), plus there may be some delay due to the callbacky nature of obtaining the streams. Below is the comparison of the server-side implementation of the endpoints, so my usage can be fact-checked. The simple-oracledb implementation is super ugly because you only get the streams in callbacks. :( // oracleStream implementation
const claims$ = oracleStream.createReadStream(dbConfig, CLAIMS_QUERY, [request.params.familyMemberId], { outFormat: oracle.OBJECT, rowsPerFetch: 1 });
const coverages$ = oracleStream.createReadStream(dbConfig, COVERAGE_QUERY, [request.params.familyMemberId], { outFormat: oracle.OBJECT, rowsPerFetch: 1 });
const accums$ = oracleStream.createReadStream(dbConfig, BENEFIT_ACCUMS_QUERY, [request.params.familyMemberId], { outFormat: oracle.OBJECT, rowsPerFetch: 1 });
const benefits$ = oracleStream.createReadStream(dbConfig, BENEFITS_QUERY, [request.params.familyMemberId], { outFormat: oracle.OBJECT, rowsPerFetch: 1 });
const insane$ = oracleStream.createReadStream(dbConfig, INSANE_QUERY, [request.params.familyMemberId], { outFormat: oracle.OBJECT, rowsPerFetch: 1 });
const bigStream$ = new ss({ objectMode: true });
bigStream$.write(claims$);
bigStream$.write(coverages$);
bigStream$.write(accums$);
bigStream$.write(benefits$);
bigStream$.write(insane$);
bigStream$.end();
// JSONStream doesn't implement the newer streams2 interface, so we have to wrap it to keep hapi... happy.
const json$ = new Readable().wrap(JSONStream.stringify());
reply(bigStream$.pipe(json$)); // simple-oracledb implementation
oracle.getConnection(dbConfig, function onConnection(err, connection) {
if (err) {
console.error(err);
} else {
connection.query(CLAIMS_QUERY, [request.params.familyMemberId], { streamResults: true }, function onResults(err, claims$) {
if (err) {
console.error(err);
} else {
claims$.on('end', function () {
connection.release();
});
oracle.getConnection(dbConfig, function onConnection(err, connection) {
if (err) {
console.error(err);
} else {
connection.query(COVERAGE_QUERY, [request.params.familyMemberId], { streamResults: true }, function onResults(err, coverages$) {
if (err) {
console.error(err);
} else {
coverages$.on('end', function () {
connection.release();
});
oracle.getConnection(dbConfig, function onConnection(err, connection) {
if (err) {
console.error(err);
} else {
connection.query(BENEFIT_ACCUMS_QUERY, [request.params.familyMemberId], { streamResults: true }, function onResults(err, accums$) {
if (err) {
console.error(err);
} else {
accums$.on('end', function () {
connection.release();
});
oracle.getConnection(dbConfig, function onConnection(err, connection) {
if (err) {
console.error(err);
} else {
connection.query(BENEFITS_QUERY, [request.params.familyMemberId], { streamResults: true }, function onResults(err, benefits$) {
if (err) {
console.error(err);
} else {
benefits$.on('end', function () {
connection.release();
});
oracle.getConnection(dbConfig, function onConnection(err, connection) {
if (err) {
console.error(err);
} else {
connection.query(INSANE_QUERY, [request.params.familyMemberId], { streamResults: true }, function onResults(err, insane$) {
if (err) {
console.error(err);
} else {
insane$.on('end', function () {
connection.release();
});
const bigStream$ = new ss({ objectMode: true });
bigStream$.write(claims$);
bigStream$.write(coverages$);
bigStream$.write(accums$);
bigStream$.write(benefits$);
bigStream$.write(insane$);
bigStream$.end();
// JSONStream doesn't implement the newer streams2 interface, so we have to wrap it to keep hapi... happy.
const json$ = new Readable().wrap(JSONStream.stringify());
reply(bigStream$.pipe(json$));
}
});
}
});
}
});
}
});
}
});
}
});
}
});
}
});
}
});
}
}); |
var tasks = [];
bindParamsArray.forEach(function createTask(bindParams) {
tasks.push(function executeTask(asyncCallback) {
self.insert(sql, bindParams, options, asyncCallback);
});
});
asyncLib.parallel(tasks, function onBatchDone(error, results) {
var batchCallback = self.createCallback(callback, commit, results);
batchCallback(error);
});
|
Firstly, in no way was I arguing; I was pursuing my own questions as to the efficiency of certain methods of streaming data and the fundamental functionality of the Streams api. Secondly, the callback hell was intentional, as it illustrates a good point regarding using streams directly as callbacks rather than relying on Promises, etc. I sense a language barrier causing some of the confusion, since all responses have been kind of tangential to the overall drift of the conversation. My intent was to share my findings with those similarly curious. Here is the summary:
And the fun TODOs:
|
@cjbj In the end, is there a possibility in the future to get a Readable stream from a query execution inside node-oracledb ? |
@jgoux you can do the below even now. @cjbj I'll try to see if I can move that part of the code so I'll be able to push a PR to oracledb without the C++ (sorry). //do this one time
var oracledb = require('oracledb');
var SimpleOracleDB = require('simple-oracledb');
SimpleOracleDB.extend(oracledb);
//some application code, to get a connection.....
//stream all rows (options.streamResults)
var stream = connection.query('SELECT * FROM departments WHERE manager_id > :id', [110], {
streamResults: true
});
//listen to fetched rows via data event or just pipe to another handler
stream.on('data', function (row) {
//use row object (all LOBs are now either string for CLOBs or node Buffer for BLOBs)
console.log(row.myField);
});
//listen to other events such as end/close/error.... See |
@sagiegurari that would be great |
see new PR #321 |
Closing (a bit late). ResultSet streaming was implemented in 1.8 thanks toe @sagiegurari. |
Hello,
I want to turn a resultSet into a readableStream (like node-mssql and node-mysql provide through their
pipe
methods), so I can pipe it to whatever I want to do with the results (millions of lines).Here is my function to execute a query and then returns a readable stream :
Here is how I use it :
My issue is as a certain point, the fetching stops without errors.
For the same query, sometimes they're 10K rows written to the file, then only 2K, then 14K...
I'm pretty sure it's due to my
_read
implementation, but I can't figure out what's wrong with it.In node.js doc (https://nodejs.org/api/stream.html#stream_readable_read_size_1) this part is unclear to me :
Thanks in advance.
EDIT : Here is an example of an implementation : https://github.com/felixge/node-mysql/blob/de5913227dbbaacbbdf22ee38689d090e1451be9/lib/protocol/sequences/Query.js#L188
The text was updated successfully, but these errors were encountered: