diff --git a/package.json b/package.json index 39f0b88..8e743ae 100644 --- a/package.json +++ b/package.json @@ -68,10 +68,10 @@ "fsm-event": "^2.1.0", "hashlru": "^2.3.0", "interface-connection": "~0.3.3", - "libp2p-circuit": "~0.3.4", - "libp2p-identify": "~0.7.5", + "libp2p-circuit": "~0.3.5", + "libp2p-identify": "~0.7.6", "moving-average": "^1.0.0", - "multiaddr": "^6.0.4", + "multiaddr": "^6.0.6", "multistream-select": "~0.14.4", "once": "^1.4.0", "peer-id": "~0.12.2", diff --git a/src/connection/index.js b/src/connection/index.js index a7b8b05..b960d4d 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -389,7 +389,6 @@ class ConnectionFSM extends BaseConnection { }) this.switch.emit('peer-mux-established', this.theirPeerInfo) - this._didUpgrade(null) }) } diff --git a/src/index.js b/src/index.js index 4658b3e..1ae9cad 100644 --- a/src/index.js +++ b/src/index.js @@ -5,6 +5,7 @@ const EventEmitter = require('events').EventEmitter const each = require('async/each') const eachSeries = require('async/eachSeries') const series = require('async/series') +const Circuit = require('libp2p-circuit') const TransportManager = require('./transport') const ConnectionManager = require('./connection/manager') const getPeerInfo = require('./get-peer-info') @@ -128,7 +129,7 @@ class Switch extends EventEmitter { return myTransports.filter((ts) => this.transports[ts].filter(myAddrs).length > 0) // push Circuit to be the last proto to be dialed .sort((a) => { - return a === 'Circuit' ? 1 : 0 + return a === Circuit.tag ? 1 : 0 }) } @@ -147,6 +148,7 @@ class Switch extends EventEmitter { handlerFunc: handlerFunc, matchFunc: matchFunc } + this._peerInfo.protocols.add(protocol) } /** @@ -159,6 +161,7 @@ class Switch extends EventEmitter { if (this.protocols[protocol]) { delete this.protocols[protocol] } + this._peerInfo.protocols.delete(protocol) } /** @@ -185,7 +188,7 @@ class Switch extends EventEmitter { * @returns {boolean} */ hasTransports () { - const transports = Object.keys(this.transports).filter((t) => t !== 'Circuit') + const transports = Object.keys(this.transports).filter((t) => t !== Circuit.tag) return transports && transports.length > 0 } diff --git a/test/circuit-relay.node.js b/test/circuit-relay.node.js index 0c3c305..66a10db 100644 --- a/test/circuit-relay.node.js +++ b/test/circuit-relay.node.js @@ -252,6 +252,10 @@ describe(`circuit`, function () { ], (err) => { if (err) return done(err) + if (bootstrapSwitch._peerBook.getAllArray().length === 4) { + return done() + } + done = once(done) // Wait for everyone to connect, before we try relaying bootstrapSwitch.on('peer-mux-established', () => { @@ -263,6 +267,10 @@ describe(`circuit`, function () { }) })) + before('wait so hop status can be negotiated', function (done) { + setTimeout(done, 1000) + }) + after(function (done) { parallel([ (cb) => bootstrapSwitch.stop(cb), @@ -294,6 +302,7 @@ describe(`circuit`, function () { done() } }) + tcpSwitch1.dial(wsPeer1, (err, connection) => { expect(err).to.not.exist() // We're not dialing a protocol, so we won't get a connection back @@ -323,6 +332,7 @@ describe(`circuit`, function () { done() } }) + wsSwitch2.dial(tcpPeer1, (err, connection) => { expect(err).to.not.exist() // We're not dialing a protocol, so we won't get a connection back diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 8ab4d8e..0382891 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -13,6 +13,7 @@ const TCP = require('libp2p-tcp') const secio = require('libp2p-secio') const multiplex = require('pull-mplex') const pull = require('pull-stream') +const identify = require('libp2p-identify') const utils = require('./utils') const createInfos = utils.createInfos @@ -24,6 +25,7 @@ describe('dialFSM', () => { let switchC let peerAId let peerBId + let protocol before((done) => createInfos(3, (err, infos) => { expect(err).to.not.exist() @@ -76,10 +78,18 @@ describe('dialFSM', () => { ], done) }) + afterEach(() => { + switchA.unhandle(protocol) + switchB.unhandle(protocol) + switchC.unhandle(protocol) + protocol = null + }) + it('should emit `error:connection_attempt_failed` when a transport fails to dial', (done) => { - switchC.handle('/warn/1.0.0', () => { }) + protocol = '/warn/1.0.0' + switchC.handle(protocol, () => { }) - switchA.dialFSM(switchC._peerInfo, '/warn/1.0.0', (err, connFSM) => { + switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => { expect(err).to.not.exist() connFSM.once('error:connection_attempt_failed', (errors) => { expect(errors).to.be.an('array') @@ -90,9 +100,10 @@ describe('dialFSM', () => { }) it('should emit an `error` event when a it cannot dial a peer', (done) => { - switchC.handle('/error/1.0.0', () => { }) + protocol = '/error/1.0.0' + switchC.handle(protocol, () => { }) - switchA.dialFSM(switchC._peerInfo, '/error/1.0.0', (err, connFSM) => { + switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => { expect(err).to.not.exist() connFSM.once('error', (err) => { expect(err).to.be.exist() @@ -103,9 +114,10 @@ describe('dialFSM', () => { }) it('should emit a `closed` event when closed', (done) => { - switchB.handle('/closed/1.0.0', () => { }) + protocol = '/closed/1.0.0' + switchB.handle(protocol, () => { }) - switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => { + switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => { expect(err).to.not.exist() connFSM.once('close', () => { @@ -120,11 +132,49 @@ describe('dialFSM', () => { }) }) + it('should have the peers protocols once connected', (done) => { + protocol = '/lscheck/1.0.0' + switchB.handle(protocol, () => { }) + + expect(4).checks(done) + + switchB.once('peer-mux-established', (peerInfo) => { + const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String()) + const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String()) + // Verify the dialer knows the receiver's protocols + expect(Array.from(peerB.protocols)).to.eql([ + multiplex.multicodec, + identify.multicodec, + protocol + ]).mark() + // Verify the receiver knows the dialer's protocols + expect(Array.from(peerA.protocols)).to.eql([ + multiplex.multicodec, + identify.multicodec + ]).mark() + + switchA.hangUp(switchB._peerInfo) + }) + + switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => { + expect(err).to.not.exist().mark() + + connFSM.once('close', () => { + // Just mark that close was called + expect(true).to.eql(true).mark() + }) + }) + }) + it('should close when the receiver closes', (done) => { + protocol = '/closed/1.0.0' + switchB.handle(protocol, () => { }) + // wait for the expects to happen - expect(2).checks(done) + expect(2).checks(() => { + done() + }) - switchB.handle('/closed/1.0.0', () => { }) switchB.on('peer-mux-established', (peerInfo) => { if (peerInfo.id.toB58String() === peerAId) { switchB.removeAllListeners('peer-mux-established') @@ -133,7 +183,7 @@ describe('dialFSM', () => { } }) - switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => { + switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => { expect(err).to.not.exist() connFSM.once('close', () => { @@ -161,8 +211,11 @@ describe('dialFSM', () => { }) it('parallel dials to one another should disconnect on hangup', function (done) { - switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) - switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + this.timeout(10e3) + protocol = '/parallel/1.0.0' + + switchA.handle(protocol, (_, conn) => { pull(conn, conn) }) + switchB.handle(protocol, (_, conn) => { pull(conn, conn) }) expect(switchA.connection.getAllById(peerBId)).to.have.length(0) @@ -180,12 +233,12 @@ describe('dialFSM', () => { expect(peerInfo.id.toB58String()).to.eql(peerAId).mark() }) - switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', (err, connFSM) => { + switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => { expect(err).to.not.exist() // Hold the dial from A, until switch B is done dialing to ensure // we have both incoming and outgoing connections connFSM._state.on('DIALING:leave', (cb) => { - switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', (err, connB) => { + switchB.dialFSM(switchA._peerInfo, protocol, (err, connB) => { expect(err).to.not.exist() connB.on('muxed', cb) }) @@ -201,8 +254,9 @@ describe('dialFSM', () => { }) it('parallel dials to one another should disconnect on stop', (done) => { - switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) - switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + protocol = '/parallel/1.0.0' + switchA.handle(protocol, (_, conn) => { pull(conn, conn) }) + switchB.handle(protocol, (_, conn) => { pull(conn, conn) }) // 4 close checks and 1 hangup check expect(5).checks(() => { diff --git a/test/identify.node.js b/test/identify.node.js index e454d47..74d11f9 100644 --- a/test/identify.node.js +++ b/test/identify.node.js @@ -87,7 +87,8 @@ describe('Identify', () => { switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn)) switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => { expect(err).to.not.exist() - let data = Buffer.from('data that cant be had') + + let data = Buffer.from('data that can be had') pull( pull.values([data]), conn, @@ -100,6 +101,27 @@ describe('Identify', () => { }) }) + it('should get protocols for one another', (done) => { + switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn)) + switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => { + expect(err).to.not.exist() + + const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String()) + const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String()) + expect(Array.from(peerB.protocols)).to.eql([ + multiplex.multicodec, + identify.multicodec + ]) + expect(Array.from(peerA.protocols)).to.eql([ + multiplex.multicodec, + identify.multicodec, + '/id-test/1.0.0' + ]) + + done() + }) + }) + it('should close connection when identify fails', (done) => { const stub = sinon.stub(identify, 'listener').callsFake((conn) => { conn.getObservedAddrs((err, observedAddrs) => { @@ -125,7 +147,7 @@ describe('Identify', () => { }) }) - expect(2).check(done) + expect(2).checks(done) switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn)) switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err, connFSM) => {