From f3dfceb620bc134902ec18589d3a42149fc0d868 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 18 Aug 2016 10:48:04 +0200 Subject: [PATCH] refactor: migrate to pull-streams - pull-streams - remove highland - update deps --- .travis.yml | 2 +- API.md | 53 ++-- package.json | 31 +-- src/decision/engine.js | 94 ++++---- src/decision/peer-request-queue.js | 6 +- src/index.js | 293 ++++++++++++----------- src/network/index.js | 68 +++--- src/wantmanager/index.js | 26 +- src/wantmanager/msg-queue.js | 37 +-- test/browser.js | 20 +- test/decision/engine-test.js | 192 ++++++++------- test/index-test.js | 172 ++++++++----- test/network/gen-bitswap-network.node.js | 75 +++--- test/network/network.node.js | 13 +- test/node.js | 8 +- test/utils.js | 27 ++- 16 files changed, 625 insertions(+), 492 deletions(-) diff --git a/.travis.yml b/.travis.yml index e1d6320b..0c44ec89 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ sudo: false language: node_js node_js: - 4 - - 5 + - stable # Make sure we have new NPM. before_install: diff --git a/API.md b/API.md index d02bcf94..023e4cad 100644 --- a/API.md +++ b/API.md @@ -6,39 +6,45 @@ - `id: PeerId`, the id of the local instance. - `libp2p: Libp2p`, instance of the local network stack. -- `datastore: Datastore`, instance of the local database (`IpfsRepo.datastore`) +- `blockstore: Datastore`, instance of the local database (`IpfsRepo.blockstore`) Create a new instance. -### `getBlock(key, cb)` -- `key: Multihash` -- `cb: Function` +### `getStream(key)` -Fetch a single block. +- `key: Multihash|Array` -> Note: This is safe guarded so that the network is not asked -> for blocks that are in the local `datastore`. +Returns a source `pull-stream`. Values emitted are the received blocks. -### `getBlocks(keys, cb)` - -- `keys: []Multihash` -- `cb: Function` +Example: -Fetch multiple blocks. The `cb` is called with a result object of the form ```js -{ - [key1]: {error: errorOrUndefined, block: blockOrUndefined}, - [key2]: {error: errorOrUndefined, block: blockOrUndefined}, - ... -} +// Single block +pull( + bitswap.getStream(key), + pull.collect((err, blocks) => { + // blocks === [block] + }) +) + +// Many blocks +pull( + bitswap.getStream([key1, key2, key3]), + pull.collect((err, blocks) => { + // blocks === [block1, block2, block3] + }) +) ``` -Where `key` is the multihash of the block. -### `unwantBlocks(keys)` +> Note: This is safe guarded so that the network is not asked +> for blocks that are in the local `datastore`. + -- `keys: []Multihash` +### `unwant(keys)` + +- `keys: Mutlihash|[]Multihash` Cancel previously requested keys, forcefully. That means they are removed from the wantlist independent of how many other resources requested these keys. Callbacks @@ -46,12 +52,15 @@ attached to `getBlock` are errored with `Error('manual unwant: key')`. ### `cancelWants(keys)` -- `keys: []Multihash` +- `keys: Multihash|[]Multihash` Cancel previously requested keys. +### `putStream()` + +Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored. -### `hasBlock(block, cb)` +### `put(block, cb)` - `block: IpfsBlock` - `cb: Function` diff --git a/package.json b/package.json index 67601489..8f91ad3b 100644 --- a/package.json +++ b/package.json @@ -33,35 +33,38 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "abstract-blob-store": "^3.2.0", "aegir": "^8.0.1", "buffer-loader": "0.0.1", "chai": "^3.5.0", - "fs-blob-store": "^5.2.1", - "idb-plus-blob-store": "^1.1.2", - "ipfs-repo": "^0.8.0", - "libp2p-ipfs": "^0.12.0", - "lodash": "^4.13.1", + "fs-pull-blob-store": "^0.3.0", + "idb-pull-blob-store": "^0.4.0", + "interface-pull-blob-store": "^0.5.0", + "ipfs-repo": "^0.9.0", + "libp2p-ipfs": "^0.13.0", + "lodash": "^4.15.0", "multiaddr": "^2.0.3", "ncp": "^2.0.0", "peer-book": "^0.3.0", "peer-id": "^0.7.0", "peer-info": "^0.7.1", - "rimraf": "^2.5.2", + "rimraf": "^2.5.4", "safe-buffer": "^5.0.1" }, "dependencies": { - "async": "^2.0.0-rc.5", - "bl": "^1.1.2", + "async": "^2.0.1", "debug": "^2.2.0", "heap": "^0.2.6", - "highland": "^3.0.0-beta.1", "ipfs-block": "^0.3.0", - "length-prefixed-stream": "^1.5.0", - "lodash.isequalwith": "^4.2.0", + "lodash.isequalwith": "^4.4.0", "lodash.isundefined": "^3.0.1", "multihashes": "^0.2.2", - "protocol-buffers": "^3.1.6" + "protocol-buffers": "^3.1.6", + "pull-defer": "^0.2.2", + "pull-generate": "^2.2.0", + "pull-length-prefixed": "^1.2.0", + "pull-paramap": "^1.1.6", + "pull-pushable": "^2.0.1", + "pull-stream": "^3.4.5" }, "contributors": [ "David Dias ", @@ -69,4 +72,4 @@ "Stephen Whitmore ", "dignifiedquire " ] -} \ No newline at end of file +} diff --git a/src/decision/engine.js b/src/decision/engine.js index 1d3ce31c..545a3b4b 100644 --- a/src/decision/engine.js +++ b/src/decision/engine.js @@ -1,9 +1,9 @@ 'use strict' const debug = require('debug') -const _ = require('highland') -const async = require('async') const mh = require('multihashes') +const pull = require('pull-stream') +const generate = require('pull-generate') const log = debug('bitswap:engine') log.error = debug('bitswap:engine:error') @@ -14,8 +14,8 @@ const PeerRequestQueue = require('./peer-request-queue') const Ledger = require('./ledger') module.exports = class Engine { - constructor (datastore, network) { - this.datastore = datastore + constructor (blockstore, network) { + this.blockstore = blockstore this.network = network // A list of of ledgers by their partner id @@ -45,34 +45,43 @@ module.exports = class Engine { _outbox () { if (!this._running) return - const doIt = (cb) => { - _((push, next) => { - if (!this._running) return push(null, _.nil) - const nextTask = this.peerRequestQueue.pop() + const doIt = (cb) => pull( + generate(null, (state, cb) => { + log('generating', this._running) + if (!this._running) { + return cb(true) + } - if (!nextTask) return push(null, _.nil) + const nextTask = this.peerRequestQueue.pop() + log('got task', nextTask) + if (!nextTask) { + return cb(true) + } - this.datastore.get(nextTask.entry.key, (err, block) => { - if (err || !block) { - nextTask.done() - } else { - push(null, { + pull( + this.blockstore.getStream(nextTask.entry.key), + pull.collect((err, blocks) => { + log('generated', blocks) + const block = blocks[0] + if (err || !block) { + nextTask.done() + return cb(null, false) + } + + cb(null, { peer: nextTask.target, block: block, sent: () => { nextTask.done() } }) - } - - next() - }) - }) - .flatMap((envelope) => { - return _.wrapCallback(this._sendBlock.bind(this))(envelope) - }) - .done(cb) - } + }) + ) + }), + pull.filter(Boolean), + pull.asyncMap(this._sendBlock.bind(this)), + pull.onEnd(cb) + ) if (!this._timer) { this._timer = setTimeout(() => { @@ -97,12 +106,13 @@ module.exports = class Engine { // Handle incoming messages messageReceived (peerId, msg, cb) { + const ledger = this._findOrCreate(peerId) + if (msg.empty) { log('received empty message from %s', peerId.toB58String()) + return cb() } - const ledger = this._findOrCreate(peerId) - // If the message was a full wantlist clear the current one if (msg.full) { ledger.wantlist = new Wantlist() @@ -110,27 +120,29 @@ module.exports = class Engine { this._processBlocks(msg.blocks, ledger) log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString())) - async.eachSeries( - msg.wantlist.values(), - this._processWantlist.bind(this, ledger, peerId), - (err) => { - const done = (err) => async.setImmediate(() => cb(err)) - if (err) return done(err) + + pull( + pull.values(Array.from(msg.wantlist.values())), + pull.asyncMap((entry, cb) => { + this._processWantlist(ledger, peerId, entry, cb) + }), + pull.onEnd((err) => { + if (err) return cb(err) this._outbox() - done() - } + cb() + }) ) } - receivedBlock (block) { - this._processBlock(block) + receivedBlock (key) { + this._processBlock(key) this._outbox() } - _processBlock (block) { + _processBlock (key) { // Check all connected peers if they want the block we received for (let l of this.ledgerMap.values()) { - const entry = l.wantlistContains(block.key) + const entry = l.wantlistContains(key) if (entry) { this.peerRequestQueue.push(entry, l.partner) @@ -143,13 +155,13 @@ module.exports = class Engine { log('cancel %s', mh.toB58String(entry.key)) ledger.cancelWant(entry.key) this.peerRequestQueue.remove(entry.key, peerId) - async.setImmediate(() => cb()) + setImmediate(() => cb()) } else { log('wants %s - %s', mh.toB58String(entry.key), entry.priority) ledger.wants(entry.key, entry.priority) // If we already have the block, serve it - this.datastore.has(entry.key, (err, exists) => { + this.blockstore.has(entry.key, (err, exists) => { if (err) { log('failed existence check %s', mh.toB58String(entry.key)) } else if (exists) { @@ -166,7 +178,7 @@ module.exports = class Engine { log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length) ledger.receivedBytes(block.data.length) - this.receivedBlock(block) + this.receivedBlock(block.key) } } diff --git a/src/decision/peer-request-queue.js b/src/decision/peer-request-queue.js index 62e2fda3..aec41693 100644 --- a/src/decision/peer-request-queue.js +++ b/src/decision/peer-request-queue.js @@ -152,14 +152,14 @@ function taskKey (peerId, key) { function partnerCompare (a, b) { // having no blocks in their wantlist means lowest priority - // having both of these checks ensures stability of the sort + // having both of these checks ensures stability of the sort if (a.requests === 0) return false if (b.requests === 0) return true if (a.active === b.active) { // sorting by taskQueue.size() aids in cleaning out trash entries faster - // if we sorted instead by requests, one peer could potentially build up - // a huge number of cancelled entries in the queue resulting in a memory leak + // if we sorted instead by requests, one peer could potentially build up + // a huge number of cancelled entries in the queue resulting in a memory leak return a.taskQueue.size() > b.taskQueue.size() } diff --git a/src/index.js b/src/index.js index 94bf16ec..32df2149 100644 --- a/src/index.js +++ b/src/index.js @@ -1,11 +1,15 @@ 'use strict' -const async = require('async') +const series = require('async/series') +const retry = require('async/retry') const debug = require('debug') const log = debug('bitswap') log.error = debug('bitswap:error') const EventEmitter = require('events').EventEmitter const mh = require('multihashes') +const pull = require('pull-stream') +const paramap = require('pull-paramap') +const defer = require('pull-defer/source') const cs = require('./constants') const WantManager = require('./wantmanager') @@ -13,7 +17,7 @@ const Network = require('./network') const decision = require('./decision') module.exports = class Bitwap { - constructor (p, libp2p, datastore, peerBook) { + constructor (p, libp2p, blockstore, peerBook) { // the ID of the peer to act on behalf of this.self = p @@ -21,9 +25,9 @@ module.exports = class Bitwap { this.network = new Network(libp2p, peerBook, this) // local database - this.datastore = datastore + this.blockstore = blockstore - this.engine = new decision.Engine(datastore, this.network) + this.engine = new decision.Engine(blockstore, this.network) // handle message sending this.wm = new WantManager(this.network) @@ -45,52 +49,61 @@ module.exports = class Bitwap { log('failed to receive message', incoming) } - const iblocks = incoming.blocks + const iblocks = Array.from(incoming.blocks.values()) - if (iblocks.size === 0) { + if (iblocks.length === 0) { return cb() } // quickly send out cancels, reduces chances of duplicate block receives - const keys = [] - for (let block of iblocks.values()) { - const found = this.wm.wl.contains(block.key) - if (!found) { - log('received un-askes-for %s from %s', mh.toB58String(block.key), peerId.toB58String()) - } else { - keys.push(block.key) - } - } - this.wm.cancelWants(keys) - - async.eachLimit(iblocks.values(), 10, (block, next) => { - async.series([ - (innerCb) => this._updateReceiveCounters(block, (err) => { - if (err) { - // ignore, as these have been handled in _updateReceiveCounters - return innerCb() - } - - log('got block from %s', peerId.toB58String(), block.data.toString()) - innerCb() - }), - (innerCb) => this.hasBlock(block, (err) => { - if (err) { - log.error('receiveMessage hasBlock error: %s', err.message) - } - innerCb() - }) - ], next) - }, cb) + pull( + pull.values(iblocks), + pull.map((block) => block.key), + pull.filter((key) => this.wm.wl.contains(key)), + pull.collect((err, keys) => { + if (err) { + return log.error(err) + } + this.wm.cancelWants(keys) + }) + ) + + pull( + pull.values(iblocks), + paramap(this._handleReceivedBlock.bind(this, peerId), 10), + pull.onEnd(cb) + ) }) } + _handleReceivedBlock (peerId, block, cb) { + log('handling block', block) + series([ + (cb) => this._updateReceiveCounters(block, (err) => { + if (err) { + // ignore, as these have been handled + // in _updateReceiveCounters + return cb() + } + + log('got block from %s', peerId.toB58String(), block.data.toString()) + cb() + }), + (cb) => this.put(block, (err) => { + if (err) { + log.error('receiveMessage put error: %s', err.message) + } + cb() + }) + ], cb) + } + _updateReceiveCounters (block, cb) { this.blocksRecvd ++ - this.datastore.has(block.key, (err, has) => { + this.blockstore.has(block.key, (err, has) => { if (err) { - log('datastore.has error: %s', err.message) + log('blockstore.has error: %s', err.message) return cb(err) } @@ -106,8 +119,12 @@ module.exports = class Bitwap { _tryPutBlock (block, times, cb) { log('trying to put block %s', block.data.toString()) - async.retry({times, interval: 400}, (done) => { - this.datastore.put(block, done) + retry({times, interval: 400}, (done) => { + pull( + pull.values([block]), + this.blockstore.putStream(), + pull.onEnd(done) + ) }, cb) } @@ -127,120 +144,89 @@ module.exports = class Bitwap { this.engine.peerDisconnected(peerId) } - // getBlock attempts to retrieve a particular block with key `key` from peers - getBlock (key, cb) { - const keyS = mh.toB58String(key) - log('getBlock.start %s', keyS) - const done = (err, block) => { - if (err) { - log('getBlock.fail %s', keyS) - return cb(err) - } - - if (!block) { - log('getBlock.fail %s', keyS) - return cb(new Error('Empty block received')) - } - - log('getBlock.end %s', keyS) - cb(null, block) - } - - this.getBlocks([key], (results) => { - const err = results[keyS].error - const block = results[keyS].block - - done(err, block) - }) - } - // return the current wantlist for a given `peerId` wantlistForPeer (peerId) { return this.engine.wantlistForPeer(peerId) } - getBlocks (keys, cb) { - const results = {} + getStream (keys) { + if (!Array.isArray(keys)) { + return this._getStreamSingle(keys) + } + + return pull( + pull.values(keys), + paramap((key, cb) => { + pull( + this._getStreamSingle(key), + pull.collect(cb) + ) + }), + pull.flatten() + ) + } + + _getStreamSingle (key) { const unwantListeners = {} const blockListeners = {} const unwantEvent = (key) => `unwant:${key}` const blockEvent = (key) => `block:${key}` + const d = defer() - const cleanupListeners = () => { - keys.forEach((key) => { - const keyS = mh.toB58String(key) + const cleanupListener = (key) => { + const keyS = mh.toB58String(key) + + if (unwantListeners[keyS]) { this.notifications.removeListener(unwantEvent(keyS), unwantListeners[keyS]) + delete unwantListeners[keyS] + } + + if (blockListeners[keyS]) { this.notifications.removeListener(blockEvent(keyS), blockListeners[keyS]) - }) + delete blockListeners[keyS] + } } - const addListeners = () => { - keys.forEach((key) => { - const keyS = mh.toB58String(key) - unwantListeners[keyS] = () => { - finish(keyS, new Error(`manual unwant: ${keyS}`)) - } + const addListener = (key) => { + const keyS = mh.toB58String(key) + unwantListeners[keyS] = () => { + log(`manual unwant: ${keyS}`) + cleanupListener(key) + this.wm.cancelWants([key]) + d.resolve(pull.empty()) + } - blockListeners[keyS] = (block) => { - finish(keyS, null, block) - } + blockListeners[keyS] = (block) => { + this.wm.cancelWants([block.key]) + cleanupListener(key) + d.resolve(pull.values([block])) + } - this.notifications.once(unwantEvent(keyS), unwantListeners[keyS]) - this.notifications.once(blockEvent(keyS), blockListeners[keyS]) - }) + this.notifications.once(unwantEvent(keyS), unwantListeners[keyS]) + this.notifications.once(blockEvent(keyS), blockListeners[keyS]) } - let finished = false - const finish = (key, err, block) => { - results[key] = { - error: err, - block: block + this.blockstore.has(key, (err, exists) => { + if (err) { + return d.resolve(pull.error(err)) } - - if (Object.keys(results).length === keys.length) { - cleanupListeners() - - if (!finished) { - cb(results) - } - finished = true + if (exists) { + return d.resolve(this.blockstore.getStream(key)) } - } - - addListeners() - this.wm.wantBlocks(keys) - - async.parallel(keys.map((key) => (cb) => { - // We don't want to announce looking for blocks - // when we might have them ourselves. - this.datastore.has(key, (err, exists) => { - if (err) { - log('error in datastore.has: ', err.message) - return cb() - } - - if (!exists) { - return cb() - } - - this.datastore.get(key, (err, res) => { - if (err) { - log('error in datastore.get: ', err.message) - } - if (!err && res) { - finish(mh.toB58String(key), null, res) - this.wm.cancelWants([key]) - } + addListener(key) + this.wm.wantBlocks([key]) + }) - cb() - }) - }) - })) + return d } // removes the given keys from the want list independent of any ref counts - unwantBlocks (keys) { + unwant (keys) { + if (!Array.isArray(keys)) { + keys = [keys] + } + this.wm.unwantBlocks(keys) keys.forEach((key) => { this.notifications.emit(`unwant:${mh.toB58String(key)}`) @@ -249,23 +235,46 @@ module.exports = class Bitwap { // removes the given keys from the want list cancelWants (keys) { + if (!Array.isArray(keys)) { + keys = [keys] + } this.wm.cancelWants(keys) } - // announces the existance of a block to this service - hasBlock (block, cb) { - cb = cb || (() => {}) + putStream () { + return pull( + pull.asyncMap((block, cb) => { + this.blockstore.has(block.key, (err, exists) => { + if (err) return cb(err) + cb(null, [block, exists]) + }) + }), + pull.filter((val) => !val[1]), + pull.map((val) => { + const block = val[0] + + return pull( + pull.values([block]), + this.blockstore.putStream(), + pull.through((meta) => { + const key = block.key + log('put block: %s', mh.toB58String(key)) + this.notifications.emit(`block:${mh.toB58String(key)}`, block) + this.engine.receivedBlock(key) + }) + ) + }), + pull.flatten() + ) + } - this._tryPutBlock(block, 4, (err) => { - if (err) { - log.error('Error writing block to datastore: %s', err.message) - return cb(err) - } - log('put block: %s', mh.toB58String(block.key)) - this.notifications.emit(`block:${mh.toB58String(block.key)}`, block) - this.engine.receivedBlock(block) - cb() - }) + // announces the existance of a block to this service + put (block, cb) { + pull( + pull.values([block]), + this.putStream(), + pull.onEnd(cb) + ) } getWantlist () { diff --git a/src/network/index.js b/src/network/index.js index 3300da33..b34873cd 100644 --- a/src/network/index.js +++ b/src/network/index.js @@ -1,9 +1,8 @@ 'use strict' -const bl = require('bl') -const async = require('async') const debug = require('debug') -const lps = require('length-prefixed-stream') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') const Message = require('../message') const cs = require('../constants') @@ -50,30 +49,28 @@ module.exports = class Network { } _onConnection (conn) { - const decode = lps.decode() - conn.pipe(decode).pipe(bl((err, data) => { - conn.end() - if (err) { - return this.bitswap._receiveError(err) - } - let msg - try { - msg = Message.fromProto(data) - } catch (err) { - return this.bitswap._receiveError(err) - } - conn.getPeerInfo((err, peerInfo) => { + pull( + conn, + lp.decode(), + pull.collect((err, msgs) => msgs.forEach((data) => { + log('raw message', data) if (err) { return this.bitswap._receiveError(err) } - this.bitswap._receiveMessage(peerInfo.id, msg) - }) - })) - - conn.on('error', (err) => { - this.bitswap._receiveError(err) - conn.end() - }) + let msg + try { + msg = Message.fromProto(data) + } catch (err) { + return this.bitswap._receiveError(err) + } + conn.getPeerInfo((err, peerInfo) => { + if (err) { + return this.bitswap._receiveError(err) + } + this.bitswap._receiveMessage(peerInfo.id, msg) + }) + })) + ) } _onPeerMux (peerInfo) { @@ -87,7 +84,7 @@ module.exports = class Network { // Connect to the given peer connectTo (peerId, cb) { log('connecting to %s', peerId.toB58String()) - const done = (err) => async.setImmediate(() => cb(err)) + const done = (err) => setImmediate(() => cb(err)) // NOTE: For now, all this does is ensure that we are // connected. Once we have Peer Routing, we will be able // to find the Peer @@ -101,27 +98,24 @@ module.exports = class Network { // Send the given msg (instance of Message) to the given peer sendMessage (peerId, msg, cb) { log('sendMessage to %s', peerId.toB58String()) - log('msg %s', msg.full, msg.wantlist, msg.blocks) - const done = (err) => async.setImmediate(() => cb(err)) + log('msg', msg) let peerInfo try { peerInfo = this.peerBook.getByMultihash(peerId.toBytes()) } catch (err) { - return done(err) + return cb(err) } this.libp2p.dialByPeerInfo(peerInfo, PROTOCOL_IDENTIFIER, (err, conn) => { if (err) { - return done(err) + return cb(err) } - - conn.once('error', (err) => done(err)) - conn.once('finish', done) - - const encode = lps.encode() - encode.pipe(conn) - encode.write(msg.toProto()) - encode.end() + pull( + pull.values([msg.toProto()]), + lp.encode(), + conn + ) + cb() }) } } diff --git a/src/wantmanager/index.js b/src/wantmanager/index.js index 1f0c77d8..ebce56fe 100644 --- a/src/wantmanager/index.js +++ b/src/wantmanager/index.js @@ -1,7 +1,8 @@ 'use strict' const debug = require('debug') -const _ = require('highland') +const pull = require('pull-stream') +const mh = require('multihashes') const Message = require('../message') const Wantlist = require('../wantlist') @@ -25,12 +26,13 @@ module.exports = class Wantmanager { _addEntries (keys, cancel, force) { let i = -1 - _(keys) - .map((key) => { + pull( + pull.values(keys), + pull.map((key) => { i++ return new Message.Entry(key, cs.kMaxPriority - i, cancel) - }) - .tap((e) => { + }), + pull.through((e) => { // add changes to our wantlist if (e.cancel) { if (force) { @@ -39,15 +41,18 @@ module.exports = class Wantmanager { this.wl.remove(e.key) } } else { + log('adding to wl', mh.toB58String(e.key), e.priority) this.wl.add(e.key, e.priority) } - }) - .toArray((entries) => { + }), + pull.collect((err, entries) => { + if (err) throw err // broadcast changes for (let p of this.peers.values()) { p.addEntries(entries, false) } }) + ) } _startPeerHandler (peerId) { @@ -65,6 +70,7 @@ module.exports = class Wantmanager { for (let entry of this.wl.entries()) { fullwantlist.addEntry(entry[1].key, entry[1].priority) } + mq.addMessage(fullwantlist) this.peers.set(peerId.toB58String(), mq) @@ -90,19 +96,19 @@ module.exports = class Wantmanager { // add all the keys to the wantlist wantBlocks (keys) { - log('want blocks:', keys) + log('want blocks:', keys.map((k) => mh.toB58String(k))) this._addEntries(keys, false) } // remove blocks of all the given keys without respecting refcounts unwantBlocks (keys) { - log('unwant blocks:', keys) + log('unwant blocks:', keys.map((k) => mh.toB58String(k))) this._addEntries(keys, true, true) } // cancel wanting all of the given keys cancelWants (keys) { - log('cancel wants: ', keys) + log('cancel wants: ', keys.map((k) => mh.toB58String(k))) this._addEntries(keys, true) } diff --git a/src/wantmanager/msg-queue.js b/src/wantmanager/msg-queue.js index d8cbae5c..aa61954d 100644 --- a/src/wantmanager/msg-queue.js +++ b/src/wantmanager/msg-queue.js @@ -1,7 +1,8 @@ 'use strict' const debug = require('debug') -const async = require('async') +const pull = require('pull-stream') +const pushable = require('pull-pushable') const Message = require('../message') @@ -14,13 +15,11 @@ module.exports = class MsgQueue { this.network = network this.refcnt = 1 - this.queue = async.queue(this.doWork.bind(this), 1) - // only start when `run` is called - this.queue.pause() + this.queue = pushable() } addMessage (msg) { - log('addMessage: %s', this.p.toB58String()) + log('addMessage: %s', this.p.toB58String(), msg) this.queue.push(msg) } @@ -39,35 +38,39 @@ module.exports = class MsgQueue { } doWork (wlm, cb) { - log('doWork: %s', this.p.toB58String()) + log('doWork: %s', this.p.toB58String(), wlm) + if (wlm.empty) return cb() this.network.connectTo(this.p, (err) => { if (err) { log.error('cant connect to peer %s: %s', this.p.toB58String(), err.message) return cb() } - + log('sending message', wlm) this.network.sendMessage(this.p, wlm, (err) => { if (err) { log.error('send error: %s', err.message) } - cb() }) }) } run () { - this.queue.resume() + log('starting queue') + + pull( + this.queue, + pull.asyncMap(this.doWork.bind(this)), + pull.onEnd((err) => { + if (err) { + log.error('error processing message queue', err) + } + this.queue = pushable() + }) + ) } stop () { - const done = () => { - this.queue.kill() - this.queue.pause() - } - - // Give the queue up to 1s time to finish things - this.queue.drain = done - setTimeout(done, 1000) + this.queue.end() } } diff --git a/test/browser.js b/test/browser.js index b42a8e57..8792239d 100644 --- a/test/browser.js +++ b/test/browser.js @@ -1,9 +1,10 @@ 'use strict' -const async = require('async') -const store = require('idb-plus-blob-store') +const eachSeries = require('async/eachSeries') +const Store = require('idb-pull-blob-store') const _ = require('lodash') const IPFSRepo = require('ipfs-repo') +const pull = require('pull-stream') const repoContext = require.context('buffer!./test-repo', true) @@ -24,12 +25,12 @@ function createRepo (id, done) { }) }) - const mainBlob = store(id) - const blocksBlob = store(`${id}/blocks`) + const mainBlob = new Store(id) + const blocksBlob = new Store(`${id}/blocks`) dbs.push(id) - async.eachSeries(repoData, (file, cb) => { + eachSeries(repoData, (file, cb) => { if (_.startsWith(file.key, 'datastore/')) { return cb() } @@ -39,11 +40,12 @@ function createRepo (id, done) { const key = blocks ? file.key.replace(/^blocks\//, '') : file.key - blob.createWriteStream({ - key: key - }).end(file.value, cb) + pull( + pull.values([file.value]), + blob.write(key, cb) + ) }, () => { - const repo = new IPFSRepo(id, {stores: store}) + const repo = new IPFSRepo(id, {stores: Store}) done(null, repo) }) } diff --git a/test/decision/engine-test.js b/test/decision/engine-test.js index a2b8f833..c47c8811 100644 --- a/test/decision/engine-test.js +++ b/test/decision/engine-test.js @@ -5,7 +5,11 @@ const expect = require('chai').expect const PeerId = require('peer-id') const _ = require('lodash') const Block = require('ipfs-block') -const async = require('async') +const parallel = require('async/parallel') +const series = require('async/series') +const eachSeries = require('async/eachSeries') +const pull = require('pull-stream') +const paramap = require('pull-paramap') const Message = require('../../src/message') const Engine = require('../../src/decision/engine') @@ -16,7 +20,7 @@ module.exports = (repo) => { function newEngine (id, done) { repo.create(id, (err, repo) => { if (err) return done(err) - const engine = new Engine(repo.datastore, mockNetwork()) + const engine = new Engine(repo.blockstore, mockNetwork()) engine.start() done(null, { @@ -32,7 +36,7 @@ module.exports = (repo) => { }) it('consistent accounting', (done) => { - async.parallel([ + parallel([ (cb) => newEngine('Ernie', cb), (cb) => newEngine('Bert', cb) ], (err, res) => { @@ -41,46 +45,50 @@ module.exports = (repo) => { const sender = res[0] const receiver = res[1] - async.eachLimit(_.range(1000), 100, (i, cb) => { - const m = new Message(false) - const content = `this is message ${i}` - m.addBlock(new Block(content)) - sender.engine.messageSent(receiver.peer, m) - receiver.engine.messageReceived(sender.peer, m, cb) - }, (err) => { - expect(err).to.not.exist - - expect( - sender.engine.numBytesSentTo(receiver.peer) - ).to.be.above( - 0 - ) - - expect( - sender.engine.numBytesSentTo(receiver.peer) - ).to.be.eql( - receiver.engine.numBytesReceivedFrom(sender.peer) - ) - - expect( - receiver.engine.numBytesSentTo(sender.peer) - ).to.be.eql( - 0 - ) - - expect( - sender.engine.numBytesReceivedFrom(receiver.peer) - ).to.be.eql( - 0 - ) - - done() - }) + pull( + pull.values(_.range(1000)), + paramap((i, cb) => { + const m = new Message(false) + const content = `this is message ${i}` + m.addBlock(new Block(content)) + sender.engine.messageSent(receiver.peer, m) + receiver.engine.messageReceived(sender.peer, m, cb) + }, 100), + pull.onEnd((err) => { + expect(err).to.not.exist + + expect( + sender.engine.numBytesSentTo(receiver.peer) + ).to.be.above( + 0 + ) + + expect( + sender.engine.numBytesSentTo(receiver.peer) + ).to.be.eql( + receiver.engine.numBytesReceivedFrom(sender.peer) + ) + + expect( + receiver.engine.numBytesSentTo(sender.peer) + ).to.be.eql( + 0 + ) + + expect( + sender.engine.numBytesReceivedFrom(receiver.peer) + ).to.be.eql( + 0 + ) + + done() + }) + ) }) }) it('peer is added to peers when message receiver or sent', (done) => { - async.parallel([ + parallel([ (cb) => newEngine('sf', cb), (cb) => newEngine('sea', cb) ], (err, res) => { @@ -129,60 +137,66 @@ module.exports = (repo) => { repo.create('p', (err, repo) => { expect(err).to.not.exist - async.each(alphabet, (letter, cb) => { - const block = new Block(letter) - repo.datastore.put(block, cb) - }, (err) => { - expect(err).to.not.exist + pull( + pull.values(alphabet), + pull.map((l) => new Block(l)), + repo.blockstore.putStream(), + pull.onEnd((err) => { + expect(err).to.not.exist + + const partnerWants = (e, keys, p, cb) => { + const add = new Message(false) + keys.forEach((letter, i) => { + const block = new Block(letter) + add.addEntry(block.key, Math.pow(2, 32) - 1 - i) + }) - const partnerWants = (e, keys, p, cb) => { - const add = new Message(false) - keys.forEach((letter, i) => { - const block = new Block(letter) - add.addEntry(block.key, Math.pow(2, 32) - 1 - i) - }) - - e.messageReceived(p, add, cb) - } - - const partnerCancels = (e, keys, p, cb) => { - const cancels = new Message(false) - keys.forEach((k) => { - const block = new Block(k) - cancels.cancel(block.key) - }) - - e.messageReceived(p, cancels, cb) - } - - async.eachSeries(_.range(numRounds), (i, cb) => { - async.eachSeries(testCases, (testcase, innerCb) => { - const set = testcase[0] - const cancels = testcase[1] - const keeps = _.difference(set, cancels) - - const network = mockNetwork(keeps.length, (res) => { - const msgs = _.flatten(res.messages.map( - (m) => Array.from(m[1].blocks.values()) - .map((b) => b.data.toString()) - )) + e.messageReceived(p, add, cb) + } - expect(msgs).to.be.eql(keeps) - innerCb() + const partnerCancels = (e, keys, p, cb) => { + const cancels = new Message(false) + keys.forEach((k) => { + const block = new Block(k) + cancels.cancel(block.key) }) - const e = new Engine(repo.datastore, network) - e.start() - const partner = PeerId.create({bits: 64}) - async.series([ - (c) => partnerWants(e, set, partner, c), - (c) => partnerCancels(e, cancels, partner, c) - ], (err) => { - if (err) throw err - }) - }, cb) - }, done) - }) + e.messageReceived(p, cancels, cb) + } + + eachSeries(_.range(numRounds), (i, cb) => { + eachSeries(testCases, (testcase, innerCb) => { + const set = testcase[0] + const cancels = testcase[1] + const keeps = _.difference(set, cancels) + + const messageToString = (m) => { + return Array.from(m[1].blocks.values()) + .map((b) => b.data.toString()) + } + const stringifyMessages = (messages) => { + return _.flatten(messages.map(messageToString)) + } + + const network = mockNetwork(keeps.length, (res) => { + const msgs = stringifyMessages(res.messages) + expect(msgs).to.be.eql(keeps) + innerCb() + }) + + const e = new Engine(repo.blockstore, network) + e.start() + const partner = PeerId.create({bits: 64}) + series([ + (c) => partnerWants(e, set, partner, c), + (c) => partnerCancels(e, cancels, partner, c) + ], (err) => { + if (err) throw err + }) + }, cb) + }, done) + }) + ) }) }) }) diff --git a/test/index-test.js b/test/index-test.js index af44159a..515e76e9 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -1,13 +1,17 @@ /* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const async = require('async') +const eachSeries = require('async/eachSeries') +const waterfall = require('async/waterfall') +const each = require('async/each') const _ = require('lodash') const expect = require('chai').expect const PeerId = require('peer-id') const Block = require('ipfs-block') const mh = require('multihashes') const PeerBook = require('peer-book') +const pull = require('pull-stream') const Message = require('../src/message') const Bitswap = require('../src') @@ -33,7 +37,7 @@ module.exports = (repo) => { beforeEach((done) => { repo.create('hello', (err, r) => { if (err) return done(err) - store = r.datastore + store = r.blockstore done() }) }) @@ -61,18 +65,17 @@ module.exports = (repo) => { expect(bs.blocksRecvd).to.be.eql(2) expect(bs.dupBlocksRecvd).to.be.eql(0) - async.parallel([ - (cb) => store.get(b1.key, (err, res) => { - if (err) cb(err) - expect(res).to.be.eql(b1) - cb() - }), - (cb) => store.get(b1.key, (err, res) => { - if (err) return cb(err) - expect(res).to.be.eql(b1) - cb() + pull( + pull.values([b1, b1]), + pull.map((block) => store.getStream(block.key)), + pull.flatten(), + pull.collect((err, blocks) => { + if (err) return done(err) + + expect(blocks).to.be.eql([b1, b1]) + done() }) - ], done) + ) }) }) @@ -96,6 +99,7 @@ module.exports = (repo) => { expect(bs.dupBlocksRecvd).to.be.eql(0) const wl = bs.wantlistForPeer(other) + expect(wl.has(mh.toB58String(b1.key))).to.be.eql(true) expect(wl.has(mh.toB58String(b2.key))).to.be.eql(true) @@ -118,7 +122,7 @@ module.exports = (repo) => { return m }) let i = 0 - async.eachSeries(others, (other, cb) => { + eachSeries(others, (other, cb) => { const msg = messages[i] i++ bs._receiveMessage(other, msg, (err) => { @@ -129,13 +133,13 @@ module.exports = (repo) => { }) }) - describe('getBlock', () => { + describe('getStream', () => { let store before((done) => { repo.create('hello', (err, r) => { if (err) return done(err) - store = r.datastore + store = r.blockstore done() }) }) @@ -147,18 +151,54 @@ module.exports = (repo) => { it('block exists locally', (done) => { const me = PeerId.create({bits: 64}) const block = makeBlock() - store.put(block, (err) => { - if (err) throw err - const book = new PeerBook() - const bs = new Bitswap(me, libp2pMock, store, book) - - bs.getBlock(block.key, (err, res) => { - if (err) throw err + pull( + pull.values([block]), + store.putStream(), + pull.onEnd((err) => { + if (err) return done(err) + + const book = new PeerBook() + const bs = new Bitswap(me, libp2pMock, store, book) + + pull( + bs.getStream(block.key), + pull.collect((err, res) => { + if (err) return done(err) + + expect(res).to.be.eql([block]) + done() + }) + ) + }) + ) + }) - expect(res).to.be.eql(block) - done() + it('blocks exist locally', (done) => { + const me = PeerId.create({bits: 64}) + const b1 = makeBlock() + const b2 = makeBlock() + const b3 = makeBlock() + + pull( + pull.values([b1, b2, b3]), + store.putStream(), + pull.onEnd((err) => { + if (err) return done(err) + + const book = new PeerBook() + const bs = new Bitswap(me, libp2pMock, store, book) + + pull( + bs.getStream([b1.key, b2.key, b3.key]), + pull.collect((err, res) => { + if (err) return done(err) + + expect(res).to.be.eql([b1, b2, b3]) + done() + }) + ) }) - }) + ) }) // Not sure if I understand what is going on here @@ -168,7 +208,7 @@ module.exports = (repo) => { const block = makeBlock() let mockNet - async.waterfall([ + waterfall([ (cb) => utils.createMockNet(repo, 2, cb), (net, cb) => { mockNet = net @@ -177,7 +217,13 @@ module.exports = (repo) => { (val, cb) => { mockNet.bitswaps[0]._onPeerConnected(mockNet.ids[1]) mockNet.bitswaps[1]._onPeerConnected(mockNet.ids[0]) - mockNet.bitswaps[0].getBlock(block.key, cb) + pull( + mockNet.bitswaps[0].getStream(block.key), + pull.collect((err, res) => { + if (err) return cb(err) + cb(null, res[0]) + }) + ) }, (res, cb) => { expect(res).to.be.eql(res) @@ -197,13 +243,17 @@ module.exports = (repo) => { bs.engine.network = net bs.start() - bs.getBlock(block.key, (err, res) => { - if (err) throw err - expect(res).to.be.eql(block) - done() - }) + pull( + bs.getStream(block.key), + pull.collect((err, res) => { + if (err) throw err + expect(res).to.be.eql([block]) + done() + }) + ) + setTimeout(() => { - bs.hasBlock(block, () => {}) + bs.put(block, () => {}) }, 200) }) @@ -220,13 +270,13 @@ module.exports = (repo) => { if (id.toHexString() !== other.toHexString()) { err = new Error('unkown peer') } - async.setImmediate(() => cb(err)) + setImmediate(() => cb(err)) }, sendMessage (id, msg, cb) { if (id.toHexString() === other.toHexString()) { bs2._receiveMessage(me, msg, cb) } else { - async.setImmediate(() => cb(new Error('unkown peer'))) + setImmediate(() => cb(new Error('unkown peer'))) } }, start () {}, @@ -238,13 +288,13 @@ module.exports = (repo) => { if (id.toHexString() !== me.toHexString()) { err = new Error('unkown peer') } - async.setImmediate(() => cb(err)) + setImmediate(() => cb(err)) }, sendMessage (id, msg, cb) { if (id.toHexString() === me.toHexString()) { bs1._receiveMessage(other, msg, cb) } else { - async.setImmediate(() => cb(new Error('unkown peer'))) + setImmediate(() => cb(new Error('unkown peer'))) } }, start () {}, @@ -256,19 +306,25 @@ module.exports = (repo) => { let store2 - async.waterfall([ + waterfall([ (cb) => repo.create('world', cb), (repo, cb) => { - store2 = repo.datastore + store2 = repo.blockstore bs2 = new Bitswap(other, libp2pMock, store2, new PeerBook()) utils.applyNetwork(bs2, n2) bs2.start() bs1._onPeerConnected(other) bs2._onPeerConnected(me) - bs1.getBlock(block.key, cb) + pull( + bs1.getStream(block.key), + pull.collect((err, res) => { + if (err) return cb(err) + cb(null, res[0]) + }) + ) setTimeout(() => { - bs2.hasBlock(block) + bs2.put(block) }, 1000) }, (res, cb) => { @@ -293,12 +349,12 @@ module.exports = (repo) => { }) }) - describe('unwantBlocks', () => { + describe('unwant', () => { let store beforeEach((done) => { repo.create('hello', (err, r) => { if (err) return done(err) - store = r.datastore + store = r.blockstore done() }) }) @@ -321,23 +377,31 @@ module.exports = (repo) => { } } - bs.getBlock(b.key, (err, res) => { - expect(err.message).to.be.eql(`manual unwant: ${mh.toB58String(b.key)}`) - finish() - }) - bs.getBlock(b.key, (err, res) => { - expect(err.message).to.be.eql(`manual unwant: ${mh.toB58String(b.key)}`) - finish() - }) + pull( + bs.getStream(b.key), + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res).to.be.empty + finish() + }) + ) + pull( + bs.getStream(b.key), + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res).to.be.empty + finish() + }) + ) - bs.unwantBlocks([b.key]) + setTimeout(() => bs.unwant(b.key), 10) }) }) }) } function hasBlocks (msg, store, cb) { - async.each(Array.from(msg.blocks.values()), (b, next) => { + each(Array.from(msg.blocks.values()), (b, next) => { if (!b.cancel) { store.has(b.key, next) } else { diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index 371840fd..8c8725dd 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -1,14 +1,19 @@ /* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ 'use strict' const expect = require('chai').expect const utils = require('../utils') -const async = require('async') +const series = require('async/series') +const parallel = require('async/parallel') +const each = require('async/each') const _ = require('lodash') const Block = require('ipfs-block') const Buffer = require('safe-buffer').Buffer +const pull = require('pull-stream') describe('gen Bitswap network', function () { + // CI is very slow this.timeout(300 * 1000) it('retrieves local blocks', (done) => { @@ -22,21 +27,26 @@ describe('gen Bitswap network', function () { return new Block(b) }) - async.series([ + series([ (cb) => { - async.parallel(blocks.map((b) => (cb) => { - node.bitswap.hasBlock(b, cb) - }), cb) + pull( + pull.values(blocks), + node.bitswap.putStream(), + pull.onEnd(cb) + ) }, (cb) => { - async.each(_.range(100), (i, cb) => { - async.parallel(blocks.map((b) => (cb) => { - node.bitswap.getBlock(b.key, (err, res) => { - expect(err).to.not.exist - expect(res).to.be.eql(b) + each(_.range(100), (i, cb) => { + pull( + node.bitswap.getStream( + blocks.map((b) => b.key) + ), + pull.collect((err, res) => { + if (err) return cb(err) + expect(res).to.have.length(blocks.length) cb() }) - }), cb) + ) }, cb) } ], (err) => { @@ -50,10 +60,9 @@ describe('gen Bitswap network', function () { }) // const counts = [2, 3, 4, 5, 10] - const counts = [2, 3, 5] + const counts = [2] - // TODO: Enable once we figured out why this is failing on CI - describe.skip('distributed blocks', () => { + describe('distributed blocks', () => { counts.forEach((n) => { it(`with ${n} nodes`, (done) => { utils.genBitswapNetwork(n, (err, nodeArr) => { @@ -79,40 +88,46 @@ describe('gen Bitswap network', function () { const d = (new Date()).getTime() - async.parallel(_.map(nodeArr, (node, i) => (callback) => { + parallel(_.map(nodeArr, (node, i) => (callback) => { node.bitswap.start() - async.parallel([ + parallel([ (finish) => { - async.parallel(_.range(blockFactor).map((j) => (cb) => { - // console.log('has node:%s block %s', i, i * blockFactor + j) - node.bitswap.hasBlock(blocks[i * blockFactor + j], cb) - }), finish) + pull( + pull.values( + _.range(blockFactor) + ), + pull.map((j) => blocks[i * blockFactor + j]), + node.bitswap.putStream(), + pull.onEnd(finish) + ) }, (finish) => { - async.parallel(_.map(blocks, (b, j) => (cb) => { - node.bitswap.getBlock(b.key, (err, res) => { - // console.log('node:%s got block: %s', i, j) - expect(err).to.not.exist - expect(res).to.be.eql(b) - cb() + pull( + node.bitswap.getStream( + blocks.map((b) => b.key) + ), + pull.collect((err, res) => { + if (err) return finish(err) + expect(res).to.have.length(blocks.length) + finish() }) - }), finish) + ) } ], callback) }), (err) => { if (err) return cb(err) - console.log('time -- %s', (new Date()).getTime() - d) + console.log(' time -- %s', (new Date()).getTime() - d) cb() }) } - async.series( + series( _.range(2).map((i) => (cb) => round(i, cb)), (err) => { // setTimeout is used to avoid closing the TCP socket while spdy is // still sending a ton of signalling data setTimeout(() => { - async.parallel(nodeArr.map((node) => (cb) => { + parallel(nodeArr.map((node) => (cb) => { node.bitswap.stop() node.libp2p.stop(cb) }), (err2) => { diff --git a/test/network/network.node.js b/test/network/network.node.js index d32ee114..7eff6878 100644 --- a/test/network/network.node.js +++ b/test/network/network.node.js @@ -8,7 +8,8 @@ const multiaddr = require('multiaddr') const expect = require('chai').expect const PeerBook = require('peer-book') const Block = require('ipfs-block') -const lps = require('length-prefixed-stream') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') const Network = require('../../src/network') const Message = require('../../src/message') @@ -151,11 +152,11 @@ describe('network', function () { libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap', (err, conn) => { expect(err).to.not.exist - const msgEncoded = msg.toProto() - const enc = lps.encode() - enc.pipe(conn) - enc.write(msgEncoded) - enc.end() + pull( + pull.values([msg.toProto()]), + lp.encode(), + conn + ) }) }) diff --git a/test/node.js b/test/node.js index 2842a8f6..f109f698 100644 --- a/test/node.js +++ b/test/node.js @@ -4,9 +4,9 @@ const IPFSRepo = require('ipfs-repo') const path = require('path') const ncp = require('ncp') const rimraf = require('rimraf') -const fs = require('fs-blob-store') +const Store = require('fs-pull-blob-store') const testRepoPath = path.join(__dirname, 'test-repo') -const async = require('async') +const each = require('async/each') // book keeping const repos = [] @@ -17,14 +17,14 @@ function createRepo (id, done) { ncp(testRepoPath, repoPath, (err) => { if (err) return done(err) - const repo = new IPFSRepo(repoPath, {stores: fs}) + const repo = new IPFSRepo(repoPath, {stores: Store}) repos.push(repoPath) done(null, repo) }) } function removeRepos (done) { - async.each(repos, (repo, cb) => { + each(repos, (repo, cb) => { rimraf(repo, cb) }, done) } diff --git a/test/utils.js b/test/utils.js index cbe24956..6ffee229 100644 --- a/test/utils.js +++ b/test/utils.js @@ -1,6 +1,8 @@ 'use strict' -const async = require('async') +const each = require('async/each') +const eachSeries = require('async/eachSeries') +const map = require('async/map') const _ = require('lodash') const PeerId = require('peer-id') const PeerInfo = require('peer-info') @@ -10,8 +12,7 @@ const Bitswap = require('../src') const libp2p = require('libp2p-ipfs') const os = require('os') const Repo = require('ipfs-repo') -const bs = require('abstract-blob-store') -// const bs = require('fs-blob-store') +const Store = require('interface-pull-blob-store/lib/reference') exports.mockNetwork = (calls, done) => { done = done || (() => {}) @@ -28,13 +29,13 @@ exports.mockNetwork = (calls, done) => { return { connectTo (p, cb) { - async.setImmediate(() => { + setImmediate(() => { connects.push(p) cb() }) }, sendMessage (p, msg, cb) { - async.setImmediate(() => { + setImmediate(() => { messages.push([p, msg]) cb() finish() @@ -46,9 +47,9 @@ exports.mockNetwork = (calls, done) => { } exports.createMockNet = (repo, count, cb) => { - async.map(_.range(count), (i, cb) => repo.create(`repo-${i}`, (err, res) => { + map(_.range(count), (i, cb) => repo.create(`repo-${i}`, (err, res) => { if (err) return cb(err) - cb(null, res.datastore) + cb(null, res.blockstore) }), (err, stores) => { if (err) return cb(err) @@ -58,7 +59,7 @@ exports.createMockNet = (repo, count, cb) => { const networks = _.range(count).map((i) => { return { connectTo (id, cb) { - const done = (err) => async.setImmediate(() => cb(err)) + const done = (err) => setImmediate(() => cb(err)) if (!_.includes(hexIds, id.toHexString())) { return done(new Error('unkown peer')) } @@ -128,11 +129,11 @@ exports.genBitswapNetwork = (n, callback) => { const tmpDir = os.tmpdir() netArray.forEach((net, i) => { const repoPath = tmpDir + '/' + net.peerInfo.id.toB58String() - net.repo = new Repo(repoPath, { stores: bs }) + net.repo = new Repo(repoPath, { stores: Store }) }) // start every libp2pNode - async.each(netArray, (net, cb) => { + each(netArray, (net, cb) => { net.libp2p.start(cb) }, (err) => { if (err) { @@ -144,15 +145,15 @@ exports.genBitswapNetwork = (n, callback) => { // create every BitSwap function createBitswaps () { netArray.forEach((net) => { - net.bitswap = new Bitswap(net.peerInfo, net.libp2p, net.repo.datastore, net.peerBook) + net.bitswap = new Bitswap(net.peerInfo, net.libp2p, net.repo.blockstore, net.peerBook) }) establishLinks() } // connect all the nodes between each other function establishLinks () { - async.eachSeries(netArray, (from, cbI) => { - async.eachSeries(netArray, (to, cbJ) => { + eachSeries(netArray, (from, cbI) => { + eachSeries(netArray, (to, cbJ) => { if (from.peerInfo.id.toB58String() === to.peerInfo.id.toB58String()) { return cbJ()