Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

feat: add basic dial queue to avoid many connections to peer #310

Merged
merged 12 commits into from
Mar 20, 2019
Merged
1 change: 0 additions & 1 deletion src/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class BaseConnection extends EventEmitter {
* @returns {void}
*/
_onPrivatized () {
this.log('successfully privatized incoming connection')
this.emit('private', this.conn)
}

Expand Down
33 changes: 17 additions & 16 deletions src/connection/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,32 @@ function listener (_switch) {
* @param {function} handler A custom handler to use
* @returns {function(Connection)} A connection handler function
*/
return (transportKey, handler) => {
return function (transportKey, handler) {
/**
* Takes a base connection and manages listening behavior
*
* @param {Connection} conn The connection to manage
* @returns {void}
*/
return (conn) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn
return function (conn) {
log('received incoming connection for transport %s', transportKey)
conn.getPeerInfo((_, peerInfo) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn
const connFSM = new IncomingConnection({ connection, _switch, transportKey, peerInfo })

log('received incoming connection')
const connFSM = new IncomingConnection({ connection, _switch, transportKey })
connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
})
connFSM.once('encrypted', () => connFSM.upgrade())

connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
connFSM.protect()
})
connFSM.once('encrypted', () => connFSM.upgrade())

connFSM.protect()
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ const withIs = require('class-is')
const BaseConnection = require('./base')

class IncomingConnectionFSM extends BaseConnection {
constructor ({ connection, _switch, transportKey }) {
constructor ({ connection, _switch, transportKey, peerInfo }) {
super({
_switch,
name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})
this.conn = connection
this.theirPeerInfo = null
this.theirPeerInfo = peerInfo || null
this.theirB58Id = this.theirPeerInfo ? this.theirPeerInfo.id.toB58String() : null
this.ourPeerInfo = this.switch._peerInfo
this.transportKey = transportKey
this.protocolMuxer = this.switch.protocolMuxer(this.transportKey)
Expand Down
4 changes: 1 addition & 3 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onDisconnecting () {
this.log('disconnecting from %s', this.theirB58Id)
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))

// Issue disconnects on both Peers
if (this.theirPeerInfo) {
Expand Down Expand Up @@ -366,8 +366,6 @@ class ConnectionFSM extends BaseConnection {
const conn = observeConnection(null, key, _conn, this.switch.observer)

this.muxer = this.switch.muxers[key].dialer(conn)
// this.switch.muxedConns[this.theirB58Id] = this
this.switch.connection.add(this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now controlled by the dialer queue.


this.muxer.once('close', () => {
this.close()
Expand Down
8 changes: 6 additions & 2 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ class ConnectionManager {
*/
getOne (peerId) {
if (this.connections[peerId]) {
// TODO: Maybe select the best?
return this.connections[peerId][0]
// Only return muxed connections
for (var i = 0; i < this.connections[peerId].length; i++) {
if (this.connections[peerId][i].getState() === 'MUXED') {
return this.connections[peerId][i]
}
}
}
return null
}
Expand Down
110 changes: 0 additions & 110 deletions src/dialer.js

This file was deleted.

72 changes: 72 additions & 0 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
'use strict'

const DialerQueue = require('./queue')
const getPeerInfo = require('../get-peer-info')

module.exports = function (_switch) {
const dialerQueue = new DialerQueue(_switch)

_switch.state.on('STOPPING:enter', abort)

/**
* @param {object} options
* @param {PeerInfo} options.peerInfo
* @param {string} options.protocol
* @param {boolean} options.useFSM If `callback` should return a ConnectionFSM
* @param {function(Error, Connection)} options.callback
* @returns {void}
*/
function _dial ({ peerInfo, protocol, useFSM, callback }) {
if (typeof protocol === 'function') {
callback = protocol
protocol = null
}

try {
peerInfo = getPeerInfo(peerInfo, _switch._peerBook)
} catch (err) {
return callback(err)
}

// Add it to the queue, it will automatically get executed
dialerQueue.add({ peerInfo, protocol, useFSM, callback })
}

/**
* Aborts all dials that are queued. This should
* only be used when the Switch is being stopped
*
* @param {function} callback
*/
function abort (callback) {
dialerQueue.abort()
callback()
}

/**
* Adds the dial request to the queue for the given `peerInfo`
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @param {function(Error, Connection)} callback
*/
function dial (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, useFSM: false, callback })
}

/**
* Behaves like dial, except it calls back with a ConnectionFSM
*
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @param {function(Error, ConnectionFSM)} callback
*/
function dialFSM (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, useFSM: true, callback })
}

return {
dial,
dialFSM,
abort
}
}
Loading