Skip to content

Commit 5f51b93

Browse files
committed
Remove chunk channels
License: MIT Signed-off-by: rht <[email protected]>
1 parent 1bbc472 commit 5f51b93

File tree

5 files changed

+23
-53
lines changed

5 files changed

+23
-53
lines changed

importer/balanced/balanced_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,12 @@ import (
2222
// TODO: extract these tests and more as a generic layout test suite
2323

2424
func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.Node, error) {
25-
// Start the splitter
26-
blkch, errs := chunk.Chan(spl)
27-
2825
dbp := h.DagBuilderParams{
2926
Dagserv: ds,
3027
Maxlinks: h.DefaultLinksPerBlock,
3128
}
3229

33-
return BalancedLayout(dbp.New(blkch, errs))
30+
return BalancedLayout(dbp.New(spl))
3431
}
3532

3633
func getTestDag(t *testing.T, ds dag.DAGService, size int64, blksize int64) (*dag.Node, []byte) {

importer/helpers/dagbuilder.go

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package helpers
22

33
import (
4+
"github.com/ipfs/go-ipfs/importer/chunk"
45
dag "github.com/ipfs/go-ipfs/merkledag"
56
)
67

78
// DagBuilderHelper wraps together a bunch of objects needed to
89
// efficiently create unixfs dag trees
910
type DagBuilderHelper struct {
1011
dserv dag.DAGService
11-
in <-chan []byte
12-
errs <-chan error
12+
spl chunk.Splitter
1313
recvdErr error
1414
nextData []byte // the next item to return.
1515
maxlinks int
@@ -24,45 +24,35 @@ type DagBuilderParams struct {
2424
Dagserv dag.DAGService
2525
}
2626

27-
// Generate a new DagBuilderHelper from the given params, using 'in' as a
28-
// data source
29-
func (dbp *DagBuilderParams) New(in <-chan []byte, errs <-chan error) *DagBuilderHelper {
27+
// Generate a new DagBuilderHelper from the given params, which data source comes
28+
// from chunks object
29+
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
3030
return &DagBuilderHelper{
3131
dserv: dbp.Dagserv,
32-
in: in,
33-
errs: errs,
32+
spl: spl,
3433
maxlinks: dbp.Maxlinks,
3534
batch: dbp.Dagserv.Batch(),
3635
}
3736
}
3837

39-
// prepareNext consumes the next item from the channel and puts it
38+
// prepareNext consumes the next item from the splitter and puts it
4039
// in the nextData field. it is idempotent-- if nextData is full
4140
// it will do nothing.
42-
//
43-
// i realized that building the dag becomes _a lot_ easier if we can
44-
// "peek" the "are done yet?" (i.e. not consume it from the channel)
4541
func (db *DagBuilderHelper) prepareNext() {
46-
if db.in == nil {
47-
// if our input is nil, there is "nothing to do". we're done.
48-
// as if there was no data at all. (a sort of zero-value)
49-
return
50-
}
51-
52-
// if we already have data waiting to be consumed, we're ready.
42+
// if we already have data waiting to be consumed, we're ready
5343
if db.nextData != nil {
5444
return
5545
}
5646

57-
// if it's closed, nextData will be correctly set to nil, signaling
58-
// that we're done consuming from the channel.
59-
db.nextData = <-db.in
47+
// TODO: handle err (which wasn't handled either when the splitter was channeled)
48+
db.nextData, _ = db.spl.NextBytes()
6049
}
6150

6251
// Done returns whether or not we're done consuming the incoming data.
6352
func (db *DagBuilderHelper) Done() bool {
6453
// ensure we have an accurate perspective on data
6554
// as `done` this may be called before `next`.
55+
//db.prepareNext() // idempotent
6656
db.prepareNext() // idempotent
6757
return db.nextData == nil
6858
}

importer/importer.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,19 @@ func BuildDagFromFile(fpath string, ds dag.DAGService) (*dag.Node, error) {
3939
}
4040

4141
func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.Node, error) {
42-
// Start the splitter
43-
blkch, errch := chunk.Chan(spl)
44-
4542
dbp := h.DagBuilderParams{
4643
Dagserv: ds,
4744
Maxlinks: h.DefaultLinksPerBlock,
4845
}
4946

50-
return bal.BalancedLayout(dbp.New(blkch, errch))
47+
return bal.BalancedLayout(dbp.New(spl))
5148
}
5249

5350
func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.Node, error) {
54-
// Start the splitter
55-
blkch, errch := chunk.Chan(spl)
56-
5751
dbp := h.DagBuilderParams{
5852
Dagserv: ds,
5953
Maxlinks: h.DefaultLinksPerBlock,
6054
}
6155

62-
return trickle.TrickleLayout(dbp.New(blkch, errch))
56+
return trickle.TrickleLayout(dbp.New(spl))
6357
}

