Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit d804bf0

Browse files
committed
feat: reprovider
1 parent 103e359 commit d804bf0

File tree

11 files changed

+565
-15
lines changed

11 files changed

+565
-15
lines changed

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
"hapi-pino": "^6.0.0",
9090
"human-to-milliseconds": "^1.0.0",
9191
"interface-datastore": "~0.6.0",
92-
"ipfs-bitswap": "~0.24.1",
92+
"ipfs-bitswap": "ipfs/js-ipfs-bitswap#feat/use-ipfs-provider",
9393
"ipfs-block": "~0.8.1",
9494
"ipfs-block-service": "~0.15.1",
9595
"ipfs-http-client": "^32.0.0",
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
'use strict'
22

3-
const CID = require('cids')
4-
const base32 = require('base32.js')
53
const pull = require('pull-stream')
64
const pullDefer = require('pull-defer')
5+
const { blockKeyToCid } = require('../../utils')
76

87
module.exports = function (self) {
98
return () => {
@@ -14,21 +13,12 @@ module.exports = function (self) {
1413
return deferred.resolve(pull.error(err))
1514
}
1615

17-
const refs = blocks.map(b => dsKeyToRef(b.key))
16+
const refs = blocks.map(b => ({
17+
ref: blockKeyToCid(b.key).toString()
18+
}))
1819
deferred.resolve(pull.values(refs))
1920
})
2021

2122
return deferred
2223
}
2324
}
24-
25-
function dsKeyToRef (key) {
26-
try {
27-
// Block key is of the form /<base32 encoded string>
28-
const decoder = new base32.Decoder()
29-
const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize())
30-
return { ref: new CID(buff).toString() }
31-
} catch (err) {
32-
return { err: `Could not convert block with key '${key}' to CID: ${err.message}` }
33-
}
34-
}

src/core/components/start.js

+10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
'use strict'
22

3+
const get = require('dlv')
34
const series = require('async/series')
45
const Bitswap = require('ipfs-bitswap')
56
const setImmediate = require('async/setImmediate')
67
const promisify = require('promisify-es6')
78

89
const IPNS = require('../ipns')
10+
const Provider = require('../provider')
911
const routingConfig = require('../ipns/routing/config')
1012
const createLibp2pBundle = require('./libp2p')
1113

@@ -45,6 +47,8 @@ module.exports = (self) => {
4547
libp2p.start(err => {
4648
if (err) return cb(err)
4749
self.libp2p = libp2p
50+
51+
self._provider = new Provider(libp2p, self._repo.blocks, get(config, 'Reprovider'))
4852
cb()
4953
})
5054
})
@@ -56,9 +60,15 @@ module.exports = (self) => {
5660
self._bitswap = new Bitswap(
5761
self.libp2p,
5862
self._repo.blocks,
63+
self._provider,
5964
{ statsEnabled: true }
6065
)
6166

67+
if (!get(self._options, 'offline') &&
68+
(get(self._options, 'libp2p.config.dht.enabled', false) || get(self._options, 'libp2p.modules.contentRouting', false))) {
69+
self._provider.start()
70+
}
71+
6272
self._bitswap.start()
6373
self._blockService.setExchange(self._bitswap)
6474

src/core/config.js

+4
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ const configSchema = s({
6666
Enabled: 'boolean?'
6767
}))
6868
})),
69+
Reprovider: optional(s({
70+
Interval: 'string?',
71+
Strategy: 'string?'
72+
})),
6973
Bootstrap: optional(s(['multiaddr-ipfs']))
7074
})),
7175
ipld: 'object?',

