forked from ipfs-inactive/js-ipfs-unixfs-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
98 lines (84 loc) · 2.21 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
'use strict'
const pause = require('pull-pause')
const pull = require('pull-stream')
const writable = require('pull-write')
const pushable = require('pull-pushable')
const assert = require('assert')
const setImmediate = require('async/setImmediate')
const DAGBuilder = require('../builder')
const createTreeBuilder = require('./tree-builder')
const chunkers = {
fixed: require('../chunker/fixed-size')
}
const defaultOptions = {
chunker: 'fixed'
}
module.exports = function (ipldResolver, _options) {
const options = Object.assign({}, defaultOptions, _options)
const Chunker = chunkers[options.chunker]
assert(Chunker, 'Unknkown chunker named ' + options.chunker)
let pending = 0
const waitingPending = []
const entry = {
sink: writable(
(nodes, callback) => {
pending += nodes.length
nodes.forEach((node) => entry.source.push(node))
setImmediate(callback)
},
null,
1,
(err) => entry.source.end(err)
),
source: pushable()
}
const dagStream = DAGBuilder(Chunker, ipldResolver, options)
const treeBuilder = createTreeBuilder(ipldResolver, options)
const treeBuilderStream = treeBuilder.stream()
const pausable = pause(() => {})
// TODO: transform this entry -> pausable -> <custom async transform> -> exit
// into a generic NPM package named something like pull-pause-and-drain
pull(
entry,
pausable,
dagStream,
pull.map((node) => {
pending--
if (!pending) {
process.nextTick(() => {
while (waitingPending.length) {
waitingPending.shift()()
}
})
}
return node
}),
treeBuilderStream
)
return {
sink: entry.sink,
source: treeBuilderStream.source,
flush: flush
}
function flush (callback) {
pausable.pause()
// wait until all the files entered were
// transformed into DAG nodes
if (!pending) {
proceed()
} else {
waitingPending.push(proceed)
}
function proceed () {
treeBuilder.flush((err, hash) => {
if (err) {
treeBuilderStream.source.end(err)
callback(err)
return
}
pausable.resume()
callback(null, hash)
})
}
}
}