Skip to content
This repository was archived by the owner on Oct 1, 2021. It is now read-only.

feat: report migration progress #33

Merged
merged 1 commit into from
Aug 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ This package is inspired by the [go-ipfs repo migration tool](https://github.com
- [Usage](#usage)
- [API](#api)
- [`.migrate(path, repoOptions, toVersion, {ignoreLock, onProgress, isDryRun}) -> Promise<void>`](#migratepath-repooptions-toversion-ignorelock-onprogress-isdryrun---promisevoid)
- [`onProgress(migration, counter, totalMigrations)`](#onprogressmigration-counter-totalmigrations)
- [`onProgress(versionFrom, versionTo, percent, message)`](#onprogressversionfrom-versionto-percent-message)
- [`.revert(path, repoOptions, toVersion, {ignoreLock, onProgress, isDryRun}) -> Promise<void>`](#revertpath-repooptions-toversion-ignorelock-onprogress-isdryrun---promisevoid)
- [`getLatestMigrationVersion() -> int`](#getlatestmigrationversion---int)
- [Creating a new migration](#creating-a-new-migration)
Expand Down Expand Up @@ -121,7 +121,7 @@ Executes a forward migration to a specific version, or to the latest version if
* `options.onProgress` (function, optional) - callback that is called after finishing execution of each migration to report progress.
* `options.isDryRun` (bool, optional) - flag that indicates if it is a dry run that should give the same output as running a migration but without making any actual changes.

#### `onProgress(migration, counter, totalMigrations)`
#### `onProgress(versionFrom, versionTo, percent, message)`

Signature of the progress callback.

Expand Down
58 changes: 23 additions & 35 deletions migrations/migration-8/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,11 @@

const CID = require('cids')
const Key = require('interface-datastore').Key
const core = require('datastore-core')
const ShardingStore = core.ShardingDatastore
const mb = require('multibase')
const utils = require('../../src/utils')
const log = require('debug')('ipfs-repo-migrations:migration-8')
const uint8ArrayToString = require('uint8arrays/to-string')

// This function in js-ipfs-repo defaults to not using sharding
// but the default value of the options.sharding is true hence this
// function defaults to use sharding.
async function maybeWithSharding (filestore, options) {
if (options.sharding === false) {
return filestore
}

const shard = new core.shard.NextToLast(2)

return ShardingStore.createOrOpen(filestore, shard)
}
const { createStore } = require('../../src/utils')
const length = require('it-length')

function keyToMultihash (key) {
const buf = mb.decode(`b${key.toString().slice(1)}`)
Expand All @@ -46,44 +32,46 @@ function keyToCid (key) {
return new Key(`/${uint8ArrayToString(multihash)}`.toUpperCase(), false)
}

async function process (repoPath, options, keyFunction){
const { StorageBackend, storageOptions } = utils.getDatastoreAndOptions(options, 'blocks')
async function process (repoPath, repoOptions, onProgress, keyFunction) {
const blockstore = await createStore(repoPath, 'blocks', repoOptions)
await blockstore.open()

let blockCount

const baseStore = new StorageBackend(`${repoPath}/blocks`, storageOptions)
await baseStore.open()
const store = await maybeWithSharding(baseStore, storageOptions)
await store.open()
if (onProgress) {
blockCount = await length(blockstore.query({ keysOnly: true }))
}

try {
let counter = 0

for await (const block of store.query({})) {
for await (const block of blockstore.query({})) {
const newKey = keyFunction(block.key)
counter += 1

// If the Key is base32 CIDv0 then there's nothing to do
if(newKey.toString() !== block.key.toString()) {
counter += 1
log(`Migrating Block from ${block.key} to ${newKey}`)
await blockstore.delete(block.key)
await blockstore.put(newKey, block.value)

log(`Migrating Block from ${block.key.toString()} to ${newKey.toString()}`)
await store.delete(block.key)
await store.put(newKey, block.value)
if (onProgress) {
onProgress((counter / blockCount) * 100, `Migrated Block from ${block.key} to ${newKey}`)
}
}
}

log(`Changed ${ counter } blocks`)
} finally {
await store.close()
await baseStore.close()
await blockstore.close()
}
}

module.exports = {
version: 8,
description: 'Transforms key names into base32 encoding and converts Block store to use bare multihashes encoded as base32',
migrate: (repoPath, options = {}) => {
return process(repoPath, options, keyToMultihash)
migrate: (repoPath, repoOptions, onProgress) => {
return process(repoPath, repoOptions, onProgress, keyToMultihash)
},
revert: (repoPath, options = {}) => {
return process(repoPath, options, keyToCid)
revert: (repoPath, repoOptions, onProgress) => {
return process(repoPath, repoOptions, onProgress, keyToCid)
}
}
54 changes: 41 additions & 13 deletions migrations/migration-9/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@ const cbor = require('cbor')
const multicodec = require('multicodec')
const multibase = require('multibase')
const pinset = require('./pin-set')
const { createStore, cidToKey, PIN_DS_KEY, PinTypes } = require('./utils')
const { createStore } = require('../../src/utils')
const { cidToKey, PIN_DS_KEY, PinTypes } = require('./utils')
const length = require('it-length')

async function pinsToDatastore (blockstore, datastore, pinstore) {
async function pinsToDatastore (blockstore, datastore, pinstore, onProgress) {
const mh = await datastore.get(PIN_DS_KEY)
const cid = new CID(mh)

const pinRootBuf = await blockstore.get(cidToKey(cid))
const pinRoot = dagpb.util.deserialize(pinRootBuf)

const pinCount = (await length(pinset.loadSet(blockstore, pinRoot, PinTypes.recursive))) + (await length(pinset.loadSet(blockstore, pinRoot, PinTypes.direct)))
let counter = 0

for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.recursive)) {
counter++
const pin = {
depth: Infinity
}
Expand All @@ -29,9 +34,12 @@ async function pinsToDatastore (blockstore, datastore, pinstore) {
}

await pinstore.put(cidToKey(cid), cbor.encode(pin))

onProgress((counter / pinCount) * 100, `Migrated recursive pin ${cid}`)
}

for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.direct)) {
counter++
const pin = {
depth: 0
}
Expand All @@ -45,27 +53,47 @@ async function pinsToDatastore (blockstore, datastore, pinstore) {
}

await pinstore.put(cidToKey(cid), cbor.encode(pin))

onProgress((counter / pinCount) * 100, `Migrated direct pin ${cid}`)
}

await blockstore.delete(cidToKey(cid))
await datastore.delete(PIN_DS_KEY)
}

async function pinsToDAG (blockstore, datastore, pinstore) {
async function pinsToDAG (blockstore, datastore, pinstore, onProgress) {
let recursivePins = []
let directPins = []

let pinCount

if (onProgress) {
pinCount = await length(pinstore.query({ keysOnly: true }))
}

let counter = 0

for await (const { key, value } of pinstore.query({})) {
counter++
const pin = cbor.decode(value)
const cid = new CID(pin.version || 0, pin.codec && multicodec.getName(pin.codec) || 'dag-pb', multibase.decode('b' + key.toString().split('/').pop()))

if (pin.depth === 0) {
if (onProgress) {
onProgress((counter / pinCount) * 100, `Reverted direct pin ${cid}`)
}

directPins.push(cid)
} else {
if (onProgress) {
onProgress((counter / pinCount) * 100, `Reverted recursive pin ${cid}`)
}

recursivePins.push(cid)
}
}

onProgress(100, 'Updating pin root')
const pinRoot = new dagpb.DAGNode(new Uint8Array(), [
await pinset.storeSet(blockstore, PinTypes.recursive, recursivePins),
await pinset.storeSet(blockstore, PinTypes.direct, directPins)
Expand All @@ -79,17 +107,17 @@ async function pinsToDAG (blockstore, datastore, pinstore) {
await datastore.put(PIN_DS_KEY, cid.multihash)
}

async function process (repoPath, options, fn) {
const blockstore = await createStore(repoPath, 'blocks', options)
const datastore = await createStore(repoPath, 'datastore', options)
const pinstore = await createStore(repoPath, 'pins', options)
async function process (repoPath, repoOptions, onProgress, fn) {
const blockstore = await createStore(repoPath, 'blocks', repoOptions)
const datastore = await createStore(repoPath, 'datastore', repoOptions)
const pinstore = await createStore(repoPath, 'pins', repoOptions)

await blockstore.open()
await datastore.open()
await pinstore.open()

try {
await fn(blockstore, datastore, pinstore)
await fn(blockstore, datastore, pinstore, onProgress)
} finally {
await pinstore.close()
await datastore.close()
Expand All @@ -100,10 +128,10 @@ async function process (repoPath, options, fn) {
module.exports = {
version: 9,
description: 'Migrates pins to datastore',
migrate: (repoPath, options = {}) => {
return process(repoPath, options, pinsToDatastore)
migrate: (repoPath, repoOptions, onProgress) => {
return process(repoPath, repoOptions, onProgress, pinsToDatastore)
},
revert: (repoPath, options = {}) => {
return process(repoPath, options, pinsToDAG)
revert: (repoPath, repoOptions, onProgress) => {
return process(repoPath, repoOptions, onProgress, pinsToDAG)
}
}
26 changes: 0 additions & 26 deletions migrations/migration-9/utils.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
'use strict'

const core = require('datastore-core')
const ShardingStore = core.ShardingDatastore
const utils = require('../../src/utils')
const multibase = require('multibase')
const { Key } = require('interface-datastore')
const multihashes = require('multihashing-async').multihash
Expand All @@ -21,34 +18,11 @@ function cidToKey (cid) {
return new Key(`/${multibase.encoding('base32upper').encode(cid.multihash)}`)
}

// This function in js-ipfs-repo defaults to not using sharding
// but the default value of the options.sharding is true hence this
// function defaults to use sharding.
async function maybeWithSharding (filestore, options) {
if (options.sharding === false) {
return filestore
}

const shard = new core.shard.NextToLast(2)

return ShardingStore.createOrOpen(filestore, shard)
}

const createStore = async (location, name, options) => {
const { StorageBackend, storageOptions } = utils.getDatastoreAndOptions(options, name)

let store = new StorageBackend(`${location}/${name}`, storageOptions)
store = maybeWithSharding(store, storageOptions)

return store
}

module.exports = {
PIN_DS_KEY,
DEFAULT_FANOUT,
MAX_ITEMS,
EMPTY_KEY,
PinTypes,
createStore,
cidToKey
}
10 changes: 4 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@
"cbor": "^5.0.2",
"cids": "^1.0.0",
"datastore-core": "^2.0.0",
"datastore-fs": "^2.0.0",
"datastore-level": "^2.0.0",
"debug": "^4.1.0",
"fnv1a": "^1.0.1",
"interface-datastore": "^2.0.0",
"ipld-dag-pb": "^0.20.0",
"it-length": "0.0.2",
"multibase": "^3.0.0",
"multicodec": "^2.0.0",
"multihashing-async": "^2.0.0",
Expand All @@ -59,11 +58,10 @@
"varint": "^5.0.0"
},
"devDependencies": {
"aegir": "^25.0.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"aegir": "^26.0.0",
"datastore-car": "^1.2.0",
"dirty-chai": "^2.0.1",
"datastore-fs": "^2.0.1",
"datastore-level": "^2.0.0",
"it-all": "^1.0.2",
"just-safe-set": "^2.1.0",
"ncp": "^2.0.0",
Expand Down
Loading