Skip to content

Commit 7c3f7e0

Browse files
authored
feat: discover and connect to closest peers (#798)
1 parent c6fd23a commit 7c3f7e0

File tree

7 files changed

+401
-27
lines changed

7 files changed

+401
-27
lines changed

doc/API.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [`contentRouting.get`](#contentroutingget)
2121
* [`contentRouting.getMany`](#contentroutinggetmany)
2222
* [`peerRouting.findPeer`](#peerroutingfindpeer)
23+
* [`peerRouting.getClosestPeers`](#peerroutinggetclosestpeers)
2324
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
2425
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
2526
* [`peerStore.addressBook.get`](#peerstoreaddressbookget)
@@ -99,6 +100,7 @@ Creates an instance of Libp2p.
99100
| [options.keychain] | [`object`](./CONFIGURATION.md#setup-with-keychain) | keychain [configuration](./CONFIGURATION.md#setup-with-keychain) |
100101
| [options.metrics] | [`object`](./CONFIGURATION.md#configuring-metrics) | libp2p Metrics [configuration](./CONFIGURATION.md#configuring-metrics) |
101102
| [options.peerId] | [`PeerId`][peer-id] | peerId instance (it will be created if not provided) |
103+
| [options.peerRouting] | [`object`](./CONFIGURATION.md#setup-with-content-and-peer-routing) | libp2p Peer routing service [configuration](./CONFIGURATION.md#setup-with-content-and-peer-routing) |
102104
| [options.peerStore] | [`object`](./CONFIGURATION.md#configuring-peerstore) | libp2p PeerStore [configuration](./CONFIGURATION.md#configuring-peerstore) |
103105

104106
For Libp2p configurations and modules details read the [Configuration Document](./CONFIGURATION.md).
@@ -675,6 +677,36 @@ Iterates over all peer routers in series to find the given peer. If the DHT is e
675677
const peer = await libp2p.peerRouting.findPeer(peerId, options)
676678
```
677679

680+
### peerRouting.getClosestPeers
681+
682+
Iterates over all content routers in series to get the closest peers of the given key.
683+
Once a content router succeeds, the iteration will stop. If the DHT is enabled, it will be queried first.
684+
685+
`libp2p.peerRouting.getClosestPeers(cid, options)`
686+
687+
#### Parameters
688+
689+
| Name | Type | Description |
690+
|------|------|-------------|
691+
| key | `Uint8Array` | A CID like key |
692+
| options | `object` | operation options |
693+
| options.timeout | `number` | How long the query can take (ms). |
694+
695+
#### Returns
696+
697+
| Type | Description |
698+
|------|-------------|
699+
| `AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }` | Async iterator for peer data |
700+
701+
#### Example
702+
703+
```js
704+
// Iterate over the closest peers found for the given key
705+
for await (const peer of libp2p.peerRouting.getClosestPeers(key)) {
706+
console.log(peer.id, peer.multiaddrs)
707+
}
708+
```
709+
678710
### peerStore.addressBook.add
679711

680712
Adds known `multiaddrs` of a given peer. If the peer is not known, it will be set with the provided multiaddrs.

doc/CONFIGURATION.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,14 @@ const node = await Libp2p.create({
397397
new DelegatedPeerRouter()
398398
],
399399
},
400-
peerId
400+
peerId,
401+
peerRouting: { // Peer routing configuration
402+
refreshManager: { // Refresh known and connected closest peers
403+
enabled: true, // Should find the closest peers.
404+
interval: 6e5, // Interval for getting the new for closest peers of 10min
405+
bootDelay: 10e3 // Delay for the initial query for closest peers
406+
}
407+
}
401408
})
402409
```
403410

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
"protons": "^2.0.0",
8080
"retimer": "^2.0.0",
8181
"sanitize-filename": "^1.6.3",
82+
"set-delayed-interval": "^1.0.0",
8283
"streaming-iterables": "^5.0.2",
8384
"timeout-abort-controller": "^1.1.1",
8485
"varint": "^5.0.0",
@@ -92,6 +93,7 @@
9293
"chai-string": "^1.5.0",
9394
"delay": "^4.3.0",
9495
"interop-libp2p": "^0.3.0",
96+
"into-stream": "^6.0.0",
9597
"ipfs-http-client": "^47.0.1",
9698
"it-concat": "^1.0.0",
9799
"it-pair": "^1.0.0",

src/config.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ const DefaultConfig = {
3737
persistence: false,
3838
threshold: 5
3939
},
40+
peerRouting: {
41+
refreshManager: {
42+
enabled: true,
43+
interval: 6e5,
44+
bootDelay: 10e3
45+
}
46+
},
4047
config: {
4148
dht: {
4249
enabled: false,

src/index.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ log.error = debug('libp2p:error')
99
const errCode = require('err-code')
1010
const PeerId = require('peer-id')
1111

12-
const peerRouting = require('./peer-routing')
12+
const PeerRouting = require('./peer-routing')
1313
const contentRouting = require('./content-routing')
1414
const getPeer = require('./get-peer')
1515
const { validate: validateConfig } = require('./config')
@@ -193,7 +193,7 @@ class Libp2p extends EventEmitter {
193193

194194
// Attach remaining APIs
195195
// peer and content routing will automatically get modules from _modules and _dht
196-
this.peerRouting = peerRouting(this)
196+
this.peerRouting = new PeerRouting(this)
197197
this.contentRouting = contentRouting(this)
198198

199199
// Mount default protocols
@@ -250,8 +250,8 @@ class Libp2p extends EventEmitter {
250250
try {
251251
this._isStarted = false
252252

253-
// Relay
254253
this.relay && this.relay.stop()
254+
this.peerRouting.stop()
255255

256256
for (const service of this._discovery.values()) {
257257
service.removeListener('peer', this._onDiscoveryPeer)
@@ -496,6 +496,8 @@ class Libp2p extends EventEmitter {
496496

497497
// Relay
498498
this.relay && this.relay.start()
499+
500+
this.peerRouting.start()
499501
}
500502

501503
/**

src/peer-routing.js

Lines changed: 109 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,126 @@
11
'use strict'
22

33
const errCode = require('err-code')
4+
const debug = require('debug')
5+
const log = debug('libp2p:peer-routing')
6+
log.error = debug('libp2p:peer-routing:error')
7+
8+
const all = require('it-all')
49
const pAny = require('p-any')
10+
const {
11+
setDelayedInterval,
12+
clearDelayedInterval
13+
} = require('set-delayed-interval')
14+
15+
/**
16+
* Responsible for managing the usage of the available Peer Routing modules.
17+
*/
18+
class PeerRouting {
19+
/**
20+
* @class
21+
* @param {Libp2p} libp2p
22+
*/
23+
constructor (libp2p) {
24+
this._peerId = libp2p.peerId
25+
this._peerStore = libp2p.peerStore
26+
this._routers = libp2p._modules.peerRouting || []
27+
28+
// If we have the dht, make it first
29+
if (libp2p._dht) {
30+
this._routers.unshift(libp2p._dht)
31+
}
32+
33+
this._refreshManagerOptions = libp2p._options.peerRouting.refreshManager
34+
35+
this._findClosestPeersTask = this._findClosestPeersTask.bind(this)
36+
}
37+
38+
/**
39+
* Start peer routing service.
40+
*/
41+
start () {
42+
if (!this._routers.length || this._timeoutId || !this._refreshManagerOptions.enabled) {
43+
return
44+
}
45+
46+
this._timeoutId = setDelayedInterval(
47+
this._findClosestPeersTask, this._refreshManagerOptions.interval, this._refreshManagerOptions.bootDelay
48+
)
49+
}
550

6-
module.exports = (node) => {
7-
const routers = node._modules.peerRouting || []
51+
/**
52+
* Recurrent task to find closest peers and add their addresses to the Address Book.
53+
*/
54+
async _findClosestPeersTask () {
55+
try {
56+
for await (const { id, multiaddrs } of this.getClosestPeers(this._peerId.id)) {
57+
this._peerStore.addressBook.add(id, multiaddrs)
58+
}
59+
} catch (err) {
60+
log.error(err)
61+
}
62+
}
863

9-
// If we have the dht, make it first
10-
if (node._dht) {
11-
routers.unshift(node._dht)
64+
/**
65+
* Stop peer routing service.
66+
*/
67+
stop () {
68+
clearDelayedInterval(this._timeoutId)
1269
}
1370

14-
return {
15-
/**
16-
* Iterates over all peer routers in series to find the given peer.
17-
*
18-
* @param {string} id - The id of the peer to find
19-
* @param {object} [options]
20-
* @param {number} [options.timeout] - How long the query should run
21-
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
22-
*/
23-
findPeer: async (id, options) => { // eslint-disable-line require-await
24-
if (!routers.length) {
25-
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
71+
/**
72+
* Iterates over all peer routers in series to find the given peer.
73+
*
74+
* @param {string} id - The id of the peer to find
75+
* @param {object} [options]
76+
* @param {number} [options.timeout] - How long the query should run
77+
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
78+
*/
79+
async findPeer (id, options) { // eslint-disable-line require-await
80+
if (!this._routers.length) {
81+
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
82+
}
83+
84+
return pAny(this._routers.map(async (router) => {
85+
const result = await router.findPeer(id, options)
86+
87+
// If we don't have a result, we need to provide an error to keep trying
88+
if (!result || Object.keys(result).length === 0) {
89+
throw errCode(new Error('not found'), 'NOT_FOUND')
2690
}
2791

28-
return pAny(routers.map(async (router) => {
29-
const result = await router.findPeer(id, options)
92+
return result
93+
}))
94+
}
95+
96+
/**
97+
* Attempt to find the closest peers on the network to the given key.
98+
*
99+
* @param {Uint8Array} key - A CID like key
100+
* @param {Object} [options]
101+
* @param {number} [options.timeout=30e3] - How long the query can take.
102+
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
103+
*/
104+
async * getClosestPeers (key, options = { timeout: 30e3 }) {
105+
if (!this._routers.length) {
106+
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
107+
}
30108

31-
// If we don't have a result, we need to provide an error to keep trying
32-
if (!result || Object.keys(result).length === 0) {
109+
const result = await pAny(
110+
this._routers.map(async (router) => {
111+
const peers = await all(router.getClosestPeers(key, options))
112+
113+
if (!peers || !peers.length) {
33114
throw errCode(new Error('not found'), 'NOT_FOUND')
34115
}
116+
return peers
117+
})
118+
)
35119

36-
return result
37-
}))
120+
for (const peer of result) {
121+
yield peer
38122
}
39123
}
40124
}
125+
126+
module.exports = PeerRouting

0 commit comments

Comments
 (0)