Skip to content

Commit db1b95e

Browse files
committed
Add 'packages/pg-query-stream/' from commit '9ced05e8aab65f3fdf1a67add87bfc9035e487e8'
git-subtree-dir: packages/pg-query-stream git-subtree-mainline: cccf84e git-subtree-split: 9ced05e
2 parents cccf84e + 9ced05e commit db1b95e

25 files changed

+2242
-0
lines changed

packages/pg-query-stream/.eslintrc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"extends": "standard",
3+
"env": {
4+
"mocha": true
5+
},
6+
"rules": {
7+
"no-new-func": "off"
8+
}
9+
}

packages/pg-query-stream/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
node_modules

packages/pg-query-stream/.travis.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
language: node_js
2+
dist: trusty
3+
node_js:
4+
- "8"
5+
- "10"
6+
- "12"
7+
env:
8+
- PGUSER=postgres
9+
services:
10+
- postgresql
11+
addons:
12+
postgresql: "9.6"
13+
before_script:
14+
- psql -c 'create database travis;' -U postgres | true

packages/pg-query-stream/LICENSE

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2013 Brian M. Carlson
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
6+
7+
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
8+
9+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

packages/pg-query-stream/Makefile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
.PHONY: publish-patch test
2+
3+
test:
4+
npm test
5+
6+
patch: test
7+
npm version patch -m "Bump version"
8+
git push origin master --tags
9+
npm publish
10+
11+
minor: test
12+
npm version minor -m "Bump version"
13+
git push origin master --tags
14+
npm publish

