Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit a2a566b

Browse files
committed
feat: use PubSub API directly from libp2p
1 parent d945fce commit a2a566b

File tree

6 files changed

+18
-101
lines changed

6 files changed

+18
-101
lines changed

src/core/components/libp2p.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ module.exports = function libp2p (self) {
2121
bootstrap: get(config, 'Bootstrap'),
2222
modules: self._libp2pModules,
2323
// EXPERIMENTAL
24+
pubsub: get(self._options, 'EXPERIMENTAL.pubsub', false),
2425
dht: get(self._options, 'EXPERIMENTAL.dht', false),
2526
relay: {
2627
enabled: get(config, 'EXPERIMENTAL.relay.enabled', false),
@@ -50,9 +51,7 @@ module.exports = function libp2p (self) {
5051
})
5152

5253
self._libp2pNode.start((err) => {
53-
if (err) {
54-
return callback(err)
55-
}
54+
if (err) { return callback(err) }
5655

5756
self._libp2pNode.peerInfo.multiaddrs.forEach((ma) => {
5857
console.log('Swarm listening on', ma.toString())

src/core/components/no-floodsub.js

Lines changed: 0 additions & 24 deletions
This file was deleted.

src/core/components/pubsub.js

Lines changed: 7 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,96 +1,48 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4-
const setImmediate = require('async/setImmediate')
5-
6-
const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
74

85
module.exports = function pubsub (self) {
96
return {
107
subscribe: (topic, options, handler, callback) => {
11-
if (!self.isOnline()) {
12-
throw new Error(OFFLINE_ERROR)
13-
}
14-
158
if (typeof options === 'function') {
169
callback = handler
1710
handler = options
1811
options = {}
1912
}
2013

21-
function subscribe (cb) {
22-
if (self._pubsub.listenerCount(topic) === 0) {
23-
self._pubsub.subscribe(topic)
24-
}
25-
26-
self._pubsub.on(topic, handler)
27-
setImmediate(cb)
28-
}
29-
3014
if (!callback) {
3115
return new Promise((resolve, reject) => {
32-
subscribe((err) => {
16+
self.libp2p.pubsub.subscribe(topic, options, handler, (err) => {
3317
if (err) {
3418
return reject(err)
3519
}
3620
resolve()
3721
})
3822
})
3923
} else {
40-
subscribe(callback)
24+
self.libp2p.pubsub.subscribe(topic, options, handler, callback)
4125
}
4226
},
4327

4428
unsubscribe: (topic, handler) => {
45-
self._pubsub.removeListener(topic, handler)
46-
47-
if (self._pubsub.listenerCount(topic) === 0) {
48-
self._pubsub.unsubscribe(topic)
49-
}
29+
self.libp2p.pubsub.unsubscribe(topic, handler)
5030
},
5131

5232
publish: promisify((topic, data, callback) => {
53-
if (!self.isOnline()) {
54-
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
55-
}
56-
57-
if (!Buffer.isBuffer(data)) {
58-
return setImmediate(() => callback(new Error('data must be a Buffer')))
59-
}
60-
61-
self._pubsub.publish(topic, data)
62-
setImmediate(() => callback())
33+
self.libp2p.pubsub.publish(topic, data, callback)
6334
}),
6435

6536
ls: promisify((callback) => {
66-
if (!self.isOnline()) {
67-
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
68-
}
69-
70-
const subscriptions = Array.from(self._pubsub.subscriptions)
71-
72-
setImmediate(() => callback(null, subscriptions))
37+
self.libp2p.pubsub.ls(callback)
7338
}),
7439

7540
peers: promisify((topic, callback) => {
76-
if (!self.isOnline()) {
77-
return setImmediate(() => callback(new Error(OFFLINE_ERROR)))
78-
}
79-
80-
if (typeof topic === 'function') {
81-
callback = topic
82-
topic = null
83-
}
84-
85-
const peers = Array.from(self._pubsub.peers.values())
86-
.filter((peer) => topic ? peer.topics.has(topic) : true)
87-
.map((peer) => peer.info.id.toB58String())
88-
89-
setImmediate(() => callback(null, peers))
41+
self.libp2p.pubsub.peers(topic, callback)
9042
}),
9143

9244
setMaxListeners (n) {
93-
return self._pubsub.setMaxListeners(n)
45+
self.libp2p.pubsub.setMaxListeners(n)
9446
}
9547
}
9648
}

src/core/components/start.js

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
const series = require('async/series')
44
const Bitswap = require('ipfs-bitswap')
5-
const FloodSub = require('libp2p-floodsub')
6-
const NoFloodSub = require('./no-floodsub')
75
const setImmediate = require('async/setImmediate')
86
const promisify = require('promisify-es6')
97

@@ -38,9 +36,7 @@ module.exports = (self) => {
3836
(cb) => self.preStart(cb),
3937
(cb) => self.libp2p.start(cb)
4038
], (err) => {
41-
if (err) {
42-
return done(err)
43-
}
39+
if (err) { return done(err) }
4440

4541
self._bitswap = new Bitswap(
4642
self._libp2pNode,
@@ -50,11 +46,7 @@ module.exports = (self) => {
5046

5147
self._bitswap.start()
5248
self._blockService.setExchange(self._bitswap)
53-
54-
self._pubsub = self._options.EXPERIMENTAL.pubsub
55-
? new FloodSub(self._libp2pNode)
56-
: new NoFloodSub()
57-
self._pubsub.start(done)
49+
done()
5850
})
5951
})
6052
}

src/core/components/stop.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ module.exports = (self) => {
3131
self._bitswap.stop()
3232

3333
series([
34-
(cb) => self._pubsub.stop(cb),
3534
(cb) => self.libp2p.stop(cb),
3635
(cb) => self._repo.close(cb)
3736
], done)

test/core/bitswap.spec.js

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ const isNode = require('detect-node')
1616
const multihashing = require('multihashing-async')
1717
const CID = require('cids')
1818

19-
const DaemonFactory = require('ipfsd-ctl')
20-
const df = DaemonFactory.create({ type: 'js' })
21-
22-
const dfProc = DaemonFactory.create({ type: 'proc' })
19+
const IPFSFactory = require('ipfsd-ctl')
20+
const fDaemon = IPFSFactory.create({ type: 'js' })
21+
const fInProc = IPFSFactory.create({ type: 'proc' })
2322

2423
// This gets replaced by '../utils/create-repo-browser.js' in the browser
2524
const createTempRepo = require('../utils/create-repo-nodejs.js')
@@ -69,7 +68,7 @@ function connectNodes (remoteNode, inProcNode, callback) {
6968
let nodes = []
7069

7170
function addNode (inProcNode, callback) {
72-
df.spawn({
71+
fDaemon.spawn({
7372
exec: `./src/cli/bin.js`,
7473
config: {
7574
Addresses: {
@@ -89,7 +88,7 @@ function addNode (inProcNode, callback) {
8988
})
9089
}
9190

92-
describe('bitswap', function () {
91+
describe.only('bitswap', function () {
9392
this.timeout(80 * 1000)
9493

9594
let inProcNode // Node spawned inside this process
@@ -119,7 +118,7 @@ describe('bitswap', function () {
119118
})
120119
}
121120

122-
dfProc.spawn({ exec: IPFS, config }, (err, _ipfsd) => {
121+
fInProc.spawn({ exec: IPFS, config: config }, (err, _ipfsd) => {
123122
expect(err).to.not.exist()
124123
nodes.push(_ipfsd)
125124
inProcNode = _ipfsd.api
@@ -137,7 +136,7 @@ describe('bitswap', function () {
137136
})
138137
})
139138

140-
describe('transfer a block between', () => {
139+
describe.only('transfer a block between', () => {
141140
it('2 peers', function (done) {
142141
this.timeout(80 * 1000)
143142

0 commit comments

Comments
 (0)