Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

feat: update identify to include supported protocols #311

Merged
merged 2 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@
"fsm-event": "^2.1.0",
"hashlru": "^2.3.0",
"interface-connection": "~0.3.3",
"libp2p-circuit": "~0.3.4",
"libp2p-identify": "~0.7.5",
"libp2p-circuit": "~0.3.5",
"libp2p-identify": "~0.7.6",
"moving-average": "^1.0.0",
"multiaddr": "^6.0.4",
"multiaddr": "^6.0.6",
"multistream-select": "~0.14.4",
"once": "^1.4.0",
"peer-id": "~0.12.2",
Expand Down
1 change: 0 additions & 1 deletion src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ class ConnectionFSM extends BaseConnection {
})

this.switch.emit('peer-mux-established', this.theirPeerInfo)

this._didUpgrade(null)
})
}
Expand Down
7 changes: 5 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const EventEmitter = require('events').EventEmitter
const each = require('async/each')
const eachSeries = require('async/eachSeries')
const series = require('async/series')
const Circuit = require('libp2p-circuit')
const TransportManager = require('./transport')
const ConnectionManager = require('./connection/manager')
const getPeerInfo = require('./get-peer-info')
Expand Down Expand Up @@ -128,7 +129,7 @@ class Switch extends EventEmitter {
return myTransports.filter((ts) => this.transports[ts].filter(myAddrs).length > 0)
// push Circuit to be the last proto to be dialed
.sort((a) => {
return a === 'Circuit' ? 1 : 0
return a === Circuit.tag ? 1 : 0
})
}

Expand All @@ -147,6 +148,7 @@ class Switch extends EventEmitter {
handlerFunc: handlerFunc,
matchFunc: matchFunc
}
this._peerInfo.protocols.add(protocol)
}

/**
Expand All @@ -159,6 +161,7 @@ class Switch extends EventEmitter {
if (this.protocols[protocol]) {
delete this.protocols[protocol]
}
this._peerInfo.protocols.delete(protocol)
}

/**
Expand All @@ -185,7 +188,7 @@ class Switch extends EventEmitter {
* @returns {boolean}
*/
hasTransports () {
const transports = Object.keys(this.transports).filter((t) => t !== 'Circuit')
const transports = Object.keys(this.transports).filter((t) => t !== Circuit.tag)
return transports && transports.length > 0
}

