Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

feat: use PubSub API directly from libp2p #1215

Merged
merged 7 commits into from
Feb 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ language: node_js

matrix:
include:
- node_js: 6
env: CXX=g++-4.8
- node_js: 8
env: CXX=g++-4.8

script:
- npm run lint
- npm run test
- make test

before_script:
- export DISPLAY=:99.0
Expand Down
3 changes: 3 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ machine:
version: stable

test:
pre:
- npm run lint
post:
- make test
- npm run coverage -- --upload --providers coveralls

dependencies:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
"is-ipfs": "^0.3.2",
"is-stream": "^1.1.0",
"joi": "^13.1.2",
"libp2p": "~0.16.5",
"libp2p": "~0.17.0",
"libp2p-circuit": "~0.1.4",
"libp2p-floodsub": "~0.14.1",
"libp2p-kad-dht": "~0.8.0",
Expand Down
38 changes: 24 additions & 14 deletions src/core/components/bootstrap.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
'use strict'

const defaultNodes = require('../runtime/config-nodejs.json').Bootstrap
const MultiAddr = require('multiaddr')
const Multiaddr = require('multiaddr')
const promisify = require('promisify-es6')

function isValid (ma) {
if (typeof ma === 'string') {
try {
ma = new Multiaddr(ma)
return Boolean(ma)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This idea comes from type checking discussion. It would be cool if isValid() functions would return false if invalid and return a valid object whatever got checked (in this case a Multiaddr).

That way you could also use isValid() to make sure you always get a valid Multiaddr object, no matter if it was already one or of if it was a string.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This got upgraded in #1227

} catch (err) {
return false
}
} else if (ma) {
return Multiaddr.isMultiaddr(ma)
} else {
return false
}
}

