Skip to content

[WIP] Pull streams #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: false
language: node_js
node_js:
- 4
- 5
- stable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add Node.js 5 too


# Make sure we have new NPM.
before_install:
Expand Down
53 changes: 31 additions & 22 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,61 @@

- `id: PeerId`, the id of the local instance.
- `libp2p: Libp2p`, instance of the local network stack.
- `datastore: Datastore`, instance of the local database (`IpfsRepo.datastore`)
- `blockstore: Datastore`, instance of the local database (`IpfsRepo.blockstore`)

Create a new instance.

### `getBlock(key, cb)`

- `key: Multihash`
- `cb: Function`
### `getStream(key)`

Fetch a single block.
- `key: Multihash|Array`

> Note: This is safe guarded so that the network is not asked
> for blocks that are in the local `datastore`.
Returns a source `pull-stream`. Values emitted are the received blocks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to return several blocks for one key? Seems that there is a need for get(key), get(keys), getStream(keys)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, it now takes an array, or a single key.


### `getBlocks(keys, cb)`

- `keys: []Multihash`
- `cb: Function`
Example:

Fetch multiple blocks. The `cb` is called with a result object of the form
```js
{
[key1]: {error: errorOrUndefined, block: blockOrUndefined},
[key2]: {error: errorOrUndefined, block: blockOrUndefined},
...
}
// Single block
pull(
bitswap.getStream(key),
pull.collect((err, blocks) => {
// blocks === [block]
})
)

// Many blocks
pull(
bitswap.getStream([key1, key2, key3]),
pull.collect((err, blocks) => {
// blocks === [block1, block2, block3]
})
)
```

Where `key<i>` is the multihash of the block.

### `unwantBlocks(keys)`
> Note: This is safe guarded so that the network is not asked
> for blocks that are in the local `datastore`.


- `keys: []Multihash`
### `unwant(keys)`

- `keys: Mutlihash|[]Multihash`

Cancel previously requested keys, forcefully. That means they are removed from the
wantlist independent of how many other resources requested these keys. Callbacks
attached to `getBlock` are errored with `Error('manual unwant: key')`.

### `cancelWants(keys)`

- `keys: []Multihash`
- `keys: Multihash|[]Multihash`

Cancel previously requested keys.

### `putStream()`

Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored.

### `hasBlock(block, cb)`
### `put(block, cb)`

- `block: IpfsBlock`
- `cb: Function`
Expand Down
31 changes: 17 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,43 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"abstract-blob-store": "^3.2.0",
"aegir": "^8.0.1",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-repo": "^0.8.0",
"libp2p-ipfs": "^0.12.0",
"lodash": "^4.13.1",
"fs-pull-blob-store": "^0.3.0",
"idb-pull-blob-store": "^0.4.0",
"interface-pull-blob-store": "^0.5.0",
"ipfs-repo": "^0.9.0",
"libp2p-ipfs": "^0.13.0",
"lodash": "^4.15.0",
"multiaddr": "^2.0.3",
"ncp": "^2.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.7.0",
"peer-info": "^0.7.1",
"rimraf": "^2.5.2",
"rimraf": "^2.5.4",
"safe-buffer": "^5.0.1"
},
"dependencies": {
"async": "^2.0.0-rc.5",
"bl": "^1.1.2",
"async": "^2.0.1",
"debug": "^2.2.0",
"heap": "^0.2.6",
"highland": "^3.0.0-beta.1",
"ipfs-block": "^0.3.0",
"length-prefixed-stream": "^1.5.0",
"lodash.isequalwith": "^4.2.0",
"lodash.isequalwith": "^4.4.0",
"lodash.isundefined": "^3.0.1",
"multihashes": "^0.2.2",
"protocol-buffers": "^3.1.6"
"protocol-buffers": "^3.1.6",
"pull-defer": "^0.2.2",
"pull-generate": "^2.2.0",
"pull-length-prefixed": "^1.2.0",
"pull-paramap": "^1.1.6",
"pull-pushable": "^2.0.1",
"pull-stream": "^3.4.5"
},
"contributors": [
"David Dias <[email protected]>",
"Richard Littauer <[email protected]>",
"Stephen Whitmore <[email protected]>",
"dignifiedquire <[email protected]>"
]
}
}
94 changes: 53 additions & 41 deletions src/decision/engine.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict'

const debug = require('debug')
const _ = require('highland')
const async = require('async')
const mh = require('multihashes')
const pull = require('pull-stream')
const generate = require('pull-generate')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')
Expand All @@ -14,8 +14,8 @@ const PeerRequestQueue = require('./peer-request-queue')
const Ledger = require('./ledger')

