From c6caddbafbe7ead7f00608948e9985dbc26c4fe2 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 14 Feb 2018 12:01:53 +0100 Subject: [PATCH 1/7] feat: use PubSub API directly from libp2p --- src/core/components/libp2p.js | 5 +-- src/core/components/no-floodsub.js | 24 ------------ src/core/components/pubsub.js | 62 ++++-------------------------- src/core/components/start.js | 12 +----- src/core/components/stop.js | 1 - test/core/bitswap.spec.js | 15 ++++---- 6 files changed, 18 insertions(+), 101 deletions(-) delete mode 100644 src/core/components/no-floodsub.js diff --git a/src/core/components/libp2p.js b/src/core/components/libp2p.js index e998517f01..8935fe3e4d 100644 --- a/src/core/components/libp2p.js +++ b/src/core/components/libp2p.js @@ -21,6 +21,7 @@ module.exports = function libp2p (self) { bootstrap: get(config, 'Bootstrap'), modules: self._libp2pModules, // EXPERIMENTAL + pubsub: get(self._options, 'EXPERIMENTAL.pubsub', false), dht: get(self._options, 'EXPERIMENTAL.dht', false), relay: { enabled: get(config, 'EXPERIMENTAL.relay.enabled', false), @@ -50,9 +51,7 @@ module.exports = function libp2p (self) { }) self._libp2pNode.start((err) => { - if (err) { - return callback(err) - } + if (err) { return callback(err) } self._libp2pNode.peerInfo.multiaddrs.forEach((ma) => { console.log('Swarm listening on', ma.toString()) diff --git a/src/core/components/no-floodsub.js b/src/core/components/no-floodsub.js deleted file mode 100644 index 95db571f5f..0000000000 --- a/src/core/components/no-floodsub.js +++ /dev/null @@ -1,24 +0,0 @@ -'use strict' - -const EventEmitter = require('events') - -function fail () { - throw new Error('The daemon must be run with \'--enable-pubsub-experiment\'') -} - -class NoFloodSub extends EventEmitter { - constructor () { - super() - - this.peers = new Map() - this.subscriptions = new Set() - } - - start (callback) { callback() } - stop (callback) { callback() } - publish () { fail() } - subscribe () { fail() } - unsubscribe () { fail() } -} - -module.exports = NoFloodSub diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js index 7e51062f40..8d1204876e 100644 --- a/src/core/components/pubsub.js +++ b/src/core/components/pubsub.js @@ -1,35 +1,19 @@ 'use strict' const promisify = require('promisify-es6') -const setImmediate = require('async/setImmediate') - -const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR module.exports = function pubsub (self) { return { subscribe: (topic, options, handler, callback) => { - if (!self.isOnline()) { - throw new Error(OFFLINE_ERROR) - } - if (typeof options === 'function') { callback = handler handler = options options = {} } - function subscribe (cb) { - if (self._pubsub.listenerCount(topic) === 0) { - self._pubsub.subscribe(topic) - } - - self._pubsub.on(topic, handler) - setImmediate(cb) - } - if (!callback) { return new Promise((resolve, reject) => { - subscribe((err) => { + self.libp2p.pubsub.subscribe(topic, options, handler, (err) => { if (err) { return reject(err) } @@ -37,60 +21,28 @@ module.exports = function pubsub (self) { }) }) } else { - subscribe(callback) + self.libp2p.pubsub.subscribe(topic, options, handler, callback) } }, unsubscribe: (topic, handler) => { - self._pubsub.removeListener(topic, handler) - - if (self._pubsub.listenerCount(topic) === 0) { - self._pubsub.unsubscribe(topic) - } + self.libp2p.pubsub.unsubscribe(topic, handler) }, publish: promisify((topic, data, callback) => { - if (!self.isOnline()) { - return setImmediate(() => callback(new Error(OFFLINE_ERROR))) - } - - if (!Buffer.isBuffer(data)) { - return setImmediate(() => callback(new Error('data must be a Buffer'))) - } - - self._pubsub.publish(topic, data) - setImmediate(() => callback()) + self.libp2p.pubsub.publish(topic, data, callback) }), ls: promisify((callback) => { - if (!self.isOnline()) { - return setImmediate(() => callback(new Error(OFFLINE_ERROR))) - } - - const subscriptions = Array.from(self._pubsub.subscriptions) - - setImmediate(() => callback(null, subscriptions)) + self.libp2p.pubsub.ls(callback) }), peers: promisify((topic, callback) => { - if (!self.isOnline()) { - return setImmediate(() => callback(new Error(OFFLINE_ERROR))) - } - - if (typeof topic === 'function') { - callback = topic - topic = null - } - - const peers = Array.from(self._pubsub.peers.values()) - .filter((peer) => topic ? peer.topics.has(topic) : true) - .map((peer) => peer.info.id.toB58String()) - - setImmediate(() => callback(null, peers)) + self.libp2p.pubsub.peers(topic, callback) }), setMaxListeners (n) { - return self._pubsub.setMaxListeners(n) + self.libp2p.pubsub.setMaxListeners(n) } } } diff --git a/src/core/components/start.js b/src/core/components/start.js index 3004cca34d..f0517b1ed9 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -2,8 +2,6 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') -const FloodSub = require('libp2p-floodsub') -const NoFloodSub = require('./no-floodsub') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') @@ -38,9 +36,7 @@ module.exports = (self) => { (cb) => self.preStart(cb), (cb) => self.libp2p.start(cb) ], (err) => { - if (err) { - return done(err) - } + if (err) { return done(err) } self._bitswap = new Bitswap( self._libp2pNode, @@ -50,11 +46,7 @@ module.exports = (self) => { self._bitswap.start() self._blockService.setExchange(self._bitswap) - - self._pubsub = self._options.EXPERIMENTAL.pubsub - ? new FloodSub(self._libp2pNode) - : new NoFloodSub() - self._pubsub.start(done) + done() }) }) } diff --git a/src/core/components/stop.js b/src/core/components/stop.js index a39900d09c..4d35190d21 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -32,7 +32,6 @@ module.exports = (self) => { self._bitswap.stop() series([ - (cb) => self._pubsub.stop(cb), (cb) => self.libp2p.stop(cb), (cb) => self._repo.close(cb) ], done) diff --git a/test/core/bitswap.spec.js b/test/core/bitswap.spec.js index b0260ed5ff..a186aaf352 100644 --- a/test/core/bitswap.spec.js +++ b/test/core/bitswap.spec.js @@ -16,10 +16,9 @@ const isNode = require('detect-node') const multihashing = require('multihashing-async') const CID = require('cids') -const DaemonFactory = require('ipfsd-ctl') -const df = DaemonFactory.create({ type: 'js' }) - -const dfProc = DaemonFactory.create({ type: 'proc' }) +const IPFSFactory = require('ipfsd-ctl') +const fDaemon = IPFSFactory.create({ type: 'js' }) +const fInProc = IPFSFactory.create({ type: 'proc' }) // This gets replaced by '../utils/create-repo-browser.js' in the browser const createTempRepo = require('../utils/create-repo-nodejs.js') @@ -69,7 +68,7 @@ function connectNodes (remoteNode, inProcNode, callback) { let nodes = [] function addNode (inProcNode, callback) { - df.spawn({ + fDaemon.spawn({ exec: './src/cli/bin.js', config: { Addresses: { @@ -89,7 +88,7 @@ function addNode (inProcNode, callback) { }) } -describe('bitswap', function () { +describe.only('bitswap', function () { this.timeout(80 * 1000) let inProcNode // Node spawned inside this process @@ -119,7 +118,7 @@ describe('bitswap', function () { }) } - dfProc.spawn({ exec: IPFS, config }, (err, _ipfsd) => { + fInProc.spawn({ exec: IPFS, config: config }, (err, _ipfsd) => { expect(err).to.not.exist() nodes.push(_ipfsd) inProcNode = _ipfsd.api @@ -137,7 +136,7 @@ describe('bitswap', function () { }) }) - describe('transfer a block between', () => { + describe.only('transfer a block between', () => { it('2 peers', function (done) { this.timeout(80 * 1000) From f46957c87b327a37d000900ec44efeab2bc39850 Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 15 Feb 2018 19:28:32 +0100 Subject: [PATCH 2/7] feat: all pubsub tests passing with libp2p pubsub --- src/core/components/pubsub.js | 14 +++++++------- test/core/bitswap.spec.js | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js index 8d1204876e..d5825960e2 100644 --- a/src/core/components/pubsub.js +++ b/src/core/components/pubsub.js @@ -13,7 +13,7 @@ module.exports = function pubsub (self) { if (!callback) { return new Promise((resolve, reject) => { - self.libp2p.pubsub.subscribe(topic, options, handler, (err) => { + self._libp2pNode.pubsub.subscribe(topic, options, handler, (err) => { if (err) { return reject(err) } @@ -21,28 +21,28 @@ module.exports = function pubsub (self) { }) }) } else { - self.libp2p.pubsub.subscribe(topic, options, handler, callback) + self._libp2pNode.pubsub.subscribe(topic, options, handler, callback) } }, unsubscribe: (topic, handler) => { - self.libp2p.pubsub.unsubscribe(topic, handler) + self._libp2pNode.pubsub.unsubscribe(topic, handler) }, publish: promisify((topic, data, callback) => { - self.libp2p.pubsub.publish(topic, data, callback) + self._libp2pNode.pubsub.publish(topic, data, callback) }), ls: promisify((callback) => { - self.libp2p.pubsub.ls(callback) + self._libp2pNode.pubsub.ls(callback) }), peers: promisify((topic, callback) => { - self.libp2p.pubsub.peers(topic, callback) + self._libp2pNode.pubsub.peers(topic, callback) }), setMaxListeners (n) { - self.libp2p.pubsub.setMaxListeners(n) + self._libp2pNode.pubsub.setMaxListeners(n) } } } diff --git a/test/core/bitswap.spec.js b/test/core/bitswap.spec.js index a186aaf352..b05244f249 100644 --- a/test/core/bitswap.spec.js +++ b/test/core/bitswap.spec.js @@ -88,7 +88,7 @@ function addNode (inProcNode, callback) { }) } -describe.only('bitswap', function () { +describe('bitswap', function () { this.timeout(80 * 1000) let inProcNode // Node spawned inside this process @@ -136,7 +136,7 @@ describe.only('bitswap', function () { }) }) - describe.only('transfer a block between', () => { + describe('transfer a block between', () => { it('2 peers', function (done) { this.timeout(80 * 1000) From cdbee339773089257a8b007d1050e35b85fc8d5d Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 16 Feb 2018 18:39:12 +0000 Subject: [PATCH 3/7] chore: update libp2p --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index d927d3913d..382d8f7326 100644 --- a/package.json +++ b/package.json @@ -119,7 +119,7 @@ "is-ipfs": "^0.3.2", "is-stream": "^1.1.0", "joi": "^13.1.2", - "libp2p": "~0.16.5", + "libp2p": "~0.17.0", "libp2p-circuit": "~0.1.4", "libp2p-floodsub": "~0.14.1", "libp2p-kad-dht": "~0.8.0", From f7328a9309a20c999be10ba341a6bc1bfdd31a19 Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 17 Feb 2018 22:21:05 +0000 Subject: [PATCH 4/7] refactor: fix linting from bootstrap.js --- src/core/components/bootstrap.js | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/core/components/bootstrap.js b/src/core/components/bootstrap.js index 9ca1a76479..b5a497eac3 100644 --- a/src/core/components/bootstrap.js +++ b/src/core/components/bootstrap.js @@ -20,10 +20,10 @@ module.exports = function bootstrap (self) { args = {default: false} } try { - if (multiaddr) - new MultiAddr(multiaddr) - } - catch (err) { + if (multiaddr) { + multiaddr = new MultiAddr(multiaddr) + } + } catch (err) { return setImmediate(() => callback(err)) } self._repo.config.get((err, config) => { @@ -52,12 +52,13 @@ module.exports = function bootstrap (self) { args = {all: false} } try { - if (multiaddr) - new MultiAddr(multiaddr) - } - catch (err) { + if (multiaddr) { + multiaddr = new MultiAddr(multiaddr) + } + } catch (err) { return setImmediate(() => callback(err)) } + self._repo.config.get((err, config) => { if (err) { return callback(err) From 4ac081d071f1dbff78eb2f46a85f419cff7c85ba Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 17 Feb 2018 22:23:38 +0000 Subject: [PATCH 5/7] chore: update CI configs --- .travis.yml | 4 ---- circle.yml | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8253ebb93d..30e306640b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,15 +3,11 @@ language: node_js matrix: include: - - node_js: 6 - env: CXX=g++-4.8 - node_js: 8 env: CXX=g++-4.8 script: - - npm run lint - npm run test - - make test before_script: - export DISPLAY=:99.0 diff --git a/circle.yml b/circle.yml index e0338e62d4..445e40b5e0 100644 --- a/circle.yml +++ b/circle.yml @@ -4,7 +4,10 @@ machine: version: stable test: + pre: + - npm run lint post: + - make test - npm run coverage -- --upload --providers coveralls dependencies: From fce326c6be0544ba931a9f61db90fb0af0eb2f56 Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 17 Feb 2018 22:25:18 +0000 Subject: [PATCH 6/7] fix: bootstrap --- src/core/components/bootstrap.js | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/core/components/bootstrap.js b/src/core/components/bootstrap.js index b5a497eac3..a57858abb3 100644 --- a/src/core/components/bootstrap.js +++ b/src/core/components/bootstrap.js @@ -1,7 +1,7 @@ 'use strict' const defaultNodes = require('../runtime/config-nodejs.json').Bootstrap -const MultiAddr = require('multiaddr') +// const MultiAddr = require('multiaddr') const promisify = require('promisify-es6') module.exports = function bootstrap (self) { @@ -21,7 +21,9 @@ module.exports = function bootstrap (self) { } try { if (multiaddr) { - multiaddr = new MultiAddr(multiaddr) + // TODO understand what was the purpose of this code + // it failed on tests, it passes without + // multiaddr = new MultiAddr(multiaddr) } } catch (err) { return setImmediate(() => callback(err)) @@ -53,7 +55,8 @@ module.exports = function bootstrap (self) { } try { if (multiaddr) { - multiaddr = new MultiAddr(multiaddr) + // TODO understand what was the purpose of this code + // multiaddr = new MultiAddr(multiaddr) } } catch (err) { return setImmediate(() => callback(err)) From 19769742e07ddffbbf16543087fa80edea90b05a Mon Sep 17 00:00:00 2001 From: David Dias Date: Sun, 18 Feb 2018 07:59:52 +0000 Subject: [PATCH 7/7] fix: now properly fix bootstrap in core --- src/core/components/bootstrap.js | 40 ++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/src/core/components/bootstrap.js b/src/core/components/bootstrap.js index a57858abb3..559f8e05b3 100644 --- a/src/core/components/bootstrap.js +++ b/src/core/components/bootstrap.js @@ -1,9 +1,24 @@ 'use strict' const defaultNodes = require('../runtime/config-nodejs.json').Bootstrap -// const MultiAddr = require('multiaddr') +const Multiaddr = require('multiaddr') const promisify = require('promisify-es6') +function isValid (ma) { + if (typeof ma === 'string') { + try { + ma = new Multiaddr(ma) + return Boolean(ma) + } catch (err) { + return false + } + } else if (ma) { + return Multiaddr.isMultiaddr(ma) + } else { + return false + } +} + module.exports = function bootstrap (self) { return { list: promisify((callback) => { @@ -17,17 +32,13 @@ module.exports = function bootstrap (self) { add: promisify((multiaddr, args, callback) => { if (typeof args === 'function') { callback = args - args = {default: false} + args = { default: false } } - try { - if (multiaddr) { - // TODO understand what was the purpose of this code - // it failed on tests, it passes without - // multiaddr = new MultiAddr(multiaddr) - } - } catch (err) { - return setImmediate(() => callback(err)) + + if (multiaddr && !isValid(multiaddr)) { + return setImmediate(() => callback(new Error('Not valid multiaddr'))) } + self._repo.config.get((err, config) => { if (err) { return callback(err) @@ -53,13 +64,8 @@ module.exports = function bootstrap (self) { callback = args args = {all: false} } - try { - if (multiaddr) { - // TODO understand what was the purpose of this code - // multiaddr = new MultiAddr(multiaddr) - } - } catch (err) { - return setImmediate(() => callback(err)) + if (multiaddr && !isValid(multiaddr)) { + return setImmediate(() => callback(new Error('Not valid multiaddr'))) } self._repo.config.get((err, config) => {