src/core/provider/index.js

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
'use strict'
2+
3+
const errCode = require('err-code')
4+
const human = require('human-to-milliseconds')
5+
const promisify = require('promisify-es6')
6+
7+
const CID = require('cids')
8+
9+
const Reprovider = require('./reprovider')
10+
11+
class Provider {
12+
/**
13+
* Provider goal is to announce blocks to the network.
14+
* It keeps track of which blocks are provided, and allow them to be reprovided
15+
* @param {Libp2p} libp2p
16+
* @param {Blockstore} blockstore
17+
* @param {object} options
18+
* @memberof Provider
19+
*/
20+
constructor (libp2p, blockstore, options) {
21+
this._running = false
22+
23+
this._contentRouting = libp2p.contentRouting
24+
this._blockstore = blockstore
25+
this._options = options
26+
this.reprovider = undefined
27+
}
28+
29+
/**
30+
* Begin processing the provider work
31+
* @returns {void}
32+
*/
33+
async start () {
34+
// do not run twice
35+
if (this._running) {
36+
return
37+
}
38+
39+
this._running = true
40+
41+
// handle options
42+
const strategy = this._options.strategy || 'all'
43+
const humanInterval = this._options.Interval || '12h'
44+
const interval = await promisify((callback) => human(humanInterval, callback))()
45+
const options = {
46+
interval,
47+
strategy
48+
}
49+
50+
this.reprovider = new Reprovider(this._contentRouting, this._blockstore, options)
51+
52+
// Start reprovider
53+
this.reprovider.start()
54+
}
55+
56+
/**
57+
* Stop the provider
58+
* @returns {void}
59+
*/
60+
stop () {
61+
this._running = true
62+
63+
// stop the reprovider
64+
this.reprovider.stop()
65+
}
66+
67+
/**
68+
* Announce block to the network and add and entry to the tracker
69+
* Takes a cid and makes an attempt to announce it to the network
70+
* @param {CID} cid
71+
*/
72+
async provide (cid) {
73+
if (!CID.isCID(cid)) {
74+
throw errCode('invalid CID to provide', 'ERR_INVALID_CID')
75+
}
76+
77+
await promisify((callback) => {
78+
this._contentRouting.provide(cid, callback)
79+
})()
80+
}
81+
82+
async findProviders (cid, options) { // eslint-disable-line require-await
83+
if (!CID.isCID(cid)) {
84+
throw errCode('invalid CID to find', 'ERR_INVALID_CID')
85+
}
86+
87+
return promisify((callback) => {
88+
this._contentRouting.findProviders(cid, options, callback)
89+
})()
90+
}
91+
}
92+
93+
exports = module.exports = Provider

src/core/provider/queue.js

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
'use strict'
2+
3+
const queue = require('async/queue')
4+
5+
const debug = require('debug')
6+
const log = debug('ipfs:provider')
7+
log.error = debug('ipfs:provider:error')
8+
9+
class WorkerQueue {
10+
/**
11+
* Creates an instance of a WorkerQueue.
12+
* @param {function} executeWork
13+
* @param {number} [concurrency=3]
14+
*/
15+
constructor (executeWork, concurrency = 3) {
16+
this.executeWork = executeWork
17+
this._concurrency = concurrency
18+
19+
this.running = false
20+
this.queue = this._setupQueue()
21+
}
22+
23+
/**
24+
* Create the underlying async queue.
25+
* @returns {queue}
26+
*/
27+
_setupQueue () {
28+
const q = queue(async (block) => {
29+
await this._processNext(block)
30+
}, this._concurrency)
31+
32+
// If there is an error, stop the worker
33+
q.error = (err) => {
34+
log.error(err)
35+
this.stop(err)
36+
}
37+
38+
q.buffer = 0
39+
40+
return q
41+
}
42+
43+
/**
44+
* Use the queue from async to keep `concurrency` amount items running
45+
* @param {Block[]} blocks
46+
* @returns {Promise}
47+
*/
48+
async execute (blocks) {
49+
this.running = true
50+
51+
// store the promise resolution functions to be resolved at end of queue
52+
this.execution = {}
53+
const execPromise = new Promise((resolve, reject) => Object.assign(this.execution, { resolve, reject }))
54+
55+
// When all blocks have been processed, stop the worker
56+
this.queue.drain = () => {
57+
log('queue:drain')
58+
this.stop()
59+
}
60+
61+
// Fill queue with blocks
62+
this.queue.push(blocks)
63+
64+
await execPromise
65+
}
66+
67+
/**
68+
* Stop the worker, optionally an error is thrown if received
69+
*
70+
* @param {object} error
71+
*/
72+
stop (error) {
73+
if (!this.running) {
74+
return
75+
}
76+
77+
this.running = false
78+
this.queue.kill()
79+
80+
if (error) {
81+
this.execution && this.execution.reject(error)
82+
} else {
83+
this.execution && this.execution.resolve()
84+
}
85+
}
86+
87+
/**
88+
* Process the next block in the queue.
89+
* @param {Block} block
90+
*/
91+
async _processNext (block) {
92+
if (!this.running) {
93+
return
94+
}
95+
96+
// Execute work
97+
log('queue:work')
98+
99+
let execErr
100+
try {
101+
await this.executeWork(block)
102+
} catch (err) {
103+
execErr = err
104+
}
105+
106+
log('queue:work:done', execErr)
107+
}
108+
}
109+
110+
exports = module.exports = WorkerQueue

