Skip to content

Commit 51d8c6d

Browse files
committed
implement trickledag for faster unixfs operations
1 parent 69333e4 commit 51d8c6d

File tree

2 files changed

+154
-0
lines changed

2 files changed

+154
-0
lines changed

importer/importer_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,32 @@ func TestBuilderConsistency(t *testing.T) {
9696
}
9797
}
9898

99+
func TestTrickleBuilderConsistency(t *testing.T) {
100+
nbytes := 100000
101+
buf := new(bytes.Buffer)
102+
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
103+
should := dup(buf.Bytes())
104+
dagserv := merkledag.Mock(t)
105+
nd, err := BuildTrickleDagFromReader(buf, dagserv, nil, chunk.DefaultSplitter)
106+
if err != nil {
107+
t.Fatal(err)
108+
}
109+
r, err := uio.NewDagReader(context.Background(), nd, dagserv)
110+
if err != nil {
111+
t.Fatal(err)
112+
}
113+
114+
out, err := ioutil.ReadAll(r)
115+
if err != nil {
116+
t.Fatal(err)
117+
}
118+
119+
err = arrComp(out, should)
120+
if err != nil {
121+
t.Fatal(err)
122+
}
123+
}
124+
99125
func arrComp(a, b []byte) error {
100126
if len(a) != len(b) {
101127
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
@@ -220,6 +246,43 @@ func TestSeekingBasic(t *testing.T) {
220246
}
221247
}
222248

249+
func TestTrickleSeekingBasic(t *testing.T) {
250+
nbytes := int64(10 * 1024)
251+
should := make([]byte, nbytes)
252+
u.NewTimeSeededRand().Read(should)
253+
254+
read := bytes.NewReader(should)
255+
dnp := getDagservAndPinner(t)
256+
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
257+
if err != nil {
258+
t.Fatal(err)
259+
}
260+
261+
rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds)
262+
if err != nil {
263+
t.Fatal(err)
264+
}
265+
266+
start := int64(4000)
267+
n, err := rs.Seek(start, os.SEEK_SET)
268+
if err != nil {
269+
t.Fatal(err)
270+
}
271+
if n != start {
272+
t.Fatal("Failed to seek to correct offset")
273+
}
274+
275+
out, err := ioutil.ReadAll(rs)
276+
if err != nil {
277+
t.Fatal(err)
278+
}
279+
280+
err = arrComp(out, should[start:])
281+
if err != nil {
282+
t.Fatal(err)
283+
}
284+
}
285+
223286
func TestSeekToBegin(t *testing.T) {
224287
nbytes := int64(10 * 1024)
225288
should := make([]byte, nbytes)

importer/trickledag.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package importer
2+
3+
import (
4+
"io"
5+
6+
"github.com/jbenet/go-ipfs/importer/chunk"
7+
dag "github.com/jbenet/go-ipfs/merkledag"
8+
"github.com/jbenet/go-ipfs/pin"
9+
)
10+
11+
// layerRepeat specifies how many times to append a child tree of a
12+
// given depth. Higher values increase the width of a given node, which
13+
// improves seek speeds.
14+
const layerRepeat = 4
15+
16+
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
17+
// Start the splitter
18+
blkch := spl.Split(r)
19+
20+
// Create our builder helper
21+
db := &dagBuilderHelper{
22+
dserv: ds,
23+
mp: mp,
24+
in: blkch,
25+
maxlinks: DefaultLinksPerBlock,
26+
indrSize: defaultIndirectBlockDataSize(),
27+
}
28+
29+
root := newUnixfsNode()
30+
err := db.fillNodeRec(root, 1)
31+
if err != nil {
32+
return nil, err
33+
}
34+
for level := 1; !db.done(); level++ {
35+
for i := 0; i < layerRepeat && !db.done(); i++ {
36+
next := newUnixfsNode()
37+
err := db.fillTrickleRec(next, level)
38+
if err != nil {
39+
return nil, err
40+
}
41+
err = root.addChild(next, db)
42+
if err != nil {
43+
return nil, err
44+
}
45+
}
46+
}
47+
48+
rootnode, err := root.getDagNode()
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
rootkey, err := ds.Add(rootnode)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
if mp != nil {
59+
mp.PinWithMode(rootkey, pin.Recursive)
60+
err := mp.Flush()
61+
if err != nil {
62+
return nil, err
63+
}
64+
}
65+
66+
return root.getDagNode()
67+
}
68+
69+
func (db *dagBuilderHelper) fillTrickleRec(node *unixfsNode, depth int) error {
70+
// Always do this, even in the base case
71+
err := db.fillNodeRec(node, 1)
72+
if err != nil {
73+
return err
74+
}
75+
76+
for i := 1; i < depth && !db.done(); i++ {
77+
for j := 0; j < layerRepeat; j++ {
78+
next := newUnixfsNode()
79+
err := db.fillTrickleRec(next, i)
80+
if err != nil {
81+
return err
82+
}
83+
84+
err = node.addChild(next, db)
85+
if err != nil {
86+
return err
87+
}
88+
}
89+
}
90+
return nil
91+
}

0 commit comments

Comments
 (0)