Skip to content
This repository was archived by the owner on Oct 1, 2021. It is now read-only.

Commit 051c0a4

Browse files
authored
feat: report migration progress (#33)
Changes the `onProgress` option to receive feedback on how far each migration has progressed with a percent complete indicator and a message to show the user. If an `onProgress` option is passed, migrations will get a bit slower as we need to calculate the total volume of work before starting a migration in order to work out the percent complete. Also removes datastores from runtime dependencies as these should only ever be passed in as config. Fixes #32 BREAKING CHANGES: - The signature of the `onProgress` callback has changed
1 parent 1ba2a18 commit 051c0a4

22 files changed

+269
-355
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ This package is inspired by the [go-ipfs repo migration tool](https://github.com
2929
- [Usage](#usage)
3030
- [API](#api)
3131
- [`.migrate(path, repoOptions, toVersion, {ignoreLock, onProgress, isDryRun}) -> Promise<void>`](#migratepath-repooptions-toversion-ignorelock-onprogress-isdryrun---promisevoid)
32-
- [`onProgress(migration, counter, totalMigrations)`](#onprogressmigration-counter-totalmigrations)
32+
- [`onProgress(versionFrom, versionTo, percent, message)`](#onprogressversionfrom-versionto-percent-message)
3333
- [`.revert(path, repoOptions, toVersion, {ignoreLock, onProgress, isDryRun}) -> Promise<void>`](#revertpath-repooptions-toversion-ignorelock-onprogress-isdryrun---promisevoid)
3434
- [`getLatestMigrationVersion() -> int`](#getlatestmigrationversion---int)
3535
- [Creating a new migration](#creating-a-new-migration)
@@ -121,7 +121,7 @@ Executes a forward migration to a specific version, or to the latest version if
121121
* `options.onProgress` (function, optional) - callback that is called after finishing execution of each migration to report progress.
122122
* `options.isDryRun` (bool, optional) - flag that indicates if it is a dry run that should give the same output as running a migration but without making any actual changes.
123123

124-
#### `onProgress(migration, counter, totalMigrations)`
124+
#### `onProgress(versionFrom, versionTo, percent, message)`
125125

126126
Signature of the progress callback.
127127

migrations/migration-8/index.js

+23-35
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,11 @@
22

33
const CID = require('cids')
44
const Key = require('interface-datastore').Key
5-
const core = require('datastore-core')
6-
const ShardingStore = core.ShardingDatastore
75
const mb = require('multibase')
8-
const utils = require('../../src/utils')
96
const log = require('debug')('ipfs-repo-migrations:migration-8')
107
const uint8ArrayToString = require('uint8arrays/to-string')
11-
12-
// This function in js-ipfs-repo defaults to not using sharding
13-
// but the default value of the options.sharding is true hence this
14-
// function defaults to use sharding.
15-
async function maybeWithSharding (filestore, options) {
16-
if (options.sharding === false) {
17-
return filestore
18-
}
19-
20-
const shard = new core.shard.NextToLast(2)
21-
22-
return ShardingStore.createOrOpen(filestore, shard)
23-
}
8+
const { createStore } = require('../../src/utils')
9+
const length = require('it-length')
2410

2511
function keyToMultihash (key) {
2612
const buf = mb.decode(`b${key.toString().slice(1)}`)
@@ -46,44 +32,46 @@ function keyToCid (key) {
4632
return new Key(`/${uint8ArrayToString(multihash)}`.toUpperCase(), false)
4733
}
4834

49-
async function process (repoPath, options, keyFunction){
50-
const { StorageBackend, storageOptions } = utils.getDatastoreAndOptions(options, 'blocks')
35+
async function process (repoPath, repoOptions, onProgress, keyFunction) {
36+
const blockstore = await createStore(repoPath, 'blocks', repoOptions)
37+
await blockstore.open()
38+
39+
let blockCount
5140

52-
const baseStore = new StorageBackend(`${repoPath}/blocks`, storageOptions)
53-
await baseStore.open()
54-
const store = await maybeWithSharding(baseStore, storageOptions)
55-
await store.open()
41+
if (onProgress) {
42+
blockCount = await length(blockstore.query({ keysOnly: true }))
43+
}
5644

5745
try {
5846
let counter = 0
5947

60-
for await (const block of store.query({})) {
48+
for await (const block of blockstore.query({})) {
6149
const newKey = keyFunction(block.key)
50+
counter += 1
6251

6352
// If the Key is base32 CIDv0 then there's nothing to do
6453
if(newKey.toString() !== block.key.toString()) {
65-
counter += 1
54+
log(`Migrating Block from ${block.key} to ${newKey}`)
55+
await blockstore.delete(block.key)
56+
await blockstore.put(newKey, block.value)
6657

67-
log(`Migrating Block from ${block.key.toString()} to ${newKey.toString()}`)
68-
await store.delete(block.key)
69-
await store.put(newKey, block.value)
58+
if (onProgress) {
59+
onProgress((counter / blockCount) * 100, `Migrated Block from ${block.key} to ${newKey}`)
60+
}
7061
}
7162
}
72-
73-
log(`Changed ${ counter } blocks`)
7463
} finally {
75-
await store.close()
76-
await baseStore.close()
64+
await blockstore.close()
7765
}
7866
}
7967

8068
module.exports = {
8169
version: 8,
8270
description: 'Transforms key names into base32 encoding and converts Block store to use bare multihashes encoded as base32',
83-
migrate: (repoPath, options = {}) => {
84-
return process(repoPath, options, keyToMultihash)
71+
migrate: (repoPath, repoOptions, onProgress) => {
72+
return process(repoPath, repoOptions, onProgress, keyToMultihash)
8573
},
86-
revert: (repoPath, options = {}) => {
87-
return process(repoPath, options, keyToCid)
74+
revert: (repoPath, repoOptions, onProgress) => {
75+
return process(repoPath, repoOptions, onProgress, keyToCid)
8876
}
8977
}

migrations/migration-9/index.js

+41-13
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,21 @@ const cbor = require('cbor')
66
const multicodec = require('multicodec')
77
const multibase = require('multibase')
88
const pinset = require('./pin-set')
9-
const { createStore, cidToKey, PIN_DS_KEY, PinTypes } = require('./utils')
9+
const { createStore } = require('../../src/utils')
10+
const { cidToKey, PIN_DS_KEY, PinTypes } = require('./utils')
11+
const length = require('it-length')
1012

11-
async function pinsToDatastore (blockstore, datastore, pinstore) {
13+
async function pinsToDatastore (blockstore, datastore, pinstore, onProgress) {
1214
const mh = await datastore.get(PIN_DS_KEY)
1315
const cid = new CID(mh)
14-
1516
const pinRootBuf = await blockstore.get(cidToKey(cid))
1617
const pinRoot = dagpb.util.deserialize(pinRootBuf)
1718

19+
const pinCount = (await length(pinset.loadSet(blockstore, pinRoot, PinTypes.recursive))) + (await length(pinset.loadSet(blockstore, pinRoot, PinTypes.direct)))
20+
let counter = 0
21+
1822
for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.recursive)) {
23+
counter++
1924
const pin = {
2025
depth: Infinity
2126
}
@@ -29,9 +34,12 @@ async function pinsToDatastore (blockstore, datastore, pinstore) {
2934
}
3035

3136
await pinstore.put(cidToKey(cid), cbor.encode(pin))
37+
38+
onProgress((counter / pinCount) * 100, `Migrated recursive pin ${cid}`)
3239
}
3340

3441
for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.direct)) {
42+
counter++
3543
const pin = {
3644
depth: 0
3745
}
@@ -45,27 +53,47 @@ async function pinsToDatastore (blockstore, datastore, pinstore) {
4553
}
4654

4755
await pinstore.put(cidToKey(cid), cbor.encode(pin))
56+
57+
onProgress((counter / pinCount) * 100, `Migrated direct pin ${cid}`)
4858
}
4959

5060
await blockstore.delete(cidToKey(cid))
5161
await datastore.delete(PIN_DS_KEY)
5262
}
5363

54-
async function pinsToDAG (blockstore, datastore, pinstore) {
64+
async function pinsToDAG (blockstore, datastore, pinstore, onProgress) {
5565
let recursivePins = []
5666
let directPins = []
5767

68+
let pinCount
69+
70+
if (onProgress) {
71+
pinCount = await length(pinstore.query({ keysOnly: true }))
72+
}
73+
74+
let counter = 0
75+
5876
for await (const { key, value } of pinstore.query({})) {
77+
counter++
5978
const pin = cbor.decode(value)
6079
const cid = new CID(pin.version || 0, pin.codec && multicodec.getName(pin.codec) || 'dag-pb', multibase.decode('b' + key.toString().split('/').pop()))
6180

6281
if (pin.depth === 0) {
82+
if (onProgress) {
83+
onProgress((counter / pinCount) * 100, `Reverted direct pin ${cid}`)
84+
}
85+
6386
directPins.push(cid)
6487
} else {
88+
if (onProgress) {
89+
onProgress((counter / pinCount) * 100, `Reverted recursive pin ${cid}`)
90+
}
91+
6592
recursivePins.push(cid)
6693
}
6794
}
6895

96+
onProgress(100, 'Updating pin root')
6997
const pinRoot = new dagpb.DAGNode(new Uint8Array(), [
7098
await pinset.storeSet(blockstore, PinTypes.recursive, recursivePins),
7199
await pinset.storeSet(blockstore, PinTypes.direct, directPins)
@@ -79,17 +107,17 @@ async function pinsToDAG (blockstore, datastore, pinstore) {
79107
await datastore.put(PIN_DS_KEY, cid.multihash)
80108
}
81109

82-
async function process (repoPath, options, fn) {
83-
const blockstore = await createStore(repoPath, 'blocks', options)
84-
const datastore = await createStore(repoPath, 'datastore', options)
85-
const pinstore = await createStore(repoPath, 'pins', options)
110+
async function process (repoPath, repoOptions, onProgress, fn) {
111+
const blockstore = await createStore(repoPath, 'blocks', repoOptions)
112+
const datastore = await createStore(repoPath, 'datastore', repoOptions)
113+
const pinstore = await createStore(repoPath, 'pins', repoOptions)
86114

87115
await blockstore.open()
88116
await datastore.open()
89117
await pinstore.open()
90118

91119
try {
92-
await fn(blockstore, datastore, pinstore)
120+
await fn(blockstore, datastore, pinstore, onProgress)
93121
} finally {
94122
await pinstore.close()
95123
await datastore.close()
@@ -100,10 +128,10 @@ async function process (repoPath, options, fn) {
100128
module.exports = {
101129
version: 9,
102130
description: 'Migrates pins to datastore',
103-
migrate: (repoPath, options = {}) => {
104-
return process(repoPath, options, pinsToDatastore)
131+
migrate: (repoPath, repoOptions, onProgress) => {
132+
return process(repoPath, repoOptions, onProgress, pinsToDatastore)
105133
},
106-
revert: (repoPath, options = {}) => {
107-
return process(repoPath, options, pinsToDAG)
134+
revert: (repoPath, repoOptions, onProgress) => {
135+
return process(repoPath, repoOptions, onProgress, pinsToDAG)
108136
}
109137
}

migrations/migration-9/utils.js

-26
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
'use strict'
22

3-
const core = require('datastore-core')
4-
const ShardingStore = core.ShardingDatastore
5-
const utils = require('../../src/utils')
63
const multibase = require('multibase')
74
const { Key } = require('interface-datastore')
85
const multihashes = require('multihashing-async').multihash
@@ -21,34 +18,11 @@ function cidToKey (cid) {
2118
return new Key(`/${multibase.encoding('base32upper').encode(cid.multihash)}`)
2219
}
2320

24-
// This function in js-ipfs-repo defaults to not using sharding
25-
// but the default value of the options.sharding is true hence this
26-
// function defaults to use sharding.
27-
async function maybeWithSharding (filestore, options) {
28-
if (options.sharding === false) {
29-
return filestore
30-
}
31-
32-
const shard = new core.shard.NextToLast(2)
33-
34-
return ShardingStore.createOrOpen(filestore, shard)
35-
}
36-
37-
const createStore = async (location, name, options) => {
38-
const { StorageBackend, storageOptions } = utils.getDatastoreAndOptions(options, name)
39-
40-
let store = new StorageBackend(`${location}/${name}`, storageOptions)
41-
store = maybeWithSharding(store, storageOptions)
42-
43-
return store
44-
}
45-
4621
module.exports = {
4722
PIN_DS_KEY,
4823
DEFAULT_FANOUT,
4924
MAX_ITEMS,
5025
EMPTY_KEY,
5126
PinTypes,
52-
createStore,
5327
cidToKey
5428
}

package.json

+4-6
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,11 @@
4444
"cbor": "^5.0.2",
4545
"cids": "^1.0.0",
4646
"datastore-core": "^2.0.0",
47-
"datastore-fs": "^2.0.0",
48-
"datastore-level": "^2.0.0",
4947
"debug": "^4.1.0",
5048
"fnv1a": "^1.0.1",
5149
"interface-datastore": "^2.0.0",
5250
"ipld-dag-pb": "^0.20.0",
51+
"it-length": "0.0.2",
5352
"multibase": "^3.0.0",
5453
"multicodec": "^2.0.0",
5554
"multihashing-async": "^2.0.0",
@@ -59,11 +58,10 @@
5958
"varint": "^5.0.0"
6059
},
6160
"devDependencies": {
62-
"aegir": "^25.0.0",
63-
"chai": "^4.2.0",
64-
"chai-as-promised": "^7.1.1",
61+
"aegir": "^26.0.0",
6562
"datastore-car": "^1.2.0",
66-
"dirty-chai": "^2.0.1",
63+
"datastore-fs": "^2.0.1",
64+
"datastore-level": "^2.0.0",
6765
"it-all": "^1.0.2",
6866
"just-safe-set": "^2.1.0",
6967
"ncp": "^2.0.0",

0 commit comments

Comments
 (0)