module.exports = function bootstrap (self) {
return {
list: promisify((callback) => {
Expand All @@ -17,15 +32,13 @@ module.exports = function bootstrap (self) {
add: promisify((multiaddr, args, callback) => {
if (typeof args === 'function') {
callback = args
args = {default: false}
}
try {
if (multiaddr)
new MultiAddr(multiaddr)
args = { default: false }
}
catch (err) {
return setImmediate(() => callback(err))

if (multiaddr && !isValid(multiaddr)) {
return setImmediate(() => callback(new Error('Not valid multiaddr')))
}

self._repo.config.get((err, config) => {
if (err) {
return callback(err)
Expand All @@ -51,13 +64,10 @@ module.exports = function bootstrap (self) {
callback = args
args = {all: false}
}
try {
if (multiaddr)
new MultiAddr(multiaddr)
}
catch (err) {
return setImmediate(() => callback(err))
if (multiaddr && !isValid(multiaddr)) {
return setImmediate(() => callback(new Error('Not valid multiaddr')))
}

self._repo.config.get((err, config) => {
if (err) {
return callback(err)
Expand Down
5 changes: 2 additions & 3 deletions src/core/components/libp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module.exports = function libp2p (self) {
bootstrap: get(config, 'Bootstrap'),
modules: self._libp2pModules,
// EXPERIMENTAL
pubsub: get(self._options, 'EXPERIMENTAL.pubsub', false),
dht: get(self._options, 'EXPERIMENTAL.dht', false),
relay: {
enabled: get(config, 'EXPERIMENTAL.relay.enabled', false),
Expand Down Expand Up @@ -50,9 +51,7 @@ module.exports = function libp2p (self) {
})

self._libp2pNode.start((err) => {
if (err) {
return callback(err)
}
if (err) { return callback(err) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I prefer not having unrelated formatting changes together with real code changes.


self._libp2pNode.peerInfo.multiaddrs.forEach((ma) => {
console.log('Swarm listening on', ma.toString())
Expand Down
24 changes: 0 additions & 24 deletions src/core/components/no-floodsub.js

This file was deleted.

62 changes: 7 additions & 55 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
@@ -1,96 +1,48 @@
'use strict'

const promisify = require('promisify-es6')
const setImmediate = require('async/setImmediate')

const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR

module.exports = function pubsub (self) {
return {
subscribe: (topic, options, handler, callback) => {
if (!self.isOnline()) {
throw new Error(OFFLINE_ERROR)
}

if (typeof options === 'function') {
callback = handler
handler = options
options = {}
}

function subscribe (cb) {
if (self._pubsub.listenerCount(topic) === 0) {
self._pubsub.subscribe(topic)
}

self._pubsub.on(topic, handler)
setImmediate(cb)
}

if (!callback) {
return new Promise((resolve, reject) => {
subscribe((err) => {
self._libp2pNode.pubsub.subscribe(topic, options, handler, (err) => {
if (err) {
return reject(err)
}
resolve()
})
})
} else {
subscribe(callback)
self._libp2pNode.pubsub.subscribe(topic, options, handler, callback)
}
},

unsubscribe: (topic, handler) => {
self._pubsub.removeListener(topic, handler)

if (self._pubsub.listenerCount(topic) === 0) {
self._pubsub.unsubscribe(topic)
}
self._libp2pNode.pubsub.unsubscribe(topic, handler)
},

publish: promisify((topic, data, callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
}

if (!Buffer.isBuffer(data)) {
return setImmediate(() => callback(new Error('data must be a Buffer')))
}

self._pubsub.publish(topic, data)
setImmediate(() => callback())
self._libp2pNode.pubsub.publish(topic, data, callback)
}),

ls: promisify((callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
}

const subscriptions = Array.from(self._pubsub.subscriptions)

setImmediate(() => callback(null, subscriptions))
self._libp2pNode.pubsub.ls(callback)
}),

peers: promisify((topic, callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
}

if (typeof topic === 'function') {
callback = topic
topic = null
}

const peers = Array.from(self._pubsub.peers.values())
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())

setImmediate(() => callback(null, peers))
self._libp2pNode.pubsub.peers(topic, callback)
}),

setMaxListeners (n) {
return self._pubsub.setMaxListeners(n)
self._libp2pNode.pubsub.setMaxListeners(n)
}
}
}
12 changes: 2 additions & 10 deletions src/core/components/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')
const NoFloodSub = require('./no-floodsub')
const setImmediate = require('async/setImmediate')
const promisify = require('promisify-es6')

Expand Down Expand Up @@ -38,9 +36,7 @@ module.exports = (self) => {
(cb) => self.preStart(cb),
(cb) => self.libp2p.start(cb)
], (err) => {
if (err) {
return done(err)
}
if (err) { return done(err) }

self._bitswap = new Bitswap(
self._libp2pNode,
Expand All @@ -50,11 +46,7 @@ module.exports = (self) => {

self._bitswap.start()
self._blockService.setExchange(self._bitswap)

self._pubsub = self._options.EXPERIMENTAL.pubsub
? new FloodSub(self._libp2pNode)
: new NoFloodSub()
self._pubsub.start(done)
done()
})
})
}
1 change: 0 additions & 1 deletion src/core/components/stop.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ module.exports = (self) => {
self._bitswap.stop()

series([
(cb) => self._pubsub.stop(cb),
(cb) => self.libp2p.stop(cb),
(cb) => self._repo.close(cb)
], done)
Expand Down
11 changes: 5 additions & 6 deletions test/core/bitswap.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ const isNode = require('detect-node')
const multihashing = require('multihashing-async')
const CID = require('cids')

const DaemonFactory = require('ipfsd-ctl')
const df = DaemonFactory.create({ type: 'js' })

const dfProc = DaemonFactory.create({ type: 'proc' })
const IPFSFactory = require('ipfsd-ctl')
const fDaemon = IPFSFactory.create({ type: 'js' })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find fDaemon a strange, hard to read, uncommon name. What about just Deamon or jsDeamon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a Factory. I could call it factoryDaemon if that makes more sense for you

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it matter that's a factory? If you really want to know, I think it's apparent from IPFSFactor.create() that it is a factory.
This don't find this super important, so if you don't find it worth the change, feel free to leave it as it currently is.

const fInProc = IPFSFactory.create({ type: 'proc' })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find fInProc a strange, hard to read, meaningless, uncommon name. What about just InProc or InProcDeamon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #1215 (comment). I can use the full word if that works better :)


// This gets replaced by '../utils/create-repo-browser.js' in the browser
const createTempRepo = require('../utils/create-repo-nodejs.js')
Expand Down Expand Up @@ -69,7 +68,7 @@ function connectNodes (remoteNode, inProcNode, callback) {
let nodes = []

function addNode (inProcNode, callback) {
df.spawn({
fDaemon.spawn({
exec: './src/cli/bin.js',
config: {
Addresses: {
Expand Down Expand Up @@ -119,7 +118,7 @@ describe('bitswap', function () {
})
}

dfProc.spawn({ exec: IPFS, config }, (err, _ipfsd) => {
fInProc.spawn({ exec: IPFS, config: config }, (err, _ipfsd) => {
expect(err).to.not.exist()
nodes.push(_ipfsd)
inProcNode = _ipfsd.api
Expand Down