11'use strict'
22
33const FSM = require ( 'fsm-event' )
4- const EventEmitter = require ( 'events' ) . EventEmitter
4+ const { EventEmitter } = require ( 'events' )
55const debug = require ( 'debug' )
66const log = debug ( 'libp2p' )
77log . error = debug ( 'libp2p:error' )
88const errCode = require ( 'err-code' )
99const promisify = require ( 'promisify-es6' )
1010
1111const each = require ( 'async/each' )
12- const nextTick = require ( 'async/nextTick' )
1312
14- const PeerBook = require ( 'peer-book' )
1513const PeerInfo = require ( 'peer-info' )
1614const multiaddr = require ( 'multiaddr' )
1715const Switch = require ( './switch' )
@@ -29,6 +27,8 @@ const { codes } = require('./errors')
2927const Dialer = require ( './dialer' )
3028const TransportManager = require ( './transport-manager' )
3129const Upgrader = require ( './upgrader' )
30+ const Registrar = require ( './registrar' )
31+ const PeerStore = require ( './peer-store' )
3232
3333const notStarted = ( action , state ) => {
3434 return errCode (
@@ -54,25 +54,31 @@ class Libp2p extends EventEmitter {
5454
5555 this . datastore = this . _options . datastore
5656 this . peerInfo = this . _options . peerInfo
57- this . peerBook = this . _options . peerBook || new PeerBook ( )
5857
5958 this . _modules = this . _options . modules
6059 this . _config = this . _options . config
6160 this . _transport = [ ] // Transport instances/references
6261 this . _discovery = [ ] // Discovery service instances/references
6362
63+ this . peerStore = new PeerStore ( )
64+
6465 // create the switch, and listen for errors
65- this . _switch = new Switch ( this . peerInfo , this . peerBook , this . _options . switch )
66+ this . _switch = new Switch ( this . peerInfo , this . peerStore , this . _options . switch )
6667
6768 // Setup the Upgrader
6869 this . upgrader = new Upgrader ( {
6970 localPeer : this . peerInfo . id ,
7071 onConnection : ( connection ) => {
7172 const peerInfo = getPeerInfo ( connection . remotePeer )
73+
74+ this . peerStore . put ( peerInfo )
75+ this . registrar . onConnect ( peerInfo , connection )
7276 this . emit ( 'peer:connect' , peerInfo )
7377 } ,
7478 onConnectionEnd : ( connection ) => {
7579 const peerInfo = getPeerInfo ( connection . remotePeer )
80+
81+ this . registrar . onDisconnect ( peerInfo )
7682 this . emit ( 'peer:disconnect' , peerInfo )
7783 }
7884 } )
@@ -106,6 +112,10 @@ class Libp2p extends EventEmitter {
106112 transportManager : this . transportManager
107113 } )
108114
115+ this . registrar = new Registrar ( this . peerStore )
116+ this . handle = this . handle . bind ( this )
117+ this . registrar . handle = this . handle
118+
109119 // Attach private network protector
110120 if ( this . _modules . connProtector ) {
111121 this . _switch . protector = this . _modules . connProtector
@@ -124,7 +134,7 @@ class Libp2p extends EventEmitter {
124134 }
125135
126136 // start pubsub
127- if ( this . _modules . pubsub && this . _config . pubsub . enabled !== false ) {
137+ if ( this . _modules . pubsub ) {
128138 this . pubsub = pubsub ( this , this . _modules . pubsub , this . _config . pubsub )
129139 }
130140
@@ -179,7 +189,7 @@ class Libp2p extends EventEmitter {
179189
180190 // Once we start, emit and dial any peers we may have already discovered
181191 this . state . on ( 'STARTED' , ( ) => {
182- this . peerBook . getAllArray ( ) . forEach ( ( peerInfo ) => {
192+ this . peerStore . getAllArray ( ) . forEach ( ( peerInfo ) => {
183193 this . emit ( 'peer:discovery' , peerInfo )
184194 this . _maybeConnect ( peerInfo )
185195 } )
@@ -228,6 +238,7 @@ class Libp2p extends EventEmitter {
228238 this . state ( 'stop' )
229239
230240 try {
241+ this . pubsub && await this . pubsub . stop ( )
231242 await this . transportManager . close ( )
232243 await this . _switch . stop ( )
233244 } catch ( err ) {
@@ -245,7 +256,7 @@ class Libp2p extends EventEmitter {
245256
246257 /**
247258 * Dials to the provided peer. If successful, the `PeerInfo` of the
248- * peer will be added to the nodes `PeerBook `
259+ * peer will be added to the nodes `PeerStore `
249260 *
250261 * @param {PeerInfo|PeerId|Multiaddr|string } peer The peer to dial
251262 * @param {object } options
@@ -258,7 +269,7 @@ class Libp2p extends EventEmitter {
258269
259270 /**
260271 * Dials to the provided peer and handshakes with the given protocol.
261- * If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook `,
272+ * If successful, the `PeerInfo` of the peer will be added to the nodes `PeerStore `,
262273 * and the `Connection` will be sent in the callback
263274 *
264275 * @async
@@ -279,7 +290,13 @@ class Libp2p extends EventEmitter {
279290
280291 // If a protocol was provided, create a new stream
281292 if ( protocols ) {
282- return connection . newStream ( protocols )
293+ const stream = await connection . newStream ( protocols )
294+ const peerInfo = getPeerInfo ( connection . remotePeer )
295+
296+ peerInfo . protocols . add ( stream . protocol )
297+ this . peerStore . put ( peerInfo )
298+
299+ return stream
283300 }
284301
285302 return connection
@@ -350,10 +367,16 @@ class Libp2p extends EventEmitter {
350367 const multiaddrs = this . peerInfo . multiaddrs . toArray ( )
351368
352369 // Start parallel tasks
370+ const tasks = [
371+ this . transportManager . listen ( multiaddrs )
372+ ]
373+
374+ if ( this . _config . pubsub . enabled ) {
375+ this . pubsub && this . pubsub . start ( )
376+ }
377+
353378 try {
354- await Promise . all ( [
355- this . transportManager . listen ( multiaddrs )
356- ] )
379+ await Promise . all ( tasks )
357380 } catch ( err ) {
358381 log . error ( err )
359382 this . emit ( 'error' , err )
@@ -369,12 +392,6 @@ class Libp2p extends EventEmitter {
369392 * the `peer:discovery` event. If auto dial is enabled for libp2p
370393 * and the current connection count is under the low watermark, the
371394 * peer will be dialed.
372- *
373- * TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
374- * it would be ideal if only new peers were emitted. Currently, with
375- * other modules adding peers to the `PeerBook` we have no way of knowing
376- * if a peer is new or not, so it has to be emitted.
377- *
378395 * @private
379396 * @param {PeerInfo } peerInfo
380397 */
@@ -383,7 +400,7 @@ class Libp2p extends EventEmitter {
383400 log . error ( new Error ( codes . ERR_DISCOVERED_SELF ) )
384401 return
385402 }
386- peerInfo = this . peerBook . put ( peerInfo )
403+ peerInfo = this . peerStore . put ( peerInfo )
387404
388405 if ( ! this . isStarted ( ) ) return
389406
@@ -454,16 +471,15 @@ module.exports = Libp2p
454471 * Like `new Libp2p(options)` except it will create a `PeerInfo`
455472 * instance if one is not provided in options.
456473 * @param {object } options Libp2p configuration options
457- * @param {function(Error, Libp2p) } callback
458- * @returns {void }
474+ * @returns {Libp2p }
459475 */
460- module . exports . createLibp2p = promisify ( ( options , callback ) => {
476+ module . exports . create = async ( options = { } ) => {
461477 if ( options . peerInfo ) {
462- return nextTick ( callback , null , new Libp2p ( options ) )
478+ return new Libp2p ( options )
463479 }
464- PeerInfo . create ( ( err , peerInfo ) => {
465- if ( err ) return callback ( err )
466- options . peerInfo = peerInfo
467- callback ( null , new Libp2p ( options ) )
468- } )
469- } )
480+
481+ const peerInfo = await PeerInfo . create ( )
482+
483+ options . peerInfo = peerInfo
484+ return new Libp2p ( options )
485+ }
0 commit comments