Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Commit 1fe5848

Browse files
committed
Merge pull request #42 from ipfs/exporter/duplex
export is now a readable object stream
2 parents 89a02de + 984b42d commit 1fe5848

File tree

4 files changed

+128
-77
lines changed

4 files changed

+128
-77
lines changed

README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ add.on('end', () => {
6666
// Calling write on the importer to add the file/object tuples
6767

6868
add.write(input)
69+
add.write(input2)
6970
add.end()
7071
```
7172

@@ -121,13 +122,13 @@ const repo = new ipfsRepo('', { stores: memStore })
121122
const blocks = new ipfsBlockService(repo)
122123
const dag = new ipfsMerkleDag.DAGService(blocks)
123124
124-
// Create an export event with the hash you want to export and a dag service
125+
// Create an export readable object stream with the hash you want to export and a dag service
125126
126127
const exportEvent = Exporter(hash, dag)
127128
128129
// Pipe the return stream to console
129130
130-
exportEvent.on('file', (result) => {
131+
exportEvent.on('data', (result) => {
131132
result.stream.pipe(process.stdout)
132133
}
133134
```
@@ -137,8 +138,7 @@ exportEvent.on('file', (result) => {
137138
const Importer = require('ipfs-unixfs-engine').exporter
138139
```
139140

140-
The exporter is an event emitter that returns a stream of the file found
141-
by the multihash of the file from the dag service.
141+
The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service.
142142

143143

144144
## install

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"ipfs-merkle-dag": "^0.5.0",
5858
"ipfs-unixfs": "^0.1.0",
5959
"readable-stream": "^1.1.13",
60+
"run-series": "^1.1.4",
6061
"through2": "^2.0.0"
6162
},
6263
"contributors": [
@@ -67,4 +68,4 @@
6768
"greenkeeperio-bot <[email protected]>",
6869
"nginnever <[email protected]>"
6970
]
70-
}
71+
}

src/exporter.js

+62-58
Original file line numberDiff line numberDiff line change
@@ -4,42 +4,32 @@ const debug = require('debug')
44
const log = debug('exporter')
55
log.err = debug('exporter:error')
66
const UnixFS = require('ipfs-unixfs')
7+
const series = require('run-series')
78
const async = require('async')
8-
const events = require('events')
99
const Readable = require('readable-stream').Readable
1010
const pathj = require('path')
11+
const util = require('util')
1112

12-
exports = module.exports = exporter
13+
exports = module.exports = Exporter
1314

14-
function exporter (hash, dagService, options, callback) {
15-
if (typeof options === 'function') {
16-
callback = options
17-
options = {}
15+
util.inherits(Exporter, Readable)
16+
17+
function Exporter (hash, dagService, options) {
18+
if (!(this instanceof Exporter)) {
19+
return new Exporter(hash, dagService, options)
1820
}
1921

20-
const ee = new events.EventEmitter()
21-
dagService.get(hash, (err, fetchedNode) => {
22-
if (err) {
23-
if (callback) {
24-
return callback(err)
25-
}
26-
return
27-
}
28-
const data = UnixFS.unmarshal(fetchedNode.data)
29-
const type = data.type
30-
if (type === 'directory') {
31-
dirExporter(fetchedNode, hash, callback)
32-
}
33-
if (type === 'file') {
34-
fileExporter(fetchedNode, hash, false, callback)
35-
}
36-
})
37-
return ee
22+
Readable.call(this, { objectMode: true })
3823

39-
function fileExporter (node, name, dir, callback) {
24+
this.options = options || {}
25+
26+
this._read = (n) => {}
27+
28+
let fileExporter = (node, name, callback) => {
4029
let init
4130

42-
if (typeof dir === 'function') { callback = dir; dir = {} }
31+
if (!callback) { callback = function noop () {} }
32+
4333
var rs = new Readable()
4434
if (node.links.length === 0) {
4535
const unmarshaledData = UnixFS.unmarshal(node.data)
@@ -52,10 +42,8 @@ function exporter (hash, dagService, options, callback) {
5242
rs.push(unmarshaledData.data)
5343
rs.push(null)
5444
}
55-
ee.emit('file', { stream: rs, path: name, dir: dir })
56-
if (callback) {
57-
callback()
58-
}
45+
this.push({ stream: rs, path: name })
46+
callback()
5947
return
6048
} else {
6149
init = false
@@ -64,36 +52,40 @@ function exporter (hash, dagService, options, callback) {
6452
return
6553
}
6654
init = true
67-
async.forEachSeries(node.links, (link, callback) => {
68-
dagService.get(link.hash, (err, res) => {
69-
if (err) {
70-
callback(err)
71-
}
72-
var unmarshaledData = UnixFS.unmarshal(res.data)
73-
rs.push(unmarshaledData.data)
74-
callback()
75-
})
76-
}, (err) => {
55+
56+
const array = node.links.map((link) => {
57+
return (cb) => {
58+
dagService.get(link.hash, (err, res) => {
59+
if (err) {
60+
cb(err)
61+
}
62+
var unmarshaledData = UnixFS.unmarshal(res.data)
63+
rs.push(unmarshaledData.data)
64+
cb()
65+
})
66+
}
67+
})
68+
series(array, (err, res) => {
7769
if (err) {
78-
if (callback) {
79-
return callback(err)
80-
}
70+
callback()
8171
return
8272
}
8373
rs.push(null)
84-
if (callback) {
85-
callback()
86-
}
74+
callback()
8775
return
8876
})
8977
}
90-
ee.emit('file', { stream: rs, path: name, dir: dir })
78+
this.push({ stream: rs, path: name })
79+
callback()
80+
return
9181
}
9282
}
9383

94-
function dirExporter (node, name, callback) {
84+
let dirExporter = (node, name, callback) => {
9585
let init
9686

87+
if (!callback) { callback = function noop () {} }
88+
9789
var rs = new Readable()
9890
if (node.links.length === 0) {
9991
init = false
@@ -105,10 +97,8 @@ function exporter (hash, dagService, options, callback) {
10597
rs.push(node.data)
10698
rs.push(null)
10799
}
108-
ee.emit('file', {stream: rs, path: name})
109-
if (callback) {
110-
callback()
111-
}
100+
this.push({stream: null, path: name})
101+
callback()
112102
return
113103
} else {
114104
async.forEachSeries(node.links, (link, callback) => {
@@ -127,16 +117,30 @@ function exporter (hash, dagService, options, callback) {
127117
})
128118
}, (err) => {
129119
if (err) {
130-
if (callback) {
131-
return callback(err)
132-
}
133-
return
134-
}
135-
if (callback) {
136120
callback()
121+
return
137122
}
123+
callback()
138124
return
139125
})
140126
}
141127
}
128+
129+
dagService.get(hash, (err, fetchedNode) => {
130+
if (err) {
131+
this.emit('error', err)
132+
return
133+
}
134+
const data = UnixFS.unmarshal(fetchedNode.data)
135+
const type = data.type
136+
137+
if (type === 'directory') {
138+
dirExporter(fetchedNode, hash)
139+
}
140+
if (type === 'file') {
141+
fileExporter(fetchedNode, hash)
142+
}
143+
})
144+
145+
return this
142146
}

test/test-exporter.js

+60-14
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ const BlockService = require('ipfs-block-service')
88
const DAGService = require('ipfs-merkle-dag').DAGService
99
const UnixFS = require('ipfs-unixfs')
1010
const bl = require('bl')
11+
const fs = require('fs')
12+
const path = require('path')
1113

1214
let ds
1315

1416
module.exports = function (repo) {
1517
describe('exporter', function () {
18+
const bigFile = fs.readFileSync(path.join(__dirname, '/test-data/1.2MiB.txt'))
1619
before((done) => {
1720
const bs = new BlockService(repo)
1821
expect(bs).to.exist
@@ -25,12 +28,12 @@ module.exports = function (repo) {
2528
const hash = 'QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8'
2629
const bs = new BlockService(repo)
2730
const ds = new DAGService(bs)
28-
const testExport = exporter(hash, ds)
29-
testExport.on('file', (data) => {
30-
ds.get(hash, (err, fetchedNode) => {
31-
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
32-
expect(err).to.not.exist
33-
data.stream.pipe(bl((err, bldata) => {
31+
ds.get(hash, (err, fetchedNode) => {
32+
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
33+
expect(err).to.not.exist
34+
const testExport = exporter(hash, ds)
35+
testExport.on('data', (file) => {
36+
file.stream.pipe(bl((err, bldata) => {
3437
expect(err).to.not.exist
3538
expect(bldata).to.deep.equal(unmarsh.data)
3639
done()
@@ -44,9 +47,12 @@ module.exports = function (repo) {
4447
const bs = new BlockService(repo)
4548
const ds = new DAGService(bs)
4649
const testExport = exporter(hash, ds)
47-
testExport.on('file', (data) => {
48-
expect(data.stream).to.exist
49-
done()
50+
testExport.on('data', (file) => {
51+
file.stream.pipe(bl((err, bldata) => {
52+
expect(bldata).to.deep.equal(bigFile)
53+
expect(err).to.not.exist
54+
done()
55+
}))
5056
})
5157
})
5258

@@ -55,9 +61,12 @@ module.exports = function (repo) {
5561
const bs = new BlockService(repo)
5662
const ds = new DAGService(bs)
5763
const testExport = exporter(hash, ds)
58-
testExport.on('file', (data) => {
59-
expect(data.stream).to.exist
60-
done()
64+
testExport.on('data', (file) => {
65+
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
66+
file.stream.pipe(bl((err, bldata) => {
67+
expect(err).to.not.exist
68+
done()
69+
}))
6170
})
6271
})
6372

@@ -67,8 +76,8 @@ module.exports = function (repo) {
6776
const ds = new DAGService(bs)
6877
const testExport = exporter(hash, ds)
6978
var fsa = []
70-
testExport.on('file', (data) => {
71-
fsa.push(data)
79+
testExport.on('data', (files) => {
80+
fsa.push(files)
7281
})
7382
setTimeout(() => {
7483
expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
@@ -78,5 +87,42 @@ module.exports = function (repo) {
7887
done()
7988
}, 1000)
8089
})
90+
91+
it('returns a null stream for dir', (done) => {
92+
const hash = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' // This hash doesn't exist in the repo
93+
const bs = new BlockService(repo)
94+
const ds = new DAGService(bs)
95+
const testExport = exporter(hash, ds)
96+
testExport.on('data', (dir) => {
97+
expect(dir.stream).to.equal(null)
98+
done()
99+
})
100+
})
101+
102+
it('fails on non existent hash', (done) => {
103+
const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKj3' // This hash doesn't exist in the repo
104+
const bs = new BlockService(repo)
105+
const ds = new DAGService(bs)
106+
const testExport = exporter(hash, ds)
107+
testExport.on('error', (err) => {
108+
const error = err.toString()
109+
expect(err).to.exist
110+
const browser = error.includes('Error: key not found:')
111+
const node = error.includes('no such file or directory')
112+
// the browser and node js return different errors
113+
if (browser) {
114+
expect(error).to.contain('Error: key not found:')
115+
done()
116+
}
117+
if (node) {
118+
expect(error).to.contain('no such file or directory')
119+
done()
120+
}
121+
if (!node && !browser) {
122+
expect(node).to.equal(true)
123+
done()
124+
}
125+
})
126+
})
81127
})
82128
}

0 commit comments

Comments
 (0)