diff --git a/package.json b/package.json index 2a3f63e8..34ef7d3e 100644 --- a/package.json +++ b/package.json @@ -40,27 +40,30 @@ "fs-blob-store": "^5.2.1", "idb-plus-blob-store": "^1.1.2", "ipfs-repo": "^0.8.0", - "libp2p-ipfs": "^0.10.0", - "lodash": "^4.11.2", + "libp2p-ipfs": "^0.11.0", + "lodash": "^4.13.1", "multiaddr": "^2.0.2", "ncp": "^2.0.0", "peer-book": "^0.3.0", "peer-id": "^0.7.0", "peer-info": "^0.7.0", - "rimraf": "^2.5.2" + "rimraf": "^2.5.2", + "safe-buffer": "^5.0.1" }, "dependencies": { - "async": "^2.0.0-rc.4", + "async": "^2.0.0-rc.5", + "bl": "^1.1.2", "debug": "^2.2.0", "heap": "^0.2.6", "highland": "^3.0.0-beta.1", "ipfs-block": "^0.3.0", - "lodash.isequal": "^4.1.4", + "lodash.isequalwith": "^4.2.0", "lodash.isundefined": "^3.0.1", + "multihashes": "^0.2.2", "protocol-buffers": "^3.1.6" }, "contributors": [ "David Dias ", "Friedel Ziegelmayer " ] -} \ No newline at end of file +} diff --git a/src/constants.js b/src/constants.js index c3a1db3e..b612d3bc 100644 --- a/src/constants.js +++ b/src/constants.js @@ -4,7 +4,7 @@ const second = 1000 module.exports = { maxProvidersPerRequest: 3, - provierRequestTimeout: 10 * second, + providerRequestTimeout: 10 * second, hasBlockTimeout: 15 * second, provideTimeout: 15 * second, kMaxPriority: Math.pow(2, 31) - 1, diff --git a/src/decision/engine.js b/src/decision/engine.js index f46bf4f2..1d3ce31c 100644 --- a/src/decision/engine.js +++ b/src/decision/engine.js @@ -3,6 +3,7 @@ const debug = require('debug') const _ = require('highland') const async = require('async') +const mh = require('multihashes') const log = debug('bitswap:engine') log.error = debug('bitswap:engine:error') @@ -23,6 +24,8 @@ module.exports = class Engine { // A priority queue of requests received from different // peers. this.peerRequestQueue = new PeerRequestQueue() + + this._running = false } _sendBlock (env, cb) { @@ -40,16 +43,11 @@ module.exports = class Engine { } _outbox () { - if (!this._timer) { - this._timer = setTimeout(() => { - doIt(() => { - this._timer = null - }) - }, 100) - } + if (!this._running) return const doIt = (cb) => { _((push, next) => { + if (!this._running) return push(null, _.nil) const nextTask = this.peerRequestQueue.pop() if (!nextTask) return push(null, _.nil) @@ -75,6 +73,14 @@ module.exports = class Engine { }) .done(cb) } + + if (!this._timer) { + this._timer = setTimeout(() => { + doIt(() => { + this._timer = null + }) + }, 50) + } } wantlistForPeer (peerId) { @@ -103,7 +109,7 @@ module.exports = class Engine { } this._processBlocks(msg.blocks, ledger) - log('wantlist', Array.from(msg.wantlist.values())) + log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString())) async.eachSeries( msg.wantlist.values(), this._processWantlist.bind(this, ledger, peerId), @@ -112,7 +118,8 @@ module.exports = class Engine { if (err) return done(err) this._outbox() done() - }) + } + ) } receivedBlock (block) { @@ -133,20 +140,20 @@ module.exports = class Engine { _processWantlist (ledger, peerId, entry, cb) { if (entry.cancel) { - log('cancel %s', entry.key.toString('hex')) + log('cancel %s', mh.toB58String(entry.key)) ledger.cancelWant(entry.key) this.peerRequestQueue.remove(entry.key, peerId) async.setImmediate(() => cb()) } else { - log('wants %s - %s', entry.key.toString('hex'), entry.priority) + log('wants %s - %s', mh.toB58String(entry.key), entry.priority) ledger.wants(entry.key, entry.priority) // If we already have the block, serve it this.datastore.has(entry.key, (err, exists) => { if (err) { - log('failed existence check %s', entry.key.toString('hex')) + log('failed existence check %s', mh.toB58String(entry.key)) } else if (exists) { - log('has want %s', entry.key.toString('hex')) + log('has want %s', mh.toB58String(entry.key)) this.peerRequestQueue.push(entry.entry, peerId) } cb() @@ -156,7 +163,7 @@ module.exports = class Engine { _processBlocks (blocks, ledger) { for (let block of blocks.values()) { - log('got block %s %s bytes', block.key.toString('hex'), block.data.length) + log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length) ledger.receivedBytes(block.data.length) this.receivedBlock(block) @@ -200,4 +207,12 @@ module.exports = class Engine { return l } + + start () { + this._running = true + } + + stop () { + this._running = false + } } diff --git a/src/decision/peer-request-queue.js b/src/decision/peer-request-queue.js index 69c3ca49..62e2fda3 100644 --- a/src/decision/peer-request-queue.js +++ b/src/decision/peer-request-queue.js @@ -1,7 +1,13 @@ 'use strict' +const mh = require('multihashes') +const debug = require('debug') +const assert = require('assert') + const PriorityQueue = require('./pq') +const log = debug('bitswap:peer-request-queue') + class PeerRequestTask { constructor (entry, target, done) { this.entry = entry @@ -13,10 +19,16 @@ class PeerRequestTask { get key () { return taskKey(this.target, this.entry.key) } + + get [Symbol.toStringTag] () { + return `PeerRequestTask ` + } } class ActivePartner { - constructor () { + constructor (id) { + this.id = id + // The number of blocks this peer is currently being sent. this.active = 0 @@ -30,17 +42,18 @@ class ActivePartner { } startTask (key) { - this.activeBlocks.set(key, {}) + this.activeBlocks.set(mh.toB58String(key), 1) this.active ++ } taskDone (key) { - this.activeBlocks.delete(key) + const k = mh.toB58String(key) + assert(this.activeBlocks.has(k), 'finishing non existent task') + + this.activeBlocks.delete() this.active -- - if (this.active < 0) { - throw new Error('more tasks finished than started') - } + assert(this.active >= 0, 'more tasks finished than started') } } @@ -53,21 +66,24 @@ module.exports = class PeerRequestQueue { // Add a new entry to the queue push (entry, to) { + log('push, to: %s', to.toB58String()) let partner = this.partners.get(to.toB58String()) if (!partner) { - partner = new ActivePartner() + partner = new ActivePartner(to) this.pQueue.push(partner) this.partners.set(to.toB58String(), partner) } if (partner.activeBlocks.has(entry.key)) { + log('has activeBlocks', entry.key) return } let task = this.taskMap.get(taskKey(to, entry.key)) if (task) { + log('updating task', task.toString()) task.entry.priority = entry.priority partner.taskQueue.update(task) return @@ -79,6 +95,7 @@ module.exports = class PeerRequestQueue { }) partner.taskQueue.push(task) + log('taskMap.set', task.key, task.toString()) this.taskMap.set(task.key, task) partner.requests ++ partner.taskQueue.update(task) @@ -86,6 +103,8 @@ module.exports = class PeerRequestQueue { // Get the task with the hightest priority from the queue pop () { + // log('pop, empty? %s', this.pQueue.isEmpty()) + // log('partners', Array.from(this.partners.values()).map((val) => [val.requests, val.taskQueue.size()])) if (this.pQueue.isEmpty()) return let partner = this.pQueue.pop() @@ -103,7 +122,7 @@ module.exports = class PeerRequestQueue { partner.requests -- break } - + // log('pop, out', partner.taskQueue.isEmpty(), out) this.pQueue.push(partner) return out } @@ -120,11 +139,15 @@ module.exports = class PeerRequestQueue { // having canceled a block, we now account for that in the given partner this.partners.get(peerId.toB58String()).requests -- } + + log('taskMap', Array.from(this.taskMap.values()).map((v) => { + return v.toString() + })) } } function taskKey (peerId, key) { - return `${peerId.toB58String()}:${key.toString('hex')}` + return `${peerId.toB58String()}:${mh.toB58String(key)}` } function partnerCompare (a, b) { diff --git a/src/index.js b/src/index.js index 45037a1a..20b531e7 100644 --- a/src/index.js +++ b/src/index.js @@ -5,6 +5,7 @@ const debug = require('debug') const log = debug('bitswap') log.error = debug('bitswap:error') const EventEmitter = require('events').EventEmitter +const mh = require('multihashes') const cs = require('./constants') const WantManager = require('./wantmanager') @@ -55,7 +56,7 @@ module.exports = class Bitwap { for (let block of iblocks.values()) { const found = this.wm.wl.contains(block.key) if (!found) { - log('received un-askes-for %s from %s', block.key.toString('hex'), peerId.toB58String()) + log('received un-askes-for %s from %s', mh.toB58String(block.key), peerId.toB58String()) } else { keys.push(block.key) } @@ -128,19 +129,26 @@ module.exports = class Bitwap { // getBlock attempts to retrieve a particular block with key `key` from peers getBlock (key, cb) { - log('getBlock.start %s', key.toString('hex')) + const keyS = mh.toB58String(key) + log('getBlock.start %s', keyS) const done = (err, block) => { if (err) { - log('getBlock.fail %s', key.toString('hex')) - } else { - log('getBlock.end %s', key.toString('hex')) + log('getBlock.fail %s', keyS) + return cb(err) + } + + if (!block) { + log('getBlock.fail %s', keyS) + return cb(new Error('Empty block received')) } - cb(err, block) + + log('getBlock.end %s', keyS) + cb(null, block) } this.getBlocks([key], (results) => { - const err = results[key].error - const block = results[key].block + const err = results[keyS].error + const block = results[keyS].block done(err, block) }) @@ -155,28 +163,30 @@ module.exports = class Bitwap { const results = {} const unwantListeners = {} const blockListeners = {} - const unwantEvent = (key) => `unwant:${key.toString('hex')}` - const blockEvent = (key) => `block:${key.toString('hex')}` + const unwantEvent = (key) => `unwant:${key}` + const blockEvent = (key) => `block:${key}` const cleanupListeners = () => { keys.forEach((key) => { - this.notifications.removeListener(unwantEvent(key), unwantListeners[key]) - this.notifications.removeListener(blockEvent(key), blockListeners[key]) + const keyS = mh.toB58String(key) + this.notifications.removeListener(unwantEvent(keyS), unwantListeners[keyS]) + this.notifications.removeListener(blockEvent(keyS), blockListeners[keyS]) }) } const addListeners = () => { keys.forEach((key) => { - unwantListeners[key] = () => { - finish(key, new Error(`manual unwant: ${key.toString('hex')}`)) + const keyS = mh.toB58String(key) + unwantListeners[keyS] = () => { + finish(keyS, new Error(`manual unwant: ${keyS}`)) } - blockListeners[key] = (block) => { - finish(key, null, block) + blockListeners[keyS] = (block) => { + finish(keyS, null, block) } - this.notifications.once(unwantEvent(key), unwantListeners[key]) - this.notifications.once(blockEvent(key), blockListeners[key]) + this.notifications.once(unwantEvent(keyS), unwantListeners[keyS]) + this.notifications.once(blockEvent(keyS), blockListeners[keyS]) }) } @@ -193,40 +203,42 @@ module.exports = class Bitwap { } addListeners() + this.wm.wantBlocks(keys) - keys.forEach((key) => { + async.parallel(keys.map((key) => (cb) => { // We don't want to announce looking for blocks // when we might have them ourselves. this.datastore.has(key, (err, exists) => { if (err) { log('error in datastore.has: ', err.message) - return + return cb() } - if (exists) { - this.datastore.get(key, (err, res) => { - if (!err && res) { - finish(key, null, res) - this.wm.cancelWants([key]) - return - } - - if (err) { - log('error in datastore.get: ', err.message) - } - }) + if (!exists) { + return cb() } - }) - }) - this.wm.wantBlocks(keys) + this.datastore.get(key, (err, res) => { + if (err) { + log('error in datastore.get: ', err.message) + } + + if (!err && res) { + finish(mh.toB58String(key), null, res) + this.wm.cancelWants([key]) + } + + cb() + }) + }) + })) } // removes the given keys from the want list independent of any ref counts unwantBlocks (keys) { this.wm.unwantBlocks(keys) keys.forEach((key) => { - this.notifications.emit(`unwant:${key.toString('hex')}`) + this.notifications.emit(`unwant:${mh.toB58String(key)}`) }) } @@ -244,8 +256,8 @@ module.exports = class Bitwap { log.error('Error writing block to datastore: %s', err.message) return cb(err) } - log('put block: %s', block.key.toString('hex')) - this.notifications.emit(`block:${block.key.toString('hex')}`, block) + log('put block: %s', mh.toB58String(block.key)) + this.notifications.emit(`block:${mh.toB58String(block.key)}`, block) this.engine.receivedBlock(block) cb() }) @@ -268,11 +280,13 @@ module.exports = class Bitwap { start () { this.wm.run() this.network.start() + this.engine.start() } // Halt everything stop () { this.wm.stop() - this.network.start() + this.network.stop() + this.engine.stop() } } diff --git a/src/message/entry.js b/src/message/entry.js index 033a6001..4617c8de 100644 --- a/src/message/entry.js +++ b/src/message/entry.js @@ -1,5 +1,7 @@ 'use strict' +const mh = require('multihashes') + const WantlistEntry = require('../wantlist').Entry module.exports = class BitswapMessageEntry { @@ -23,4 +25,12 @@ module.exports = class BitswapMessageEntry { set priority (val) { this.entry.priority = val } + + get [Symbol.toStringTag] () { + return `BitswapMessageEntry ${mh.toB58String(this.key)} ` + } + + equals (other) { + return (this.cancel === other.cancel) && this.entry.equals(other.entry) + } } diff --git a/src/message/index.js b/src/message/index.js index 78b731ab..564acf32 100644 --- a/src/message/index.js +++ b/src/message/index.js @@ -4,7 +4,9 @@ const protobuf = require('protocol-buffers') const fs = require('fs') const Block = require('ipfs-block') const path = require('path') -const isEqual = require('lodash.isequal') +const isEqualWith = require('lodash.isequalwith') +const mh = require('multihashes') +const assert = require('assert') const pbm = protobuf(fs.readFileSync(path.join(__dirname, 'message.proto'))) const Entry = require('./entry') @@ -21,23 +23,25 @@ class BitswapMessage { } addEntry (key, priority, cancel) { - const e = this.wantlist.get(key.toString('hex')) + assert(Buffer.isBuffer(key), 'key must be a buffer') + + const e = this.wantlist.get(mh.toB58String(key)) if (e) { e.priority = priority e.cancel = Boolean(cancel) } else { - this.wantlist.set(key.toString('hex'), new Entry(key, priority, cancel)) + this.wantlist.set(mh.toB58String(key), new Entry(key, priority, cancel)) } } addBlock (block) { - this.blocks.set(block.key.toString('hex'), block) + this.blocks.set(mh.toB58String(block.key), block) } cancel (key) { - this.wantlist.delete(key.toString('hex')) - this.addEntry(key.toString('hex'), 0, true) + this.wantlist.delete(mh.toB58String(key)) + this.addEntry(key, 0, true) } toProto () { @@ -45,7 +49,7 @@ class BitswapMessage { wantlist: { entries: Array.from(this.wantlist.values()).map((e) => { return { - block: e.key.toString('hex'), + block: mh.toB58String(e.key), priority: Number(e.priority), cancel: Boolean(e.cancel) } @@ -57,15 +61,27 @@ class BitswapMessage { } equals (other) { + const cmp = (a, b) => { + if (a.equals && typeof a.equals === 'function') { + return a.equals(b) + } + } + if (this.full !== other.full || - !isEqual(this.wantlist, other.wantlist) || - !isEqual(this.blocks, other.blocks) + !isEqualWith(this.wantlist, other.wantlist, cmp) || + !isEqualWith(this.blocks, other.blocks, cmp) ) { return false } return true } + + get [Symbol.toStringTag] () { + const list = Array.from(this.wantlist.keys()) + const blocks = Array.from(this.blocks.keys()) + return `BitswapMessage ` + } } BitswapMessage.fromProto = (raw) => { @@ -73,7 +89,7 @@ BitswapMessage.fromProto = (raw) => { const m = new BitswapMessage(dec.wantlist.full) dec.wantlist.entries.forEach((e) => { - m.addEntry(new Buffer(e.block, 'hex'), e.priority, e.cancel) + m.addEntry(mh.fromB58String(e.block), e.priority, e.cancel) }) dec.blocks.forEach((b) => m.addBlock(new Block(b))) diff --git a/src/network/index.js b/src/network/index.js index 700edf2e..4fc4fc65 100644 --- a/src/network/index.js +++ b/src/network/index.js @@ -5,6 +5,7 @@ const async = require('async') const debug = require('debug') const Message = require('../message') +const cs = require('../constants') const log = debug('bitswap:network') const PROTOCOL_IDENTIFIER = '/ipfs/bitswap/1.0.0' @@ -14,6 +15,9 @@ module.exports = class Network { this.libp2p = libp2p this.peerBook = peerBook this.bitswap = bitswap + + // increase event listener max + this.libp2p.swarm.setMaxListeners(cs.maxListeners) } start () { @@ -27,13 +31,19 @@ module.exports = class Network { this.libp2p.swarm.on('peer-mux-established', this._onPeerMux) this.libp2p.swarm.on('peer-mux-closed', this._onPeerMuxClosed) + + // All existing connections are like new ones for us + const pKeys = Object.keys(this.peerBook.getAll()) + pKeys.forEach((k) => { + this._onPeerMux(this.peerBook.getByB58String(k)) + }) } stop () { this.libp2p.unhandle(PROTOCOL_IDENTIFIER) - this.libp2p.swarm.removeEventListener('peer-mux-established', this._onPeerMux) + this.libp2p.swarm.removeListener('peer-mux-established', this._onPeerMux) - this.libp2p.swarm.removeEventListener('peer-mux-closed', this._onPeerMuxClosed) + this.libp2p.swarm.removeListener('peer-mux-closed', this._onPeerMuxClosed) } _onConnection (conn) { @@ -50,6 +60,11 @@ module.exports = class Network { } this.bitswap._receiveMessage(conn.peerId, msg) })) + + conn.on('error', (err) => { + this.bitswap._receiveError(err) + conn.end() + }) } _onPeerMux (peerInfo) { diff --git a/src/wantlist/entry.js b/src/wantlist/entry.js index 2e47ada9..29a4fd39 100644 --- a/src/wantlist/entry.js +++ b/src/wantlist/entry.js @@ -1,9 +1,12 @@ 'use strict' +const assert = require('assert') const isUndefined = require('lodash.isundefined') +const mh = require('multihashes') module.exports = class WantlistEntry { constructor (key, priority) { + assert(Buffer.isBuffer(key), 'key must be a buffer') // Keep track of how many requests we have for this key this._refCounter = 1 @@ -22,4 +25,14 @@ module.exports = class WantlistEntry { hasRefs () { return this._refCounter > 0 } + + get [Symbol.toStringTag] () { + return `WantlistEntry ` + } + + equals (other) { + return (this._refCounter === other._refCounter) && + this.key.equals(other.key) && + this.priority === other.priority + } } diff --git a/src/wantlist/index.js b/src/wantlist/index.js index d08f68bd..24cc9ec1 100644 --- a/src/wantlist/index.js +++ b/src/wantlist/index.js @@ -1,5 +1,7 @@ 'use strict' +const mh = require('multihashes') + const Entry = require('./entry') class Wantlist { @@ -12,18 +14,18 @@ class Wantlist { } add (key, priority) { - const e = this.set.get(key.toString('hex')) + const e = this.set.get(mh.toB58String(key)) if (e) { e.inc() e.priority = priority } else { - this.set.set(key.toString('hex'), new Entry(key, priority)) + this.set.set(mh.toB58String(key), new Entry(key, priority)) } } remove (key) { - const e = this.set.get(key.toString('hex')) + const e = this.set.get(mh.toB58String(key)) if (!e) return @@ -32,7 +34,7 @@ class Wantlist { // only delete when no refs are held if (e.hasRefs()) return - this.set.delete(key.toString('hex')) + this.set.delete(mh.toB58String(key)) } removeForce (key) { @@ -50,7 +52,7 @@ class Wantlist { } contains (key) { - return this.set.get(key.toString('hex')) + return this.set.get(mh.toB58String(key)) } } diff --git a/test/decision/engine-test.js b/test/decision/engine-test.js index 00dbceba..a2b8f833 100644 --- a/test/decision/engine-test.js +++ b/test/decision/engine-test.js @@ -16,10 +16,12 @@ module.exports = (repo) => { function newEngine (id, done) { repo.create(id, (err, repo) => { if (err) return done(err) + const engine = new Engine(repo.datastore, mockNetwork()) + engine.start() done(null, { peer: PeerId.create({bits: 64}), - engine: new Engine(repo.datastore, mockNetwork()) + engine }) }) } @@ -170,6 +172,7 @@ module.exports = (repo) => { }) const e = new Engine(repo.datastore, network) + e.start() const partner = PeerId.create({bits: 64}) async.series([ (c) => partnerWants(e, set, partner, c), diff --git a/test/index-test.js b/test/index-test.js index 25febd7b..af44159a 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -6,6 +6,8 @@ const _ = require('lodash') const expect = require('chai').expect const PeerId = require('peer-id') const Block = require('ipfs-block') +const mh = require('multihashes') +const PeerBook = require('peer-book') const Message = require('../src/message') const Bitswap = require('../src') @@ -19,7 +21,8 @@ module.exports = (repo) => { handle: function () {}, swarm: { muxedConns: {}, - on: function () {} + on () {}, + setMaxListeners () {} } } @@ -41,7 +44,8 @@ module.exports = (repo) => { it('simple block message', (done) => { const me = PeerId.create({bits: 64}) - const bs = new Bitswap(me, libp2pMock, store) + const book = new PeerBook() + const bs = new Bitswap(me, libp2pMock, store, book) bs.start() const other = PeerId.create({bits: 64}) @@ -74,7 +78,8 @@ module.exports = (repo) => { it('simple want message', (done) => { const me = PeerId.create({bits: 64}) - const bs = new Bitswap(me, libp2pMock, store) + const book = new PeerBook() + const bs = new Bitswap(me, libp2pMock, store, book) bs.start() const other = PeerId.create({bits: 64}) @@ -91,8 +96,8 @@ module.exports = (repo) => { expect(bs.dupBlocksRecvd).to.be.eql(0) const wl = bs.wantlistForPeer(other) - expect(wl.has(b1.key.toString('hex'))).to.be.eql(true) - expect(wl.has(b2.key.toString('hex'))).to.be.eql(true) + expect(wl.has(mh.toB58String(b1.key))).to.be.eql(true) + expect(wl.has(mh.toB58String(b2.key))).to.be.eql(true) done() }) @@ -100,7 +105,8 @@ module.exports = (repo) => { it('multi peer', (done) => { const me = PeerId.create({bits: 64}) - const bs = new Bitswap(me, libp2pMock, store) + const book = new PeerBook() + const bs = new Bitswap(me, libp2pMock, store, book) bs.start() const others = _.range(5).map(() => PeerId.create({bits: 64})) @@ -122,6 +128,7 @@ module.exports = (repo) => { }, done) }) }) + describe('getBlock', () => { let store @@ -142,7 +149,8 @@ module.exports = (repo) => { const block = makeBlock() store.put(block, (err) => { if (err) throw err - const bs = new Bitswap(me, libp2pMock, store) + const book = new PeerBook() + const bs = new Bitswap(me, libp2pMock, store, book) bs.getBlock(block.key, (err, res) => { if (err) throw err @@ -181,7 +189,8 @@ module.exports = (repo) => { it('block is added locally afterwards', (done) => { const me = PeerId.create({bits: 64}) const block = makeBlock() - const bs = new Bitswap(me, libp2pMock, store) + const book = new PeerBook() + const bs = new Bitswap(me, libp2pMock, store, book) const net = utils.mockNetwork() bs.network = net bs.wm.network = net @@ -219,7 +228,9 @@ module.exports = (repo) => { } else { async.setImmediate(() => cb(new Error('unkown peer'))) } - } + }, + start () {}, + stop () {} } const n2 = { connectTo (id, cb) { @@ -235,10 +246,13 @@ module.exports = (repo) => { } else { async.setImmediate(() => cb(new Error('unkown peer'))) } - } + }, + start () {}, + stop () {} } - bs1 = new Bitswap(me, libp2pMock, store) + bs1 = new Bitswap(me, libp2pMock, store, new PeerBook()) utils.applyNetwork(bs1, n1) + bs1.start() let store2 @@ -246,8 +260,9 @@ module.exports = (repo) => { (cb) => repo.create('world', cb), (repo, cb) => { store2 = repo.datastore - bs2 = new Bitswap(other, libp2pMock, store2) + bs2 = new Bitswap(other, libp2pMock, store2, new PeerBook()) utils.applyNetwork(bs2, n2) + bs2.start() bs1._onPeerConnected(other) bs2._onPeerConnected(me) bs1.getBlock(block.key, cb) @@ -267,7 +282,7 @@ module.exports = (repo) => { describe('stat', () => { it('has initial stats', () => { const me = PeerId.create({bits: 64}) - const bs = new Bitswap(me, libp2pMock, {}) + const bs = new Bitswap(me, libp2pMock, {}, new PeerBook()) const stats = bs.stat() expect(stats).to.have.property('wantlist') @@ -294,7 +309,7 @@ module.exports = (repo) => { it('removes blocks that are wanted multiple times', (done) => { const me = PeerId.create({bits: 64}) - const bs = new Bitswap(me, libp2pMock, store) + const bs = new Bitswap(me, libp2pMock, store, new PeerBook()) bs.start() const b = makeBlock() @@ -307,11 +322,11 @@ module.exports = (repo) => { } bs.getBlock(b.key, (err, res) => { - expect(err.message).to.be.eql(`manual unwant: ${b.key.toString('hex')}`) + expect(err.message).to.be.eql(`manual unwant: ${mh.toB58String(b.key)}`) finish() }) bs.getBlock(b.key, (err, res) => { - expect(err.message).to.be.eql(`manual unwant: ${b.key.toString('hex')}`) + expect(err.message).to.be.eql(`manual unwant: ${mh.toB58String(b.key)}`) finish() }) diff --git a/test/message.spec.js b/test/message.spec.js index eef11701..b51cfc88 100644 --- a/test/message.spec.js +++ b/test/message.spec.js @@ -6,6 +6,7 @@ const fs = require('fs') const Block = require('ipfs-block') const protobuf = require('protocol-buffers') const path = require('path') +const mh = require('multihashes') const pbm = protobuf(fs.readFileSync(path.join(__dirname, '../src/message/message.proto'))) const BitswapMessage = require('../src/message') @@ -20,7 +21,7 @@ describe('BitswapMessage', () => { expect( pbm.Message.decode(m.toProto()).wantlist.entries[0] ).to.be.eql({ - block: block.key.toString('hex'), + block: mh.toB58String(block.key), priority: 1, cancel: false }) @@ -42,7 +43,7 @@ describe('BitswapMessage', () => { const raw = pbm.Message.encode({ wantlist: { entries: [{ - block: new Buffer('hello').toString('hex'), + block: mh.toB58String(new Buffer('hello')), cancel: false }], full: true @@ -60,7 +61,7 @@ describe('BitswapMessage', () => { expect( Array.from(protoMessage.wantlist) ).to.be.eql([ - [new Buffer('hello').toString('hex'), new BitswapMessage.Entry(new Buffer('hello'), 0, false)] + [mh.toB58String(new Buffer('hello')), new BitswapMessage.Entry(new Buffer('hello'), 0, false)] ]) const b1 = new Block('hello') @@ -68,8 +69,8 @@ describe('BitswapMessage', () => { expect( Array.from(protoMessage.blocks) ).to.be.eql([ - [b1.key.toString('hex'), b1], - [b2.key.toString('hex'), b2] + [mh.toB58String(b1.key), b1], + [mh.toB58String(b2.key), b2] ]) }) @@ -138,24 +139,25 @@ describe('BitswapMessage', () => { describe('Entry', () => { it('exposes the wantlist entry properties', () => { - const entry = new BitswapMessage.Entry('hello', 5, false) + const entry = new BitswapMessage.Entry(new Buffer('hello'), 5, false) - expect(entry).to.have.property('key', 'hello') + expect(entry).to.have.property('key') expect(entry).to.have.property('priority', 5) expect(entry).to.have.property('cancel', false) }) it('allows setting properties on the wantlist entry', () => { - const entry = new BitswapMessage.Entry('hello', 5, false) + const entry = new BitswapMessage.Entry(new Buffer('hello'), 5, false) - expect(entry.entry).to.have.property('key', 'hello') + expect(entry.entry).to.have.property('key') expect(entry.entry).to.have.property('priority', 5) - entry.key = 'world' + entry.key = new Buffer('world') entry.priority = 2 - expect(entry.entry).to.have.property('key', 'world') + expect(entry.entry).to.have.property('key') + expect(entry.entry.key.equals(new Buffer('world'))) expect(entry.entry).to.have.property('priority', 2) }) }) diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index 5820e575..371840fd 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -1,66 +1,128 @@ /* eslint-env mocha */ - 'use strict' const expect = require('chai').expect const utils = require('../utils') const async = require('async') +const _ = require('lodash') +const Block = require('ipfs-block') +const Buffer = require('safe-buffer').Buffer describe('gen Bitswap network', function () { this.timeout(300 * 1000) - it('gen a network with 2 bitswap nodes', (done) => { - const n = 2 - utils.genBitswapNetwork(n, (err, nodeArr) => { + it('retrieves local blocks', (done) => { + utils.genBitswapNetwork(1, (err, nodes) => { expect(err).to.not.exist - nodeArr.forEach((node) => { - expect(node.bitswap).to.exist - expect(node.peerInfo).to.exist - expect(node.libp2p).to.exist - expect(Object.keys(node.libp2p.swarm.muxedConns).length).to.equal(n - 1) - expect(node.repo).to.exist - }) - cleanUp(nodeArr) - }) - - function cleanUp (nodeArr) { - // setTimeout is used to avoid closing the TCP socket while spdy is - // still sending a ton of signalling data - setTimeout(() => { - async.each(nodeArr, (node, callback) => { - node.libp2p.swarm.close(callback) - }, done) - }, 1000) - } - }) - it('gen a network with 3 bitswap nodes', (done) => { - const n = 3 - utils.genBitswapNetwork(n, (err, nodeArr) => { - expect(err).to.not.exist - nodeArr.forEach((node) => { - expect(node.bitswap).to.exist - expect(node.peerInfo).to.exist - expect(node.libp2p).to.exist - expect(Object.keys(node.libp2p.swarm.conns).length).to.equal(0) - expect(Object.keys(node.libp2p.swarm.muxedConns).length).to.equal(n - 1) - expect(node.repo).to.exist + const node = nodes[0] + const blocks = _.range(100).map((k) => { + const b = Buffer.alloc(1024) + b.fill(k) + return new Block(b) }) - cleanUp(nodeArr) - }) - function cleanUp (nodeArr) { - // setTimeout is used to avoid closing the TCP socket while spdy is - // still sending a ton of signalling data - const tasks = nodeArr.map((node) => { - return (cb) => { - node.libp2p.swarm.close(cb) + async.series([ + (cb) => { + async.parallel(blocks.map((b) => (cb) => { + node.bitswap.hasBlock(b, cb) + }), cb) + }, + (cb) => { + async.each(_.range(100), (i, cb) => { + async.parallel(blocks.map((b) => (cb) => { + node.bitswap.getBlock(b.key, (err, res) => { + expect(err).to.not.exist + expect(res).to.be.eql(b) + cb() + }) + }), cb) + }, cb) } + ], (err) => { + expect(err).to.not.exist + setTimeout(() => { + node.bitswap.stop() + node.libp2p.stop(done) + }) }) + }) + }) - setTimeout(() => { - async.parallel(tasks, done) - }, 1000) - } + // const counts = [2, 3, 4, 5, 10] + const counts = [2, 3, 5] + + // TODO: Enable once we figured out why this is failing on CI + describe.skip('distributed blocks', () => { + counts.forEach((n) => { + it(`with ${n} nodes`, (done) => { + utils.genBitswapNetwork(n, (err, nodeArr) => { + expect(err).to.not.exist + nodeArr.forEach((node) => { + expect(node.bitswap).to.exist + expect(node.libp2p).to.exist + expect(Object.keys(node.libp2p.swarm.conns).length).to.equal(0) + expect(Object.keys(node.libp2p.swarm.muxedConns).length).to.equal(n - 1) + expect(node.repo).to.exist + }) + + // -- actual test + + const round = (j, cb) => { + const blockFactor = 10 + const blocks = _.range(n * blockFactor).map((k) => { + const buf = Buffer.alloc(1024) + buf.fill(k) + buf[0] = j + return new Block(buf) + }) + + const d = (new Date()).getTime() + + async.parallel(_.map(nodeArr, (node, i) => (callback) => { + node.bitswap.start() + async.parallel([ + (finish) => { + async.parallel(_.range(blockFactor).map((j) => (cb) => { + // console.log('has node:%s block %s', i, i * blockFactor + j) + node.bitswap.hasBlock(blocks[i * blockFactor + j], cb) + }), finish) + }, + (finish) => { + async.parallel(_.map(blocks, (b, j) => (cb) => { + node.bitswap.getBlock(b.key, (err, res) => { + // console.log('node:%s got block: %s', i, j) + expect(err).to.not.exist + expect(res).to.be.eql(b) + cb() + }) + }), finish) + } + ], callback) + }), (err) => { + if (err) return cb(err) + console.log('time -- %s', (new Date()).getTime() - d) + cb() + }) + } + + async.series( + _.range(2).map((i) => (cb) => round(i, cb)), + (err) => { + // setTimeout is used to avoid closing the TCP socket while spdy is + // still sending a ton of signalling data + setTimeout(() => { + async.parallel(nodeArr.map((node) => (cb) => { + node.bitswap.stop() + node.libp2p.stop(cb) + }), (err2) => { + done(err || err2) + }) + }, 2000) + } + ) + }) + }) + }) }) }) diff --git a/test/utils.js b/test/utils.js index 63354db7..cbe24956 100644 --- a/test/utils.js +++ b/test/utils.js @@ -102,11 +102,11 @@ exports.genBitswapNetwork = (n, callback) => { const p = new PeerInfo() const mh1 = multiaddr('/ip4/127.0.0.1/tcp/' + (basePort + i) + '/ipfs/' + p.id.toB58String()) - const mh2 = multiaddr('/ip4/127.0.0.1/tcp/' + (basePort + i + 2000) + '/ws' + - '/ipfs/' + p.id.toB58String()) - p.multiaddr.add(mh1) - p.multiaddr.add(mh2) + + // const mh2 = multiaddr('/ip4/127.0.0.1/tcp/' + (basePort + i + 2000) + '/ws' + + // '/ipfs/' + p.id.toB58String()) + // p.multiaddr.add(mh2) const l = new libp2p.Node(p) netArray.push({peerInfo: p, libp2p: l}) @@ -144,7 +144,7 @@ exports.genBitswapNetwork = (n, callback) => { // create every BitSwap function createBitswaps () { netArray.forEach((net) => { - net.bitswap = new Bitswap(net.peerInfo, net.libp2p, net.repo, net.peerBook) + net.bitswap = new Bitswap(net.peerInfo, net.libp2p, net.repo.datastore, net.peerBook) }) establishLinks() } @@ -177,4 +177,3 @@ exports.genBitswapNetwork = (n, callback) => { callback(null, netArray) } } - diff --git a/test/wantlist.spec.js b/test/wantlist.spec.js index 6c759b9d..f684b243 100644 --- a/test/wantlist.spec.js +++ b/test/wantlist.spec.js @@ -3,6 +3,7 @@ const expect = require('chai').expect const Block = require('ipfs-block') +const mh = require('multihashes') const Wantlist = require('../src/wantlist') @@ -73,7 +74,7 @@ describe('Wantlist', () => { expect( Array.from(wm.entries()) ).to.be.eql([ - [b.key.toString('hex'), new Wantlist.Entry(b.key, 2)] + [mh.toB58String(b.key), new Wantlist.Entry(b.key, 2)] ]) }) @@ -87,8 +88,8 @@ describe('Wantlist', () => { expect( Array.from(wm.sortedEntries()) ).to.be.eql([ - [b1.key.toString('hex'), new Wantlist.Entry(b1.key, 1)], - [b2.key.toString('hex'), new Wantlist.Entry(b2.key, 1)] + [mh.toB58String(b1.key), new Wantlist.Entry(b1.key, 1)], + [mh.toB58String(b2.key), new Wantlist.Entry(b2.key, 1)] ]) }) diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js index 6a705936..cd761479 100644 --- a/test/wantmanager/index.spec.js +++ b/test/wantmanager/index.spec.js @@ -18,14 +18,14 @@ describe('Wantmanager', () => { const network = mockNetwork(6, (calls) => { expect(calls.connects).to.have.length(6) const m1 = new Message(true) - m1.addEntry('hello', cs.kMaxPriority) - m1.addEntry('world', cs.kMaxPriority - 1) + m1.addEntry(new Buffer('hello'), cs.kMaxPriority) + m1.addEntry(new Buffer('world'), cs.kMaxPriority - 1) const m2 = new Message(false) - m2.cancel('world') + m2.cancel(new Buffer('world')) const m3 = new Message(false) - m3.addEntry('foo', cs.kMaxPriority) + m3.addEntry(new Buffer('foo'), cs.kMaxPriority) const msgs = [m1, m1, m2, m2, m3, m3] @@ -41,15 +41,15 @@ describe('Wantmanager', () => { wm = new Wantmanager(network) wm.run() - wm.wantBlocks(['hello', 'world']) + wm.wantBlocks([new Buffer('hello'), new Buffer('world')]) wm.connected(peer1) wm.connected(peer2) setTimeout(() => { - wm.cancelWants(['world']) + wm.cancelWants([new Buffer('world')]) setTimeout(() => { - wm.wantBlocks(['foo']) + wm.wantBlocks([new Buffer('foo')]) wm.disconnected(peer1) wm.disconnected(peer2) diff --git a/test/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js index d20d8682..9e41a613 100644 --- a/test/wantmanager/msg-queue.spec.js +++ b/test/wantmanager/msg-queue.spec.js @@ -11,8 +11,8 @@ describe('MsgQueue', () => { it('connects and sends messages', (done) => { const id = PeerId.create({bits: 64}) const msg = new Message(true) - msg.addEntry('hello world', 3) - msg.addEntry('foo bar', 1) + msg.addEntry(new Buffer('hello world'), 3) + msg.addEntry(new Buffer('foo bar'), 1) const messages = [] const connects = [] @@ -27,11 +27,11 @@ describe('MsgQueue', () => { ]) const m1 = new Message(false) - m1.addEntry('hello', 1) - m1.addEntry('world', 2) + m1.addEntry(new Buffer('hello'), 1) + m1.addEntry(new Buffer('world'), 2) const m2 = new Message(false) - m2.cancel('foo') - m2.cancel('bar') + m2.cancel(new Buffer('foo')) + m2.cancel(new Buffer('bar')) expect( messages @@ -61,13 +61,13 @@ describe('MsgQueue', () => { expect(mq.refcnt).to.be.eql(1) const batch1 = [ - new Message.Entry('hello', 1, false), - new Message.Entry('world', 2, false) + new Message.Entry(new Buffer('hello'), 1, false), + new Message.Entry(new Buffer('world'), 2, false) ] const batch2 = [ - new Message.Entry('foo', 1, true), - new Message.Entry('bar', 2, true) + new Message.Entry(new Buffer('foo'), 1, true), + new Message.Entry(new Buffer('bar'), 2, true) ] mq.run()