packages/pg-query-stream/README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# pg-query-stream
2+
3+
[![Build Status](https://travis-ci.org/brianc/node-pg-query-stream.svg)](https://travis-ci.org/brianc/node-pg-query-stream)
4+
5+
Receive result rows from [pg](https://github.com/brianc/node-postgres) as a readable (object) stream.
6+
7+
8+
## installation
9+
10+
```bash
11+
$ npm install pg --save
12+
$ npm install pg-query-stream --save
13+
```
14+
15+
_requires pg>=2.8.1_
16+
17+
18+
## use
19+
20+
```js
21+
const pg = require('pg')
22+
const QueryStream = require('pg-query-stream')
23+
const JSONStream = require('JSONStream')
24+
25+
//pipe 1,000,000 rows to stdout without blowing up your memory usage
26+
pg.connect((err, client, done) => {
27+
if (err) throw err;
28+
const query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000])
29+
const stream = client.query(query)
30+
//release the client when the stream is finished
31+
stream.on('end', done)
32+
stream.pipe(JSONStream.stringify()).pipe(process.stdout)
33+
})
34+
```
35+
36+
The stream uses a cursor on the server so it efficiently keeps only a low number of rows in memory.
37+
38+
This is especially useful when doing [ETL](http://en.wikipedia.org/wiki/Extract,_transform,_load) on a huge table. Using manual `limit` and `offset` queries to fake out async itteration through your data is cumbersome, and _way way way_ slower than using a cursor.
39+
40+
_note: this module only works with the JavaScript client, and does not work with the native bindings. libpq doesn't expose the protocol at a level where a cursor can be manipulated directly_
41+
42+
## contribution
43+
44+
I'm very open to contribution! Open a pull request with your code or idea and we'll talk about it. If it's not way insane we'll merge it in too: isn't open source awesome?
45+
46+
## license
47+
48+
The MIT License (MIT)
49+
50+
Copyright (c) 2013 Brian M. Carlson
51+
52+
Permission is hereby granted, free of charge, to any person obtaining a copy
53+
of this software and associated documentation files (the "Software"), to deal
54+
in the Software without restriction, including without limitation the rights
55+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
56+
copies of the Software, and to permit persons to whom the Software is
57+
furnished to do so, subject to the following conditions:
58+
59+
The above copyright notice and this permission notice shall be included in
60+
all copies or substantial portions of the Software.
61+
62+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
63+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
64+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
65+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
66+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
67+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
68+
THE SOFTWARE.

packages/pg-query-stream/index.js

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict'
2+
var Cursor = require('pg-cursor')
3+
var Readable = require('stream').Readable
4+
5+
class PgQueryStream extends Readable {
6+
constructor (text, values, options) {
7+
super(Object.assign({ objectMode: true }, options))
8+
this.cursor = new Cursor(text, values, options)
9+
this._reading = false
10+
this._closed = false
11+
this.batchSize = (options || {}).batchSize || 100
12+
13+
// delegate Submittable callbacks to cursor
14+
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
15+
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
16+
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
17+
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
18+
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
19+
this.handleError = this.cursor.handleError.bind(this.cursor)
20+
}
21+
22+
submit (connection) {
23+
this.cursor.submit(connection)
24+
}
25+
26+
close (callback) {
27+
this._closed = true
28+
const cb = callback || (() => this.emit('close'))
29+
this.cursor.close(cb)
30+
}
31+
32+
_read (size) {
33+
if (this._reading || this._closed) {
34+
return false
35+
}
36+
this._reading = true
37+
const readAmount = Math.max(size, this.batchSize)
38+
this.cursor.read(readAmount, (err, rows) => {
39+
if (this._closed) {
40+
return
41+
}
42+
if (err) {
43+
return this.emit('error', err)
44+
}
45+
// if we get a 0 length array we've read to the end of the cursor
46+
if (!rows.length) {
47+
this._closed = true
48+
setImmediate(() => this.emit('close'))
49+
return this.push(null)
50+
}
51+
52+
// push each row into the stream
53+
this._reading = false
54+
for (var i = 0; i < rows.length; i++) {
55+
this.push(rows[i])
56+
}
57+
})
58+
}
59+
}
60+
61+
module.exports = PgQueryStream

packages/pg-query-stream/package.json

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
{
2+
"name": "pg-query-stream",
3+
"version": "2.0.1",
4+
"description": "Postgres query result returned as readable stream",
5+
"main": "index.js",
6+
"scripts": {
7+
"test": "mocha"
8+
},
9+
"repository": {
10+
"type": "git",
11+
"url": "git://github.com/brianc/node-pg-query-stream.git"
12+
},
13+
"keywords": [
14+
"postgres",
15+
"pg",
16+
"query",
17+
"stream"
18+
],
19+
"author": "Brian M. Carlson",
20+
"license": "MIT",
21+
"bugs": {
22+
"url": "https://github.com/brianc/node-pg-query-stream/issues"
23+
},
24+
"devDependencies": {
25+
"JSONStream": "~0.7.1",
26+
"concat-stream": "~1.0.1",
27+
"eslint": "^4.4.0",
28+
"eslint-config-standard": "^10.2.1",
29+
"eslint-plugin-import": "^2.7.0",
30+
"eslint-plugin-node": "^5.1.1",
31+
"eslint-plugin-promise": "^3.5.0",
32+
"eslint-plugin-standard": "^3.0.1",
33+
"mocha": "^6.2.2",
34+
"pg": "^7.5.0",
35+
"prettier": "^1.18.2",
36+
"stream-spec": "~0.3.5",
37+
"stream-tester": "0.0.5",
38+
"through": "~2.3.4"
39+
},
40+
"prettier": {
41+
"semi": false,
42+
"printWidth": 120,
43+
"trailingComma": "es5",
44+
"singleQuote": true
45+
},
46+
"dependencies": {
47+
"pg-cursor": "^2.0.1"
48+
}
49+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
const QueryStream = require('../')
2+
const pg = require('pg')
3+
const assert = require('assert')
4+
5+
const queryText = 'SELECT * FROM generate_series(0, 200) num'
6+
describe('Async iterator', () => {
7+
it('works', async () => {
8+
const stream = new QueryStream(queryText, [])
9+
const client = new pg.Client()
10+
await client.connect()
11+
const query = client.query(stream)
12+
const rows = []
13+
for await (const row of query) {
14+
rows.push(row)
15+
}
16+
assert.equal(rows.length, 201)
17+
await client.end()
18+
})
19+
20+
it('can async iterate and then do a query afterwards', async () => {
21+
const stream = new QueryStream(queryText, [])
22+
const client = new pg.Client()
23+
await client.connect()
24+
const query = client.query(stream)
25+
const iteratorRows = []
26+
for await (const row of query) {
27+
iteratorRows.push(row)
28+
}
29+
assert.equal(iteratorRows.length, 201)
30+
const { rows } = await client.query('SELECT NOW()')
31+
assert.equal(rows.length, 1)
32+
await client.end()
33+
})
34+
35+
it('can async iterate multiple times with a pool', async () => {
36+
const pool = new pg.Pool({ max: 1 })
37+
38+
const allRows = []
39+
const run = async () => {
40+
// get the client
41+
const client = await pool.connect()
42+
// stream some rows
43+
const stream = new QueryStream(queryText, [])
44+
const iteratorRows = []
45+
client.query(stream)
46+
for await (const row of stream) {
47+
iteratorRows.push(row)
48+
allRows.push(row)
49+
}
50+
assert.equal(iteratorRows.length, 201)
51+
client.release()
52+
}
53+
await Promise.all([run(), run(), run()])
54+
assert.equal(allRows.length, 603)
55+
await pool.end()
56+
})
57+
})
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// only newer versions of node support async iterator
2+
if (!process.version.startsWith('v8')) {
3+
require('./async-iterator.es6')
4+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
var assert = require('assert')
2+
var concat = require('concat-stream')
3+
4+
var QueryStream = require('../')
5+
var helper = require('./helper')
6+
7+
helper('close', function (client) {
8+
it('emits close', function (done) {
9+
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
10+
var query = client.query(stream)
11+
query.pipe(concat(function () {}))
12+
query.on('close', done)
13+
})
14+
})
15+
16+
helper('early close', function (client) {
17+
it('can be closed early', function (done) {
18+
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
19+
var query = client.query(stream)
20+
var readCount = 0
21+
query.on('readable', function () {
22+
readCount++
23+
query.read()
24+
})
25+
query.once('readable', function () {
26+
query.close()
27+
})
28+
query.on('close', function () {
29+
assert(readCount < 10, 'should not have read more than 10 rows')
30+
done()
31+
})
32+
})
33+
})
34+
35+
helper('close callback', function (client) {
36+
it('notifies an optional callback when the conneciton is closed', function (done) {
37+
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
38+
var query = client.query(stream)
39+
query.once('readable', function () { // only reading once
40+
query.read()
41+
})
42+
query.once('readable', function () {
43+
query.close(function () {
44+
// nothing to assert. This test will time out if the callback does not work.
45+
done()
46+
})
47+
})
48+
query.on('close', function () {
49+
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
50+
})
51+
})
52+
})
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
var assert = require('assert')
2+
var concat = require('concat-stream')
3+
var through = require('through')
4+
var helper = require('./helper')
5+
6+
var QueryStream = require('../')
7+
8+
helper('concat', function (client) {
9+
it('concats correctly', function (done) {
10+
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
11+
var query = client.query(stream)
12+
query.pipe(through(function (row) {
13+
this.push(row.num)
14+
})).pipe(concat(function (result) {
15+
var total = result.reduce(function (prev, cur) {
16+
return prev + cur
17+
})
18+
assert.equal(total, 20100)
19+
}))
20+
stream.on('end', done)
21+
})
22+
})
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
var assert = require('assert')
2+
var QueryStream = require('../')
3+
4+
var stream = new QueryStream('SELECT NOW()', [], {
5+
highWaterMark: 999,
6+
batchSize: 88
7+
})
8+
9+
assert.equal(stream._readableState.highWaterMark, 999)
10+
assert.equal(stream.batchSize, 88)

0 commit comments

Comments
 (0)