src/core/provider/reprovider.js

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
const WorkerQueue = require('./queue')
5+
6+
const { blockKeyToCid } = require('../utils')
7+
8+
// const initialDelay = 15000
9+
const initialDelay = 3000
10+
11+
class Reprovider {
12+
/**
13+
* Reprovider goal is to reannounce blocks to the network.
14+
* @param {object} contentRouting
15+
* @param {Blockstore} blockstore
16+
* @param {object} options
17+
* @memberof Reprovider
18+
*/
19+
constructor (contentRouting, blockstore, options) {
20+
this._contentRouting = contentRouting
21+
this._blockstore = blockstore
22+
this._options = options
23+
24+
this._timeoutId = undefined
25+
this._worker = new WorkerQueue(this._provideBlock)
26+
}
27+
28+
/**
29+
* Begin processing the reprovider work and waiting for reprovide triggers
30+
* @returns {void}
31+
*/
32+
start () {
33+
// Start doing reprovides after the initial delay
34+
this._timeoutId = setTimeout(() => {
35+
this._runPeriodically()
36+
}, initialDelay)
37+
}
38+
39+
/**
40+
* Stops the reprovider. Any active reprovide actions should be aborted
41+
* @returns {void}
42+
*/
43+
stop () {
44+
if (this._timeoutId) {
45+
clearTimeout(this._timeoutId)
46+
this._timeoutId = undefined
47+
}
48+
this._worker.stop()
49+
}
50+
51+
/**
52+
* Run reprovide on every `options.interval` ms
53+
* @returns {void}
54+
*/
55+
async _runPeriodically () {
56+
while (this._timeoutId) {
57+
const blocks = await promisify((callback) => this._blockstore.query({}, callback))()
58+
59+
// TODO strategy logic here
60+
if (this._options.strategy === 'pinned') {
61+
62+
} else if (this._options.strategy === 'pinned') {
63+
64+
}
65+
66+
await this._worker.execute(blocks)
67+
68+
// Each subsequent walk should run on a `this._options.interval` interval
69+
await new Promise(resolve => {
70+
this._timeoutId = setTimeout(resolve, this._options.interval)
71+
})
72+
}
73+
}
74+
75+
/**
76+
* Do the reprovide work to libp2p content routing
77+
* @param {Block} block
78+
* @returns {void}
79+
*/
80+
async _provideBlock (block) {
81+
const cid = blockKeyToCid(block.key.toBuffer())
82+
83+
await promisify((callback) => {
84+
this._contentRouting.provide(cid, callback)
85+
})()
86+
}
87+
}
88+
89+
exports = module.exports = Reprovider

0 commit comments

Comments
 (0)