Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

feat: reprovider #2184

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
"hashlru": "^2.3.0",
"human-to-milliseconds": "^2.0.0",
"interface-datastore": "~0.6.0",
"ipfs-bitswap": "~0.24.1",
"ipfs-bitswap": "ipfs/js-ipfs-bitswap#feat/use-ipfs-provider",
"ipfs-block": "~0.8.1",
"ipfs-block-service": "~0.15.1",
"ipfs-http-client": "^32.0.1",
Expand Down Expand Up @@ -146,6 +146,7 @@
"multihashes": "~0.4.14",
"multihashing-async": "~0.6.0",
"node-fetch": "^2.3.0",
"p-queue": "^6.0.2",
"peer-book": "~0.9.0",
"peer-id": "~0.12.0",
"peer-info": "~0.15.0",
Expand Down
18 changes: 4 additions & 14 deletions src/core/components/files-regular/refs-local-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
'use strict'

const CID = require('cids')
const base32 = require('base32.js')
const pull = require('pull-stream')
const pullDefer = require('pull-defer')
const { blockKeyToCid } = require('../../utils')

module.exports = function (self) {
return () => {
Expand All @@ -14,21 +13,12 @@ module.exports = function (self) {
return deferred.resolve(pull.error(err))
}

const refs = blocks.map(b => dsKeyToRef(b.key))
const refs = blocks.map(b => ({
ref: blockKeyToCid(b.key).toString()
}))
deferred.resolve(pull.values(refs))
})

return deferred
}
}

function dsKeyToRef (key) {
try {
// Block key is of the form /<base32 encoded string>
const decoder = new base32.Decoder()
const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize())
return { ref: new CID(buff).toString() }
} catch (err) {
return { err: `Could not convert block with key '${key}' to CID: ${err.message}` }
}
}
18 changes: 18 additions & 0 deletions src/core/components/start.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
'use strict'

const get = require('dlv')
const callbackify = require('callbackify')
const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const setImmediate = require('async/setImmediate')
const promisify = require('promisify-es6')

const IPNS = require('../ipns')
const Provider = require('../provider')
const routingConfig = require('../ipns/routing/config')
const createLibp2pBundle = require('./libp2p')

Expand Down Expand Up @@ -45,17 +48,32 @@ module.exports = (self) => {
libp2p.start(err => {
if (err) return cb(err)
self.libp2p = libp2p

// create provider
self._provider = new Provider(libp2p, self._repo.blocks, get(config, 'Reprovider'))
cb()
})
})
},
(cb) => {
// start provider if libp2p routing enabled
if (!get(self._options, 'offline') &&
(get(self._options, 'libp2p.config.dht.enabled', false) || get(self._options, 'libp2p.modules.contentRouting', false))) {
const providerStart = callbackify(() => self._provider.start())

providerStart(cb)
} else {
cb()
}
},
(cb) => {
const ipnsRouting = routingConfig(self)
self._ipns = new IPNS(ipnsRouting, self._repo.datastore, self._peerInfo, self._keychain, self._options)

self._bitswap = new Bitswap(
self.libp2p,
self._repo.blocks,
self._provider,
{ statsEnabled: true }
)

Expand Down
5 changes: 5 additions & 0 deletions src/core/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ const configSchema = s({
Enabled: 'boolean?'
}))
})),
Reprovider: optional(s({
Delay: 'string?',
Interval: 'string?',
Strategy: 'string?'
})),
Bootstrap: optional(s(['multiaddr-ipfs']))
})),
ipld: 'object?',
Expand Down
123 changes: 123 additions & 0 deletions src/core/provider/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
'use strict'

const errCode = require('err-code')
const human = require('human-to-milliseconds')
const promisify = require('promisify-es6')
const assert = require('assert')

const CID = require('cids')

const Reprovider = require('./reprovider')

