diff --git a/package.json b/package.json index cc2e3843e4..30f6bc04e8 100644 --- a/package.json +++ b/package.json @@ -72,7 +72,7 @@ "execa": "^1.0.0", "form-data": "^2.3.3", "hat": "0.0.3", - "interface-ipfs-core": "~0.103.0", + "interface-ipfs-core": "ipfs/interface-js-ipfs-core#feat/gc", "ipfsd-ctl": "~0.42.0", "libp2p-websocket-star": "~0.10.2", "ncp": "^2.0.0", @@ -87,7 +87,7 @@ "@hapi/joi": "^15.0.1", "async": "^2.6.1", "async-iterator-all": "0.0.2", - "async-iterator-to-pull-stream": "^1.1.0", + "async-iterator-to-pull-stream": "^1.3.0", "async-iterator-to-stream": "^1.1.0", "base32.js": "~0.1.0", "bignumber.js": "^8.0.2", @@ -117,7 +117,7 @@ "ipfs-bitswap": "~0.24.0", "ipfs-block": "~0.8.1", "ipfs-block-service": "~0.15.1", - "ipfs-http-client": "^32.0.0", + "ipfs-http-client": "ipfs/js-ipfs-http-client#feat/gc", "ipfs-http-response": "~0.3.0", "ipfs-mfs": "~0.11.2", "ipfs-multipart": "~0.1.0", @@ -139,6 +139,7 @@ "is-pull-stream": "~0.0.0", "is-stream": "^2.0.0", "iso-url": "~0.4.6", + "joi": "^14.3.1", "just-flatten-it": "^2.1.0", "just-safe-set": "^2.1.0", "kind-of": "^6.0.2", @@ -159,6 +160,7 @@ "merge-options": "^1.0.1", "mime-types": "^2.1.21", "mkdirp": "~0.5.1", + "mortice": "dirkmc/mortice#fix/read-then-write", "multiaddr": "^6.0.5", "multiaddr-to-uri": "^4.0.1", "multibase": "~0.6.0", @@ -166,6 +168,7 @@ "multihashes": "~0.4.14", "multihashing-async": "~0.6.0", "node-fetch": "^2.3.0", + "p-event": "^4.1.0", "peer-book": "~0.9.0", "peer-id": "~0.12.0", "peer-info": "~0.15.0", diff --git a/src/cli/commands/repo/gc.js b/src/cli/commands/repo/gc.js index ec3b547e93..d45f4fdd07 100644 --- a/src/cli/commands/repo/gc.js +++ b/src/cli/commands/repo/gc.js @@ -1,16 +1,37 @@ 'use strict' +const { print } = require('../../utils') + module.exports = { command: 'gc', describe: 'Perform a garbage collection sweep on the repo.', - builder: {}, + builder: { + quiet: { + alias: 'q', + desc: 'Write minimal output', + type: 'boolean', + default: false + }, + 'stream-errors': { + desc: 'Output individual errors thrown when deleting blocks.', + type: 'boolean', + default: false + } + }, - handler (argv) { - argv.resolve((async () => { - const ipfs = await argv.getIpfs() - await ipfs.repo.gc() + handler ({ getIpfs, quiet, streamErrors, resolve }) { + resolve((async () => { + const ipfs = await getIpfs() + const res = await ipfs.repo.gc() + for (const r of res) { + if (r.err) { + streamErrors && print(r.err, true, true) + } else { + print((quiet ? '' : 'Removed ') + r.cid) + } + } })()) } } diff --git a/src/core/components/block.js b/src/core/components/block.js index 6a1bb960d1..2a7196bbc3 100644 --- a/src/core/components/block.js +++ b/src/core/components/block.js @@ -81,17 +81,19 @@ module.exports = function block (self) { cb(null, new Block(block, cid)) }) }, - (block, cb) => self._blockService.put(block, (err) => { - if (err) { - return cb(err) - } + (block, cb) => self._gcLock.readLock((_cb) => { + self._blockService.put(block, (err) => { + if (err) { + return _cb(err) + } - if (options.preload !== false) { - self._preload(block.cid) - } + if (options.preload !== false) { + self._preload(block.cid) + } - cb(null, block) - }) + _cb(null, block) + }) + }, cb) ], callback) }), rm: promisify((cid, callback) => { @@ -100,7 +102,8 @@ module.exports = function block (self) { } catch (err) { return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID'))) } - self._blockService.delete(cid, callback) + + self._gcLock.writeLock((cb) => self._blockService.delete(cid, cb), callback) }), stat: promisify((cid, options, callback) => { if (typeof options === 'function') { diff --git a/src/core/components/files-regular/add-pull-stream.js b/src/core/components/files-regular/add-pull-stream.js index c976fcf5df..d00e4d5138 100644 --- a/src/core/components/files-regular/add-pull-stream.js +++ b/src/core/components/files-regular/add-pull-stream.js @@ -112,7 +112,7 @@ function pinFile (file, self, opts, cb) { const isRootDir = !file.path.includes('/') const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg if (shouldPin) { - return self.pin.add(file.hash, { preload: false }, err => cb(err, file)) + return self.pin.add(file.hash, { preload: false, lock: false }, err => cb(err, file)) } else { cb(null, file) } @@ -152,7 +152,7 @@ module.exports = function (self) { } opts.progress = progress - return pull( + return self._gcLock.pullReadLock(() => pull( pullMap(content => normalizeContent(content, opts)), pullFlatten(), pullMap(file => ({ @@ -163,6 +163,6 @@ module.exports = function (self) { pullAsyncMap((file, cb) => prepareFile(file, self, opts, cb)), pullMap(file => preloadFile(file, self, opts)), pullAsyncMap((file, cb) => pinFile(file, self, opts, cb)) - ) + )) } } diff --git a/src/core/components/object.js b/src/core/components/object.js index 9db82771eb..5295a76ec8 100644 --- a/src/core/components/object.js +++ b/src/core/components/object.js @@ -242,19 +242,21 @@ module.exports = function object (self) { } function next () { - self._ipld.put(node, multicodec.DAG_PB, { - cidVersion: 0, - hashAlg: multicodec.SHA2_256 - }).then( - (cid) => { - if (options.preload !== false) { - self._preload(cid) - } - - callback(null, cid) - }, - (error) => callback(error) - ) + self._gcLock.readLock((cb) => { + self._ipld.put(node, multicodec.DAG_PB, { + cidVersion: 0, + hashAlg: multicodec.SHA2_256 + }).then( + (cid) => { + if (options.preload !== false) { + self._preload(cid) + } + + cb(null, cid) + }, + cb + ) + }, callback) } }), diff --git a/src/core/components/pin.js b/src/core/components/pin.js index 0afa939e38..40d661e9a0 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -2,171 +2,25 @@ 'use strict' const promisify = require('promisify-es6') -const { DAGNode, DAGLink, util } = require('ipld-dag-pb') const CID = require('cids') const map = require('async/map') const mapSeries = require('async/mapSeries') -const series = require('async/series') -const parallel = require('async/parallel') -const eachLimit = require('async/eachLimit') const waterfall = require('async/waterfall') -const detectLimit = require('async/detectLimit') const setImmediate = require('async/setImmediate') -const { Key } = require('interface-datastore') const errCode = require('err-code') const multibase = require('multibase') -const multicodec = require('multicodec') -const createPinSet = require('./pin-set') const { resolvePath } = require('../utils') - -// arbitrary limit to the number of concurrent dag operations -const concurrencyLimit = 300 -const pinDataStoreKey = new Key('/local/pins') +const PinManager = require('./pin/pin-manager') +const PinTypes = PinManager.PinTypes function toB58String (hash) { return new CID(hash).toBaseEncodedString() } -function invalidPinTypeErr (type) { - const errMsg = `Invalid type '${type}', must be one of {direct, indirect, recursive, all}` - return errCode(new Error(errMsg), 'ERR_INVALID_PIN_TYPE') -} - module.exports = (self) => { - const repo = self._repo const dag = self.dag - const pinset = createPinSet(dag) - const types = { - direct: 'direct', - recursive: 'recursive', - indirect: 'indirect', - all: 'all' - } - - let directPins = new Set() - let recursivePins = new Set() - - const directKeys = () => - Array.from(directPins).map(key => new CID(key).buffer) - const recursiveKeys = () => - Array.from(recursivePins).map(key => new CID(key).buffer) - - function getIndirectKeys (callback) { - const indirectKeys = new Set() - eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => { - dag._getRecursive(multihash, (err, nodes) => { - if (err) { - return cb(err) - } - - map(nodes, (node, cb) => util.cid(util.serialize(node), { - cidVersion: 0 - }).then(cid => cb(null, cid), cb), (err, cids) => { - if (err) { - return cb(err) - } - - cids - .map(cid => cid.toString()) - // recursive pins pre-empt indirect pins - .filter(key => !recursivePins.has(key)) - .forEach(key => indirectKeys.add(key)) - - cb() - }) - }) - }, (err) => { - if (err) { return callback(err) } - callback(null, Array.from(indirectKeys)) - }) - } - - // Encode and write pin key sets to the datastore: - // a DAGLink for each of the recursive and direct pinsets - // a DAGNode holding those as DAGLinks, a kind of root pin - function flushPins (callback) { - let dLink, rLink, root - series([ - // create a DAGLink to the node with direct pins - cb => waterfall([ - cb => pinset.storeSet(directKeys(), cb), - ({ node, cid }, cb) => { - try { - cb(null, new DAGLink(types.direct, node.size, cid)) - } catch (err) { - cb(err) - } - }, - (link, cb) => { dLink = link; cb(null) } - ], cb), - - // create a DAGLink to the node with recursive pins - cb => waterfall([ - cb => pinset.storeSet(recursiveKeys(), cb), - ({ node, cid }, cb) => { - try { - cb(null, new DAGLink(types.recursive, node.size, cid)) - } catch (err) { - cb(err) - } - }, - (link, cb) => { rLink = link; cb(null) } - ], cb), - - // the pin-set nodes link to a special 'empty' node, so make sure it exists - cb => { - let empty - - try { - empty = DAGNode.create(Buffer.alloc(0)) - } catch (err) { - return cb(err) - } - - dag.put(empty, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }, cb) - }, - - // create a root node with DAGLinks to the direct and recursive DAGs - cb => { - let node - - try { - node = DAGNode.create(Buffer.alloc(0), [dLink, rLink]) - } catch (err) { - return cb(err) - } - - root = node - dag.put(root, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }, (err, cid) => { - if (!err) { - root.multihash = cid.buffer - } - cb(err) - }) - }, - - // hack for CLI tests - cb => repo.closed ? repo.open(cb) : cb(null, null), - - // save root to datastore under a consistent key - cb => repo.datastore.put(pinDataStoreKey, root.multihash, cb) - ], (err, res) => { - if (err) { return callback(err) } - self.log(`Flushed pins with root: ${root}`) - return callback(null, root) - }) - } + const pinManager = new PinManager(self._repo, dag, self._options.repoOwner, self.log) const pin = { add: promisify((paths, options, callback) => { @@ -182,52 +36,68 @@ module.exports = (self) => { resolvePath(self.object, paths, (err, mhs) => { if (err) { return callback(err) } - // verify that each hash can be pinned - map(mhs, (multihash, cb) => { - const key = toB58String(multihash) - if (recursive) { - if (recursivePins.has(key)) { - // it's already pinned recursively - return cb(null, key) + const pinAdd = (pinComplete) => { + // verify that each hash can be pinned + map(mhs, (multihash, cb) => { + const key = toB58String(multihash) + if (recursive) { + if (pinManager.recursivePins.has(key)) { + // it's already pinned recursively + return cb(null, null) + } + + // entire graph of nested links should be pinned, + // so make sure we have all the objects + dag._getRecursive(key, { preload: options.preload }, (err) => { + if (err) { return cb(err) } + // found all objects, we can add the pin + return cb(null, key) + }) + } else { + if (pinManager.recursivePins.has(key)) { + // recursive supersedes direct, can't have both + return cb(new Error(`${key} already pinned recursively`)) + } + if (pinManager.directPins.has(key)) { + // already directly pinned + return cb(null, null) + } + + // make sure we have the object + dag.get(new CID(multihash), { preload: options.preload }, (err) => { + if (err) { return cb(err) } + // found the object, we can add the pin + return cb(null, key) + }) } + }, (err, results) => { + if (err) { return pinComplete(err) } - // entire graph of nested links should be pinned, - // so make sure we have all the objects - dag._getRecursive(key, { preload: options.preload }, (err) => { - if (err) { return cb(err) } - // found all objects, we can add the pin - return cb(null, key) - }) - } else { - if (recursivePins.has(key)) { - // recursive supersedes direct, can't have both - return cb(new Error(`${key} already pinned recursively`)) - } - if (directPins.has(key)) { - // already directly pinned - return cb(null, key) + const flushComplete = (err) => { + if (err) { return pinComplete(err) } + pinComplete(null, mhs.map(mh => ({ hash: toB58String(mh) }))) } - // make sure we have the object - dag.get(new CID(multihash), { preload: options.preload }, (err) => { - if (err) { return cb(err) } - // found the object, we can add the pin - return cb(null, key) - }) - } - }, (err, results) => { - if (err) { return callback(err) } - - // update the pin sets in memory - const pinset = recursive ? recursivePins : directPins - results.forEach(key => pinset.add(key)) + // each result is either a key or null if there is already a pin + results = results.filter(Boolean) + if (!results.length) { return flushComplete() } - // persist updated pin sets to datastore - flushPins((err, root) => { - if (err) { return callback(err) } - callback(null, results.map(hash => ({ hash }))) + if (recursive) { + pinManager.addRecursivePins(results, flushComplete) + } else { + pinManager.addDirectPins(results, flushComplete) + } }) - }) + } + + // When adding a file, we take a lock that gets released after pinning + // is complete, so don't take a second lock here + const lock = options.lock !== false + if (lock) { + self._gcLock.readLock(pinAdd, callback) + } else { + pinAdd(callback) + } }) }), @@ -249,55 +119,47 @@ module.exports = (self) => { resolvePath(self.object, paths, (err, mhs) => { if (err) { return callback(err) } - // verify that each hash can be unpinned - map(mhs, (multihash, cb) => { - pin._isPinnedWithType(multihash, types.all, (err, res) => { - if (err) { return cb(err) } - const { pinned, reason } = res - const key = toB58String(multihash) - if (!pinned) { - return cb(new Error(`${key} is not pinned`)) - } - - switch (reason) { - case (types.recursive): - if (recursive) { + self._gcLock.readLock((lockCb) => { + // verify that each hash can be unpinned + map(mhs, (multihash, cb) => { + pinManager.isPinnedWithType(multihash, PinTypes.all, (err, res) => { + if (err) { return cb(err) } + const { pinned, reason } = res + const key = toB58String(multihash) + if (!pinned) { + return cb(new Error(`${key} is not pinned`)) + } + + switch (reason) { + case (PinTypes.recursive): + if (recursive) { + return cb(null, key) + } else { + return cb(new Error(`${key} is pinned recursively`)) + } + case (PinTypes.direct): return cb(null, key) - } else { - return cb(new Error(`${key} is pinned recursively`)) - } - case (types.direct): - return cb(null, key) - default: - return cb(new Error( - `${key} is pinned indirectly under ${reason}` - )) - } - }) - }, (err, results) => { - if (err) { return callback(err) } - - // update the pin sets in memory - results.forEach(key => { - if (recursive && recursivePins.has(key)) { - recursivePins.delete(key) - } else { - directPins.delete(key) - } - }) + default: + return cb(new Error( + `${key} is pinned indirectly under ${reason}` + )) + } + }) + }, (err, results) => { + if (err) { return lockCb(err) } - // persist updated pin sets to datastore - flushPins((err, root) => { - if (err) { return callback(err) } - self.log(`Removed pins: ${results}`) - callback(null, results.map(hash => ({ hash }))) + pinManager.rmPins(results, recursive, (err) => { + if (err) { return lockCb(err) } + self.log(`Removed pins: ${results}`) + lockCb(null, mhs.map(mh => ({ hash: toB58String(mh) }))) + }) }) - }) + }, callback) }) }), ls: promisify((paths, options, callback) => { - let type = types.all + let type = PinTypes.all if (typeof paths === 'function') { callback = paths options = {} @@ -314,27 +176,28 @@ module.exports = (self) => { options = options || {} if (options.type) { - if (typeof options.type !== 'string') { - return setImmediate(() => callback(invalidPinTypeErr(options.type))) + type = options.type + if (typeof options.type === 'string') { + type = options.type.toLowerCase() + } + const err = PinManager.checkPinType(type) + if (err) { + return setImmediate(() => callback(err)) } - type = options.type.toLowerCase() - } - if (!Object.keys(types).includes(type)) { - return setImmediate(() => callback(invalidPinTypeErr(type))) } if (paths) { // check the pinned state of specific hashes waterfall([ (cb) => resolvePath(self.object, paths, cb), - (hashes, cb) => mapSeries(hashes, (hash, done) => pin._isPinnedWithType(hash, types.all, done), cb), + (hashes, cb) => mapSeries(hashes, (hash, done) => pinManager.isPinnedWithType(hash, PinTypes.all, done), cb), (results, cb) => { results = results .filter(result => result.pinned) .map(({ key, reason }) => { switch (reason) { - case types.direct: - case types.recursive: + case PinTypes.direct: + case PinTypes.recursive: return { hash: key, type: reason @@ -342,7 +205,7 @@ module.exports = (self) => { default: return { hash: key, - type: `${types.indirect} through ${reason}` + type: `${PinTypes.indirect} through ${reason}` } } }) @@ -357,34 +220,34 @@ module.exports = (self) => { } else { // show all pinned items of type let pins = [] - if (type === types.direct || type === types.all) { + if (type === PinTypes.direct || type === PinTypes.all) { pins = pins.concat( - Array.from(directPins).map(hash => ({ - type: types.direct, + Array.from(pinManager.directPins).map(hash => ({ + type: PinTypes.direct, hash })) ) } - if (type === types.recursive || type === types.all) { + if (type === PinTypes.recursive || type === PinTypes.all) { pins = pins.concat( - Array.from(recursivePins).map(hash => ({ - type: types.recursive, + Array.from(pinManager.recursivePins).map(hash => ({ + type: PinTypes.recursive, hash })) ) } - if (type === types.indirect || type === types.all) { - getIndirectKeys((err, indirects) => { + if (type === PinTypes.indirect || type === PinTypes.all) { + pinManager.getIndirectKeys((err, indirects) => { if (err) { return callback(err) } pins = pins // if something is pinned both directly and indirectly, // report the indirect entry .filter(({ hash }) => !indirects.includes(hash) || - (indirects.includes(hash) && !directPins.has(hash)) + (indirects.includes(hash) && !pinManager.directPins.has(hash)) ) .concat(indirects.map(hash => ({ - type: types.indirect, + type: PinTypes.indirect, hash }))) return callback(null, pins) @@ -395,93 +258,9 @@ module.exports = (self) => { } }), - _isPinnedWithType: promisify((multihash, type, callback) => { - const key = toB58String(multihash) - const { recursive, direct, all } = types - - // recursive - if ((type === recursive || type === all) && recursivePins.has(key)) { - return callback(null, { - key, - pinned: true, - reason: recursive - }) - } - - if (type === recursive) { - return callback(null, { - key, - pinned: false - }) - } - - // direct - if ((type === direct || type === all) && directPins.has(key)) { - return callback(null, { - key, - pinned: true, - reason: direct - }) - } - - if (type === direct) { - return callback(null, { - key, - pinned: false - }) - } - - // indirect (default) - // check each recursive key to see if multihash is under it - // arbitrary limit, enables handling 1000s of pins. - detectLimit(recursiveKeys().map(key => new CID(key)), concurrencyLimit, (cid, cb) => { - waterfall([ - (done) => dag.get(cid, '', { preload: false }, done), - (result, done) => done(null, result.value), - (node, done) => pinset.hasDescendant(node, key, done) - ], cb) - }, (err, cid) => callback(err, { - key, - pinned: Boolean(cid), - reason: cid - })) - }), - - _load: promisify(callback => { - waterfall([ - // hack for CLI tests - (cb) => repo.closed ? repo.datastore.open(cb) : cb(null, null), - (_, cb) => repo.datastore.has(pinDataStoreKey, cb), - (has, cb) => has ? cb() : cb(new Error('No pins to load')), - (cb) => repo.datastore.get(pinDataStoreKey, cb), - (mh, cb) => { - dag.get(new CID(mh), '', { preload: false }, cb) - } - ], (err, pinRoot) => { - if (err) { - if (err.message === 'No pins to load') { - self.log('No pins to load') - return callback() - } else { - return callback(err) - } - } - - parallel([ - cb => pinset.loadSet(pinRoot.value, types.recursive, cb), - cb => pinset.loadSet(pinRoot.value, types.direct, cb) - ], (err, keys) => { - if (err) { return callback(err) } - const [ rKeys, dKeys ] = keys - - directPins = new Set(dKeys.map(toB58String)) - recursivePins = new Set(rKeys.map(toB58String)) - - self.log('Loaded pins from the datastore') - return callback(null) - }) - }) - }) + _isPinnedWithType: promisify(pinManager.isPinnedWithType.bind(pinManager)), + _getInternalBlocks: promisify(pinManager.getInternalBlocks.bind(pinManager)), + _load: promisify(pinManager.load.bind(pinManager)) } return pin diff --git a/src/core/components/pin/gc.js b/src/core/components/pin/gc.js new file mode 100644 index 0000000000..342fb75f91 --- /dev/null +++ b/src/core/components/pin/gc.js @@ -0,0 +1,153 @@ +'use strict' + +const promisify = require('promisify-es6') +const CID = require('cids') +const base32 = require('base32.js') +const parallel = require('async/parallel') +const mapLimit = require('async/mapLimit') +const { Key } = require('interface-datastore') +const log = require('debug')('ipfs:gc') + +// Limit on the number of parallel block remove operations +const BLOCK_RM_CONCURRENCY = 256 +const MFS_ROOT_DS_KEY = new Key('/local/filesroot') + +// Perform mark and sweep garbage collection +module.exports = function gc (self) { + return promisify(async (callback) => { + const start = Date.now() + log(`Creating set of marked blocks`) + + self._gcLock.writeLock((lockCb) => { + parallel([ + // Get all blocks from the blockstore + (cb) => self._repo.blocks.query({ keysOnly: true }, cb), + // Mark all blocks that are being used + (cb) => createMarkedSet(self, cb) + ], (err, [blocks, markedSet]) => { + if (err) { + log(`Error - ${err.message}`) + return lockCb(err) + } + + // Delete blocks that are not being used + deleteUnmarkedBlocks(self, markedSet, blocks, start, (err, res) => { + if (err) { + log(`Error - ${err.message}`) + return lockCb(err) + } + lockCb(null, res) + }) + }) + }, callback) + }) +} + +// Get Set of CIDs of blocks to keep +function createMarkedSet (ipfs, callback) { + parallel([ + // All pins, direct and indirect + (cb) => ipfs.pin.ls((err, pins) => { + if (err) { + return cb(new Error(`Could not list pinned blocks: ${err.message}`)) + } + log(`Found ${pins.length} pinned blocks`) + const cids = pins.map(p => new CID(p.hash)) + // log(' ' + cids.join('\n ')) + cb(null, cids) + }), + + // Blocks used internally by the pinner + (cb) => ipfs.pin._getInternalBlocks((err, cids) => { + if (err) { + return cb(new Error(`Could not list pinner internal blocks: ${err.message}`)) + } + log(`Found ${cids.length} pinner internal blocks`) + // log(' ' + cids.join('\n ')) + cb(null, cids) + }), + + // The MFS root and all its descendants + (cb) => ipfs._repo.root.get(MFS_ROOT_DS_KEY, (err, mh) => { + if (err) { + if (err.code === 'ERR_NOT_FOUND') { + log(`No blocks in MFS`) + return cb(null, []) + } + return cb(new Error(`Could not get MFS root from datastore: ${err.message}`)) + } + + getDescendants(ipfs, new CID(mh), cb) + }) + ], (err, res) => { + if (err) { + return callback(err) + } + + const cids = [].concat(...res).map(cid => cid.toV1().toString('base32')) + return callback(null, new Set(cids)) + }) +} + +// Recursively get descendants of the given CID +function getDescendants (ipfs, cid, callback) { + ipfs.refs(cid, { recursive: true }, (err, refs) => { + if (err) { + return callback(new Error(`Could not get MFS root descendants from store: ${err.message}`)) + } + const cids = [cid, ...refs.map(r => new CID(r.ref))] + log(`Found ${cids.length} MFS blocks`) + // log(' ' + cids.join('\n ')) + callback(null, cids) + }) +} + +// Delete all blocks that are not marked as in use +function deleteUnmarkedBlocks (ipfs, markedSet, blocks, start, callback) { + // Iterate through all blocks and find those that are not in the marked set + // The blocks variable has the form { { key: Key() }, { key: Key() }, ... } + const unreferenced = [] + const res = [] + let errCount = 0 + for (const { key: k } of blocks) { + try { + const cid = dsKeyToCid(k) + const b32 = cid.toV1().toString('base32') + if (!markedSet.has(b32)) { + unreferenced.push(cid) + } + } catch (err) { + errCount++ + const msg = `Could not convert block with key '${k}' to CID: ${err.message}` + log(msg) + res.push({ err: new Error(msg) }) + } + } + + const msg = `Marked set has ${markedSet.size} unique blocks. Blockstore has ${blocks.length} blocks. ` + + `Deleting ${unreferenced.length} blocks.` + (errCount ? ` (${errCount} errors)` : '') + log(msg) + // log(' ' + unreferenced.join('\n ')) + + mapLimit(unreferenced, BLOCK_RM_CONCURRENCY, (cid, cb) => { + // Delete blocks from blockstore + ipfs._repo.blocks.delete(cid, (err) => { + const res = { + cid: cid.toString(), + err: err && new Error(`Could not delete block with CID ${cid}: ${err.message}`) + } + cb(null, res) + }) + }, (_, delRes) => { + log(`Complete (${Date.now() - start}ms)`) + + callback(null, res.concat(delRes)) + }) +} + +function dsKeyToCid (key) { + // Block key is of the form / + const decoder = new base32.Decoder() + const buff = decoder.write(key.toString().slice(1)).finalize() + return new CID(Buffer.from(buff)) +} diff --git a/src/core/components/pin/lock.js b/src/core/components/pin/lock.js new file mode 100644 index 0000000000..3b33f10297 --- /dev/null +++ b/src/core/components/pin/lock.js @@ -0,0 +1,135 @@ +'use strict' + +const mortice = require('mortice') +const pull = require('pull-stream') +const EventEmitter = require('events') +const debug = require('debug') + +class Lock extends EventEmitter { + constructor (repoOwner, debugName) { + super() + + // Ensure that we get a different mutex for each instance of Lock + const randId = (~~(Math.random() * 1e9)).toString(36) + Date.now() + this.mutex = mortice(randId, { + singleProcess: repoOwner + }) + + this.lockId = 0 + this.log = debug(debugName || 'lock') + } + + readLock (lockedFn, cb) { + return this.lock('readLock', lockedFn, cb) + } + + writeLock (lockedFn, cb) { + return this.lock('writeLock', lockedFn, cb) + } + + lock (type, lockedFn, cb) { + if (typeof lockedFn !== 'function') { + throw new Error(`first argument to ${type} must be a function`) + } + if (typeof cb !== 'function') { + throw new Error(`second argument to ${type} must be a callback function`) + } + + const lockId = this.lockId++ + this.log(`[${lockId}] ${type} requested`) + this.emit(`${type} request`, lockId) + const locked = () => new Promise((resolve, reject) => { + this.emit(`${type} start`, lockId) + this.log(`[${lockId}] ${type} started`) + lockedFn((err, res) => { + this.emit(`${type} release`, lockId) + this.log(`[${lockId}] ${type} released`) + err ? reject(err) : resolve(res) + }) + }) + + const lock = this.mutex[type](locked) + return lock.then(res => cb(null, res)).catch(cb) + } + + pullReadLock (lockedPullFn) { + return this.pullLock('readLock', lockedPullFn) + } + + pullWriteLock (lockedPullFn) { + return this.pullLock('writeLock', lockedPullFn) + } + + pullLock (type, lockedPullFn) { + const pullLocker = new PullLocker(this, this.mutex, type, this.lockId++, this.log) + + return pull( + pullLocker.take(), + lockedPullFn(), + pullLocker.release() + ) + } +} + +class PullLocker { + constructor (emitter, mutex, type, lockId, log) { + this.emitter = emitter + this.mutex = mutex + this.type = type + this.lockId = lockId + this.log = log + + // This Promise resolves when the mutex gives us permission to start + // running the locked piece of code + this.lockReady = new Promise((resolve) => { + this.lockReadyResolver = resolve + }) + } + + // Returns a Promise that resolves when the locked piece of code completes + locked () { + return new Promise((resolve, reject) => { + this.releaseLock = (err) => err ? reject(err) : resolve() + + this.log(`[${this.lockId}] ${this.type} (pull) started`) + this.emitter.emit(`${this.type} start`, this.lockId) + + // The locked piece of code is ready to start, so resolve the + // this.lockReady Promise (created in the constructor) + this.lockReadyResolver() + }) + } + + // Requests a lock and then waits for the mutex to give us permission to run + // the locked piece of code + take () { + return pull( + pull.asyncMap((i, cb) => { + if (!this.lock) { + this.log(`[${this.lockId}] ${this.type} (pull) requested`) + this.emitter.emit(`${this.type} request`, this.lockId) + // Request the lock + this.lock = this.mutex[this.type](() => this.locked()) + // If there is an error, it gets passed through to the caller using + // pull streams, so here we just catch the error and ignore it so + // that there isn't an UnhandledPromiseRejectionWarning + this.lock.catch(() => {}) + } + + // Wait for the mutex to give us permission + this.lockReady.then(() => cb(null, i)) + }) + ) + } + + // Releases the lock + release () { + return pull.through(null, (err) => { + this.log(`[${this.lockId}] ${this.type} (pull) released`) + this.emitter.emit(`${this.type} release`, this.lockId) + this.releaseLock(err) + }) + } +} + +module.exports = Lock diff --git a/src/core/components/pin/pin-manager.js b/src/core/components/pin/pin-manager.js new file mode 100644 index 0000000000..fba7aeaf49 --- /dev/null +++ b/src/core/components/pin/pin-manager.js @@ -0,0 +1,350 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const { DAGNode, DAGLink, util } = require('ipld-dag-pb') +const CID = require('cids') +const map = require('async/map') +const series = require('async/series') +const parallel = require('async/parallel') +const eachLimit = require('async/eachLimit') +const waterfall = require('async/waterfall') +const detectLimit = require('async/detectLimit') +const { Key } = require('interface-datastore') +const errCode = require('err-code') +const multicodec = require('multicodec') + +const createPinSet = require('./pin-set') +const Lock = require('./lock') + +// arbitrary limit to the number of concurrent dag operations +const concurrencyLimit = 300 +const PIN_DS_KEY = new Key('/local/pins') + +function toB58String (hash) { + return new CID(hash).toBaseEncodedString() +} + +function invalidPinTypeErr (type) { + const errMsg = `Invalid type '${type}', must be one of {direct, indirect, recursive, all}` + return errCode(new Error(errMsg), 'ERR_INVALID_PIN_TYPE') +} + +const PinTypes = { + direct: 'direct', + recursive: 'recursive', + indirect: 'indirect', + all: 'all' +} + +class PinManager { + constructor (repo, dag, repoOwner, log) { + this.repo = repo + this.dag = dag + this.log = log + this.pinset = createPinSet(dag) + this.directPins = new Set() + this.recursivePins = new Set() + this._lock = new Lock(repoOwner, 'ipfs:pin-manager:lock') + } + + directKeys () { + return Array.from(this.directPins).map(key => new CID(key).buffer) + } + + recursiveKeys () { + return Array.from(this.recursivePins).map(key => new CID(key).buffer) + } + + getIndirectKeys (callback) { + this._lock.readLock((lockCb) => { + const indirectKeys = new Set() + eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => { + this.dag._getRecursive(multihash, (err, nodes) => { + if (err) { + return cb(err) + } + + map(nodes, (node, cb) => util.cid(util.serialize(node), { + cidVersion: 0 + }).then(cid => cb(null, cid), cb), (err, cids) => { + if (err) { + return cb(err) + } + + cids + .map(cid => cid.toString()) + // recursive pins pre-empt indirect pins + .filter(key => !this.recursivePins.has(key)) + .forEach(key => indirectKeys.add(key)) + + cb() + }) + }) + }, (err) => { + if (err) { return lockCb(err) } + lockCb(null, Array.from(indirectKeys)) + }) + }, callback) + } + + addRecursivePins (keys, callback) { + this._addPins(keys, this.recursivePins, callback) + } + + addDirectPins (keys, callback) { + this._addPins(keys, this.directPins, callback) + } + + _addPins (keys, pinSet, callback) { + this._lock.writeLock((lockCb) => { + keys = keys.filter(key => !pinSet.has(key)) + if (!keys.length) return lockCb(null, []) + + for (const key of keys) { + pinSet.add(key) + } + this._flushPins(lockCb) + }, callback) + } + + rmPins (keys, recursive, callback) { + if (!keys.length) return callback(null, []) + + this._lock.writeLock((lockCb) => { + for (const key of keys) { + if (recursive && this.recursivePins.has(key)) { + this.recursivePins.delete(key) + } else { + this.directPins.delete(key) + } + } + + this._flushPins(lockCb) + }, callback) + } + + // Encode and write pin key sets to the datastore: + // a DAGLink for each of the recursive and direct pinsets + // a DAGNode holding those as DAGLinks, a kind of root pin + // Note: should only be called within a lock + _flushPins (callback) { + let dLink, rLink, root + series([ + // create a DAGLink to the node with direct pins + cb => waterfall([ + cb => this.pinset.storeSet(this.directKeys(), cb), + ({ node, cid }, cb) => { + try { + cb(null, new DAGLink(PinTypes.direct, node.size, cid)) + } catch (err) { + cb(err) + } + }, + (link, cb) => { dLink = link; cb(null) } + ], cb), + + // create a DAGLink to the node with recursive pins + cb => waterfall([ + cb => this.pinset.storeSet(this.recursiveKeys(), cb), + ({ node, cid }, cb) => { + try { + cb(null, new DAGLink(PinTypes.recursive, node.size, cid)) + } catch (err) { + cb(err) + } + }, + (link, cb) => { rLink = link; cb(null) } + ], cb), + + // the pin-set nodes link to a special 'empty' node, so make sure it exists + cb => { + let empty + + try { + empty = DAGNode.create(Buffer.alloc(0)) + } catch (err) { + return cb(err) + } + + this.dag.put(empty, { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }, cb) + }, + + // create a root node with DAGLinks to the direct and recursive DAGs + cb => { + let node + + try { + node = DAGNode.create(Buffer.alloc(0), [dLink, rLink]) + } catch (err) { + return cb(err) + } + + root = node + this.dag.put(root, { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }, (err, cid) => { + if (!err) { + root.multihash = cid.buffer + } + cb(err) + }) + }, + + // hack for CLI tests + cb => this.repo.closed ? this.repo.open(cb) : cb(null, null), + + // save root to datastore under a consistent key + cb => this.repo.datastore.put(PIN_DS_KEY, root.multihash, cb) + ], (err, res) => { + if (err) { return callback(err) } + this.log(`Flushed pins with root: ${root}`) + return callback(null, root) + }) + } + + load (callback) { + this._lock.writeLock((lockCb) => { + waterfall([ + // hack for CLI tests + (cb) => this.repo.closed ? this.repo.datastore.open(cb) : cb(null, null), + (_, cb) => this.repo.datastore.has(PIN_DS_KEY, cb), + (has, cb) => has ? cb() : cb(new Error('No pins to load')), + (cb) => this.repo.datastore.get(PIN_DS_KEY, cb), + (mh, cb) => { + this.dag.get(new CID(mh), '', { preload: false }, cb) + } + ], (err, pinRoot) => { + if (err) { + if (err.message === 'No pins to load') { + this.log('No pins to load') + return lockCb() + } else { + return lockCb(err) + } + } + + parallel([ + cb => this.pinset.loadSet(pinRoot.value, PinTypes.recursive, cb), + cb => this.pinset.loadSet(pinRoot.value, PinTypes.direct, cb) + ], (err, keys) => { + if (err) { return lockCb(err) } + const [ rKeys, dKeys ] = keys + + this.directPins = new Set(dKeys.map(toB58String)) + this.recursivePins = new Set(rKeys.map(toB58String)) + + this.log('Loaded pins from the datastore') + return lockCb(null) + }) + }) + }, callback) + } + + isPinnedWithType (multihash, type, callback) { + const key = toB58String(multihash) + const { recursive, direct, all } = PinTypes + + // recursive + if ((type === recursive || type === all) && this.recursivePins.has(key)) { + return callback(null, { + key, + pinned: true, + reason: recursive + }) + } + + if (type === recursive) { + return callback(null, { + key, + pinned: false + }) + } + + // direct + if ((type === direct || type === all) && this.directPins.has(key)) { + return callback(null, { + key, + pinned: true, + reason: direct + }) + } + + if (type === direct) { + return callback(null, { + key, + pinned: false + }) + } + + this._lock.readLock((lockCb) => { + // indirect (default) + // check each recursive key to see if multihash is under it + // arbitrary limit, enables handling 1000s of pins. + detectLimit(this.recursiveKeys().map(key => new CID(key)), concurrencyLimit, (cid, cb) => { + waterfall([ + (done) => this.dag.get(cid, '', { preload: false }, done), + (result, done) => done(null, result.value), + (node, done) => this.pinset.hasDescendant(node, key, done) + ], cb) + }, (err, cid) => lockCb(err, { + key, + pinned: Boolean(cid), + reason: cid + })) + }, callback) + } + + // Gets CIDs of blocks used internally by the pinner + getInternalBlocks (callback) { + this._lock.writeLock((lockCb) => { + this.repo.datastore.get(PIN_DS_KEY, (err, mh) => { + if (err) { + if (err.code === 'ERR_NOT_FOUND') { + this.log(`No pinned blocks`) + return lockCb(null, []) + } + return lockCb(new Error(`Could not get pin sets root from datastore: ${err.message}`)) + } + + const cid = new CID(mh) + this.dag.get(cid, '', { preload: false }, (err, obj) => { + if (err) { + return lockCb(new Error(`Could not get pin sets from store: ${err.message}`)) + } + + // The pinner stores an object that has two links to pin sets: + // 1. The directly pinned CIDs + // 2. The recursively pinned CIDs + // If large enough, these pin sets may have links to buckets to hold + // the pins + this.pinset.getInternalCids(obj.value, (err, cids) => { + if (err) { + return lockCb(new Error(`Could not get pinner internal cids: ${err.message}`)) + } + + lockCb(null, cids.concat(cid)) + }) + }) + }) + }, callback) + } + + // Returns an error if the pin type is invalid + static checkPinType (type) { + if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) { + return invalidPinTypeErr(type) + } + } +} + +PinManager.PinTypes = PinTypes + +module.exports = PinManager diff --git a/src/core/components/pin-set.js b/src/core/components/pin/pin-set.js similarity index 89% rename from src/core/components/pin-set.js rename to src/core/components/pin/pin-set.js index 6f3a9f98dc..04a389bf2c 100644 --- a/src/core/components/pin-set.js +++ b/src/core/components/pin/pin-set.js @@ -8,6 +8,7 @@ const varint = require('varint') const { DAGNode, DAGLink } = require('ipld-dag-pb') const multicodec = require('multicodec') const someSeries = require('async/someSeries') +const eachSeries = require('async/eachSeries') const eachOfSeries = require('async/eachOfSeries') const pbSchema = require('./pin.proto') @@ -230,15 +231,15 @@ exports = module.exports = function (dag) { dag.get(link.Hash, '', { preload: false }, (err, res) => { if (err) { return callback(err) } const keys = [] - const step = link => keys.push(link.Hash.buffer) - pinSet.walkItems(res.value, step, err => { + const stepPin = link => keys.push(link.Hash.buffer) + pinSet.walkItems(res.value, { stepPin }, err => { if (err) { return callback(err) } return callback(null, keys) }) }) }, - walkItems: (node, step, callback) => { + walkItems: (node, { stepPin = () => {}, stepBin = () => {} }, callback) => { let pbh try { pbh = readHeader(node) @@ -253,19 +254,37 @@ exports = module.exports = function (dag) { const linkHash = link.Hash.buffer if (!emptyKey.equals(linkHash)) { + stepBin(link, idx, pbh.data) + // walk the links of this fanout bin return dag.get(linkHash, '', { preload: false }, (err, res) => { if (err) { return eachCb(err) } - pinSet.walkItems(res.value, step, eachCb) + pinSet.walkItems(res.value, { stepPin, stepBin }, eachCb) }) } } else { // otherwise, the link is a pin - step(link, idx, pbh.data) + stepPin(link, idx, pbh.data) } eachCb(null) }, callback) + }, + + getInternalCids: (rootNode, callback) => { + // "Empty block" used by the pinner + const cids = [new CID(emptyKey)] + + const stepBin = link => cids.push(link.Hash) + eachSeries(rootNode.Links, (topLevelLink, cb) => { + cids.push(topLevelLink.Hash) + + dag.get(topLevelLink.Hash, '', { preload: false }, (err, res) => { + if (err) { return cb(err) } + + pinSet.walkItems(res.value, { stepBin }, cb) + }) + }, (err) => callback(err, cids)) } } return pinSet diff --git a/src/core/components/pin.proto.js b/src/core/components/pin/pin.proto.js similarity index 100% rename from src/core/components/pin.proto.js rename to src/core/components/pin/pin.proto.js diff --git a/src/core/components/repo.js b/src/core/components/repo.js index 23116d8cf5..25b7cf02ea 100644 --- a/src/core/components/repo.js +++ b/src/core/components/repo.js @@ -38,14 +38,7 @@ module.exports = function repo (self) { }) }), - gc: promisify((options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback(new Error('Not implemented')) - }), + gc: require('./pin/gc')(self), stat: promisify((options, callback) => { if (typeof options === 'function') { diff --git a/src/core/index.js b/src/core/index.js index d457cb6149..f8f2b53b16 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -26,6 +26,7 @@ const defaultRepo = require('./runtime/repo-nodejs') const preload = require('./preload') const mfsPreload = require('./mfs-preload') const ipldOptions = require('./runtime/ipld-nodejs') +const Lock = require('./components/pin/lock') class IPFS extends EventEmitter { constructor (options) { @@ -79,6 +80,7 @@ class IPFS extends EventEmitter { this._ipns = undefined // eslint-disable-next-line no-console this._print = this._options.silent ? this.log : console.log + this._gcLock = new Lock(this._options.repoOwner, 'ipfs:gc:lock') // IPFS Core exposed components // - for booting up a node diff --git a/src/http/api/resources/repo.js b/src/http/api/resources/repo.js index 998431111a..9ca4734a1c 100644 --- a/src/http/api/resources/repo.js +++ b/src/http/api/resources/repo.js @@ -1,9 +1,25 @@ 'use strict' -exports.gc = async (request, h) => { - const { ipfs } = request.server.app - await ipfs.repo.gc() - return h.response() +const Joi = require('joi') + +exports.gc = { + validate: { + query: Joi.object().keys({ + 'stream-errors': Joi.boolean().default(false) + }).unknown() + }, + + async handler (request, h) { + const streamErrors = request.query['stream-errors'] + const { ipfs } = request.server.app + const res = await ipfs.repo.gc() + + const filtered = res.filter(r => !r.err || streamErrors) + const response = filtered.map(r => { + return { Err: r.err, Key: !r.err && { '/': r.cid } } + }) + return h.response(response) + } } exports.version = async (request, h) => { diff --git a/src/http/api/routes/repo.js b/src/http/api/routes/repo.js index 21b306e51e..5f2212385c 100644 --- a/src/http/api/routes/repo.js +++ b/src/http/api/routes/repo.js @@ -12,6 +12,14 @@ module.exports = [ method: '*', path: '/api/v0/repo/stat', handler: resources.repo.stat + }, + { + method: '*', + path: '/api/v0/repo/gc', + options: { + validate: resources.repo.gc.validate + }, + handler: resources.repo.gc.handler } // TODO: implement the missing spec https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/REPO.md ] diff --git a/test/cli/repo.js b/test/cli/repo.js index 17c04aaaa3..b49c6ea396 100644 --- a/test/cli/repo.js +++ b/test/cli/repo.js @@ -3,9 +3,10 @@ const expect = require('chai').expect const repoVersion = require('ipfs-repo').repoVersion - const runOnAndOff = require('../utils/on-and-off') +const fixturePath = 'test/fixtures/planets/solar-system.md' + describe('repo', () => runOnAndOff((thing) => { let ipfs @@ -18,4 +19,30 @@ describe('repo', () => runOnAndOff((thing) => { expect(out).to.eql(`${repoVersion}\n`) }) }) + + // Note: There are more comprehensive GC tests in interface-js-ipfs-core + it('should run garbage collection', async function () { + this.timeout(60000) + + // Add a file to IPFS + const cid = (await ipfs(`add -Q ${fixturePath}`)).trim() + + // File hash should be in refs local + const localRefs = await ipfs('refs local') + expect(localRefs.split('\n')).includes(cid) + + // Run GC, file should not have been removed because it's pinned + const gcOut = await ipfs('repo gc') + expect(gcOut.split('\n')).not.includes('Removed ' + cid) + + // Unpin file + await ipfs('pin rm ' + cid) + + // Run GC, file should now be removed + const gcOutAfterUnpin = await ipfs('repo gc') + expect(gcOutAfterUnpin.split('\n')).to.includes('Removed ' + cid) + + const localRefsAfterGc = await ipfs('refs local') + expect(localRefsAfterGc.split('\n')).not.includes(cid) + }) })) diff --git a/test/core/gc.spec.js b/test/core/gc.spec.js new file mode 100644 index 0000000000..cf9593c6a6 --- /dev/null +++ b/test/core/gc.spec.js @@ -0,0 +1,245 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const isNode = require('detect-node') +const pEvent = require('p-event') +const IPFS = require('../../src/core') +const createTempRepo = require('../utils/create-repo-nodejs') + +describe('gc', function () { + const fixtures = [{ + path: 'test/my/path1', + content: Buffer.from('path1') + }, { + path: 'test/my/path2', + content: Buffer.from('path2') + }, { + path: 'test/my/path3', + content: Buffer.from('path3') + }, { + path: 'test/my/path4', + content: Buffer.from('path4') + }] + + let ipfs + let repo + + before(function (done) { + this.timeout(20 * 1000) + repo = createTempRepo() + let config = { Bootstrap: [] } + if (isNode) { + config.Addresses = { + Swarm: ['/ip4/127.0.0.1/tcp/0'] + } + } + ipfs = new IPFS({ repo, config }) + ipfs.on('ready', done) + }) + + after(function (done) { + this.timeout(60 * 1000) + ipfs.stop(done) + }) + + after((done) => repo.teardown(done)) + + const blockAddTests = [{ + name: 'add', + add1: () => ipfs.add(fixtures[0], { pin: false }), + add2: () => ipfs.add(fixtures[1], { pin: false }), + resToCid: (res) => res[0].hash + }, { + name: 'object put', + add1: () => ipfs.object.put({ Data: 'obj put 1', Links: [] }), + add2: () => ipfs.object.put({ Data: 'obj put 2', Links: [] }), + resToCid: (res) => res.toString() + }, { + name: 'block put', + add1: () => ipfs.block.put(Buffer.from('block put 1'), null), + add2: () => ipfs.block.put(Buffer.from('block put 2'), null), + resToCid: (res) => res.cid.toString() + }] + + describe('locks', function () { + for (const test of blockAddTests) { + // eslint-disable-next-line no-loop-func + it(`garbage collection should wait for pending ${test.name} to finish`, async () => { + // Add blocks to IPFS + // Note: add operation will take a read lock + const addLockRequested = pEvent(ipfs._gcLock, 'readLock request') + const add1 = test.add1() + + // Once add lock has been requested, start GC + await addLockRequested + // Note: GC will take a write lock + const gcStarted = pEvent(ipfs._gcLock, 'writeLock request') + const gc = ipfs.repo.gc() + + // Once GC has started, start second add + await gcStarted + const add2 = test.add2() + + const deleted = (await gc).map(i => i.cid) + const add1Res = test.resToCid(await add1) + const add2Res = test.resToCid(await add2) + + // Should have garbage collected blocks from first add, because GC should + // have waited for first add to finish + expect(deleted).includes(add1Res) + + // Should not have garbage collected blocks from second add, because + // second add should have waited for GC to finish + expect(deleted).not.includes(add2Res) + }) + } + + it('garbage collection should wait for pending add + pin to finish', async () => { + // Add blocks to IPFS + // Note: add operation will take a read lock + const addLockRequested = pEvent(ipfs._gcLock, 'readLock request') + const add1 = ipfs.add(fixtures[2], { pin: true }) + + // Once add lock has been requested, start GC + await addLockRequested + // Note: GC will take a write lock + const gcStarted = pEvent(ipfs._gcLock, 'writeLock request') + const gc = ipfs.repo.gc() + + // Once GC has started, start second add + await gcStarted + const add2 = ipfs.add(fixtures[3], { pin: true }) + + const deleted = (await gc).map(i => i.cid) + const add1Res = (await add1)[0].hash + const add2Res = (await add2)[0].hash + + // Should not have garbage collected blocks from first add, because GC should + // have waited for first add + pin to finish (protected by pin) + expect(deleted).not.includes(add1Res) + + // Should not have garbage collected blocks from second add, because + // second add should have waited for GC to finish + expect(deleted).not.includes(add2Res) + }) + + it('garbage collection should wait for pending block rm to finish', async () => { + // Add two blocks so that we can remove them + const cid1 = (await ipfs.block.put(Buffer.from('block to rm 1'), null)).cid + const cid2 = (await ipfs.block.put(Buffer.from('block to rm 2'), null)).cid + + // Remove first block from IPFS + // Note: block rm will take a write lock + const rmLockRequested = pEvent(ipfs._gcLock, 'writeLock request') + const rm1 = ipfs.block.rm(cid1) + + // Once rm lock has been requested, start GC + await rmLockRequested + // Note: GC will take a write lock + const gcStarted = pEvent(ipfs._gcLock, 'writeLock request') + const gc = ipfs.repo.gc() + + // Once GC has started, start second rm + await gcStarted + const rm2 = ipfs.block.rm(cid2) + + const deleted = (await gc).map(i => i.cid) + await rm1 + + // Second rm should fail because GC has already removed that block + try { + await rm2 + } catch (err) { + expect(err.code).eql('ERR_DB_DELETE_FAILED') + } + + // Confirm second second block has been removed + const localRefs = (await ipfs.refs.local()).map(r => r.ref) + expect(localRefs).not.includes(cid2.toString()) + + // Should not have garbage collected block from first block put, because + // GC should have waited for first rm (removing first block put) to finish + expect(deleted).not.includes(cid1.toString()) + + // Should have garbage collected block from second block put, because GC + // should have completed before second rm (removing second block put) + expect(deleted).includes(cid2.toString()) + }) + + it('garbage collection should wait for pending pin add to finish', async () => { + // Add two blocks so that we can pin them + const cid1 = (await ipfs.block.put(Buffer.from('block to pin add 1'), null)).cid + const cid2 = (await ipfs.block.put(Buffer.from('block to pin add 2'), null)).cid + + // Pin first block + // Note: pin add will take a read lock + const pinLockRequested = pEvent(ipfs._gcLock, 'readLock request') + const pin1 = ipfs.pin.add(cid1) + + // Once pin lock has been requested, start GC + await pinLockRequested + const gc = ipfs.repo.gc() + const deleted = (await gc).map(i => i.cid) + await pin1 + + // TODO: Adding pin for removed block never returns, which means the lock + // never gets released + // const pin2 = ipfs.pin.add(cid2) + + // Confirm second second block has been removed + const localRefs = (await ipfs.refs.local()).map(r => r.ref) + expect(localRefs).not.includes(cid2.toString()) + + // Should not have garbage collected block from first block put, because + // GC should have waited for pin (protecting first block put) to finish + expect(deleted).not.includes(cid1.toString()) + + // Should have garbage collected block from second block put, because GC + // should have completed before second pin + expect(deleted).includes(cid2.toString()) + }) + + it('garbage collection should wait for pending pin rm to finish', async () => { + // Add two blocks so that we can pin them + const cid1 = (await ipfs.block.put(Buffer.from('block to pin rm 1'), null)).cid + const cid2 = (await ipfs.block.put(Buffer.from('block to pin rm 2'), null)).cid + + // Pin blocks + await ipfs.pin.add(cid1) + await ipfs.pin.add(cid2) + + // Unpin first block + // Note: pin rm will take a read lock + const pinLockRequested = pEvent(ipfs._gcLock, 'readLock request') + const pinRm1 = ipfs.pin.rm(cid1) + + // Once pin lock has been requested, start GC + await pinLockRequested + // Note: GC will take a write lock + const gcStarted = pEvent(ipfs._gcLock, 'writeLock request') + const gc = ipfs.repo.gc() + + // Once GC has started, start second pin rm + await gcStarted + const pinRm2 = ipfs.pin.rm(cid2) + + const deleted = (await gc).map(i => i.cid) + await pinRm1 + await pinRm2 + + // Should have garbage collected block from first block put, because + // GC should have waited for pin rm (unpinning first block put) to finish + expect(deleted).includes(cid1.toString()) + + // Should not have garbage collected block from second block put, because + // GC should have completed before second block was unpinned + expect(deleted).not.includes(cid2.toString()) + }) + }) +}) diff --git a/test/core/interface.spec.js b/test/core/interface.spec.js index df572c19e2..bade8cafcd 100644 --- a/test/core/interface.spec.js +++ b/test/core/interface.spec.js @@ -172,15 +172,7 @@ describe('interface-ipfs-core tests', function () { } }) - tests.repo(defaultCommonFactory, { - skip: [ - // repo.gc - { - name: 'gc', - reason: 'TODO: repo.gc is not implemented in js-ipfs yet!' - } - ] - }) + tests.repo(defaultCommonFactory) tests.stats(defaultCommonFactory) diff --git a/test/core/lock.spec.js b/test/core/lock.spec.js new file mode 100644 index 0000000000..32d2d5792d --- /dev/null +++ b/test/core/lock.spec.js @@ -0,0 +1,297 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') +const pull = require('pull-stream') +const Lock = require('../../src/core/components/pin/lock') + +const cbTakeLock = (type, lock, out, id, duration) => { + return (cb) => lock[type + 'Lock']((lockCb) => { + out.push(`${type} ${id} start`) + setTimeout(() => { + out.push(`${type} ${id} end`) + lockCb() + }, duration) + }, cb) +} +const cbReadLock = (lock, out, id, duration) => { + return cbTakeLock('read', lock, out, id, duration) +} +const cbWriteLock = (lock, out, id, duration) => { + return cbTakeLock('write', lock, out, id, duration) +} +const cbTakeLockError = (type, lock, out, errs, id, duration) => { + return (cb) => lock[type + 'Lock']((lockCb) => { + out.push(`${type} ${id} start`) + setTimeout(() => { + out.push(`${type} ${id} error`) + lockCb(new Error('err')) + }, duration) + }, (err) => { + errs.push(err) + cb() + }) +} +const cbReadLockError = (lock, out, errs, id, duration) => { + return cbTakeLockError('read', lock, out, errs, id, duration) +} +const cbWriteLockError = (lock, out, errs, id, duration) => { + return cbTakeLockError('write', lock, out, errs, id, duration) +} + +const pullTakeLock = (type, lock, out, id, duration) => { + const lockFn = type === 'read' ? 'pullReadLock' : 'pullWriteLock' + const vals = ['a', 'b', 'c'] + return (cb) => { + pull( + pull.values(vals), + lock[lockFn](() => { + let started = false + return pull( + pull.through((i) => { + if (!started) { + out.push(`${type} ${id} start`) + started = true + } + }), + pull.asyncMap((i, cb) => { + setTimeout(() => cb(null, i), duration / vals.length) + }) + ) + }), + pull.collect(() => { + out.push(`${type} ${id} end`) + cb() + }) + ) + } +} +const pullReadLock = (lock, out, id, duration) => { + return pullTakeLock('read', lock, out, id, duration) +} +const pullWriteLock = (lock, out, id, duration) => { + return pullTakeLock('write', lock, out, id, duration) +} +const pullTakeLockError = (type, lock, out, errs, id, duration) => { + const lockFn = type === 'read' ? 'pullReadLock' : 'pullWriteLock' + const vals = ['a', 'b', 'c'] + return (cb) => { + pull( + pull.values(vals), + lock[lockFn](() => { + let started = false + return pull( + pull.through((i) => { + if (!started) { + out.push(`${type} ${id} start`) + started = true + } + }), + pull.asyncMap((i, cb) => { + setTimeout(() => cb(new Error('err')), duration) + }) + ) + }), + pull.collect((err) => { + out.push(`${type} ${id} error`) + errs.push(err) + cb() + }) + ) + } +} +const pullReadLockError = (lock, out, errs, id, duration) => { + return pullTakeLockError('read', lock, out, errs, id, duration) +} +const pullWriteLockError = (lock, out, errs, id, duration) => { + return pullTakeLockError('write', lock, out, errs, id, duration) +} + +const expectResult = (out, exp, errs, expErrCount, done) => { + if (typeof errs === 'function') { + done = errs + } + return () => { + try { + expect(out).to.eql(exp) + if (typeof expErrCount === 'number') { + expect(errs.length).to.eql(expErrCount) + for (const e of errs) { + expect(e.message).to.eql('err') + } + } + } catch (err) { + return done(err) + } + done() + } +} + +const runTests = (suiteName, { readLock, writeLock, readLockError, writeLockError }) => { + describe(suiteName, () => { + it('multiple simultaneous reads', (done) => { + const lock = new Lock() + const out = [] + parallel([ + readLock(lock, out, 1, 100), + readLock(lock, out, 2, 200), + readLock(lock, out, 3, 300) + ], expectResult(out, [ + 'read 1 start', + 'read 2 start', + 'read 3 start', + 'read 1 end', + 'read 2 end', + 'read 3 end' + ], done)) + }) + + it('multiple simultaneous writes', (done) => { + const lock = new Lock() + const out = [] + parallel([ + writeLock(lock, out, 1, 100), + writeLock(lock, out, 2, 200), + writeLock(lock, out, 3, 300) + ], expectResult(out, [ + 'write 1 start', + 'write 1 end', + 'write 2 start', + 'write 2 end', + 'write 3 start', + 'write 3 end' + ], done)) + }) + + it('read then write then read', (done) => { + const lock = new Lock() + const out = [] + parallel([ + readLock(lock, out, 1, 100), + writeLock(lock, out, 1, 100), + readLock(lock, out, 2, 100) + ], expectResult(out, [ + 'read 1 start', + 'read 1 end', + 'write 1 start', + 'write 1 end', + 'read 2 start', + 'read 2 end' + ], done)) + }) + + it('write then read then write', (done) => { + const lock = new Lock() + const out = [] + parallel([ + writeLock(lock, out, 1, 100), + readLock(lock, out, 1, 100), + writeLock(lock, out, 2, 100) + ], expectResult(out, [ + 'write 1 start', + 'write 1 end', + 'read 1 start', + 'read 1 end', + 'write 2 start', + 'write 2 end' + ], done)) + }) + + it('two simultaneous reads then write then read', (done) => { + const lock = new Lock() + const out = [] + parallel([ + readLock(lock, out, 1, 100), + readLock(lock, out, 2, 200), + writeLock(lock, out, 1, 100), + readLock(lock, out, 3, 100) + ], expectResult(out, [ + 'read 1 start', + 'read 2 start', + 'read 1 end', + 'read 2 end', + 'write 1 start', + 'write 1 end', + 'read 3 start', + 'read 3 end' + ], done)) + }) + + it('two simultaneous writes then read then write', (done) => { + const lock = new Lock() + const out = [] + parallel([ + writeLock(lock, out, 1, 100), + writeLock(lock, out, 2, 100), + readLock(lock, out, 1, 100), + writeLock(lock, out, 3, 100) + ], expectResult(out, [ + 'write 1 start', + 'write 1 end', + 'write 2 start', + 'write 2 end', + 'read 1 start', + 'read 1 end', + 'write 3 start', + 'write 3 end' + ], done)) + }) + + it('simultaneous reads with error then write', (done) => { + const lock = new Lock() + const out = [] + const errs = [] + parallel([ + readLockError(lock, out, errs, 1, 100), + readLock(lock, out, 2, 200), + writeLock(lock, out, 1, 100) + ], expectResult(out, [ + 'read 1 start', + 'read 2 start', + 'read 1 error', + 'read 2 end', + 'write 1 start', + 'write 1 end' + ], errs, 1, done)) + }) + + it('simultaneous writes with error then read', (done) => { + const lock = new Lock() + const out = [] + const errs = [] + parallel([ + writeLockError(lock, out, errs, 1, 100), + writeLock(lock, out, 2, 100), + readLock(lock, out, 1, 100) + ], expectResult(out, [ + 'write 1 start', + 'write 1 error', + 'write 2 start', + 'write 2 end', + 'read 1 start', + 'read 1 end' + ], errs, 1, done)) + }) + }) +} + +describe('lock', function () { + runTests('cb style lock', { + readLock: cbReadLock, + writeLock: cbWriteLock, + readLockError: cbReadLockError, + writeLockError: cbWriteLockError + }) + + runTests('pull stream style lock', { + readLock: pullReadLock, + writeLock: pullWriteLock, + readLockError: pullReadLockError, + writeLockError: pullWriteLockError + }) +}) diff --git a/test/core/pin-set.js b/test/core/pin-set.js index 088d248a12..73bfb69044 100644 --- a/test/core/pin-set.js +++ b/test/core/pin-set.js @@ -19,9 +19,10 @@ const { const CID = require('cids') const IPFS = require('../../src/core') -const createPinSet = require('../../src/core/components/pin-set') +const createPinSet = require('../../src/core/components/pin/pin-set') const createTempRepo = require('../utils/create-repo-nodejs') +const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' const defaultFanout = 256 const maxItems = 8192 @@ -185,7 +186,7 @@ describe('pinSet', function () { createNode('datum', (err, node) => { expect(err).to.not.exist() - pinSet.walkItems(node, () => {}, (err, res) => { + pinSet.walkItems(node, {}, (err, res) => { expect(err).to.exist() expect(res).to.not.exist() done() @@ -193,9 +194,33 @@ describe('pinSet', function () { }) }) + it('visits all links of a root node', function (done) { + this.timeout(90 * 1000) + + const seenPins = [] + const stepPin = (link, idx, data) => seenPins.push({ link, idx, data }) + const seenBins = [] + const stepBin = (link, idx, data) => seenBins.push({ link, idx, data }) + + createNodes(maxItems + 1, (err, nodes) => { + expect(err).to.not.exist() + + pinSet.storeSet(nodes, (err, result) => { + expect(err).to.not.exist() + + pinSet.walkItems(result.node, { stepPin, stepBin }, err => { + expect(err).to.not.exist() + expect(seenPins).to.have.length(maxItems + 1) + expect(seenBins).to.have.length(defaultFanout) + done() + }) + }) + }) + }) + it('visits all non-fanout links of a root node', function (done) { const seen = [] - const walker = (link, idx, data) => seen.push({ link, idx, data }) + const stepPin = (link, idx, data) => seen.push({ link, idx, data }) createNodes(defaultFanout, (err, nodes) => { expect(err).to.not.exist() @@ -203,7 +228,7 @@ describe('pinSet', function () { pinSet.storeSet(nodes, (err, result) => { expect(err).to.not.exist() - pinSet.walkItems(result.node, walker, err => { + pinSet.walkItems(result.node, { stepPin }, err => { expect(err).to.not.exist() expect(seen).to.have.length(defaultFanout) expect(seen[0].idx).to.eql(defaultFanout) @@ -217,4 +242,26 @@ describe('pinSet', function () { }) }) }) + + describe('getInternalCids', function () { + it('gets all links and empty key CID', function (done) { + createNodes(defaultFanout, (err, nodes) => { + expect(err).to.not.exist() + + pinSet.storeSet(nodes, (err, result) => { + expect(err).to.not.exist() + + const rootNode = DAGNode.create('pins', [{ Hash: result.cid }]) + pinSet.getInternalCids(rootNode, (err, cids) => { + expect(err).to.not.exist() + expect(cids.length).to.eql(2) + const cidStrs = cids.map(c => c.toString()) + expect(cidStrs).includes(emptyKeyHash) + expect(cidStrs).includes(result.cid.toString()) + done() + }) + }) + }) + }) + }) }) diff --git a/test/core/pin.js b/test/core/pin.js index b6ad6a220c..b23b5aeb8a 100644 --- a/test/core/pin.js +++ b/test/core/pin.js @@ -331,4 +331,42 @@ describe('pin', function () { .then(ls => expect(ls.length).to.eql(1)) }) }) + + describe('locking', function () { + beforeEach(clearPins) + + const resolveAsync = (pFn) => { + return new Promise((resolve) => setTimeout(() => pFn().then(resolve))) + } + + it('concurrent adds', async function () { + const promises = [] + promises.push(pin.add(pins.mercuryWiki, { recursive: false })) + promises.push(resolveAsync(() => pin.add(pins.solarWiki))) + await Promise.all(promises) + const addLs = await pin.ls() + expect(addLs.length).to.eql(2) + }) + + it('concurrent rms', async function () { + const promises = [] + await pin.add(pins.mercuryWiki) + await pin.add(pins.solarWiki) + promises.push(pin.rm(pins.mercuryWiki)) + promises.push(resolveAsync(() => pin.rm(pins.solarWiki))) + await Promise.all(promises) + const rmLs = await pin.ls() + expect(rmLs.length).to.eql(0) + }) + + it('concurrent add and rm', async function () { + const promises = [] + await pin.add(pins.mercuryWiki) + promises.push(pin.add(pins.solarWiki)) + promises.push(resolveAsync(() => pin.rm(pins.mercuryWiki))) + await Promise.all(promises) + const addRmLs = await pin.ls() + expect(addRmLs.length).to.eql(1) + }) + }) }) diff --git a/test/http-api/interface.js b/test/http-api/interface.js index 43c6ff6a12..5afbce1f7d 100644 --- a/test/http-api/interface.js +++ b/test/http-api/interface.js @@ -109,15 +109,7 @@ describe('interface-ipfs-core over ipfs-http-client tests', () => { } })) - tests.repo(defaultCommonFactory, { - skip: [ - // repo.gc - { - name: 'gc', - reason: 'TODO: repo.gc is not implemented in js-ipfs yet!' - } - ] - }) + tests.repo(defaultCommonFactory) tests.stats(defaultCommonFactory)