Skip to content

Commit 3979d42

Browse files
refactor: use modular async
Fixes #14
1 parent 66e07c2 commit 3979d42

File tree

10 files changed

+68
-55
lines changed

10 files changed

+68
-55
lines changed

src/decision/engine.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const debug = require('debug')
4-
const async = require('async')
4+
const eachSeries = require('async/eachSeries')
55
const mh = require('multihashes')
66
const pull = require('pull-stream')
77
const generate = require('pull-generate')
@@ -115,11 +115,11 @@ module.exports = class Engine {
115115

116116
this._processBlocks(msg.blocks, ledger)
117117
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))
118-
async.eachSeries(
118+
eachSeries(
119119
msg.wantlist.values(),
120120
this._processWantlist.bind(this, ledger, peerId),
121121
(err) => {
122-
const done = (err) => async.setImmediate(() => cb(err))
122+
const done = (err) => setImmediate(() => cb(err))
123123
if (err) return done(err)
124124
this._outbox()
125125
done()
@@ -148,7 +148,7 @@ module.exports = class Engine {
148148
log('cancel %s', mh.toB58String(entry.key))
149149
ledger.cancelWant(entry.key)
150150
this.peerRequestQueue.remove(entry.key, peerId)
151-
async.setImmediate(() => cb())
151+
setImmediate(() => cb())
152152
} else {
153153
log('wants %s - %s', mh.toB58String(entry.key), entry.priority)
154154
ledger.wants(entry.key, entry.priority)

src/index.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
'use strict'
22

3-
const async = require('async')
3+
const eachLimit = require('async/eachLimit')
4+
const series = require('async/series')
5+
const retry = require('async/retry')
6+
const parallel = require('async/parallel')
47
const debug = require('debug')
58
const log = debug('bitswap')
69
log.error = debug('bitswap:error')
@@ -64,8 +67,8 @@ module.exports = class Bitwap {
6467

6568
this.wm.cancelWants(keys)
6669

67-
async.eachLimit(iblocks.values(), 10, (block, next) => {
68-
async.series([
70+
eachLimit(iblocks.values(), 10, (block, next) => {
71+
series([
6972
(innerCb) => this._updateReceiveCounters(block, (err) => {
7073
if (err) {
7174
// ignore, as these have been handled in _updateReceiveCounters
@@ -106,7 +109,7 @@ module.exports = class Bitwap {
106109

107110
_tryPutBlock (block, times, cb) {
108111
log('trying to put block %s', block.data.toString())
109-
async.retry({times, interval: 400}, (done) => {
112+
retry({times, interval: 400}, (done) => {
110113
this.datastore.put(block, done)
111114
}, cb)
112115
}
@@ -210,7 +213,7 @@ module.exports = class Bitwap {
210213
addListeners()
211214
this.wm.wantBlocks(keys)
212215

213-
async.parallel(keys.map((key) => (cb) => {
216+
parallel(keys.map((key) => (cb) => {
214217
// We don't want to announce looking for blocks
215218
// when we might have them ourselves.
216219
this.datastore.has(key, (err, exists) => {

src/network/index.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

33
const bl = require('bl')
4-
const async = require('async')
54
const debug = require('debug')
65
const lps = require('length-prefixed-stream')
76

@@ -87,7 +86,7 @@ module.exports = class Network {
8786
// Connect to the given peer
8887
connectTo (peerId, cb) {
8988
log('connecting to %s', peerId.toB58String())
90-
const done = (err) => async.setImmediate(() => cb(err))
89+
const done = (err) => setImmediate(() => cb(err))
9190
// NOTE: For now, all this does is ensure that we are
9291
// connected. Once we have Peer Routing, we will be able
9392
// to find the Peer
@@ -102,7 +101,7 @@ module.exports = class Network {
102101
sendMessage (peerId, msg, cb) {
103102
log('sendMessage to %s', peerId.toB58String())
104103
log('msg %s', msg.full, msg.wantlist, msg.blocks)
105-
const done = (err) => async.setImmediate(() => cb(err))
104+
const done = (err) => setImmediate(() => cb(err))
106105
let peerInfo
107106
try {
108107
peerInfo = this.peerBook.getByMultihash(peerId.toBytes())

src/wantmanager/msg-queue.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const debug = require('debug')
4-
const async = require('async')
4+
const queue = require('async/queue')
55

66
const Message = require('../message')
77

@@ -14,7 +14,7 @@ module.exports = class MsgQueue {
1414
this.network = network
1515
this.refcnt = 1
1616

17-
this.queue = async.queue(this.doWork.bind(this), 1)
17+
this.queue = queue(this.doWork.bind(this), 1)
1818
// only start when `run` is called
1919
this.queue.pause()
2020
}

test/browser.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
const async = require('async')
3+
const eachSeries = require('async/eachSeries')
44
const store = require('idb-plus-blob-store')
55
const _ = require('lodash')
66
const IPFSRepo = require('ipfs-repo')
@@ -29,7 +29,7 @@ function createRepo (id, done) {
2929

3030
dbs.push(id)
3131

32-
async.eachSeries(repoData, (file, cb) => {
32+
eachSeries(repoData, (file, cb) => {
3333
if (_.startsWith(file.key, 'datastore/')) {
3434
return cb()
3535
}

test/decision/engine-test.js

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ const expect = require('chai').expect
55
const PeerId = require('peer-id')
66
const _ = require('lodash')
77
const Block = require('ipfs-block')
8-
const async = require('async')
8+
const parallel = require('async/parallel')
9+
const eachLimit = require('async/eachLimit')
10+
const each = require('async/each')
11+
const series = require('async/series')
12+
const eachSeries = require('async/eachSeries')
913

1014
const Message = require('../../src/message')
1115
const Engine = require('../../src/decision/engine')
@@ -32,7 +36,7 @@ module.exports = (repo) => {
3236
})
3337

3438
it('consistent accounting', (done) => {
35-
async.parallel([
39+
parallel([
3640
(cb) => newEngine('Ernie', cb),
3741
(cb) => newEngine('Bert', cb)
3842
], (err, res) => {
@@ -41,7 +45,7 @@ module.exports = (repo) => {
4145
const sender = res[0]
4246
const receiver = res[1]
4347

44-
async.eachLimit(_.range(1000), 100, (i, cb) => {
48+
eachLimit(_.range(1000), 100, (i, cb) => {
4549
const m = new Message(false)
4650
const content = `this is message ${i}`
4751
m.addBlock(new Block(content))
@@ -80,7 +84,7 @@ module.exports = (repo) => {
8084
})
8185

8286
it('peer is added to peers when message receiver or sent', (done) => {
83-
async.parallel([
87+
parallel([
8488
(cb) => newEngine('sf', cb),
8589
(cb) => newEngine('sea', cb)
8690
], (err, res) => {
@@ -129,7 +133,7 @@ module.exports = (repo) => {
129133
repo.create('p', (err, repo) => {
130134
expect(err).to.not.exist
131135

132-
async.each(alphabet, (letter, cb) => {
136+
each(alphabet, (letter, cb) => {
133137
const block = new Block(letter)
134138
repo.datastore.put(block, cb)
135139
}, (err) => {
@@ -155,8 +159,8 @@ module.exports = (repo) => {
155159
e.messageReceived(p, cancels, cb)
156160
}
157161

158-
async.eachSeries(_.range(numRounds), (i, cb) => {
159-
async.eachSeries(testCases, (testcase, innerCb) => {
162+
eachSeries(_.range(numRounds), (i, cb) => {
163+
eachSeries(testCases, (testcase, innerCb) => {
160164
const set = testcase[0]
161165
const cancels = testcase[1]
162166
const keeps = _.difference(set, cancels)
@@ -178,7 +182,7 @@ module.exports = (repo) => {
178182
const e = new Engine(repo.datastore, network)
179183
e.start()
180184
const partner = PeerId.create({bits: 64})
181-
async.series([
185+
series([
182186
(c) => partnerWants(e, set, partner, c),
183187
(c) => partnerCancels(e, cancels, partner, c)
184188
], (err) => {

test/index-test.js

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
/* eslint max-nested-callbacks: ["error", 8]*/
33
'use strict'
44

5-
const async = require('async')
5+
const map = require('async/map')
6+
const eachSeries = require('async/eachSeries')
7+
const waterfall = require('async/waterfall')
8+
const each = require('async/each')
69
const _ = require('lodash')
710
const expect = require('chai').expect
811
const PeerId = require('peer-id')
@@ -62,7 +65,7 @@ module.exports = (repo) => {
6265
expect(bs.blocksRecvd).to.be.eql(2)
6366
expect(bs.dupBlocksRecvd).to.be.eql(0)
6467

65-
async.map([b1, b1],
68+
map([b1, b1],
6669
(val, cb) => store.get(val.key, cb),
6770
(err, res) => {
6871
if (err) return done(err)
@@ -116,7 +119,7 @@ module.exports = (repo) => {
116119
return m
117120
})
118121
let i = 0
119-
async.eachSeries(others, (other, cb) => {
122+
eachSeries(others, (other, cb) => {
120123
const msg = messages[i]
121124
i++
122125
bs._receiveMessage(other, msg, (err) => {
@@ -166,7 +169,7 @@ module.exports = (repo) => {
166169
const block = makeBlock()
167170

168171
let mockNet
169-
async.waterfall([
172+
waterfall([
170173
(cb) => utils.createMockNet(repo, 2, cb),
171174
(net, cb) => {
172175
mockNet = net
@@ -218,13 +221,13 @@ module.exports = (repo) => {
218221
if (id.toHexString() !== other.toHexString()) {
219222
err = new Error('unkown peer')
220223
}
221-
async.setImmediate(() => cb(err))
224+
setImmediate(() => cb(err))
222225
},
223226
sendMessage (id, msg, cb) {
224227
if (id.toHexString() === other.toHexString()) {
225228
bs2._receiveMessage(me, msg, cb)
226229
} else {
227-
async.setImmediate(() => cb(new Error('unkown peer')))
230+
setImmediate(() => cb(new Error('unkown peer')))
228231
}
229232
},
230233
start () {},
@@ -236,13 +239,13 @@ module.exports = (repo) => {
236239
if (id.toHexString() !== me.toHexString()) {
237240
err = new Error('unkown peer')
238241
}
239-
async.setImmediate(() => cb(err))
242+
setImmediate(() => cb(err))
240243
},
241244
sendMessage (id, msg, cb) {
242245
if (id.toHexString() === me.toHexString()) {
243246
bs1._receiveMessage(other, msg, cb)
244247
} else {
245-
async.setImmediate(() => cb(new Error('unkown peer')))
248+
setImmediate(() => cb(new Error('unkown peer')))
246249
}
247250
},
248251
start () {},
@@ -254,7 +257,7 @@ module.exports = (repo) => {
254257

255258
let store2
256259

257-
async.waterfall([
260+
waterfall([
258261
(cb) => repo.create('world', cb),
259262
(repo, cb) => {
260263
store2 = repo.datastore
@@ -335,7 +338,7 @@ module.exports = (repo) => {
335338
}
336339

337340
function hasBlocks (msg, store, cb) {
338-
async.each(Array.from(msg.blocks.values()), (b, next) => {
341+
each(Array.from(msg.blocks.values()), (b, next) => {
339342
if (!b.cancel) {
340343
store.has(b.key, next)
341344
} else {

test/network/gen-bitswap-network.node.js

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
const expect = require('chai').expect
66
const utils = require('../utils')
7-
const async = require('async')
7+
const series = require('async/series')
8+
const parallel = require('async/parallel')
9+
const each = require('async/each')
810
const _ = require('lodash')
911
const Block = require('ipfs-block')
1012
const Buffer = require('safe-buffer').Buffer
@@ -23,15 +25,15 @@ describe('gen Bitswap network', function () {
2325
return new Block(b)
2426
})
2527

26-
async.series([
28+
series([
2729
(cb) => {
28-
async.parallel(blocks.map((b) => (cb) => {
30+
parallel(blocks.map((b) => (cb) => {
2931
node.bitswap.hasBlock(b, cb)
3032
}), cb)
3133
},
3234
(cb) => {
33-
async.each(_.range(100), (i, cb) => {
34-
async.parallel(blocks.map((b) => (cb) => {
35+
each(_.range(100), (i, cb) => {
36+
parallel(blocks.map((b) => (cb) => {
3537
node.bitswap.getBlock(b.key, (err, res) => {
3638
expect(err).to.not.exist
3739
expect(res).to.be.eql(b)
@@ -80,17 +82,17 @@ describe('gen Bitswap network', function () {
8082

8183
const d = (new Date()).getTime()
8284

83-
async.parallel(_.map(nodeArr, (node, i) => (callback) => {
85+
parallel(_.map(nodeArr, (node, i) => (callback) => {
8486
node.bitswap.start()
85-
async.parallel([
87+
parallel([
8688
(finish) => {
87-
async.parallel(_.range(blockFactor).map((j) => (cb) => {
89+
parallel(_.range(blockFactor).map((j) => (cb) => {
8890
// console.log('has node:%s block %s', i, i * blockFactor + j)
8991
node.bitswap.hasBlock(blocks[i * blockFactor + j], cb)
9092
}), finish)
9193
},
9294
(finish) => {
93-
async.parallel(_.map(blocks, (b, j) => (cb) => {
95+
parallel(_.map(blocks, (b, j) => (cb) => {
9496
node.bitswap.getBlock(b.key, (err, res) => {
9597
// console.log('node:%s got block: %s', i, j)
9698
expect(err).to.not.exist
@@ -107,13 +109,13 @@ describe('gen Bitswap network', function () {
107109
})
108110
}
109111

110-
async.series(
112+
series(
111113
_.range(2).map((i) => (cb) => round(i, cb)),
112114
(err) => {
113115
// setTimeout is used to avoid closing the TCP socket while spdy is
114116
// still sending a ton of signalling data
115117
setTimeout(() => {
116-
async.parallel(nodeArr.map((node) => (cb) => {
118+
parallel(nodeArr.map((node) => (cb) => {
117119
node.bitswap.stop()
118120
node.libp2p.stop(cb)
119121
}), (err2) => {

test/node.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const ncp = require('ncp')
66
const rimraf = require('rimraf')
77
const fs = require('fs-blob-store')
88
const testRepoPath = path.join(__dirname, 'test-repo')
9-
const async = require('async')
9+
const each = require('async/each')
1010

1111
// book keeping
1212
const repos = []
@@ -24,7 +24,7 @@ function createRepo (id, done) {
2424
}
2525

2626
function removeRepos (done) {
27-
async.each(repos, (repo, cb) => {
27+
each(repos, (repo, cb) => {
2828
rimraf(repo, cb)
2929
}, done)
3030
}

0 commit comments

Comments
 (0)