module.exports = class Engine {
constructor (datastore, network) {
this.datastore = datastore
constructor (blockstore, network) {
this.blockstore = blockstore
this.network = network

// A list of of ledgers by their partner id
Expand Down Expand Up @@ -45,34 +45,43 @@ module.exports = class Engine {
_outbox () {
if (!this._running) return

const doIt = (cb) => {
_((push, next) => {
if (!this._running) return push(null, _.nil)
const nextTask = this.peerRequestQueue.pop()
const doIt = (cb) => pull(
generate(null, (state, cb) => {
log('generating', this._running)
if (!this._running) {
return cb(true)
}

if (!nextTask) return push(null, _.nil)
const nextTask = this.peerRequestQueue.pop()
log('got task', nextTask)
if (!nextTask) {
return cb(true)
}

this.datastore.get(nextTask.entry.key, (err, block) => {
if (err || !block) {
nextTask.done()
} else {
push(null, {
pull(
this.blockstore.getStream(nextTask.entry.key),
pull.collect((err, blocks) => {
log('generated', blocks)
const block = blocks[0]
if (err || !block) {
nextTask.done()
return cb(null, false)
}

cb(null, {
peer: nextTask.target,
block: block,
sent: () => {
nextTask.done()
}
})
}

next()
})
})
.flatMap((envelope) => {
return _.wrapCallback(this._sendBlock.bind(this))(envelope)
})
.done(cb)
}
})
)
}),
pull.filter(Boolean),
pull.asyncMap(this._sendBlock.bind(this)),
pull.onEnd(cb)
)

if (!this._timer) {
this._timer = setTimeout(() => {
Expand All @@ -97,40 +106,43 @@ module.exports = class Engine {

// Handle incoming messages
messageReceived (peerId, msg, cb) {
const ledger = this._findOrCreate(peerId)

if (msg.empty) {
log('received empty message from %s', peerId.toB58String())
return cb()
}

const ledger = this._findOrCreate(peerId)

// If the message was a full wantlist clear the current one
if (msg.full) {
ledger.wantlist = new Wantlist()
}

this._processBlocks(msg.blocks, ledger)
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))
async.eachSeries(
msg.wantlist.values(),
this._processWantlist.bind(this, ledger, peerId),
(err) => {
const done = (err) => async.setImmediate(() => cb(err))
if (err) return done(err)

pull(
pull.values(Array.from(msg.wantlist.values())),
pull.asyncMap((entry, cb) => {
this._processWantlist(ledger, peerId, entry, cb)
}),
pull.onEnd((err) => {
if (err) return cb(err)
this._outbox()
done()
}
cb()
})
)
}

receivedBlock (block) {
this._processBlock(block)
receivedBlock (key) {
this._processBlock(key)
this._outbox()
}

_processBlock (block) {
_processBlock (key) {
// Check all connected peers if they want the block we received
for (let l of this.ledgerMap.values()) {
const entry = l.wantlistContains(block.key)
const entry = l.wantlistContains(key)

if (entry) {
this.peerRequestQueue.push(entry, l.partner)
Expand All @@ -143,13 +155,13 @@ module.exports = class Engine {
log('cancel %s', mh.toB58String(entry.key))
ledger.cancelWant(entry.key)
this.peerRequestQueue.remove(entry.key, peerId)
async.setImmediate(() => cb())
setImmediate(() => cb())
} else {
log('wants %s - %s', mh.toB58String(entry.key), entry.priority)
ledger.wants(entry.key, entry.priority)

// If we already have the block, serve it
this.datastore.has(entry.key, (err, exists) => {
this.blockstore.has(entry.key, (err, exists) => {
if (err) {
log('failed existence check %s', mh.toB58String(entry.key))
} else if (exists) {
Expand All @@ -166,7 +178,7 @@ module.exports = class Engine {
log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length)
ledger.receivedBytes(block.data.length)

this.receivedBlock(block)
this.receivedBlock(block.key)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/decision/peer-request-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ function taskKey (peerId, key) {

function partnerCompare (a, b) {
// having no blocks in their wantlist means lowest priority
// having both of these checks ensures stability of the sort
// having both of these checks ensures stability of the sort
if (a.requests === 0) return false
if (b.requests === 0) return true

if (a.active === b.active) {
// sorting by taskQueue.size() aids in cleaning out trash entries faster
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled entries in the queue resulting in a memory leak
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled entries in the queue resulting in a memory leak
return a.taskQueue.size() > b.taskQueue.size()
}

Expand Down
Loading