Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 1b6f57e

Browse files
committed
feat: reprovider
1 parent 0287a6c commit 1b6f57e

File tree

10 files changed

+212
-16
lines changed

10 files changed

+212
-16
lines changed

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
"hapi-pino": "^6.0.0",
9090
"human-to-milliseconds": "^1.0.0",
9191
"interface-datastore": "~0.6.0",
92-
"ipfs-bitswap": "~0.24.1",
92+
"ipfs-bitswap": "ipfs/js-ipfs-bitswap#feat/use-ipfs-provider",
9393
"ipfs-block": "~0.8.1",
9494
"ipfs-block-service": "~0.15.1",
9595
"ipfs-http-client": "^32.0.0",
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
'use strict'
22

3-
const CID = require('cids')
4-
const base32 = require('base32.js')
53
const pull = require('pull-stream')
64
const pullDefer = require('pull-defer')
5+
const { blockKeyToCid } = require('../../utils')
76

87
module.exports = function (self) {
98
return () => {
@@ -14,21 +13,12 @@ module.exports = function (self) {
1413
return deferred.resolve(pull.error(err))
1514
}
1615

17-
const refs = blocks.map(b => dsKeyToRef(b.key))
16+
const refs = blocks.map(b => ({
17+
ref: blockKeyToCid(b.key).toString()
18+
}))
1819
deferred.resolve(pull.values(refs))
1920
})
2021

2122
return deferred
2223
}
2324
}
24-
25-
function dsKeyToRef (key) {
26-
try {
27-
// Block key is of the form /<base32 encoded string>
28-
const decoder = new base32.Decoder()
29-
const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize())
30-
return { ref: new CID(buff).toString() }
31-
} catch (err) {
32-
return { err: `Could not convert block with key '${key}' to CID: ${err.message}` }
33-
}
34-
}

src/core/components/start.js

+6
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
'use strict'
22

3+
const get = require('dlv')
34
const series = require('async/series')
45
const Bitswap = require('ipfs-bitswap')
56
const setImmediate = require('async/setImmediate')
67
const promisify = require('promisify-es6')
78

89
const IPNS = require('../ipns')
10+
const Provider = require('../provider')
911
const routingConfig = require('../ipns/routing/config')
1012
const createLibp2pBundle = require('./libp2p')
1113

