Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 7a36211

Browse files
committed
fix: Do not load all of a DAG into memory when pinning
Given a `CID`, the `dag._recursiveGet` method returns a list of all descendents of the node with the passed `CID`. This can cause enormous memory useage when importing large datasets. Where this method is invoked the results are either a) disgarded or b) used to calculate the `CID`s of the nodes which is then bad for memory *and* CPU usage. This PR removes the buffering and `CID` recalculating for a nice speedup when adding large datasets. fixes #2310
1 parent 3cade67 commit 7a36211

File tree

3 files changed

+50
-62
lines changed

3 files changed

+50
-62
lines changed

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@
118118
"is-pull-stream": "~0.0.0",
119119
"is-stream": "^2.0.0",
120120
"iso-url": "~0.4.6",
121-
"just-flatten-it": "^2.1.0",
122121
"just-safe-set": "^2.1.0",
123122
"kind-of": "^6.0.2",
124123
"libp2p": "~0.25.4",

src/core/components/dag.js

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ const promisify = require('promisify-es6')
44
const CID = require('cids')
55
const pull = require('pull-stream')
66
const iterToPull = require('async-iterator-to-pull-stream')
7-
const mapAsync = require('async/map')
87
const setImmediate = require('async/setImmediate')
9-
const flattenDeep = require('just-flatten-it')
108
const errCode = require('err-code')
119
const multicodec = require('multicodec')
1210

@@ -180,38 +178,6 @@ module.exports = function dag (self) {
180178
iterToPull(self._ipld.tree(cid, path, options)),
181179
pull.collect(callback)
182180
)
183-
}),
184-
185-
// TODO - use IPLD selectors once they are implemented
186-
_getRecursive: promisify((multihash, options, callback) => {
187-
// gets flat array of all DAGNodes in tree given by multihash
188-
189-
if (typeof options === 'function') {
190-
callback = options
191-
options = {}
192-
}
193-
194-
options = options || {}
195-
196-
let cid
197-
198-
try {
199-
cid = new CID(multihash)
200-
} catch (err) {
201-
return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID')))
202-
}
203-
204-
self.dag.get(cid, '', options, (err, res) => {
205-
if (err) { return callback(err) }
206-
207-
mapAsync(res.value.Links, (link, cb) => {
208-
self.dag._getRecursive(link.Hash, options, cb)
209-
}, (err, nodes) => {
210-
// console.log('nodes:', nodes)
211-
if (err) return callback(err)
212-
callback(null, flattenDeep([res.value, nodes]))
213-
})
214-
})
215181
})
216182
}
217183
}

src/core/components/pin.js

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
'use strict'
33

44
const promisify = require('promisify-es6')
5-
const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
5+
const { DAGNode, DAGLink } = require('ipld-dag-pb')
66
const CID = require('cids')
77
const map = require('async/map')
88
const mapSeries = require('async/mapSeries')
@@ -12,6 +12,7 @@ const eachLimit = require('async/eachLimit')
1212
const waterfall = require('async/waterfall')
1313
const detectLimit = require('async/detectLimit')
1414
const setImmediate = require('async/setImmediate')
15+
const queue = require('async/queue')
1516
const { Key } = require('interface-datastore')
1617
const errCode = require('err-code')
1718
const multibase = require('multibase')
@@ -52,30 +53,49 @@ module.exports = (self) => {
5253
const recursiveKeys = () =>
5354
Array.from(recursivePins).map(key => new CID(key).buffer)
5455

55-
function getIndirectKeys (callback) {
56-
const indirectKeys = new Set()
57-
eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => {
58-
dag._getRecursive(multihash, (err, nodes) => {
56+
function walkDag ({ cid, onCid = () => {} }, cb) {
57+
const q = queue(function ({ cid }, done) {
58+
dag.get(cid, { preload: false }, function (err, result) {
5959
if (err) {
60-
return cb(err)
60+
return done(err)
6161
}
6262

63-
map(nodes, (node, cb) => util.cid(util.serialize(node), {
64-
cidVersion: 0
65-
}).then(cid => cb(null, cid), cb), (err, cids) => {
66-
if (err) {
67-
return cb(err)
68-
}
63+
onCid(cid)
6964

70-
cids
71-
.map(cid => cid.toString())
72-
// recursive pins pre-empt indirect pins
73-
.filter(key => !recursivePins.has(key))
74-
.forEach(key => indirectKeys.add(key))
65+
if (result.value.Links) {
66+
q.push(result.value.Links.map(link => ({
67+
cid: link.Hash
68+
})))
69+
}
7570

76-
cb()
77-
})
71+
done()
7872
})
73+
}, concurrencyLimit)
74+
q.drain = () => {
75+
cb()
76+
}
77+
q.error = (err) => {
78+
q.kill()
79+
cb(err)
80+
}
81+
q.push({ cid })
82+
}
83+
84+
function getIndirectKeys (callback) {
85+
const indirectKeys = new Set()
86+
eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => {
87+
// load every hash in the graph
88+
walkDag({
89+
cid: new CID(multihash),
90+
onCid: (cid) => {
91+
cid = cid.toString()
92+
93+
// recursive pins pre-empt indirect pins
94+
if (!recursivePins.has(cid)) {
95+
indirectKeys.add(cid)
96+
}
97+
}
98+
}, cb)
7999
}, (err) => {
80100
if (err) { return callback(err) }
81101
callback(null, Array.from(indirectKeys))
@@ -184,7 +204,9 @@ module.exports = (self) => {
184204

185205
// verify that each hash can be pinned
186206
map(mhs, (multihash, cb) => {
187-
const key = toB58String(multihash)
207+
const cid = new CID(multihash)
208+
const key = cid.toBaseEncodedString()
209+
188210
if (recursive) {
189211
if (recursivePins.has(key)) {
190212
// it's already pinned recursively
@@ -193,11 +215,10 @@ module.exports = (self) => {
193215

194216
// entire graph of nested links should be pinned,
195217
// so make sure we have all the objects
196-
dag._getRecursive(key, { preload: options.preload }, (err) => {
197-
if (err) { return cb(err) }
198-
// found all objects, we can add the pin
199-
return cb(null, key)
200-
})
218+
walkDag({
219+
dag,
220+
cid
221+
}, (err) => cb(err, key))
201222
} else {
202223
if (recursivePins.has(key)) {
203224
// recursive supersedes direct, can't have both
@@ -209,8 +230,10 @@ module.exports = (self) => {
209230
}
210231

211232
// make sure we have the object
212-
dag.get(new CID(multihash), { preload: options.preload }, (err) => {
213-
if (err) { return cb(err) }
233+
dag.get(cid, (err) => {
234+
if (err) {
235+
return cb(err)
236+
}
214237
// found the object, we can add the pin
215238
return cb(null, key)
216239
})

0 commit comments

Comments
 (0)