Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,17 @@ func (ch *Chunk) dump(w io.Writer, compressorIndex int) error {
return nil
}

type dummyCloser struct {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dummyCloser ==> noopCompressor ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

bytes.Buffer
}

func (c *dummyCloser) Close() error {
return nil
}

func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
compressed := new(bytes.Buffer)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the function should be somehow like the following:

func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
        var compressed bytes.Buffer
	var compressor io.WriteCloser

	switch compressorIndex {
	case NoCompression:
		compressor = &noopCompressor{compressed}
	case Snappy:
		compressor = snappy.NewBufferedWriter(compressed)
	case Gzip:
		compressor = gzip.NewWriter(compressed)
	default:
		return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex)
	}

        defer compressor.Close()
	if _, e := io.Copy(compressor, src); e != nil {
		return nil, fmt.Errorf("Failed to compress chunk data: %v", e)
	}

	return &compressed, nil
}

The point is that compressed is still of type bytes.Buffer, and we just create a noopCompressor which works like snappy.Writer and gzip.Writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better, thanks! Done.

var compressor io.Writer
compressed := &dummyCloser{}
var compressor io.WriteCloser

switch compressorIndex {
case NoCompression:
Expand All @@ -94,8 +102,9 @@ func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
if _, e := io.Copy(compressor, src); e != nil {
return nil, fmt.Errorf("Failed to compress chunk data: %v", e)
}
compressor.Close()

return compressed, nil
return &compressed.Buffer, nil
}

// parse the specified chunk from r.
Expand Down
7 changes: 5 additions & 2 deletions header.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
)

