Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

[WIP] Pull streams #64

Merged
merged 4 commits into from
Sep 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ language: node_js
node_js:
- 4
- 5
- stable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Node.js 5 tests too


# Make sure we have new NPM.
before_install:
Expand Down
58 changes: 26 additions & 32 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
"name": "ipfs-unixfs-engine",
"version": "0.10.2",
"description": "JavaScript implementation of the unixfs Engine used by IPFS",
"main": "src/index.js",
"main": "lib/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"lint": "aegir-lint",
"build": "aegir-build",
"test": "aegir-test",
"test:node": "aegir-test node",
"test:browser": "aegir-test browser",
"release": "aegir-release",
"release-minor": "aegir-release --type minor",
"release-major": "aegir-release --type major",
"test": "PHANTOM=off aegir-test",
"test:node": "aegir-test --env node",
"test:browser": "PHANTOM=off aegir-test --env browser",
"release": "PHANTOM=off aegir-release",
"release-minor": "PHANTOM=off aegir-release --type minor",
"release-major": "PHANTOM=off aegir-release --type major",
"coverage": "aegir-coverage",
"coverage-publish": "aegir-coverage publish"
},
Expand All @@ -22,7 +22,7 @@
],
"repository": {
"type": "git",
"url": "git+https://github.com/ipfs/js-ipfs-data-importing.git"
"url": "git+https://github.com/ipfs/js-ipfs-unixfs-engine.git"
},
"keywords": [
"IPFS"
Expand All @@ -34,37 +34,31 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-unixfs-engineg#readme",
"devDependencies": {
"aegir": "^6.0.1",
"async": "^2.0.1",
"block-stream2": "^1.1.0",
"aegir": "^8.0.1",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"concat-stream": "^1.5.1",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-repo": "^0.7.5",
"fs-pull-blob-store": "^0.3.0",
"idb-pull-blob-store": "^0.4.0",
"ipfs-block-service": "^0.5.0",
"ipfs-repo": "^0.9.0",
"ncp": "^2.0.0",
"pre-commit": "^1.1.2",
"pre-commit": "^1.1.3",
"pull-zip": "^2.0.0",
"raw-loader": "^0.5.1",
"rimraf": "^2.5.1",
"streamifier": "^0.1.1",
"ipfs-block-service": "^0.3.0",
"string-to-stream": "^1.0.1"
"rimraf": "^2.5.4",
"run-series": "^1.1.4"
},
"dependencies": {
"block-stream2": "^1.1.0",
"bs58": "^3.0.0",
"debug": "^2.2.0",
"field-trip": "0.0.3",
"ipfs-merkle-dag": "^0.6.1",
"ipfs-unixfs": "^0.1.0",
"ipfs-merkle-dag": "^0.7.0",
"ipfs-unixfs": "^0.1.4",
"is-ipfs": "^0.2.0",
"isstream": "^0.1.2",
"multihashes": "^0.2.2",
"readable-stream": "^1.1.13",
"run-series": "^1.1.4",
"streamifier": "^0.1.1",
"through2": "^2.0.0"
"pull-block": "^1.0.2",
"pull-pushable": "^2.0.1",
"pull-stream": "^3.4.5",
"pull-traverse": "^1.0.3",
"pull-write": "^1.1.0",
"run-parallel": "^1.1.6"
},
"contributors": [
"David Dias <[email protected]>",
Expand All @@ -75,4 +69,4 @@
"greenkeeperio-bot <[email protected]>",
"nginnever <[email protected]>"
]
}
}
4 changes: 2 additions & 2 deletions src/chunker-fixed-size.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const chunker = require('block-stream2')
const block = require('pull-block')

exports = module.exports = function (size) {
return chunker({ size: size, zeroPadding: false })
return block(size, {zeroPadding: false})
}
14 changes: 0 additions & 14 deletions src/clean-multihash.js

This file was deleted.

145 changes: 30 additions & 115 deletions src/exporter.js
Original file line number Diff line number Diff line change
@@ -1,124 +1,39 @@
'use strict'

const debug = require('debug')
const log = debug('unixfs')
log.err = debug('unixfs:error')
const isIPFS = require('is-ipfs')
const UnixFS = require('ipfs-unixfs')
const series = require('run-series')
const Readable = require('readable-stream').Readable
const pathj = require('path')
const util = require('util')
const fieldtrip = require('field-trip')
const cleanMultihash = require('./clean-multihash')
const traverse = require('pull-traverse')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slick, I didn't know about pull-traverse!