Expand Down
10 changes: 10 additions & 0 deletions test/circuit-relay.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ describe(`circuit`, function () {
], (err) => {
if (err) return done(err)

if (bootstrapSwitch._peerBook.getAllArray().length === 4) {
return done()
}

done = once(done)
// Wait for everyone to connect, before we try relaying
bootstrapSwitch.on('peer-mux-established', () => {
Expand All @@ -263,6 +267,10 @@ describe(`circuit`, function () {
})
}))

before('wait so hop status can be negotiated', function (done) {
setTimeout(done, 1000)
})

after(function (done) {
parallel([
(cb) => bootstrapSwitch.stop(cb),
Expand Down Expand Up @@ -294,6 +302,7 @@ describe(`circuit`, function () {
done()
}
})

tcpSwitch1.dial(wsPeer1, (err, connection) => {
expect(err).to.not.exist()
// We're not dialing a protocol, so we won't get a connection back
Expand Down Expand Up @@ -323,6 +332,7 @@ describe(`circuit`, function () {
done()
}
})

wsSwitch2.dial(tcpPeer1, (err, connection) => {
expect(err).to.not.exist()
// We're not dialing a protocol, so we won't get a connection back
Expand Down
84 changes: 69 additions & 15 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const TCP = require('libp2p-tcp')
const secio = require('libp2p-secio')
const multiplex = require('pull-mplex')
const pull = require('pull-stream')
const identify = require('libp2p-identify')

const utils = require('./utils')
const createInfos = utils.createInfos
Expand All @@ -24,6 +25,7 @@ describe('dialFSM', () => {
let switchC
let peerAId
let peerBId
let protocol

before((done) => createInfos(3, (err, infos) => {
expect(err).to.not.exist()
Expand Down Expand Up @@ -76,10 +78,18 @@ describe('dialFSM', () => {
], done)
})

afterEach(() => {
switchA.unhandle(protocol)
switchB.unhandle(protocol)
switchC.unhandle(protocol)
protocol = null
})

it('should emit `error:connection_attempt_failed` when a transport fails to dial', (done) => {
switchC.handle('/warn/1.0.0', () => { })
protocol = '/warn/1.0.0'
switchC.handle(protocol, () => { })

switchA.dialFSM(switchC._peerInfo, '/warn/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()
connFSM.once('error:connection_attempt_failed', (errors) => {
expect(errors).to.be.an('array')
Expand All @@ -90,9 +100,10 @@ describe('dialFSM', () => {
})

it('should emit an `error` event when a it cannot dial a peer', (done) => {
switchC.handle('/error/1.0.0', () => { })
protocol = '/error/1.0.0'
switchC.handle(protocol, () => { })

switchA.dialFSM(switchC._peerInfo, '/error/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()
connFSM.once('error', (err) => {
expect(err).to.be.exist()
Expand All @@ -103,9 +114,10 @@ describe('dialFSM', () => {
})

it('should emit a `closed` event when closed', (done) => {
switchB.handle('/closed/1.0.0', () => { })
protocol = '/closed/1.0.0'
switchB.handle(protocol, () => { })

switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()

connFSM.once('close', () => {
Expand All @@ -120,11 +132,49 @@ describe('dialFSM', () => {
})
})

it('should have the peers protocols once connected', (done) => {
protocol = '/lscheck/1.0.0'
switchB.handle(protocol, () => { })

expect(4).checks(done)

switchB.once('peer-mux-established', (peerInfo) => {
const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String())
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
// Verify the dialer knows the receiver's protocols
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
protocol
]).mark()
// Verify the receiver knows the dialer's protocols
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
]).mark()

switchA.hangUp(switchB._peerInfo)
})

switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist().mark()

connFSM.once('close', () => {
// Just mark that close was called
expect(true).to.eql(true).mark()
})
})
})

it('should close when the receiver closes', (done) => {
protocol = '/closed/1.0.0'
switchB.handle(protocol, () => { })

// wait for the expects to happen
expect(2).checks(done)
expect(2).checks(() => {
done()
})

switchB.handle('/closed/1.0.0', () => { })
switchB.on('peer-mux-established', (peerInfo) => {
if (peerInfo.id.toB58String() === peerAId) {
switchB.removeAllListeners('peer-mux-established')
Expand All @@ -133,7 +183,7 @@ describe('dialFSM', () => {
}
})

switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()

connFSM.once('close', () => {
Expand Down Expand Up @@ -161,8 +211,11 @@ describe('dialFSM', () => {
})

it('parallel dials to one another should disconnect on hangup', function (done) {
switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
this.timeout(10e3)
protocol = '/parallel/1.0.0'

switchA.handle(protocol, (_, conn) => { pull(conn, conn) })
switchB.handle(protocol, (_, conn) => { pull(conn, conn) })

expect(switchA.connection.getAllById(peerBId)).to.have.length(0)

Expand All @@ -180,12 +233,12 @@ describe('dialFSM', () => {
expect(peerInfo.id.toB58String()).to.eql(peerAId).mark()
})

switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', (err, connFSM) => {
switchA.dialFSM(switchB._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()
// Hold the dial from A, until switch B is done dialing to ensure
// we have both incoming and outgoing connections
connFSM._state.on('DIALING:leave', (cb) => {
switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', (err, connB) => {
switchB.dialFSM(switchA._peerInfo, protocol, (err, connB) => {
expect(err).to.not.exist()
connB.on('muxed', cb)
})
Expand All @@ -201,8 +254,9 @@ describe('dialFSM', () => {
})

it('parallel dials to one another should disconnect on stop', (done) => {
switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
protocol = '/parallel/1.0.0'
switchA.handle(protocol, (_, conn) => { pull(conn, conn) })
switchB.handle(protocol, (_, conn) => { pull(conn, conn) })

// 4 close checks and 1 hangup check
expect(5).checks(() => {
Expand Down
26 changes: 24 additions & 2 deletions test/identify.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ describe('Identify', () => {
switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => {
expect(err).to.not.exist()
let data = Buffer.from('data that cant be had')

let data = Buffer.from('data that can be had')
pull(
pull.values([data]),
conn,
Expand All @@ -100,6 +101,27 @@ describe('Identify', () => {
})
})

it('should get protocols for one another', (done) => {
switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => {
expect(err).to.not.exist()

const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String())
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
])
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
'/id-test/1.0.0'
])

done()
})
})

it('should close connection when identify fails', (done) => {
const stub = sinon.stub(identify, 'listener').callsFake((conn) => {
conn.getObservedAddrs((err, observedAddrs) => {
Expand All @@ -125,7 +147,7 @@ describe('Identify', () => {
})
})

expect(2).check(done)
expect(2).checks(done)

switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err, connFSM) => {
Expand Down