class Provider {
/**
* Provider goal is to announce blocks to the network.
* It keeps track of which blocks are provided, and allow them to be reprovided
* @param {Libp2p} libp2p libp2p instance
* @param {Blockstore} blockstore blockstore instance
* @param {object} options reprovider options
* @param {string} options.delay reprovider initial delay in human friendly time
* @param {string} options.interval reprovider interval in human friendly time
* @param {string} options.strategy reprovider strategy
*/
constructor (libp2p, blockstore, options = {}) {
// Assert options
this._validateOptions(options)

this._running = false

this._contentRouting = libp2p.contentRouting
this._blockstore = blockstore

// handle options (config uses uppercase)
const humanDelay = options.Delay || options.delay || '15s'
const delay = human(humanDelay)
const humanInterval = options.Interval || options.interval || '12h'
const interval = human(humanInterval)
const strategy = options.Strategy || options.strategy || 'all'

this._options = {
delay,
interval,
strategy
}

this.reprovider = new Reprovider(this._contentRouting, this._blockstore, this._options)

}

/**
* Begin processing the provider work
* @returns {void}
*/
async start () {
// do not run twice
if (this._running) {
return
}

this._running = true

// Start reprovider
this.reprovider.start()
}

/**
* Stop the provider
* @returns {void}
*/
stop () {
this._running = false

// stop the reprovider
this.reprovider.stop()
}

/**
* Announce block to the network
* Takes a cid and makes an attempt to announce it to the network
* @param {CID} cid
*/
async provide (cid) {
if (!CID.isCID(cid)) {
throw errCode('invalid CID to provide', 'ERR_INVALID_CID')
}

await promisify((callback) => {
this._contentRouting.provide(cid, callback)
})()
}

/**
* Find providers of a block in the network
* @param {CID} cid cid of the block
* @param {object} options
* @param {number} options.timeout - how long the query should maximally run, in ms (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {Promise}
*/
async findProviders (cid, options) { // eslint-disable-line require-await
if (!CID.isCID(cid)) {
throw errCode('invalid CID to find', 'ERR_INVALID_CID')
}

return promisify((callback) => {
this._contentRouting.findProviders(cid, options, callback)
})()
}

// Validate Provider options
_validateOptions (options) {
const delay = (options.Delay || options.delay)
assert(delay && parseInt(delay) !== 0, '0 delay is not a valid value for reprovider')

const interval = (options.Interval || options.interval)
assert(interval && parseInt(interval) !== 0, '0 interval is not a valid value for reprovider')

const strategy = (options.Strategy || options.strategy)
assert(strategy && (strategy === 'all' || strategy === 'pinned' || strategy === 'roots'),
'Reprovider must have one of the following strategies: `all`, `pinned` or `roots`')
}
}

exports = module.exports = Provider
74 changes: 74 additions & 0 deletions src/core/provider/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'use strict'

const { default: PQueue } = require('p-queue')

const debug = require('debug')
const log = debug('ipfs:provider')
log.error = debug('ipfs:provider:error')

class WorkerQueue {
/**
* Creates an instance of a WorkerQueue.
* @param {function} executeWork
* @param {number} [concurrency=3]
*/
constructor (executeWork, concurrency = 3) {
this.executeWork = executeWork
this._concurrency = concurrency

this.running = false
this.queue = new PQueue({ concurrency })
}

/**
* Use the queue from async to keep `concurrency` amount items running
* @param {Block[]} blocks
*/
async execute (blocks) {
this.running = true

// Fill queue with the processing blocks function
this.queue.addAll(blocks.map((block) => async () => this._processNext(block))) // eslint-disable-line require-await

// Wait for finishing
await this.queue.onIdle()

this.stop()
}

/**
* Stop the worker
*/
stop () {
if (!this.running) {
return
}

this.running = false
this.queue.clear()
}

/**
* Process the next block in the queue.
* @param {Block} block
*/
async _processNext (block) {
if (!this.running) {
return
}

// Execute work
log('queue:work')

let execErr
try {
await this.executeWork(block)
} catch (err) {
execErr = err
}

log('queue:work:done', execErr)
}
}

exports = module.exports = WorkerQueue
Loading