Skip to content

Stronger faster better #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,30 @@
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-repo": "^0.8.0",
"libp2p-ipfs": "^0.10.0",
"lodash": "^4.11.2",
"libp2p-ipfs": "^0.11.0",
"lodash": "^4.13.1",
"multiaddr": "^2.0.2",
"ncp": "^2.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.7.0",
"peer-info": "^0.7.0",
"rimraf": "^2.5.2"
"rimraf": "^2.5.2",
"safe-buffer": "^5.0.1"
},
"dependencies": {
"async": "^2.0.0-rc.4",
"async": "^2.0.0-rc.5",
"bl": "^1.1.2",
"debug": "^2.2.0",
"heap": "^0.2.6",
"highland": "^3.0.0-beta.1",
"ipfs-block": "^0.3.0",
"lodash.isequal": "^4.1.4",
"lodash.isequalwith": "^4.2.0",
"lodash.isundefined": "^3.0.1",
"multihashes": "^0.2.2",
"protocol-buffers": "^3.1.6"
},
"contributors": [
"David Dias <[email protected]>",
"Friedel Ziegelmayer <[email protected]>"
]
}
}
2 changes: 1 addition & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const second = 1000

module.exports = {
maxProvidersPerRequest: 3,
provierRequestTimeout: 10 * second,
providerRequestTimeout: 10 * second,
hasBlockTimeout: 15 * second,
provideTimeout: 15 * second,
kMaxPriority: Math.pow(2, 31) - 1,
Expand Down
43 changes: 29 additions & 14 deletions src/decision/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const debug = require('debug')
const _ = require('highland')
const async = require('async')
const mh = require('multihashes')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')
Expand All @@ -23,6 +24,8 @@ module.exports = class Engine {
// A priority queue of requests received from different
// peers.
this.peerRequestQueue = new PeerRequestQueue()

this._running = false
}

_sendBlock (env, cb) {
Expand All @@ -40,16 +43,11 @@ module.exports = class Engine {
}

_outbox () {
if (!this._timer) {
this._timer = setTimeout(() => {
doIt(() => {
this._timer = null
})
}, 100)
}
if (!this._running) return

const doIt = (cb) => {
_((push, next) => {
if (!this._running) return push(null, _.nil)
const nextTask = this.peerRequestQueue.pop()

if (!nextTask) return push(null, _.nil)
Expand All @@ -75,6 +73,14 @@ module.exports = class Engine {
})
.done(cb)
}

if (!this._timer) {
this._timer = setTimeout(() => {
doIt(() => {
this._timer = null
})
}, 50)
}
}

wantlistForPeer (peerId) {
Expand Down Expand Up @@ -103,7 +109,7 @@ module.exports = class Engine {
}

this._processBlocks(msg.blocks, ledger)
log('wantlist', Array.from(msg.wantlist.values()))
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))
async.eachSeries(
msg.wantlist.values(),
this._processWantlist.bind(this, ledger, peerId),
Expand All @@ -112,7 +118,8 @@ module.exports = class Engine {
if (err) return done(err)
this._outbox()
done()
})
}
)
}

