diff --git a/db/backend.go b/db/backend.go index a6c17cc..b338bde 100644 --- a/db/backend.go +++ b/db/backend.go @@ -1,45 +1,61 @@ package db import ( + "fmt" "io/ioutil" "os" "strings" - "github.com/go-distributed/xtree/db/recordio" + dblog "github.com/go-distributed/xtree/db/log" + "github.com/go-distributed/xtree/db/message" "github.com/go-distributed/xtree/third-party/github.com/google/btree" ) type backend struct { - bt *btree.BTree - cache *cache - rev int - fc recordio.Fetcher - ap recordio.Appender + bt *btree.BTree + cache *cache + rev int + dblog *dblog.DBLog + config *DBConfig } func newBackend() *backend { - bt := btree.New(10) - - // temporary file IO to test in-disk values - writeFile, err := ioutil.TempFile("", "backend") + dataDir, err := ioutil.TempDir("", "backend") if err != nil { - panic("can't create temp file") + panic("not implemented") + } + + config := &DBConfig{ + DataDir: dataDir, } - readFile, err := os.Open(writeFile.Name()) + b, err := newBackendWithConfig(config) if err != nil { - panic("can't open temp file") + panic("not implemented") } + return b +} - return &backend{ - bt: bt, - cache: newCache(), - fc: recordio.NewFetcher(readFile), - ap: recordio.NewAppender(writeFile), +func newBackendWithConfig(config *DBConfig) (b *backend, err error) { + bt := btree.New(10) + b = &backend{ + bt: bt, + cache: newCache(), + config: config, } + haveLog := dblog.Exist(config.DataDir) + switch haveLog { + case false: + fmt.Println("didn't have log file. Init...") + err = b.init(config) + case true: + fmt.Println("had log file. Restore...") + err = b.restore(config) + } + return } func (b *backend) getData(offset int64) []byte { - rec, err := b.fc.Fetch(offset) + rec, err := b.dblog.GetRecord(offset) if err != nil { panic("unimplemented") } @@ -87,7 +103,10 @@ func (b *backend) Put(rev int, path Path, data []byte) { } b.rev++ - offset, err := b.ap.Append(recordio.Record{data}) + offset, err := b.dblog.Append(&message.Record{ + Key: path.p, + Data: data, + }) if err != nil { panic("unimplemented") } @@ -96,8 +115,8 @@ func (b *backend) Put(rev int, path Path, data []byte) { } // one-level listing -func (b *backend) Ls(pathname string) []Path { - result := make([]Path, 0) +func (b *backend) Ls(pathname string) (paths []Path) { + paths = make([]Path, 0) pivot := newPathForLs(pathname) b.bt.AscendGreaterOrEqual(pivot, func(treeItem btree.Item) bool { @@ -106,8 +125,36 @@ func (b *backend) Ls(pathname string) []Path { p.level != pivot.level { return false } - result = append(result, *p) + paths = append(paths, *p) return true }) - return result + + return +} + +// init() creates a new log file +func (b *backend) init(config *DBConfig) (err error) { + b.dblog, err = dblog.Create(config.DataDir) + return +} + +// restore() restores database from the log file. +func (b *backend) restore(config *DBConfig) (err error) { + rev := 0 + return dblog.Reuse(config.DataDir, + func(l *dblog.DBLog) { + b.dblog = l + }, + func(r *message.Record) (err error) { + rev++ + p := newPath(r.Key) + b.Put(rev, *p, r.Data) + return + }) +} + +// clean up resource after testing +func (b *backend) testableCleanupResource() (err error) { + b.dblog.Close() + return os.RemoveAll(b.config.DataDir) } diff --git a/db/backend_test.go b/db/backend_test.go index dfff190..07e3a18 100644 --- a/db/backend_test.go +++ b/db/backend_test.go @@ -17,6 +17,7 @@ func TestPut(t *testing.T) { } b := newBackend() + defer b.testableCleanupResource() for i, tt := range tests { b.Put(tt.rev, tt.path, tt.data) v := b.Get(tt.rev, tt.path) @@ -27,6 +28,7 @@ func TestPut(t *testing.T) { t.Errorf("#%d: data = %s, want %s", i, v.data, tt.data) } } + } func TestPutOnExistingPath(t *testing.T) { @@ -40,6 +42,7 @@ func TestPutOnExistingPath(t *testing.T) { } b := newBackend() + defer b.testableCleanupResource() for i, tt := range tests { b.Put(2*i+1, tt.path, tt.data1) v := b.Get(2*i+1, tt.path) @@ -65,6 +68,8 @@ func TestPutOnExistingPath(t *testing.T) { func TestGetMVCC(t *testing.T) { b := newBackend() + defer b.testableCleanupResource() + b.Put(1, *newPath("/a"), []byte("1")) b.Put(2, *newPath("/b"), []byte("2")) b.Put(3, *newPath("/a"), []byte("3")) @@ -99,12 +104,15 @@ func TestGetMVCC(t *testing.T) { } func TestLs(t *testing.T) { - back := newBackend() d := []byte("somedata") - back.Put(1, *newPath("/a"), d) - back.Put(2, *newPath("/a/b"), d) - back.Put(3, *newPath("/a/c"), d) - back.Put(4, *newPath("/b"), d) + + b := newBackend() + defer b.testableCleanupResource() + + b.Put(1, *newPath("/a"), d) + b.Put(2, *newPath("/a/b"), d) + b.Put(3, *newPath("/a/c"), d) + b.Put(4, *newPath("/b"), d) tests := []struct { p string @@ -118,7 +126,7 @@ func TestLs(t *testing.T) { {"/c", []string{}}, } for i, tt := range tests { - ps := back.Ls(tt.p) + ps := b.Ls(tt.p) if len(ps) != len(tt.wps) { t.Fatalf("#%d: len(ps) = %d, want %d", i, len(ps), len(tt.wps)) } @@ -130,9 +138,44 @@ func TestLs(t *testing.T) { } } +func TestRestore(t *testing.T) { + tests := []struct { + rev int + path Path + data []byte + }{ + {1, *newPath("/foo/bar"), []byte("somedata")}, + {2, *newPath("/bar/foo"), []byte("datasome")}, + } + + b := newBackend() + for _, tt := range tests { + // append records to the log + b.Put(tt.rev, tt.path, tt.data) + } + b.dblog.Close() + + // simulate restoring log in another backend + b2, err := newBackendWithConfig(b.config) + defer b2.testableCleanupResource() + if err != nil { + t.Errorf("newBackendWithConfig failed: %v", err) + } + for i, tt := range tests { + v := b2.Get(tt.rev, tt.path) + if v.rev != tt.rev { + t.Errorf("#%d: rev = %d, want %d", i, v.rev, tt.rev) + } + if !reflect.DeepEqual(v.data, tt.data) { + t.Errorf("#%d: data = %s, want %s", i, v.data, tt.data) + } + } +} + func BenchmarkPut(b *testing.B) { b.StopTimer() back := newBackend() + defer back.testableCleanupResource() d := []byte("somedata") path := make([]Path, b.N) for i := range path { @@ -148,6 +191,8 @@ func BenchmarkPut(b *testing.B) { func BenchmarkGetWithCache(b *testing.B) { b.StopTimer() back := newBackend() + defer back.testableCleanupResource() + d := []byte("somedata") path := make([]Path, b.N) for i := range path { @@ -168,6 +213,7 @@ func BenchmarkGetWithCache(b *testing.B) { func BenchmarkGetWithOutCache(b *testing.B) { b.StopTimer() back := newBackend() + defer back.testableCleanupResource() back.cache = nil d := []byte("somedata") path := make([]Path, b.N) diff --git a/db/config.go b/db/config.go new file mode 100644 index 0000000..b76e42f --- /dev/null +++ b/db/config.go @@ -0,0 +1,5 @@ +package db + +type DBConfig struct { + DataDir string +} diff --git a/db/db.go b/db/db.go index 582d737..e3ea960 100644 --- a/db/db.go +++ b/db/db.go @@ -23,7 +23,7 @@ type DB interface { // Otherwise, it lists recursively all paths. // // if count is >= 0, it is the number of paths we want in the list. - // if count is -1, it means any. + // if count is -1, it means all. // // if it failed, an error is returned. Ls(rev int, path string, recursive bool, count int) ([]Path, error) diff --git a/db/log/decoder.go b/db/log/decoder.go new file mode 100644 index 0000000..0178e27 --- /dev/null +++ b/db/log/decoder.go @@ -0,0 +1,34 @@ +package log + +import ( + "bufio" + "encoding/binary" + "io" + + "github.com/go-distributed/xtree/db/message" +) + +type decoder struct { + br *bufio.Reader +} + +func newDecoder(r io.Reader) *decoder { + return &decoder{bufio.NewReader(r)} +} + +func (d *decoder) decode(r *message.Record) (err error) { + var l int64 + if l, err = readInt64(d.br); err != nil { + return + } + data := make([]byte, l) + if _, err = io.ReadFull(d.br, data); err != nil { + return + } + return r.Unmarshal(data) +} + +func readInt64(r io.Reader) (n int64, err error) { + err = binary.Read(r, binary.LittleEndian, &n) + return +} diff --git a/db/log/encoder.go b/db/log/encoder.go new file mode 100644 index 0000000..a4c42e9 --- /dev/null +++ b/db/log/encoder.go @@ -0,0 +1,37 @@ +package log + +import ( + "bufio" + "encoding/binary" + "io" + + "github.com/go-distributed/xtree/db/message" +) + +type encoder struct { + bw *bufio.Writer +} + +func newEncoder(w io.Writer) *encoder { + return &encoder{bufio.NewWriter(w)} +} + +func (e *encoder) encode(r *message.Record) (err error) { + var data []byte + if data, err = r.Marshal(); err != nil { + return + } + if err = writeInt64(e.bw, int64(len(data))); err != nil { + return + } + _, err = e.bw.Write(data) + return +} + +func (e *encoder) flush() error { + return e.bw.Flush() +} + +func writeInt64(w io.Writer, n int64) error { + return binary.Write(w, binary.LittleEndian, n) +} diff --git a/db/log/encoder_test.go b/db/log/encoder_test.go new file mode 100644 index 0000000..a83cc57 --- /dev/null +++ b/db/log/encoder_test.go @@ -0,0 +1,46 @@ +package log + +import ( + "bytes" + "reflect" + "testing" + + "github.com/go-distributed/xtree/db/message" +) + +func TestEncoderDecoder(t *testing.T) { + tests := []struct { + rec *message.Record + }{ + {&message.Record{ + Key: "/test", + Data: []byte("some data"), + }}, + } + + for i, tt := range tests { + var err error + eBuf := new(bytes.Buffer) + encoder := newEncoder(eBuf) + + if err = encoder.encode(tt.rec); err != nil { + t.Fatalf("#%d: cannot encode, err: %v", i, err) + } + if err = encoder.flush(); err != nil { + t.Fatalf("#%d: cannot flush encode, err: %v", i, err) + } + + rec := new(message.Record) + dBuf := bytes.NewBuffer(eBuf.Bytes()) + decoder := newDecoder(dBuf) + + if err = decoder.decode(rec); err != nil { + t.Fatalf("#%d: cannot decode, err: %v", i, err) + } + + if !reflect.DeepEqual(tt.rec, rec) { + t.Fatalf("#%d: records are not the same, want: %v, get: %v", + i, tt.rec, rec) + } + } +} diff --git a/db/log/log.go b/db/log/log.go new file mode 100644 index 0000000..5046f72 --- /dev/null +++ b/db/log/log.go @@ -0,0 +1,109 @@ +package log + +import ( + "fmt" + "io" + "os" + "path" + + "github.com/go-distributed/xtree/db/message" +) + +const ( + logFilename = "records.log" +) + +type DBLog struct { + writeFile, readFile *os.File + encoder *encoder +} + +func Create(dataDir string) (l *DBLog, err error) { + l, err = newDBLog(path.Join(dataDir, logFilename), true) + return +} + +func newDBLog(logPath string, needCreate bool) (l *DBLog, err error) { + var writeFile, readFile *os.File + flag := os.O_WRONLY | os.O_APPEND | os.O_SYNC + if needCreate { + flag |= os.O_CREATE + } + if writeFile, err = os.OpenFile(logPath, flag, 0600); err != nil { + return + } + if !needCreate { + writeFile.Seek(0, os.SEEK_END) + } + if readFile, err = os.Open(logPath); err != nil { + writeFile.Close() + return + } + l = &DBLog{ + writeFile: writeFile, + readFile: readFile, + encoder: newEncoder(writeFile), + } + return +} + +func Reuse(dataDir string, + setLog func(*DBLog), + replayRecord func(*message.Record) error) (err error) { + var l *DBLog + if l, err = newDBLog(path.Join(dataDir, logFilename), + false); err != nil { + return + } + setLog(l) + decoder := newDecoder(l.readFile) + // TODO: parallel? + for { + r := new(message.Record) + if err = decoder.decode(r); err != nil { + if err == io.EOF { + return nil + } + return + } + ret, _ := l.readFile.Seek(0, os.SEEK_CUR) + fmt.Printf("%v %#v\n", ret, r) + if err = replayRecord(r); err != nil { + return + } + } +} + +func Exist(dataDir string) bool { + p := path.Join(dataDir, logFilename) + _, err := os.Stat(p) + return err == nil +} + +func (l *DBLog) GetRecord(offset int64) (r *message.Record, err error) { + if _, err = l.readFile.Seek(offset, 0); err != nil { + return + } + decoder := newDecoder(l.readFile) + r = new(message.Record) + err = decoder.decode(r) + return +} + +func (l *DBLog) Append(r *message.Record) (offset int64, err error) { + if offset, err = l.writeFile.Seek(0, os.SEEK_CUR); err != nil { + return + } + if err = l.encoder.encode(r); err != nil { + return + } + err = l.encoder.flush() + return offset, err +} + +func (l *DBLog) Close() (err error) { + if err = l.readFile.Close(); err != nil { + return + } + return l.writeFile.Close() +} diff --git a/db/log/log_test.go b/db/log/log_test.go new file mode 100644 index 0000000..7ab2918 --- /dev/null +++ b/db/log/log_test.go @@ -0,0 +1,60 @@ +package log + +import ( + "io/ioutil" + "os" + "reflect" + "testing" + + "github.com/go-distributed/xtree/db/message" +) + +func TestAppendAndGetRecord(t *testing.T) { + var err error + var l *DBLog + var dataDir string + + if dataDir, err = ioutil.TempDir("", "logtest"); err != nil { + t.Errorf("ioutil.TempDir failed: %v", err) + } + + defer os.RemoveAll(dataDir) + + if l, err = Create(dataDir); err != nil { + t.Errorf("Create failed: %v", err) + } + + tests := []struct { + offset int64 + record *message.Record + }{ + {-1, &message.Record{ + Key: "/test", + Data: []byte("some data"), + }}, + {-1, &message.Record{ + Key: "/test2", + Data: []byte("some other data"), + }}, + } + + for i, tt := range tests { + tests[i].offset, err = l.Append(tt.record) + if err != nil { + t.Errorf("#%d: Append failed: %v", i, err) + } + } + + for i, tt := range tests { + var r *message.Record + if r, err = l.GetRecord(tt.offset); err != nil { + t.Errorf("#%d: GetRecord failed: %v", i, err) + } + + if !reflect.DeepEqual(tt.record, r) { + t.Errorf("#%d: records not the same, want: %v, get %v", + i, tt.record, r) + } + + } +} diff --git a/db/message/record.go b/db/message/record.go new file mode 100644 index 0000000..2b53136 --- /dev/null +++ b/db/message/record.go @@ -0,0 +1,16 @@ +package message + +import "encoding/json" + +type Record struct { + Key string + Data []byte +} + +func (r *Record) Marshal() ([]byte, error) { + return json.Marshal(r) +} + +func (r *Record) Unmarshal(data []byte) error { + return json.Unmarshal(data, r) +} diff --git a/db/message/record_test.go b/db/message/record_test.go new file mode 100644 index 0000000..62d95a3 --- /dev/null +++ b/db/message/record_test.go @@ -0,0 +1,35 @@ +package message + +import ( + "reflect" + "testing" +) + +func TestLogRecord(t *testing.T) { + tests := []struct { + rec *Record + }{ + {&Record{ + Key: "/test", + Data: []byte("some data"), + }}, + } + + for i, tt := range tests { + var data []byte + var err error + if data, err = tt.rec.Marshal(); err != nil { + t.Fatalf("#%d: cannot marshal, err: %v", i, err) + } + + rec := &Record{} + if err = rec.Unmarshal(data); err != nil { + t.Fatalf("#%d: cannot unmarshal, err: %v", i, err) + } + + if !reflect.DeepEqual(tt.rec, rec) { + t.Fatalf("#%d: records are not the same, want: %v, get: %v", + i, tt.rec, rec) + } + } +} diff --git a/db/recordio/appender.go b/db/recordio/appender.go deleted file mode 100644 index 05bd537..0000000 --- a/db/recordio/appender.go +++ /dev/null @@ -1,30 +0,0 @@ -package recordio - -import "io" - -type Appender interface { - Append(Record) (int64, error) -} - -type appender struct { - w io.WriteSeeker -} - -func NewAppender(w io.WriteSeeker) Appender { - return &appender{w} -} - -// Not thread-safe -func (ap *appender) Append(r Record) (offset int64, err error) { - offset, err = ap.w.Seek(0, 2) - if err != nil { - return -1, err - } - - err = (&r).encodeTo(ap.w) - if err != nil { - return -1, err - } - - return offset, nil -} diff --git a/db/recordio/fetcher.go b/db/recordio/fetcher.go deleted file mode 100644 index 930dc83..0000000 --- a/db/recordio/fetcher.go +++ /dev/null @@ -1,30 +0,0 @@ -package recordio - -import "io" - -type Fetcher interface { - Fetch(offset int64) (Record, error) -} - -type fetcher struct { - r io.ReadSeeker -} - -func NewFetcher(r io.ReadSeeker) Fetcher { - return &fetcher{r} -} - -func (fc *fetcher) Fetch(offset int64) (Record, error) { - _, err := fc.r.Seek(offset, 0) - if err != nil { - return Record{}, err - } - - r := Record{} - err = (&r).decodeFrom(fc.r) - if err != nil { - return Record{}, err - } - - return r, nil -} diff --git a/db/recordio/io_test.go b/db/recordio/io_test.go deleted file mode 100644 index 9ef9ae9..0000000 --- a/db/recordio/io_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package recordio - -import ( - "io/ioutil" - "os" - "reflect" - "testing" -) - -func TestFetch(t *testing.T) { - writeFile, err := ioutil.TempFile("", "testfetch") - if err != nil { - t.Error("can't create temp file") - } - defer os.Remove(writeFile.Name()) - defer writeFile.Close() - - readFile, err := os.Open(writeFile.Name()) - if err != nil { - t.Error("can't open temp file") - } - defer readFile.Close() - - ap := NewAppender(writeFile) - fc := NewFetcher(readFile) - - tests := []struct { - offset int64 - record Record - }{ - {-1, Record{[]byte("someData")}}, - {-1, Record{[]byte("someOtherData")}}, - } - - for i, tt := range tests { - offset, err := ap.Append(tt.record) - if err != nil { - t.Errorf("#%d: Append failed: %s", i, err.Error()) - } - tests[i].offset = offset - } - - for i, tt := range tests { - recRead, err := fc.Fetch(tt.offset) - if err != nil { - t.Errorf("#%d: Fetch failed: %s", i, err.Error()) - } - - if !reflect.DeepEqual(recRead, tt.record) { - t.Errorf("#%d: records not the same, want: %v, get %v", - i, tt.offset, recRead) - } - - } -} diff --git a/db/recordio/record.go b/db/recordio/record.go deleted file mode 100644 index 33ca3fb..0000000 --- a/db/recordio/record.go +++ /dev/null @@ -1,46 +0,0 @@ -package recordio - -import ( - "encoding/binary" - "io" -) - -const ( - sizeOfLength = 4 -) - -type Record struct { - Data []byte -} - -func (r *Record) encodeTo(wr io.Writer) error { - // Write length - lBuf := make([]byte, sizeOfLength) - binary.LittleEndian.PutUint32(lBuf, uint32(len(r.Data))) - if _, err := wr.Write(lBuf); err != nil { - return err - } - // Write data - if _, err := wr.Write(r.Data); err != nil { - return err - } - - return nil -} - -func (r *Record) decodeFrom(rd io.Reader) error { - var length uint32 - // Read length - err := binary.Read(rd, binary.LittleEndian, &length) - if err != nil { - return err - } - // Read data - r.Data = make([]byte, length) - _, err = io.ReadFull(rd, r.Data) - if err != nil { - return err - } - - return nil -} diff --git a/db/recordio/record_test.go b/db/recordio/record_test.go deleted file mode 100644 index c8073d8..0000000 --- a/db/recordio/record_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package recordio - -import ( - "bytes" - "reflect" - "testing" -) - -func TestEncodeDecode(t *testing.T) { - tests := []struct { - data []byte - }{ - {[]byte("someData")}, - {[]byte("someOtherData")}, - } - - for i, tt := range tests { - buf := new(bytes.Buffer) - recordToWrite := &Record{tt.data} - - if err := recordToWrite.encodeTo(buf); err != nil { - t.Fatalf("#%d: cannot encode, err: %s", i, err) - } - recordToRead := new(Record) - if err := recordToRead.decodeFrom(buf); err != nil { - t.Fatalf("#%d: cannot decode, err: %s", i, err) - } - if !reflect.DeepEqual(recordToRead, recordToWrite) { - t.Fatalf("#%d: records are not the same, want: %v, get: %v", - i, recordToWrite, recordToRead) - } - } -}