@@ -45,6 +47,8 @@ module.exports = (self) => {
4547
libp2p.start(err => {
4648
if (err) return cb(err)
4749
self.libp2p = libp2p
50+
51+
self._provider = new Provider(libp2p, self._repo.blocks, get(config, 'Reprovider'))
4852
cb()
4953
})
5054
})
@@ -56,9 +60,11 @@ module.exports = (self) => {
5660
self._bitswap = new Bitswap(
5761
self.libp2p,
5862
self._repo.blocks,
63+
self._provider,
5964
{ statsEnabled: true }
6065
)
6166

67+
self._provider.start()
6268
self._bitswap.start()
6369
self._blockService.setExchange(self._bitswap)
6470

src/core/provider/index.js

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
'use strict'
2+
3+
const errCode = require('err-code')
4+
const promisify = require('promisify-es6')
5+
6+
const CID = require('cids')
7+
8+
const Reprovider = require('./reprovider')
9+
10+
class Provider {
11+
/**
12+
* Provider goal is to announce blocks to the network.
13+
* It keeps track of which blocks are provided, and allow them to be reprovided
14+
* @param {Libp2p} libp2p
15+
* @param {Blockstore} blockstore
16+
* @param {object} options
17+
* @memberof Provider
18+
*/
19+
constructor(libp2p, blockstore, options) {
20+
// the CIDs for which provide announcements should be made
21+
// this.tracker = [] // TODO queue
22+
this._running = false
23+
24+
this._contentRouting = libp2p.contentRouting
25+
this._blockstore = blockstore
26+
27+
// TODO Convert
28+
console.log('opts', options)
29+
30+
this.reprovider = new Reprovider(this._contentRouting, this._blockstore, options)
31+
}
32+
33+
/**
34+
* Begin processing the provider work
35+
*/
36+
start () {
37+
// do not run twice
38+
if (this._running) {
39+
return
40+
}
41+
42+
this._running = true
43+
44+
// Start reprovider
45+
this.reprovider.start()
46+
}
47+
48+
/**
49+
* Stop the provider
50+
*/
51+
stop () {
52+
this._running = true
53+
54+
// stop the reprovider
55+
return this.reprovider.stop()
56+
}
57+
58+
/**
59+
* Announce block to the network and add and entry to the tracker
60+
* Takes a cid and makes an attempt to announce it to the network
61+
*/
62+
async provide (cid) {
63+
if (!CID.isCID(key)) {
64+
throw errCode('invalid CID to provide', 'ERR_INVALID_CID')
65+
}
66+
67+
await promisify((callback) => {
68+
this._contentRouting.provide(cid, callback)
69+
})()
70+
}
71+
72+
async findProviders (cid, options) {
73+
if (!CID.isCID(key)) {
74+
throw errCode('invalid CID to find', 'ERR_INVALID_CID')
75+
}
76+
77+
return promisify((callback) => {
78+
this._contentRouting.findProviders(cid, options, callback)
79+
})()
80+
}
81+
}
82+
83+
exports = module.exports = Provider

src/core/provider/queue.js

Whitespace-only changes.

src/core/provider/reprovider.js

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
5+
const { blockKeyToCid } = require('../utils')
6+
7+
// TODO Provide queue?
8+
9+
// const initialDelay = 10000
10+
const initialDelay = 5000
11+
12+
class Reprovider {
13+
/**
14+
* Reprovider goal is t§o reannounce blocks to the network.
15+
* @param {*} contentRouting
16+
* @param {Blockstore} blockstore
17+
* @param {object} options
18+
* @memberof Reprovider
19+
*/
20+
constructor (contentRouting, blockstore, options) {
21+
this._contentRouting = contentRouting
22+
this._blockstore = blockstore // use query({})
23+
this._options = options
24+
25+
this._timeoutId = undefined
26+
}
27+
28+
/**
29+
* Begin processing the reprovider work and waiting for reprovide triggers
30+
* @returns {void}
31+
*/
32+
start () {
33+
// Start doing reprovides after the initial delay
34+
this._timeoutId = setTimeout(() => {
35+
// start runner immediately
36+
this._runPeriodically()
37+
}, initialDelay)
38+
}
39+
40+
/**
41+
* Stops the reprovider. Any active reprovide actions should be aborted
42+
*/
43+
stop() {
44+
if (this._timeoutId) {
45+
clearTimeout(this._timeoutId)
46+
this._timeoutId = undefined
47+
}
48+
}
49+
50+
/**
51+
* Run reprovide on every `options.interval` ms
52+
* @returns {void}
53+
*/
54+
async _runPeriodically () {
55+
while (this._timeoutId) {
56+
const blocks = await promisify((callback) => this._blockstore.query({}, callback))()
57+
58+
await this._reprovide(blocks)
59+
60+
await new Promise(resolve => {
61+
this._timeoutId = setTimeout(resolve, 20000)
62+
})
63+
64+
/*
65+
// Each subsequent walk should run on a `this._options.interval` interval
66+
await new Promise(resolve => {
67+
this._timeoutId = setTimeout(resolve, this._options.interval)
68+
}) */
69+
}
70+
}
71+
72+
/**
73+
* provide all keys to libp2p content routing
74+
*/
75+
async _reprovide (blocks) {
76+
for (let i = 0; i < blocks.length && this._timeoutId; i++) {
77+
const cid = blockKeyToCid(blocks[i].key.toBuffer())
78+
79+
console.log('cid')
80+
// TODO: needs the DHT
81+
// await promisify((callback) => {
82+
// this._contentRouting.provide(cid, callback)
83+
// })()
84+
}
85+
console.log('end')
86+
}
87+
88+
_dsKeyToCid (key) {
89+
90+
}
91+
}
92+
93+
exports = module.exports = Reprovider

src/core/runtime/config-browser.js

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ module.exports = () => ({
2626
'/dns4/node0.preload.ipfs.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic',
2727
'/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6'
2828
],
29+
Reprovider: {
30+
Interval: '12h',
31+
Strategy: 'all'
32+
},
2933
Swarm: {
3034
ConnMgr: {
3135
LowWater: 200,

src/core/runtime/config-nodejs.js

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ module.exports = () => ({
3939
'/dns4/node0.preload.ipfs.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic',
4040
'/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6'
4141
],
42+
Reprovider: {
43+
Interval: '12h',
44+
Strategy: 'all'
45+
},
4246
Swarm: {
4347
ConnMgr: {
4448
LowWater: 200,

src/core/utils.js

+16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const promisify = require('promisify-es6')
44
const map = require('async/map')
55
const isIpfs = require('is-ipfs')
66
const CID = require('cids')
7+
const base32 = require('base32.js')
78

89
const ERR_BAD_PATH = 'ERR_BAD_PATH'
910
exports.OFFLINE_ERROR = 'This command must be run in online mode. Try running \'ipfs daemon\' first.'
@@ -134,7 +135,22 @@ const resolvePath = promisify(function (objectAPI, ipfsPaths, callback) {
134135
}
135136
}, callback)
136137
})
138+
/**
139+
* Convert a block key to cid
140+
* @param {Key} key form /<base32 encoded string>
141+
* @returns {CID}
142+
*/
143+
function blockKeyToCid (key) {
144+
try {
145+
const decoder = new base32.Decoder()
146+
const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize())
147+
return new CID(buff)
148+
} catch (err) {
149+
return { err: `Could not convert block with key '${key}' to CID: ${err.message}` }
150+
}
151+
}
137152

138153
exports.normalizePath = normalizePath
139154
exports.parseIpfsPath = parseIpfsPath
140155
exports.resolvePath = resolvePath
156+
exports.blockKeyToCid = blockKeyToCid

test/core/kad-dht.node.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ function createNode (callback) {
3232
}, callback)
3333
}
3434

35-
describe('kad-dht is routing content and peers correctly', () => {
35+
describe.only('kad-dht is routing content and peers correctly', () => {
3636
let nodeA
3737
let nodeB
3838
let nodeC

0 commit comments

Comments
 (0)