receivedBlock (block) {
Expand All @@ -133,20 +140,20 @@ module.exports = class Engine {

_processWantlist (ledger, peerId, entry, cb) {
if (entry.cancel) {
log('cancel %s', entry.key.toString('hex'))
log('cancel %s', mh.toB58String(entry.key))
ledger.cancelWant(entry.key)
this.peerRequestQueue.remove(entry.key, peerId)
async.setImmediate(() => cb())
} else {
log('wants %s - %s', entry.key.toString('hex'), entry.priority)
log('wants %s - %s', mh.toB58String(entry.key), entry.priority)
ledger.wants(entry.key, entry.priority)

// If we already have the block, serve it
this.datastore.has(entry.key, (err, exists) => {
if (err) {
log('failed existence check %s', entry.key.toString('hex'))
log('failed existence check %s', mh.toB58String(entry.key))
} else if (exists) {
log('has want %s', entry.key.toString('hex'))
log('has want %s', mh.toB58String(entry.key))
this.peerRequestQueue.push(entry.entry, peerId)
}
cb()
Expand All @@ -156,7 +163,7 @@ module.exports = class Engine {

_processBlocks (blocks, ledger) {
for (let block of blocks.values()) {
log('got block %s %s bytes', block.key.toString('hex'), block.data.length)
log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length)
ledger.receivedBytes(block.data.length)

this.receivedBlock(block)
Expand Down Expand Up @@ -200,4 +207,12 @@ module.exports = class Engine {

return l
}

start () {
this._running = true
}

stop () {
this._running = false
}
}
41 changes: 32 additions & 9 deletions src/decision/peer-request-queue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
'use strict'

const mh = require('multihashes')
const debug = require('debug')
const assert = require('assert')

const PriorityQueue = require('./pq')

const log = debug('bitswap:peer-request-queue')

class PeerRequestTask {
constructor (entry, target, done) {
this.entry = entry
Expand All @@ -13,10 +19,16 @@ class PeerRequestTask {
get key () {
return taskKey(this.target, this.entry.key)
}

get [Symbol.toStringTag] () {
return `PeerRequestTask <target: ${this.target.toB58String()}, entry: ${this.entry.toString()}>`
}
}

class ActivePartner {
constructor () {
constructor (id) {
this.id = id

// The number of blocks this peer is currently being sent.
this.active = 0

Expand All @@ -30,17 +42,18 @@ class ActivePartner {
}

startTask (key) {
this.activeBlocks.set(key, {})
this.activeBlocks.set(mh.toB58String(key), 1)
this.active ++
}

taskDone (key) {
this.activeBlocks.delete(key)
const k = mh.toB58String(key)
assert(this.activeBlocks.has(k), 'finishing non existent task')

this.activeBlocks.delete()
this.active --

if (this.active < 0) {
throw new Error('more tasks finished than started')
}
assert(this.active >= 0, 'more tasks finished than started')
}
}

Expand All @@ -53,21 +66,24 @@ module.exports = class PeerRequestQueue {

// Add a new entry to the queue
push (entry, to) {
log('push, to: %s', to.toB58String())
let partner = this.partners.get(to.toB58String())

if (!partner) {
partner = new ActivePartner()
partner = new ActivePartner(to)
this.pQueue.push(partner)
this.partners.set(to.toB58String(), partner)
}

if (partner.activeBlocks.has(entry.key)) {
log('has activeBlocks', entry.key)
return
}

let task = this.taskMap.get(taskKey(to, entry.key))

if (task) {
log('updating task', task.toString())
task.entry.priority = entry.priority
partner.taskQueue.update(task)
return
Expand All @@ -79,13 +95,16 @@ module.exports = class PeerRequestQueue {
})

partner.taskQueue.push(task)
log('taskMap.set', task.key, task.toString())
this.taskMap.set(task.key, task)
partner.requests ++
partner.taskQueue.update(task)
}

// Get the task with the hightest priority from the queue
pop () {
// log('pop, empty? %s', this.pQueue.isEmpty())
// log('partners', Array.from(this.partners.values()).map((val) => [val.requests, val.taskQueue.size()]))
if (this.pQueue.isEmpty()) return

let partner = this.pQueue.pop()
Expand All @@ -103,7 +122,7 @@ module.exports = class PeerRequestQueue {
partner.requests --
break
}

// log('pop, out', partner.taskQueue.isEmpty(), out)
this.pQueue.push(partner)
return out
}
Expand All @@ -120,11 +139,15 @@ module.exports = class PeerRequestQueue {
// having canceled a block, we now account for that in the given partner
this.partners.get(peerId.toB58String()).requests --
}

log('taskMap', Array.from(this.taskMap.values()).map((v) => {
return v.toString()
}))
}
}

function taskKey (peerId, key) {
return `${peerId.toB58String()}:${key.toString('hex')}`
return `${peerId.toB58String()}:${mh.toB58String(key)}`
}

function partnerCompare (a, b) {
Expand Down
Loading