Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

Commit c1acd5b

Browse files
pgtedaviddias
authored andcommitted
Builder refactoring, trickle builder and balanced builder (#118)
* builder refactoring. trickle builder. balanced builder * removed unused experimental builder * documented importer options * default builder strategy is now the balanced strategy * removed unused test * removed superfluous comment * fixed trickle builder * removed superfluous comment * using options.chunkerOptions for chunker-specific options * docs: corrected option name * fix: error handling in trickle reducer * using pull-pair instead of backpressure-less bespoke pair * fixed trickle builder tests * recursive streaming trickle builder * missing dep * some style corrections * importing multiple roots yields an error * reinstated testing importing using flat and balanced strategies * asserting that root node is one and only one * testing import and export using various builder strategies * fixed error propagation into push streams * simplified some iteration logic * default for maximum children pre node is 174 * by default, only reduces one leaf to self if specific option is present * test results reflect new default config * testing against big files genearted from a pseudo random byte stream gen * added missing dep * removed unnecessary dev dependency * go-ipfs parity: no root node with single leaf * docs: corrected the default maximum number of children nodes
1 parent 574af37 commit c1acd5b

25 files changed

+1718
-414
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,20 @@ been written into the [DAG Service][]'s storage mechanism.
150150
The input's file paths and directory structure will be preserved in the DAG
151151
Nodes.
152152

153+
### Importer options
154+
155+
In the second argument of the importer constructor you can specify the following options:
156+
157+
* `chunker` (string, defaults to `"fixed"`): the chunking strategy. Now only supports `"fixed"`
158+
* `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties:
159+
* `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size for the `fixed` chunker.
160+
* `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports:
161+
* `flat`: flat list of chunks
162+
* `balanced`: builds a balanced tree
163+
* `trickle`: builds [a trickle tree](https://github.com/ipfs/specs/pull/57#issuecomment-265205384)
164+
* `maxChildrenPerNode` (positive integer, defaults to `174`): the maximum children per node for the `balanced` and `trickle` DAG builder strategies
165+
* `layerRepeat` (positive integer, defaults to 4): (only applicable to the `trickle` DAG builder strategy). The maximum repetition of parent nodes for each layer of the tree.
166+
* `reduceSingleLeafToSelf` (boolean, defaults to `false`): optimization for, when reducing a set of nodes with one node, reduce it to that node.
153167

154168
### Example Exporter
155169

package.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,24 @@
4848
"ipfs-repo": "^0.11.2",
4949
"ncp": "^2.0.0",
5050
"pre-commit": "^1.2.2",
51+
"pull-generate": "^2.2.0",
5152
"pull-zip": "^2.0.1",
5253
"rimraf": "^2.5.4"
5354
},
5455
"dependencies": {
5556
"async": "^2.1.4",
5657
"cids": "^0.3.5",
58+
"deep-extend": "^0.4.1",
5759
"ipfs-unixfs": "^0.1.9",
5860
"ipld-dag-pb": "^0.9.3",
5961
"ipld-resolver": "^0.4.1",
6062
"is-ipfs": "^0.2.1",
6163
"multihashes": "^0.3.1",
64+
"pull-batch": "^1.0.0",
6265
"pull-block": "^1.0.2",
66+
"pull-pair": "^1.1.0",
6367
"pull-paramap": "^1.2.1",
68+
"pull-pause": "0.0.0",
6469
"pull-pushable": "^2.0.1",
6570
"pull-stream": "^3.5.0",
6671
"pull-traverse": "^1.0.3",
@@ -76,4 +81,4 @@
7681
"jbenet <[email protected]>",
7782
"nginnever <[email protected]>"
7883
]
79-
}
84+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
'use strict'
2+
3+
const assert = require('assert')
4+
const pull = require('pull-stream')
5+
const pushable = require('pull-pushable')
6+
const pullPair = require('pull-pair')
7+
const batch = require('pull-batch')
8+
9+
module.exports = function balancedReduceToRoot (reduce, options) {
10+
const pair = pullPair()
11+
const source = pair.source
12+
13+
const result = pushable()
14+
15+
reduceToParents(source, (err, roots) => {
16+
if (err) {
17+
result.end(err)
18+
return // early
19+
}
20+
assert.equal(roots.length, 1, 'need one root')
21+
result.push(roots[0])
22+
result.end()
23+
})
24+
25+
function reduceToParents (_chunks, callback) {
26+
let chunks = _chunks
27+
if (Array.isArray(chunks)) {
28+
chunks = pull.values(chunks)
29+
}
30+
31+
pull(
32+
chunks,
33+
batch(options.maxChildrenPerNode),
34+
pull.asyncMap(reduce),
35+
pull.collect(reduced)
36+
)
37+
38+
function reduced (err, roots) {
39+
if (err) {
40+
callback(err)
41+
} else if (roots.length > 1) {
42+
reduceToParents(roots, callback)
43+
} else {
44+
callback(null, roots)
45+
}
46+
}
47+
}
48+
49+
return {
50+
sink: pair.sink,
51+
source: result
52+
}
53+
}

src/builder/balanced/index.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
'use strict'
2+
3+
const balancedReducer = require('./balanced-reducer')
4+
5+
const defaultOptions = {
6+
maxChildrenPerNode: 174
7+
}
8+
9+
module.exports = function (reduce, _options) {
10+
const options = Object.assign({}, defaultOptions, _options)
11+
return balancedReducer(reduce, options)
12+
}

src/builder/builder.js

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
'use strict'
2+
3+
const extend = require('deep-extend')
4+
const assert = require('assert')
5+
const UnixFS = require('ipfs-unixfs')
6+
const pull = require('pull-stream')
7+
const through = require('pull-through')
8+
const parallel = require('async/parallel')
9+
const waterfall = require('async/waterfall')
10+
const dagPB = require('ipld-dag-pb')
11+
const CID = require('cids')
12+
13+
const reduce = require('./reduce')
14+
15+
const DAGNode = dagPB.DAGNode
16+
17+
const defaultOptions = {
18+
chunkerOptions: {
19+
maxChunkSize: 262144
20+
}
21+
}
22+
23+
module.exports = function (createChunker, ipldResolver, createReducer, _options) {
24+
const options = extend({}, defaultOptions, _options)
25+
26+
return function (source, files) {
27+
return function (items, cb) {
28+
parallel(items.map((item) => (cb) => {
29+
if (!item.content) {
30+
// item is a directory
31+
return createAndStoreDir(item, (err, node) => {
32+
if (err) {
33+
return cb(err)
34+
}
35+
source.push(node)
36+
files.push(node)
37+
cb()
38+
})
39+
}
40+
41+
// item is a file
42+
createAndStoreFile(item, (err, node) => {
43+
if (err) {
44+
return cb(err)
45+
}
46+
source.push(node)
47+
files.push(node)
48+
cb()
49+
})
50+
}), cb)
51+
}
52+
}
53+
54+
function createAndStoreDir (item, callback) {
55+
// 1. create the empty dir dag node
56+
// 2. write it to the dag store
57+
58+
const d = new UnixFS('directory')
59+
waterfall([
60+
(cb) => DAGNode.create(d.marshal(), cb),
61+
(node, cb) => {
62+
ipldResolver.put({
63+
node: node,
64+
cid: new CID(node.multihash)
65+
}, (err) => cb(err, node))
66+
}
67+
], (err, node) => {
68+
if (err) {
69+
return callback(err)
70+
}
71+
callback(null, {
72+
path: item.path,
73+
multihash: node.multihash,
74+
size: node.size
75+
})
76+
})
77+
}
78+
79+
function createAndStoreFile (file, callback) {
80+
if (Buffer.isBuffer(file.content)) {
81+
file.content = pull.values([file.content])
82+
}
83+
84+
if (typeof file.content !== 'function') {
85+
return callback(new Error('invalid content'))
86+
}
87+
88+
const reducer = createReducer(reduce(file, ipldResolver, options), options)
89+
90+
let previous
91+
let count = 0
92+
93+
pull(
94+
file.content,
95+
createChunker(options.chunkerOptions),
96+
pull.map(chunk => new Buffer(chunk)),
97+
pull.map(buffer => new UnixFS('file', buffer)),
98+
pull.asyncMap((fileNode, callback) => {
99+
DAGNode.create(fileNode.marshal(), (err, node) => {
100+
callback(err, { DAGNode: node, fileNode: fileNode })
101+
})
102+
}),
103+
pull.asyncMap((leaf, callback) => {
104+
ipldResolver.put(
105+
{
106+
node: leaf.DAGNode,
107+
cid: new CID(leaf.DAGNode.multihash)
108+
},
109+
err => callback(err, leaf)
110+
)
111+
}),
112+
pull.map((leaf) => {
113+
return {
114+
path: file.path,
115+
multihash: leaf.DAGNode.multihash,
116+
size: leaf.DAGNode.size,
117+
leafSize: leaf.fileNode.fileSize(),
118+
name: ''
119+
}
120+
}),
121+
through( // mark as single node if only one single node
122+
function onData (data) {
123+
count++
124+
if (previous) {
125+
this.queue(previous)
126+
}
127+
previous = data
128+
},
129+
function ended () {
130+
if (previous) {
131+
if (count === 1) {
132+
previous.single = true
133+
}
134+
this.queue(previous)
135+
}
136+
this.queue(null)
137+
}
138+
),
139+
reducer,
140+
pull.collect((err, roots) => {
141+
if (err) {
142+
callback(err)
143+
} else {
144+
assert.equal(roots.length, 1, 'should result in exactly one root')
145+
callback(null, roots[0])
146+
}
147+
})
148+
)
149+
}
150+
}

src/builder/create-build-stream.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict'
2+
3+
const pullPushable = require('pull-pushable')
4+
const pullWrite = require('pull-write')
5+
6+
module.exports = function createBuildStream (createStrategy, ipldResolver, flushTree, options) {
7+
const files = []
8+
9+
const source = pullPushable()
10+
11+
const sink = pullWrite(
12+
createStrategy(source, files),
13+
null,
14+
options.highWaterMark,
15+
(err) => {
16+
if (err) {
17+
source.end(err)
18+
return // early
19+
}
20+
21+
flushTree(files, ipldResolver, source, source.end)
22+
}
23+
)
24+
25+
return {
26+
source: source,
27+
sink: sink
28+
}
29+
}

src/builder/flat/index.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict'
2+
3+
const assert = require('assert')
4+
const pull = require('pull-stream')
5+
const pushable = require('pull-pushable')
6+
const pullPair = require('pull-pair')
7+
const batch = require('pull-batch')
8+
9+
module.exports = function (reduce, options) {
10+
const pair = pullPair()
11+
const source = pair.source
12+
const result = pushable()
13+
14+
pull(
15+
source,
16+
batch(Infinity),
17+
pull.asyncMap(reduce),
18+
pull.collect((err, roots) => {
19+
if (err) {
20+
result.end(err)
21+
return // early
22+
}
23+
assert.equal(roots.length, 1, 'need one root')
24+
result.push(roots[0])
25+
result.end()
26+
})
27+
)
28+
29+
return {
30+
sink: pair.sink,
31+
source: result
32+
}
33+
}

src/builder/index.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict'
2+
3+
const assert = require('assert')
4+
const createBuildStream = require('./create-build-stream')
5+
const Builder = require('./builder')
6+
7+
const reducers = {
8+
flat: require('./flat'),
9+
balanced: require('./balanced'),
10+
trickle: require('./trickle')
11+
}
12+
13+
const defaultOptions = {
14+
strategy: 'balanced',
15+
highWaterMark: 100,
16+
reduceSingleLeafToSelf: false
17+
}
18+
19+
module.exports = function (Chunker, ipldResolver, flushTree, _options) {
20+
assert(Chunker, 'Missing chunker creator function')
21+
assert(ipldResolver, 'Missing IPLD Resolver')
22+
assert(flushTree, 'Missing flushTree argument')
23+
24+
const options = Object.assign({}, defaultOptions, _options)
25+
26+
const strategyName = options.strategy
27+
const reducer = reducers[strategyName]
28+
assert(reducer, 'Unknown importer build strategy name: ' + strategyName)
29+
30+
const createStrategy = Builder(Chunker, ipldResolver, reducer, options)
31+
32+
return createBuildStream(createStrategy, ipldResolver, flushTree, options)
33+
}

0 commit comments

Comments
 (0)