importer/trickle/trickle_test.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,12 @@ import (
2121
)
2222

2323
func buildTestDag(ds merkledag.DAGService, spl chunk.Splitter) (*merkledag.Node, error) {
24-
// Start the splitter
25-
blkch, errs := chunk.Chan(spl)
26-
2724
dbp := h.DagBuilderParams{
2825
Dagserv: ds,
2926
Maxlinks: h.DefaultLinksPerBlock,
3027
}
3128

32-
nd, err := TrickleLayout(dbp.New(blkch, errs))
29+
nd, err := TrickleLayout(dbp.New(spl))
3330
if err != nil {
3431
return nil, err
3532
}
@@ -441,10 +438,9 @@ func TestAppend(t *testing.T) {
441438
}
442439

443440
r := bytes.NewReader(should[nbytes/2:])
444-
blks, errs := chunk.Chan(chunk.NewSizeSplitter(r, 500))
445441

446442
ctx := context.Background()
447-
nnode, err := TrickleAppend(ctx, nd, dbp.New(blks, errs))
443+
nnode, err := TrickleAppend(ctx, nd, dbp.New(chunk.NewSizeSplitter(r, 500)))
448444
if err != nil {
449445
t.Fatal(err)
450446
}
@@ -494,9 +490,8 @@ func TestMultipleAppends(t *testing.T) {
494490

495491
ctx := context.Background()
496492
for i := 0; i < len(should); i++ {
497-
blks, errs := chunk.Chan(spl(bytes.NewReader(should[i : i+1])))
498493

499-
nnode, err := TrickleAppend(ctx, nd, dbp.New(blks, errs))
494+
nnode, err := TrickleAppend(ctx, nd, dbp.New(spl(bytes.NewReader(should[i:i+1]))))
500495
if err != nil {
501496
t.Fatal(err)
502497
}
@@ -538,17 +533,13 @@ func TestAppendSingleBytesToEmpty(t *testing.T) {
538533

539534
spl := chunk.SizeSplitterGen(500)
540535

541-
blks, errs := chunk.Chan(spl(bytes.NewReader(data[:1])))
542-
543536
ctx := context.Background()
544-
nnode, err := TrickleAppend(ctx, nd, dbp.New(blks, errs))
537+
nnode, err := TrickleAppend(ctx, nd, dbp.New(spl(bytes.NewReader(data[:1]))))
545538
if err != nil {
546539
t.Fatal(err)
547540
}
548541

549-
blks, errs = chunk.Chan(spl(bytes.NewReader(data[1:])))
550-
551-
nnode, err = TrickleAppend(ctx, nnode, dbp.New(blks, errs))
542+
nnode, err = TrickleAppend(ctx, nnode, dbp.New(spl(bytes.NewReader(data[1:]))))
552543
if err != nil {
553544
t.Fatal(err)
554545
}

unixfs/mod/dagmodifier.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ func (zr zeroReader) Read(b []byte) (int, error) {
103103
func (dm *DagModifier) expandSparse(size int64) error {
104104
r := io.LimitReader(zeroReader{}, size)
105105
spl := chunk.NewSizeSplitter(r, 4096)
106-
blks, errs := chunk.Chan(spl)
107-
nnode, err := dm.appendData(dm.curNode, blks, errs)
106+
nnode, err := dm.appendData(dm.curNode, spl)
108107
if err != nil {
109108
return err
110109
}
@@ -191,8 +190,7 @@ func (dm *DagModifier) Sync() error {
191190

192191
// need to write past end of current dag
193192
if !done {
194-
blks, errs := chunk.Chan(dm.splitter(dm.wrBuf))
195-
nd, err = dm.appendData(dm.curNode, blks, errs)
193+
nd, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
196194
if err != nil {
197195
return err
198196
}
@@ -286,13 +284,13 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader)
286284
}
287285

288286
// appendData appends the blocks from the given chan to the end of this dag
289-
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte, errs <-chan error) (*mdag.Node, error) {
287+
func (dm *DagModifier) appendData(node *mdag.Node, spl chunk.Splitter) (*mdag.Node, error) {
290288
dbp := &help.DagBuilderParams{
291289
Dagserv: dm.dagserv,
292290
Maxlinks: help.DefaultLinksPerBlock,
293291
}
294292

295-
return trickle.TrickleAppend(dm.ctx, node, dbp.New(blks, errs))
293+
return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
296294
}
297295

298296
// Read data from this dag starting at the current offset

0 commit comments

Comments
 (0)