const (
magicNumber uint32 = 0x01020304

// NoCompression means writing raw chunk data into files.
// With other choices, chunks are compressed before written.
NoCompression = iota
Expand All @@ -21,6 +19,11 @@ const (
Gzip
)

const (
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge this const group with the previous one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

magicNumber uint32 = 0x01020304
defaultCompressor = Snappy
)

// Header is the metadata of Chunk.
type Header struct {
checkSum uint32
Expand Down
27 changes: 25 additions & 2 deletions reader.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package recordio

import "io"
import (
"fmt"
"io"
)

// Index consists offsets and sizes of the consequetive chunks in a RecordIO file.
type Index struct {
chunkOffsets []int64
chunkLens []uint32
numRecords int // the number of all records in a file.
numRecords int // the number of all records in a file.
chunkRecords []int // the number of records in chunks.
}

// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len.
Expand All @@ -19,11 +23,15 @@ func LoadIndex(r io.ReadSeeker) (*Index, error) {
for {
hdr, e = parseHeader(r)
if e != nil {
if e != io.EOF {
fmt.Println("parse err:", e)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this snippet for debugging? Should we remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

}
break
}

f.chunkOffsets = append(f.chunkOffsets, offset)
f.chunkLens = append(f.chunkLens, hdr.numRecords)
f.chunkRecords = append(f.chunkRecords, int(hdr.numRecords))
f.numRecords += int(hdr.numRecords)

offset, e = r.Seek(int64(hdr.compressedSize), io.SeekCurrent)
Expand All @@ -43,6 +51,21 @@ func (r *Index) NumRecords() int {
return r.numRecords
}

// NumChunks returns the total number of chunks in a RecordIO file.
func (r *Index) NumChunks() int {
return len(r.chunkLens)
}

// ChunkIndex return the Index of i-th Chunk.
func (r *Index) ChunkIndex(i int) *Index {
idx := &Index{}
idx.chunkOffsets = []int64{r.chunkOffsets[i]}
idx.chunkLens = []uint32{r.chunkLens[i]}
idx.chunkRecords = []int{r.chunkRecords[i]}
idx.numRecords = idx.chunkRecords[0]
return idx
}

// Locate returns the index of chunk that contains the given record,
// and the record index within the chunk. It returns (-1, -1) if the
// record is out of range.
Expand Down
90 changes: 90 additions & 0 deletions recordio_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package recordio

import (
"bytes"
"testing"
"unsafe"

"github.com/stretchr/testify/assert"
)

func TestChunkHead(t *testing.T) {
assert := assert.New(t)

c := &Header{
checkSum: 123,
compressor: 456,
compressedSize: 789,
}

var buf bytes.Buffer
_, e := c.write(&buf)
assert.Nil(e)

cc, e := parseHeader(&buf)
assert.Nil(e)
assert.Equal(c, cc)
}

func TestWriteAndRead(t *testing.T) {
assert := assert.New(t)

data := []string{
"12345",
"1234",
"12"}

var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.

n, e := w.Write([]byte(data[0])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(5, n)

n, e = w.Write([]byte(data[1])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(4, n)

n, e = w.Write([]byte(data[2])) // exeeds chunk size, dump and create a new chunk.
assert.Nil(e)
assert.Equal(n, 2)

assert.Nil(w.Close()) // flush the second chunk.
assert.Nil(w.Writer)

n, e = w.Write([]byte("anything")) // not effective after close.
assert.NotNil(e)
assert.Equal(n, 0)

idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal([]uint32{2, 1}, idx.chunkLens)
assert.Equal(
[]int64{0,
int64(4 + // magic number
unsafe.Sizeof(Header{}) +
5 + // first record
4 + // second record
2*4)}, // two record legnths
idx.chunkOffsets)

s := NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
i := 0
for s.Scan() {
assert.Equal(data[i], string(s.Record()))
i++
}
}

func TestWriteEmptyFile(t *testing.T) {
assert := assert.New(t)

var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.
assert.Nil(w.Close())
assert.Equal(0, buf.Len())

idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal(0, idx.NumRecords())
}
131 changes: 61 additions & 70 deletions recordio_test.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,81 @@
package recordio
package recordio_test

import (
"bytes"
"reflect"
"testing"
"unsafe"

"github.com/stretchr/testify/assert"
"github.com/wangkuiyi/recordio"
)

func TestChunkHead(t *testing.T) {
assert := assert.New(t)

c := &Header{
checkSum: 123,
compressor: 456,
compressedSize: 789,
}

func TestWriteRead(t *testing.T) {
const total = 1000
var buf bytes.Buffer
_, e := c.write(&buf)
assert.Nil(e)

cc, e := parseHeader(&buf)
assert.Nil(e)
assert.Equal(c, cc)
}

func TestWriteAndRead(t *testing.T) {
assert := assert.New(t)

data := []string{
"12345",
"1234",
"12"}

var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.

n, e := w.Write([]byte(data[0])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(5, n)

n, e = w.Write([]byte(data[1])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(4, n)

n, e = w.Write([]byte(data[2])) // exeeds chunk size, dump and create a new chunk.
assert.Nil(e)
assert.Equal(n, 2)

assert.Nil(w.Close()) // flush the second chunk.
assert.Nil(w.Writer)
w := recordio.NewWriter(&buf, 0, -1)
for i := 0; i < total; i++ {
_, err := w.Write(make([]byte, i))
if err != nil {
t.Fatal(err)
}
}
w.Close()

n, e = w.Write([]byte("anything")) // not effective after close.
assert.NotNil(e)
assert.Equal(n, 0)
idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes()))
if err != nil {
t.Fatal(err)
}

idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal([]uint32{2, 1}, idx.chunkLens)
assert.Equal(
[]int64{0,
int64(4 + // magic number
unsafe.Sizeof(Header{}) +
5 + // first record
4 + // second record
2*4)}, // two record legnths
idx.chunkOffsets)
if idx.NumRecords() != total {
t.Fatal("num record does not match:", idx.NumRecords(), total)
}

s := NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
i := 0
for s.Scan() {
assert.Equal(data[i], string(s.Record()))
if !reflect.DeepEqual(s.Record(), make([]byte, i)) {
t.Fatal("not equal:", len(s.Record()), len(make([]byte, i)))
}
i++
}
}

func TestWriteEmptyFile(t *testing.T) {
assert := assert.New(t)
if i != total {
t.Fatal("total count not match:", i, total)
}
}

func TestChunkIndex(t *testing.T) {
const total = 1000
var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.
assert.Nil(w.Close())
assert.Equal(0, buf.Len())
w := recordio.NewWriter(&buf, 0, -1)
for i := 0; i < total; i++ {
_, err := w.Write(make([]byte, i))
if err != nil {
t.Fatal(err)
}
}
w.Close()

idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes()))
if err != nil {
t.Fatal(err)
}

idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal(0, idx.NumRecords())
if idx.NumChunks() != total {
t.Fatal("unexpected chunk num:", idx.NumChunks(), total)
}

for i := 0; i < total; i++ {
newIdx := idx.ChunkIndex(i)
s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), newIdx, -1, -1)
j := 0
for s.Scan() {
if !reflect.DeepEqual(s.Record(), make([]byte, i)) {
t.Fatal("not equal:", len(s.Record()), len(make([]byte, i)))
}
j++
}
if j != 1 {
t.Fatal("unexpected record per chunk:", j)
}
}
}
7 changes: 6 additions & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ type Writer struct {
// using the deflate algorithm given compression level. Note that
// level 0 means no compression and -1 means default compression.
func NewWriter(w io.Writer, maxChunkSize, compressor int) *Writer {
if maxChunkSize <= 0 {
if maxChunkSize < 0 {
maxChunkSize = defaultMaxChunkSize
}

if compressor == -1 {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

== -1 ==> < 0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

compressor = defaultCompressor
}

return &Writer{
Writer: w,
chunk: &Chunk{},
Expand Down