const pull = require('pull-stream')

exports = module.exports = Exporter
const util = require('./util')
const switchType = util.switchType
const cleanMultihash = util.cleanMultihash

util.inherits(Exporter, Readable)
const dirExporter = require('./exporters/dir')
const fileExporter = require('./exporters/file')

function Exporter (hash, dagService, options) {
if (!(this instanceof Exporter)) {
return new Exporter(hash, dagService, options)
}

// Sanitize hash
if (!isIPFS.multihash(hash)) {
throw new Error('not valid multihash')
}
module.exports = (hash, dagService, options) => {
hash = cleanMultihash(hash)

Readable.call(this, { objectMode: true })

this.options = options || {}

this._read = (n) => {}

let fileExporter = (node, name, done) => {
if (!done) {
throw new Error('done must be set')
}

const contentRS = new Readable()
contentRS._read = () => {}

// Logic to export a single (possibly chunked) unixfs file.
if (node.links.length === 0) {
const unmarshaledData = UnixFS.unmarshal(node.data)
contentRS.push(unmarshaledData.data)
contentRS.push(null)
this.push({ content: contentRS, path: name })
done()
} else {
const array = node.links.map((link) => {
return (cb) => {
dagService.get(link.hash, (err, res) => {
if (err) {
return cb(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
contentRS.push(unmarshaledData.data)
cb()
})
}
})
series(array, (err) => {
if (err) {
return contentRS.emit('error', err)
}
contentRS.push(null)
})
this.push({ content: contentRS, path: name })
done()
}
}

// Logic to export a unixfs directory.
let dirExporter = (node, name, add, done) => {
if (!add) {
throw new Error('add must be set')
}
if (!done) {
throw new Error('done must be set')
}

this.push({content: null, path: name})

// Directory has links
if (node.links.length > 0) {
node.links.forEach((link) => {
add({ path: pathj.join(name, link.name), hash: link.hash })
})
}
done()
}

// Traverse the DAG asynchronously
fieldtrip([{path: hash, hash: hash}], visit.bind(this), (err) => {
if (err) {
return this.emit('error', err)
}
this.push(null)
})

// Visit function: called once per node in the exported graph
function visit (item, add, done) {
dagService.get(item.hash, (err, node) => {
if (err) {
return this.emit('error', err)
}

const data = UnixFS.unmarshal(node.data)
const type = data.type

if (type === 'directory') {
dirExporter(node, item.path, add, done)
}

if (type === 'file') {
fileExporter(node, item.path, done)
}
})
options = options || {}

function visitor (item) {
return pull(
dagService.getStream(item.hash),
pull.map((node) => switchType(
node,
() => dirExporter(node, item.path, dagService),
() => fileExporter(node, item.path, dagService)
)),
pull.flatten()
)
}

return this
// Traverse the DAG
return pull(
dagService.getStream(hash),
pull.map((node) => switchType(
node,
() => traverse.widthFirst({path: hash, hash}, visitor),
() => fileExporter(node, hash, dagService)
)),
pull.flatten()
)
}
28 changes: 28 additions & 0 deletions src/exporters/dir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict'

const path = require('path')
const pull = require('pull-stream')

const fileExporter = require('./file')
const switchType = require('../util').switchType

// Logic to export a unixfs directory.
module.exports = (node, name, dagService) => {
return pull(
pull.values(node.links),
pull.map((link) => ({
path: path.join(name, link.name),
hash: link.hash
})),
pull.map((item) => pull(
dagService.getStream(item.hash),
pull.map((n) => switchType(
n,
() => pull.values([item]),
() => fileExporter(n, item.path, dagService)
)),
pull.flatten()
)),
pull.flatten()
)
}
30 changes: 30 additions & 0 deletions src/exporters/file.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const UnixFS = require('ipfs-unixfs')
const pull = require('pull-stream')

function extractContent (node) {
return UnixFS.unmarshal(node.data).data
}

// Logic to export a single (possibly chunked) unixfs file.
module.exports = (node, name, ds) => {
let content

if (node.links.length === 0) {
const c = extractContent(node)
content = pull.values([c])
} else {
content = pull(
pull.values(node.links),
pull.map((link) => ds.getStream(link.hash)),
pull.flatten(),
pull.map(extractContent)
)
}

return pull.values([{
content: content,
path: name
}])
}
Loading