diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 274c1ee7b4f..a86efa24a9c 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -21,7 +21,9 @@ import ( var log = logging.Logger("blockstore") // BlockPrefix namespaces blockstore datastores -var BlockPrefix = ds.NewKey("blocks") +const DefaultPrefix = "/blocks" + +var blockPrefix = ds.NewKey(DefaultPrefix) var ValueTypeMismatch = errors.New("the retrieved value is not a Block") var ErrHashMismatch = errors.New("block in storage has different hash than requested") @@ -71,20 +73,23 @@ type gcBlockstore struct { } func NewBlockstore(d ds.Batching) *blockstore { + return NewBlockstoreWPrefix(d, DefaultPrefix) +} + +func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore { var dsb ds.Batching - dd := dsns.Wrap(d, BlockPrefix) + prefixKey := ds.NewKey(prefix) + dd := dsns.Wrap(d, prefixKey) dsb = dd return &blockstore{ datastore: dsb, + prefix: prefixKey, } } type blockstore struct { datastore ds.Batching - - lk sync.RWMutex - gcreq int32 - gcreqlk sync.Mutex + prefix ds.Key rehash bool } @@ -175,7 +180,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) // KeysOnly, because that would be _a lot_ of data. q := dsq.Query{KeysOnly: true} // datastore/namespace does *NOT* fix up Query.Prefix - q.Prefix = BlockPrefix.String() + q.Prefix = bs.prefix.String() res, err := bs.datastore.Query(q) if err != nil { return nil, err diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index abe8a1a72d5..22c15d0004b 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -170,7 +170,7 @@ func TestAllKeysRespectsContext(t *testing.T) { default: } - e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()} + e := dsq.Entry{Key: blockPrefix.ChildString("foo").String()} resultChan <- dsq.Result{Entry: e} // let it go. close(resultChan) <-done // should be done now. @@ -190,7 +190,7 @@ func TestValueTypeMismatch(t *testing.T) { block := blocks.NewBlock([]byte("some data")) datastore := ds.NewMapDatastore() - k := BlockPrefix.Child(dshelp.CidToDsKey(block.Cid())) + k := blockPrefix.Child(dshelp.CidToDsKey(block.Cid())) datastore.Put(k, "data that isn't a block!") blockstore := NewBlockstore(ds_sync.MutexWrap(datastore)) diff --git a/core/builder.go b/core/builder.go index baef82ed06d..e494d3ae22b 100644 --- a/core/builder.go +++ b/core/builder.go @@ -16,6 +16,7 @@ import ( pin "github.com/ipfs/go-ipfs/pin" repo "github.com/ipfs/go-ipfs/repo" cfg "github.com/ipfs/go-ipfs/repo/config" + fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" context "context" retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore" @@ -26,6 +27,9 @@ import ( ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" dsync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" ci "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto" + + "github.com/ipfs/go-ipfs/filestore" + "github.com/ipfs/go-ipfs/filestore/support" ) type BuildCfg struct { @@ -184,7 +188,14 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { return err } - n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker()) + var mbs bstore.Blockstore = cbs + + if n.Repo.DirectMount(fsrepo.FilestoreMount) != nil { + fs := bstore.NewBlockstoreWPrefix(n.Repo.Datastore(), fsrepo.FilestoreMount) + mbs = filestore_support.NewMultiBlockstore(cbs, fs) + } + + n.Blockstore = bstore.NewGCBlockstore(mbs, bstore.NewGCLocker()) rcfg, err := n.Repo.Config() if err != nil { @@ -206,9 +217,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { n.Blocks = bserv.New(n.Blockstore, n.Exchange) n.DAG = dag.NewDAGService(n.Blocks) + if fs, ok := n.Repo.DirectMount(fsrepo.FilestoreMount).(*filestore.Datastore); ok { + n.DAG = filestore_support.NewDAGService(fs, n.DAG) + } internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore))) n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag) + if err != nil { // TODO: we should move towards only running 'NewPinner' explicity on // node init instead of implicitly here as a result of the pinner keys diff --git a/core/commands/add.go b/core/commands/add.go index 52613ca4cf1..c2870400bf6 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -1,13 +1,18 @@ package commands import ( + "errors" "fmt" "io" "github.com/ipfs/go-ipfs/core/coreunix" + "github.com/ipfs/go-ipfs/filestore" + "github.com/ipfs/go-ipfs/filestore/support" + "github.com/ipfs/go-ipfs/repo/fsrepo" "gx/ipfs/QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs/pb" - blockservice "github.com/ipfs/go-ipfs/blockservice" + //bs "github.com/ipfs/go-ipfs/blocks/blockstore" + bserv "github.com/ipfs/go-ipfs/blockservice" cmds "github.com/ipfs/go-ipfs/commands" files "github.com/ipfs/go-ipfs/commands/files" core "github.com/ipfs/go-ipfs/core" @@ -33,6 +38,7 @@ const ( chunkerOptionName = "chunker" pinOptionName = "pin" rawLeavesOptionName = "raw-leaves" + noCopyName = "no-copy" ) var AddCmd = &cmds.Command{ @@ -80,6 +86,7 @@ You can now refer to the added file in a gateway, like so: cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."), cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true), cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"), + cmds.BoolOption(noCopyName, "Don't copy file contents. (experimental)"), }, PreRun: func(req cmds.Request) error { if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet { @@ -138,6 +145,7 @@ You can now refer to the added file in a gateway, like so: chunker, _, _ := req.Option(chunkerOptionName).String() dopin, _, _ := req.Option(pinOptionName).Bool() rawblks, _, _ := req.Option(rawLeavesOptionName).Bool() + nocopy, _, _ := req.Option(noCopyName).Bool() if hash { nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{ @@ -152,18 +160,33 @@ You can now refer to the added file in a gateway, like so: n = nilnode } - dserv := n.DAG + exchange := n.Exchange local, _, _ := req.Option("local").Bool() if local { - offlineexch := offline.Exchange(n.Blockstore) - bserv := blockservice.New(n.Blockstore, offlineexch) - dserv = dag.NewDAGService(bserv) + exchange = offline.Exchange(n.Blockstore) } outChan := make(chan interface{}, 8) res.SetOutput((<-chan interface{})(outChan)) - fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dserv) + var fileAdder *coreunix.Adder + if nocopy { + fs, ok := n.Repo.DirectMount(fsrepo.FilestoreMount).(*filestore.Datastore) + if !ok { + res.SetError(errors.New("filestore not enabled"), cmds.ErrNormal) + return + } + blockstore := filestore_support.NewBlockstore(n.Blockstore, fs) + blockService := bserv.NewWriteThrough(blockstore, exchange) + dagService := dag.NewDAGService(blockService) + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, blockstore, dagService) + } else if exchange != n.Exchange { + blockService := bserv.New(n.Blockstore, exchange) + dagService := dag.NewDAGService(blockService) + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dagService) + } else { + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, n.DAG) + } if err != nil { res.SetError(err, cmds.ErrNormal) return diff --git a/core/core.go b/core/core.go index 231b91ac6b0..60341f1005d 100644 --- a/core/core.go +++ b/core/core.go @@ -120,7 +120,7 @@ type IpfsNode struct { proc goprocess.Process ctx context.Context - mode mode + mode mode localModeSet bool } diff --git a/filestore/README.md b/filestore/README.md new file mode 100644 index 00000000000..35df61fded3 --- /dev/null +++ b/filestore/README.md @@ -0,0 +1,76 @@ +# Notes on the Filestore + +The filestore is a work-in-progress datastore that stores the unixfs +data component of blocks in files on the filesystem instead of in the +block itself. The main use of the datastore is to add content to IPFS +without duplicating the content in the IPFS datastore. + +The filestore is developed on Debian (GNU/Linux). It has has limited +testing on Windows and should work on MacOS X and other Unix like +systems. + +## Adding Files + +To add a file to IPFS without copying, use `add --no-copy` or to add a +directory use `add --no-copy`. (Throughout this document all +command are assumed to start with `ipfs` so `filestore add` really +mains `ipfs filestore add`). For example to add the file `hello.txt` +use: +``` + ipfs filestore add "`pwd`"/hello.txt +``` + +Paths stored in the filestore must be absolute. + +By default, the contents of the file are always verified by +recomputing the hash. The setting `Filestore.Verify` can be used to +change this to never recompute the hash (not recommended) or to only +recompute the hash when the modification-time has changed. + +Adding files to the filestore will generally be faster than adding +blocks normally as less data is copied around. Retrieving blocks from +the filestore takes about the same time when the hash is not +recomputed, when it is, retrieval is slower. + +## About filestore entries + +Each entry in the filestore is uniquely refereed to by combining the +(1) the hash of the block, (2) the path to the file, and (3) the +offset within the file, using the following syntax: +``` + /// +``` +for example: +``` + QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH//somedir/hello.txt//0 +``` + +In the case that there is only one entry for a hash the entry is +stored using just the hash. If there is more than one entry for a +hash (for example if adding two files with identical content) than one +entry will be stored using just the hash and the others will be stored +using the full key. If the backing file changes or becomes +inaccessible for the default entry (the one with just the hash) the +other entries are tried until a valid entry is found. Once a valid +entry is found that entry will become the default. + +When listing the contents of the filestore entries that are stored +using just the hash are displayed as +``` + /// +``` +with a space between the amd . + +It is always possible to refer to a specific entry in the filestore +using the full key regardless to how it is stored. + +## Controlling when blocks are verified. + +The config variable `Filestore.Verify` can be used to customize when +blocks from the filestore are verified. The default value `Always` +will always verify blocks. A value of `IfChanged. will verify a +block if the modification time of the backing file has changed. This +value works well in most cases, but can miss some changes, espacally +if the filesystem only tracks file modification times with a +resolution of one second (HFS+, used by OS X) or less (FAT32). A +value of `Never`, never checks blocks. diff --git a/filestore/dataobj.go b/filestore/dataobj.go new file mode 100644 index 00000000000..cf504a99eed --- /dev/null +++ b/filestore/dataobj.go @@ -0,0 +1,136 @@ +package filestore + +import ( + "fmt" + pb "github.com/ipfs/go-ipfs/filestore/pb" + "math" + "time" +) + +const ( + // If NoBlockData is true the Data is missing the Block data + // as that is provided by the underlying file + NoBlockData = 1 + // If WholeFile is true the Data object represents a complete + // file and Size is the size of the file + WholeFile = 2 + // If the node represents an a file but is not a leaf + // If WholeFile is also true than it is the file's root node + Internal = 4 + // If the block was determined to no longer be valid + Invalid = 8 +) + +type DataObj struct { + Flags uint64 + // The path to the file that holds the data for the object, an + // empty string if there is no underlying file + FilePath string + Offset uint64 + Size uint64 + ModTime float64 + Data []byte +} + +func (d *DataObj) NoBlockData() bool { return d.Flags&NoBlockData != 0 } +func (d *DataObj) HaveBlockData() bool { return !d.NoBlockData() } + +func (d *DataObj) WholeFile() bool { return d.Flags&WholeFile != 0 } + +func (d *DataObj) Internal() bool { return d.Flags&Internal != 0 } + +func (d *DataObj) Invalid() bool { return d.Flags&Invalid != 0 } + +func (d *DataObj) SetInvalid(val bool) { + if val { + d.Flags |= Invalid + } else { + d.Flags &^= Invalid + } +} + +func FromTime(t time.Time) float64 { + res := float64(t.Unix()) + if res > 0 { + res += float64(t.Nanosecond()) / 1000000000.0 + } + return res +} + +func ToTime(t float64) time.Time { + sec, frac := math.Modf(t) + return time.Unix(int64(sec), int64(frac*1000000000.0)) +} + +func (d *DataObj) KeyStr(key Key, asKey bool) string { + if key.FilePath == "" { + res := key.Format() + if asKey { + res += "/" + } else { + res += " /" + } + res += d.FilePath + res += "//" + res += fmt.Sprintf("%d", d.Offset) + return res + } else { + return key.Format() + } +} + +func (d *DataObj) Marshal() ([]byte, error) { + pd := new(pb.DataObj) + + pd.Flags = &d.Flags + + if d.FilePath != "" { + pd.FilePath = &d.FilePath + } + if d.Offset != 0 { + pd.Offset = &d.Offset + } + if d.Size != 0 { + pd.Size_ = &d.Size + } + if d.Data != nil { + pd.Data = d.Data + } + + if d.ModTime != 0.0 { + pd.Modtime = &d.ModTime + } + + return pd.Marshal() +} + +func (d *DataObj) Unmarshal(data []byte) error { + pd := new(pb.DataObj) + err := pd.Unmarshal(data) + if err != nil { + panic(err) + } + + if pd.Flags != nil { + d.Flags = *pd.Flags + } + + if pd.FilePath != nil { + d.FilePath = *pd.FilePath + } + if pd.Offset != nil { + d.Offset = *pd.Offset + } + if pd.Size_ != nil { + d.Size = *pd.Size_ + } + if pd.Data != nil { + d.Data = pd.Data + } + + if pd.Modtime != nil { + d.ModTime = *pd.Modtime + } + + return nil +} diff --git a/filestore/datastore.go b/filestore/datastore.go new file mode 100644 index 00000000000..693449135bf --- /dev/null +++ b/filestore/datastore.go @@ -0,0 +1,412 @@ +package filestore + +import ( + //"runtime/debug" + //"bytes" + "errors" + "io" + "os" + "path/filepath" + "sync" + + "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb" + "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt" + "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/util" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" + "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" + dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" +) + +var log = logging.Logger("filestore") +var Logger = log + +type VerifyWhen int + +const ( + VerifyNever VerifyWhen = iota + VerifyIfChanged + VerifyAlways +) + +type Datastore struct { + db dbwrap + verify VerifyWhen + + // updateLock should be held whenever updating the database. It + // is designed to only be held for a very short period of time and + // should not be held when doing potentially expensive operations + // such as computing a hash or any sort of I/O. + updateLock sync.Mutex +} + +func (d *Datastore) DB() dbwrap { + return d.db +} + +func Init(path string) error { + db, err := leveldb.OpenFile(path, &opt.Options{ + Compression: opt.NoCompression, + }) + if err != nil { + return err + } + db.Close() + return nil +} + +func New(path string, verify VerifyWhen, noCompression bool) (*Datastore, error) { + dbOpts := &opt.Options{ErrorIfMissing: true} + if noCompression { + dbOpts.Compression = opt.NoCompression + } + db, err := leveldb.OpenFile(path, dbOpts) + if err != nil { + return nil, err + } + ds := &Datastore{db: dbwrap{dbread{db}, db}, verify: verify} + return ds, nil +} + +func (d *Datastore) Put(key ds.Key, value interface{}) error { + dataObj, ok := value.(*DataObj) + if !ok { + return ds.ErrInvalidType + } + + if dataObj.FilePath == "" && dataObj.Size == 0 { + // special case to handle empty files + d.updateLock.Lock() + defer d.updateLock.Unlock() + return d.db.Put(HashToKey(key.String()), dataObj) + } + + // Make sure the filename is an absolute path + if !filepath.IsAbs(dataObj.FilePath) { + return errors.New("datastore put: non-absolute filename: " + dataObj.FilePath) + } + + // Make sure we can read the file as a sanity check + file, err := os.Open(dataObj.FilePath) + if err != nil { + return err + } + defer file.Close() + + // See if we have the whole file in the block + if dataObj.Offset == 0 && !dataObj.WholeFile() { + // Get the file size + info, err := file.Stat() + if err != nil { + return err + } + if dataObj.Size == uint64(info.Size()) { + dataObj.Flags |= WholeFile + } + } + + d.updateLock.Lock() + defer d.updateLock.Unlock() + + hash := HashToKey(key.String()) + have, err := d.db.Has(hash) + if err != nil { + return err + } + if !have { + // First the easy case, the hash doesn't exist yet so just + // insert the data object as is + return d.db.Put(hash, dataObj) + } + + // okay so the hash exists, see if we already have this DataObj + dbKey := NewDbKey(key.String(), dataObj.FilePath, int64(dataObj.Offset), nil) + foundKey, _, err := d.GetDirect(dbKey) + if err != nil && err != ds.ErrNotFound { + return err + } + + if err == nil { + // the DataObj already exists so just replace it + return d.db.Put(foundKey, dataObj) + } + + // the DataObj does not exist so insert it using the full key to + // avoid overwritting the existing entry + return d.db.Put(dbKey, dataObj) +} + +func (d *Datastore) Get(dsKey ds.Key) (value interface{}, err error) { + hash := HashToKey(dsKey.String()) + + // we need a consistent view of the database so take a snapshot + ss0, err := d.db.db.GetSnapshot() + if err != nil { + return nil, err + } + defer ss0.Release() + ss := dbread{ss0} + + val, err := ss.GetHash(hash.Bytes) + if err == leveldb.ErrNotFound { + return nil, ds.ErrNotFound + } else if err != nil { + return nil, err + } + data, err := GetData(d, hash, val, d.verify) + if err == nil { + return data, nil + } + + //println("GET TRYING ALTERNATIVES") + + // See if we have any other DataObj's for the same hash that are + // valid + itr := ss.GetAlternatives(hash.Bytes) + for itr.Next() { + key := itr.Key() + val, err := itr.Value() + if err != nil { + return nil, err + } + data, err = GetData(d, key, val, d.verify) + if err == nil { + // we found one + d.updateGood(hash, key, val) + return data, nil + } + if err != InvalidBlock { + return nil, err + } + } + + return nil, err +} + +func (d *Datastore) updateGood(hash *DbKey, key *DbKey, dataObj *DataObj) { + d.updateLock.Lock() + defer d.updateLock.Unlock() + bad, err := d.db.GetHash(hash.Bytes) + if err != nil { + log.Warningf("%s: updateGood: %s", key, err) + } + badKey := NewDbKey(hash.Hash, bad.FilePath, int64(bad.Offset), nil) + good, err := d.db.Get(key) + if err != nil { + log.Warningf("%s: updateGood: %s", key, err) + } + // use batching as this needs to be done in a single atomic + // operation, to avoid problems with partial failures + batch := NewBatch() + batch.Put(hash, good) + batch.Put(badKey, bad) + batch.Delete(key.Bytes) + err = d.db.Write(batch) + if err != nil { + log.Warningf("%s: updateGood: %s", key, err) + } +} + +// Get the key as a DataObj. To handle multiple DataObj per Hash a +// block can be retrieved by either by just the hash or the hash +// combined with filename and offset. +// +// In addition to the date GteDirect will return the key the block was +// found under. +func (d *Datastore) GetDirect(key *DbKey) (*DbKey, *DataObj, error) { + if string(key.Bytes) != key.String() { + panic(string(key.Bytes) + " != " + key.String()) + } + val, err := d.db.Get(key) + if err != leveldb.ErrNotFound { // includes the case when err == nil + return key, val, err + } + + if key.FilePath == "" { + return nil, nil, ds.ErrNotFound + } + + hash := HashToKey(key.Hash) + return d.getIndirect(hash, key) +} + +// We have a key with filename and offset that was not found directly. +// Check to see it it was stored just using the hash. +func (d *Datastore) getIndirect(hash *DbKey, key *DbKey) (*DbKey, *DataObj, error) { + val, err := d.db.GetHash(hash.Bytes) + if err == leveldb.ErrNotFound { + return nil, nil, ds.ErrNotFound + } else if err != nil { + return nil, nil, err + } + + if key.FilePath != val.FilePath || uint64(key.Offset) != val.Offset { + return nil, nil, ds.ErrNotFound + } + + return hash, val, nil +} + +type KeyVal struct { + Key *DbKey + Val *DataObj +} + +type IsPinned int + +const ( + NotPinned = 1 + MaybePinned = 2 +) + +func (d *Datastore) Update(key *DbKey, val *DataObj) { + if key.FilePath == "" { + key = NewDbKey(key.Hash, val.FilePath, int64(val.Offset), nil) + } + d.updateLock.Lock() + defer d.updateLock.Unlock() + foundKey, _, err := d.GetDirect(key) + if err != nil { + return + } + d.db.Put(foundKey, val) +} + +var InvalidBlock = errors.New("filestore: block verification failed") + +// Get the orignal data out of the DataObj +func GetData(d *Datastore, key *DbKey, val *DataObj, verify VerifyWhen) ([]byte, error) { + if val == nil { + return nil, errors.New("Nil DataObj") + } + + // If there is no data to get from a backing file then there + // is nothing more to do so just return the block data + if val.HaveBlockData() { + return val.Data, nil + } + + invalid := val.Invalid() + + // Open the file and seek to the correct position + file, err := os.Open(val.FilePath) + if err != nil { + return nil, err + } + defer file.Close() + _, err = file.Seek(int64(val.Offset), 0) + if err != nil { + return nil, err + } + + // Reconstruct the original block, if we get an EOF + // than the file shrunk and the block is invalid + data, _, err := Reconstruct(val.Data, file, val.Size) + reconstructOk := true + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, err + } else if err != nil { + log.Debugf("invalid block: %s: %s\n", MHash(key), err.Error()) + reconstructOk = false + invalid = true + } + + if verify == VerifyNever { + if invalid { + return nil, InvalidBlock + } else { + return data, nil + } + } + + // get the new modtime + fileInfo, err := file.Stat() + if err != nil { + return nil, err + } + modtime := FromTime(fileInfo.ModTime()) + + // Verify the block contents if required + if reconstructOk && (verify == VerifyAlways || modtime != val.ModTime) { + log.Debugf("verifying block %s\n", MHash(key)) + origKey, _ := key.Cid() + newKey, _ := origKey.Prefix().Sum(data) + invalid = !origKey.Equals(newKey) + } + + // Update the block if the metadata has changed + if invalid != val.Invalid() || modtime != val.ModTime { + log.Debugf("updating block %s\n", MHash(key)) + newVal := *val + newVal.SetInvalid(invalid) + newVal.ModTime = modtime + // ignore errors as they are nonfatal + d.Update(key, &newVal) + } + + // Finally return the result + if invalid { + log.Debugf("invalid block %s\n", MHash(key)) + return nil, InvalidBlock + } else { + return data, nil + } +} + +func (d *Datastore) Has(key ds.Key) (exists bool, err error) { + // FIXME: This is too simple + return d.db.HasHash(key.Bytes()) +} + +func (d *Datastore) Delete(key ds.Key) error { + //d.updateLock.Lock() + //defer d.updateLock.Unlock() + //return d.db.Delete(key.Bytes()) + return errors.New("Deleting filestore blocks via Delete() method is unsupported.") +} + +func (d *Datastore) Query(q query.Query) (query.Results, error) { + if (q.Prefix != "" && q.Prefix != "/") || + len(q.Filters) > 0 || + len(q.Orders) > 0 || + q.Limit > 0 || + q.Offset > 0 || + !q.KeysOnly { + // TODO this is overly simplistic, but the only caller is + // `ipfs refs local` for now, and this gets us moving. + return nil, errors.New("filestore only supports listing all keys in random order") + } + qrb := dsq.NewResultBuilder(q) + qrb.Process.Go(func(worker goprocess.Process) { + var rnge *util.Range + i := d.db.db.NewIterator(rnge, nil) + defer i.Release() + for i.Next() { + k := ds.NewKey(string(i.Key())).String() + e := dsq.Entry{Key: k} + select { + case qrb.Output <- dsq.Result{Entry: e}: // we sent it out + case <-worker.Closing(): // client told us to end early. + break + } + } + if err := i.Error(); err != nil { + select { + case qrb.Output <- dsq.Result{Error: err}: // client read our error + case <-worker.Closing(): // client told us to end. + return + } + } + }) + go qrb.Process.CloseAfterChildren() + return qrb.Results(), nil +} + +func (d *Datastore) Close() error { + return d.db.db.Close() +} + +func (d *Datastore) Batch() (ds.Batch, error) { + return ds.NewBasicBatch(d), nil +} diff --git a/filestore/dbwrap.go b/filestore/dbwrap.go new file mode 100644 index 00000000000..dbc7edb4b69 --- /dev/null +++ b/filestore/dbwrap.go @@ -0,0 +1,176 @@ +package filestore + +import ( + "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb" + "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/iterator" + "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt" + "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/util" +) + +type readops interface { + Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) + Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) + NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator +} + +type dbread struct { + db readops +} + +type dbwrap struct { + dbread + db *leveldb.DB +} + +func Decode(bytes []byte) (*DataObj, error) { + val := new(DataObj) + err := val.Unmarshal(bytes) + if err != nil { + return nil, err + } + return val, nil +} + +func (w dbread) GetHash(key []byte) (*DataObj, error) { + val, err := w.db.Get(key, nil) + if err != nil { + return nil, err + } + return Decode(val) +} + +func (w dbread) Get(key *DbKey) (*DataObj, error) { + if key.FilePath == "" { + return w.GetHash(key.Bytes) + } + val, err := w.db.Get(key.Bytes, nil) + if err != nil { + return nil, err + } + dataObj, err := Decode(val) + if err != nil { + return nil, err + } + dataObj.FilePath = key.FilePath + dataObj.Offset = uint64(key.Offset) + return dataObj, err +} + +func (d dbread) GetAlternatives(key []byte) *Iterator { + start := make([]byte, 0, len(key)+1) + start = append(start, key...) + start = append(start, byte('/')) + stop := make([]byte, 0, len(key)+1) + stop = append(stop, key...) + stop = append(stop, byte('/')+1) + return &Iterator{iter: d.db.NewIterator(&util.Range{start, stop}, nil)} +} + +func (w dbread) HasHash(key []byte) (bool, error) { + return w.db.Has(key, nil) +} + +func (w dbread) Has(key *DbKey) (bool, error) { + return w.db.Has(key.Bytes, nil) +} + +func marshal(key *DbKey, val *DataObj) ([]byte, error) { + if key.FilePath != "" { + val.FilePath = "" + val.Offset = 0 + } + return val.Marshal() +} + +// Put might modify `val`, it is not safe to use the objected pointed +// by `val` after this call. +func (w dbwrap) Put(key *DbKey, val *DataObj) error { + data, err := marshal(key, val) + if err != nil { + return err + } + return w.db.Put(key.Bytes, data, nil) +} + +func (w dbwrap) Delete(key []byte) error { + return w.db.Delete(key, nil) +} + +func (w dbwrap) Write(b dbbatch) error { + return w.db.Write(b.batch, nil) +} + +type dbbatch struct { + batch *leveldb.Batch +} + +func NewBatch() dbbatch { + return dbbatch{new(leveldb.Batch)} +} + +// Put might modify `val`, it is not safe to use the objected pointed +// by `val` after this call. +func (b dbbatch) Put(key *DbKey, val *DataObj) error { + data, err := marshal(key, val) + if err != nil { + return err + } + b.batch.Put(key.Bytes, data) + return nil +} + +func (b dbbatch) Delete(key []byte) { + b.batch.Delete(key) +} + +type Iterator struct { + key *DbKey + value *DataObj + iter iterator.Iterator +} + +func (d dbread) NewIterator() *Iterator { + return &Iterator{iter: d.db.NewIterator(nil, nil)} +} + +func (itr *Iterator) Next() bool { + itr.key = nil + itr.value = nil + return itr.iter.Next() +} + +func (itr *Iterator) Key() *DbKey { + if itr.key == nil { + bytes := itr.iter.Key() + itr.key = &DbKey{ + Key: ParseDsKey(string(bytes)), + Bytes: bytes, + } + } + return itr.key +} + +func (itr *Iterator) Value() (*DataObj, error) { + if itr.value != nil { + return itr.value, nil + } + bytes := itr.iter.Value() + if bytes == nil { + return nil, nil + } + var err error + itr.value, err = Decode(bytes) + if err != nil { + return nil, err + } + key := itr.Key() + if key.FilePath != "" { + itr.value.FilePath = key.FilePath + itr.value.Offset = uint64(key.Offset) + } + return itr.value, nil +} + +func (itr *Iterator) Release() { + itr.iter.Release() +} diff --git a/filestore/key.go b/filestore/key.go new file mode 100644 index 00000000000..1e493f39340 --- /dev/null +++ b/filestore/key.go @@ -0,0 +1,187 @@ +package filestore + +import ( + "bytes" + "fmt" + "strconv" + "strings" + + dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help" + cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" + base32 "gx/ipfs/Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj/base32" +) + +type Key struct { + Hash string + FilePath string // empty string if not given + Offset int64 // -1 if not given +} + +func ParseKey(str string) (*DbKey, error) { + idx := strings.Index(str, "/") + var key *DbKey + if idx == -1 { + idx = len(str) + } + if idx != 0 { // we have a Hash + mhash := str[:idx] + c, err := cid.Decode(mhash) + if err != nil { + return nil, err + } + key = CidToKey(c) + } else { + key = &DbKey{} + } + if idx == len(str) { // we just have a hash + return key, nil + } + str = str[idx+1:] + parseRest(&key.Key, str) + key.Bytes = key.Key.Bytes() + return key, nil +} + +func ParseDsKey(str string) Key { + idx := strings.Index(str[1:], "/") + 1 + if idx == 0 { + return Key{str, "", -1} + } + key := Key{Hash: str[:idx]} + str = str[idx+1:] + parseRest(&key, str) + return key +} + +func parseRest(key *Key, str string) { + filename := strings.Trim(str, "0123456789") + if len(filename) <= 2 || filename[len(filename)-2:] != "//" || len(str) == len(filename) { + key.FilePath = filename + key.Offset = -1 + return + } + offsetStr := str[len(filename):] + key.FilePath = filename[:len(filename)-2] + key.Offset, _ = strconv.ParseInt(offsetStr, 10, 64) +} + +func (k Key) String() string { + str := k.Hash + if k.FilePath == "" { + return str + } + str += "/" + str += k.FilePath + if k.Offset == -1 { + return str + } + str += "//" + str += strconv.FormatInt(k.Offset, 10) + return str +} + +func (k Key) Bytes() []byte { + if k.FilePath == "" { + return []byte(k.Hash) + } + buf := bytes.NewBuffer(nil) + if k.Offset == -1 { + fmt.Fprintf(buf, "%s/%s", k.Hash, k.FilePath) + } else { + fmt.Fprintf(buf, "%s/%s//%d", k.Hash, k.FilePath, k.Offset) + } + return buf.Bytes() +} + +func (k Key) Cid() (*cid.Cid, error) { + binary, err := base32.RawStdEncoding.DecodeString(k.Hash[1:]) + if err != nil { + return nil, err + } + return cid.Cast(binary) +} + +type DbKey struct { + Key + Bytes []byte + cid *cid.Cid +} + +func ParseDbKey(key string) *DbKey { + return &DbKey{ + Key: ParseDsKey(key), + Bytes: []byte(key), + } +} + +func NewDbKey(hash string, filePath string, offset int64, cid *cid.Cid) *DbKey { + key := &DbKey{Key: Key{hash, filePath, offset}, cid: cid} + key.Bytes = key.Key.Bytes() + return key +} + +func KeyToKey(key Key) *DbKey { + return &DbKey{key, key.Bytes(), nil} +} + +func HashToKey(hash string) *DbKey { + return NewDbKey(hash, "", -1, nil) +} + +func CidToKey(c *cid.Cid) *DbKey { + return NewDbKey(dshelp.CidToDsKey(c).String(), "", -1, c) +} + +func (k *DbKey) HashOnly() *DbKey { + if k.cid != nil { + return CidToKey(k.cid) + } else { + return HashToKey(k.Hash) + } +} + +func (k *DbKey) Cid() (*cid.Cid, error) { + if k.cid == nil { + var err error + k.cid, err = k.Key.Cid() + if err != nil { + return nil, err + } + } + return k.cid, nil +} + +type havecid interface { + Cid() (*cid.Cid, error) +} + +func MHash(k havecid) string { + key, err := k.Cid() + if err != nil { + return "??????????????????????????????????????????????" + } + return key.String() +} + +func (k Key) Format() string { + if k.FilePath == "" { + return MHash(k) + } + return Key{MHash(k), k.FilePath, k.Offset}.String() +} + +func (k *DbKey) Format() string { + mhash := "" + if k.Hash != "" { + mhash = MHash(k) + } + if k.FilePath == "" { + return mhash + } + return Key{mhash, k.FilePath, k.Offset}.String() +} + +func (k *DbKey) MakeFull(dataObj *DataObj) *DbKey { + newKey := Key{k.Hash, dataObj.FilePath, int64(dataObj.Offset)} + return &DbKey{newKey, newKey.Bytes(), k.cid} +} diff --git a/filestore/key_test.go b/filestore/key_test.go new file mode 100644 index 00000000000..41f1687b6af --- /dev/null +++ b/filestore/key_test.go @@ -0,0 +1,48 @@ +package filestore + +import ( + "testing" +) + +func testParse(t *testing.T, str string, expect Key) { + res, err := ParseKey(str) + if err != nil { + t.Errorf("%s", err) + } + if res.Key != expect { + t.Errorf("parse failed on: %s: %#v != %#v", str, expect, res.Key) + } + if str != res.Format() { + t.Errorf("Format() format failed %s != %s", str, res.Format()) + } +} + +func testDsParse(t *testing.T, str string, expect Key) { + res := ParseDsKey(str) + if res != expect { + t.Errorf("parse failed on: %s", str) + } + if str != res.String() { + t.Errorf("String() format failed %s != %s", str, res.String()) + } + if str != string(res.Bytes()) { + t.Errorf("Bytes() format failed %s != %s", str, res.String()) + } +} + +func TestKey(t *testing.T) { + qmHash := "/CIQPJLLZXHBPDKSP325GP7BLB6J3WNGKMDZJWZRGANTAN22QKXDNY6Y" + zdHash := "/AFZBEIGD4KVH2JPQABBLQGN44DZVK5F3WWBEFEUDWFZ2ANB3PLOXSHWTDY" + testParse(t, "QmeomcMd37LRxkYn69XKiTpGEiJWRgUNEaxADx6ssfUJhp", Key{qmHash, "", -1}) + testParse(t, "zdvgqEbdrK4PzARFB7twNKangqFF3mgWeuJJAtMUwdDwFq7Pj", Key{zdHash, "", -1}) + testParse(t, "QmeomcMd37LRxkYn69XKiTpGEiJWRgUNEaxADx6ssfUJhp/dir/file", Key{qmHash, "dir/file", -1}) + testParse(t, "QmeomcMd37LRxkYn69XKiTpGEiJWRgUNEaxADx6ssfUJhp//dir/file", Key{qmHash, "/dir/file", -1}) + testParse(t, "QmeomcMd37LRxkYn69XKiTpGEiJWRgUNEaxADx6ssfUJhp//dir/file//23", Key{qmHash, "/dir/file", 23}) + testParse(t, "//just/a/file", Key{"", "/just/a/file", -1}) + testParse(t, "/just/a/file", Key{"", "just/a/file", -1}) + + testDsParse(t, "/ED65SD", Key{"/ED65SD", "", -1}) + testDsParse(t, "/ED65SD//some/file", Key{"/ED65SD", "/some/file", -1}) + testDsParse(t, "/ED65SD//some/file//34", Key{"/ED65SD", "/some/file", 34}) + testDsParse(t, "/ED65SD/c:/some/file//34", Key{"/ED65SD", "c:/some/file", 34}) +} diff --git a/filestore/pb/Makefile b/filestore/pb/Makefile new file mode 100644 index 00000000000..4b6a1d37569 --- /dev/null +++ b/filestore/pb/Makefile @@ -0,0 +1,10 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --gofast_out=. $< + +clean: + rm *.pb.go diff --git a/filestore/pb/dataobj.pb.go b/filestore/pb/dataobj.pb.go new file mode 100644 index 00000000000..fe7f685473a --- /dev/null +++ b/filestore/pb/dataobj.pb.go @@ -0,0 +1,499 @@ +// Code generated by protoc-gen-gogo. +// source: dataobj.proto +// DO NOT EDIT! + +/* + Package datastore_pb is a generated protocol buffer package. + + It is generated from these files: + dataobj.proto + + It has these top-level messages: + DataObj +*/ +package datastore_pb + +import proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type DataObj struct { + FilePath *string `protobuf:"bytes,1,opt,name=FilePath" json:"FilePath,omitempty"` + Offset *uint64 `protobuf:"varint,2,opt,name=Offset" json:"Offset,omitempty"` + Size_ *uint64 `protobuf:"varint,3,opt,name=Size" json:"Size,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"` + Flags *uint64 `protobuf:"varint,8,opt,name=Flags" json:"Flags,omitempty"` + Modtime *float64 `protobuf:"fixed64,9,opt,name=Modtime" json:"Modtime,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *DataObj) Reset() { *m = DataObj{} } +func (m *DataObj) String() string { return proto.CompactTextString(m) } +func (*DataObj) ProtoMessage() {} + +func (m *DataObj) GetFilePath() string { + if m != nil && m.FilePath != nil { + return *m.FilePath + } + return "" +} + +func (m *DataObj) GetOffset() uint64 { + if m != nil && m.Offset != nil { + return *m.Offset + } + return 0 +} + +func (m *DataObj) GetSize_() uint64 { + if m != nil && m.Size_ != nil { + return *m.Size_ + } + return 0 +} + +func (m *DataObj) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *DataObj) GetFlags() uint64 { + if m != nil && m.Flags != nil { + return *m.Flags + } + return 0 +} + +func (m *DataObj) GetModtime() float64 { + if m != nil && m.Modtime != nil { + return *m.Modtime + } + return 0 +} + +func init() { + proto.RegisterType((*DataObj)(nil), "datastore.pb.DataObj") +} +func (m *DataObj) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *DataObj) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.FilePath != nil { + data[i] = 0xa + i++ + i = encodeVarintDataobj(data, i, uint64(len(*m.FilePath))) + i += copy(data[i:], *m.FilePath) + } + if m.Offset != nil { + data[i] = 0x10 + i++ + i = encodeVarintDataobj(data, i, uint64(*m.Offset)) + } + if m.Size_ != nil { + data[i] = 0x18 + i++ + i = encodeVarintDataobj(data, i, uint64(*m.Size_)) + } + if m.Data != nil { + data[i] = 0x22 + i++ + i = encodeVarintDataobj(data, i, uint64(len(m.Data))) + i += copy(data[i:], m.Data) + } + if m.Flags != nil { + data[i] = 0x40 + i++ + i = encodeVarintDataobj(data, i, uint64(*m.Flags)) + } + if m.Modtime != nil { + data[i] = 0x49 + i++ + i = encodeFixed64Dataobj(data, i, uint64(math.Float64bits(*m.Modtime))) + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeFixed64Dataobj(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Dataobj(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintDataobj(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *DataObj) Size() (n int) { + var l int + _ = l + if m.FilePath != nil { + l = len(*m.FilePath) + n += 1 + l + sovDataobj(uint64(l)) + } + if m.Offset != nil { + n += 1 + sovDataobj(uint64(*m.Offset)) + } + if m.Size_ != nil { + n += 1 + sovDataobj(uint64(*m.Size_)) + } + if m.Data != nil { + l = len(m.Data) + n += 1 + l + sovDataobj(uint64(l)) + } + if m.Flags != nil { + n += 1 + sovDataobj(uint64(*m.Flags)) + } + if m.Modtime != nil { + n += 9 + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovDataobj(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozDataobj(x uint64) (n int) { + return sovDataobj(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *DataObj) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DataObj: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DataObj: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field FilePath", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDataobj + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(data[iNdEx:postIndex]) + m.FilePath = &s + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Offset = &v + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Size_", wireType) + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Size_ = &v + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthDataobj + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append([]byte{}, data[iNdEx:postIndex]...) + iNdEx = postIndex + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Flags", wireType) + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Flags = &v + case 9: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field Modtime", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + iNdEx += 8 + v = uint64(data[iNdEx-8]) + v |= uint64(data[iNdEx-7]) << 8 + v |= uint64(data[iNdEx-6]) << 16 + v |= uint64(data[iNdEx-5]) << 24 + v |= uint64(data[iNdEx-4]) << 32 + v |= uint64(data[iNdEx-3]) << 40 + v |= uint64(data[iNdEx-2]) << 48 + v |= uint64(data[iNdEx-1]) << 56 + v2 := float64(math.Float64frombits(v)) + m.Modtime = &v2 + default: + iNdEx = preIndex + skippy, err := skipDataobj(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDataobj + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipDataobj(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDataobj + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDataobj + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDataobj + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthDataobj + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDataobj + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipDataobj(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthDataobj = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowDataobj = fmt.Errorf("proto: integer overflow") +) diff --git a/filestore/pb/dataobj.proto b/filestore/pb/dataobj.proto new file mode 100644 index 00000000000..f2d9e1033de --- /dev/null +++ b/filestore/pb/dataobj.proto @@ -0,0 +1,14 @@ +package datastore.pb; + +message DataObj { + optional string FilePath = 1; + optional uint64 Offset = 2; + optional uint64 Size = 3; + optional bytes Data = 4; + + // fields 5 - 7 where used in the dev. version, best not to resuse them + + optional uint64 Flags = 8; + optional double Modtime = 9; +} + diff --git a/filestore/reconstruct.go b/filestore/reconstruct.go new file mode 100644 index 00000000000..79584f1440e --- /dev/null +++ b/filestore/reconstruct.go @@ -0,0 +1,362 @@ +package filestore + +import ( + //"bytes" + //"encoding/hex" + "errors" + "fmt" + "io" + + dag_pb "github.com/ipfs/go-ipfs/merkledag/pb" + fs_pb "github.com/ipfs/go-ipfs/unixfs/pb" + proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" +) + +type UnixFSInfo struct { + Type fs_pb.Data_DataType + Data []byte + FileSize uint64 +} + +const useFastReconstruct = true + +func Reconstruct(data []byte, in io.Reader, blockDataSize uint64) ([]byte, *UnixFSInfo, error) { + // if blockDataSize == 0 { + // res1, fsinfo1, err1 := reconstruct(data, nil) + // if err1 != nil { + // return res1, fsinfo1, err1 + // } + // _ = fsinfo1 + // res2, fsinfo2, err2 := reconstructDirect(data, nil, 0) + // _ = fsinfo2 + // if err2 != nil { + // panic(err2) + // } + // if !bytes.Equal(res1, res2) { + // println("res1") + // print(hex.Dump(res1)) + // println("res2") + // print(hex.Dump(res2)) + // panic("Result not equal!") + // } + // return res2, fsinfo2, err2 + // } + if data == nil { // we have a raw node + blockData, err := readFromFile(in, blockDataSize) + if err != nil { + println(err) + } + return blockData, nil, err + } else if useFastReconstruct { + return reconstructDirect(data, in, blockDataSize) + } else { + blockData, err := readFromFile(in, blockDataSize) + if err != nil { + return nil, nil, err + } + return reconstruct(data, blockData) + } +} + +func readFromFile(in io.Reader, blockDataSize uint64) ([]byte, error) { + var blockData []byte + if blockDataSize > 0 { + blockData = make([]byte, blockDataSize) + _, err := io.ReadFull(in, blockData) + if err != nil { + return nil, err + } + } + return blockData, nil +} + +func reconstruct(data []byte, blockData []byte) ([]byte, *UnixFSInfo, error) { + // Decode data to merkledag protobuffer + var pbn dag_pb.PBNode + err := pbn.Unmarshal(data) + if err != nil { + panic(err) + } + + // Decode node's data to unixfs protobuffer + fs_pbn := new(fs_pb.Data) + err = proto.Unmarshal(pbn.Data, fs_pbn) + if err != nil { + panic(err) + } + + // gather some data about the unixfs object + fsinfo := &UnixFSInfo{Type: *fs_pbn.Type, Data: fs_pbn.Data} + if fs_pbn.Filesize != nil { + fsinfo.FileSize = *fs_pbn.Filesize + } + + // if we won't be replasing anything no need to reencode, just + // return the original data + if fs_pbn.Data == nil && blockData == nil { + return data, fsinfo, nil + } + + fs_pbn.Data = blockData + + // Reencode unixfs protobuffer + pbn.Data, err = proto.Marshal(fs_pbn) + if err != nil { + panic(err) + } + + // Reencode merkledag protobuffer + encoded, err := pbn.Marshal() + if err != nil { + return nil, fsinfo, err + } + return encoded, fsinfo, nil +} + +type header struct { + id int32 + // An "id" of 0 indicates a message we don't care about the + // value. As we don't care about the value multiple + // fields may be concatenated into one. + wire int32 + // "wire" is the Protocol Buffer wire format + val uint64 + // The exact meaning of "val" depends on the wire format: + // if a varint (wire format 0) then val is the value of the + // variable int; if length-delimited (wire format 2) + // then val is the payload size; otherwise, val is unused. +} + +type field struct { + header + offset int + // "offset" is the offset from the start of the buffer that + // contains the protocol key-value pair corresponding to the + // field, the end of the field is the same as the offset of + // the next field. An dummy field is added at the end that + // contains the final offset (i.e. the length of the buffer) + // to avoid special cases. +} + +type fields struct { + byts []byte + flds []field +} + +func (f fields) data(i int) []byte { + return f.byts[f.flds[i].offset:f.flds[i+1].offset] +} + +func (f fields) size(i int) int { + return f.flds[i+1].offset - f.flds[i].offset +} + +func (f fields) field(i int) field { + return f.flds[i] +} + +func (f fields) fields() []field { + return f.flds[0 : len(f.flds)-1] +} + +// only valid for the length-delimited (2) wire format +func (f fields) payload(i int) []byte { + return f.byts[f.flds[i+1].offset-int(f.flds[i].val) : f.flds[i+1].offset] +} + +const ( + unixfsTypeField = 1 + unixfsDataField = 2 + unixfsFilesizeField = 3 +) + +// An implementation of reconstruct that avoids expensive +// intermertaint data structures and unnecessary copying of data by +// reading the protocol buffer messages directly. +func reconstructDirect(data []byte, blockData io.Reader, blockDataSize uint64) ([]byte, *UnixFSInfo, error) { + dag, err := decodePB(data, func(typ int32) bool { + return typ == 1 + }) + var fs fields + if err != nil { + return nil, nil, err + } + dagSz := 0 + for i, fld := range dag.fields() { + if fld.id == 1 { + fs, err = decodePB(dag.payload(i), func(typ int32) bool { + return typ == unixfsTypeField || typ == unixfsDataField || typ == unixfsFilesizeField + }) + if err != nil { + return nil, nil, err + } + } else { + dagSz += dag.size(i) + } + } + + fsinfo := new(UnixFSInfo) + if len(fs.fields()) == 0 { + return nil, nil, errors.New("no UnixFS data") + } + if fs.field(0).id != unixfsTypeField { + return nil, nil, errors.New("unexpected field order") + } else { + fsinfo.Type = fs_pb.Data_DataType(fs.field(0).val) + } + fsSz := 0 + for i, fld := range fs.fields() { + if fld.id == unixfsDataField { + if i != 1 { + return nil, nil, errors.New("unexpected field order") + } + continue + } + if fld.id == unixfsFilesizeField { + fsinfo.FileSize = fld.val + } + fsSz += fs.size(i) + } + if len(fs.fields()) >= 2 && fs.field(1).id == unixfsDataField { + fsinfo.Data = fs.payload(1) + } else if blockDataSize == 0 { + // if we won't be replasing anything no need to + // reencode, just return the original data + return data, fsinfo, nil + } + if blockDataSize > 0 { + fsSz += 1 /* header */ + sizeVarint(blockDataSize) + int(blockDataSize) + } + dagSz += 1 /* header */ + sizeVarint(uint64(fsSz)) + fsSz + + // now reencode + + out := make([]byte, 0, dagSz) + + for i, fld := range dag.fields() { + if fld.id == 1 { + out = append(out, dag.data(i)[0]) + out = append(out, proto.EncodeVarint(uint64(fsSz))...) + out, err = reconstructUnixfs(out, fs, blockData, blockDataSize) + if err != nil { + return nil, fsinfo, err + } + } else { + out = append(out, dag.data(i)...) + } + } + + if dagSz != len(out) { + return nil, nil, fmt.Errorf("verification Failed: computed-size(%d) != actual-size(%d)", dagSz, len(out)) + } + return out, fsinfo, nil +} + +func reconstructUnixfs(out []byte, fs fields, blockData io.Reader, blockDataSize uint64) ([]byte, error) { + // copy first field + out = append(out, fs.data(0)...) + + // insert Data field + if blockDataSize > 0 { + out = append(out, byte((unixfsDataField<<3)|2)) + out = append(out, proto.EncodeVarint(blockDataSize)...) + + origLen := len(out) + out = out[:origLen+int(blockDataSize)] + _, err := io.ReadFull(blockData, out[origLen:]) + if err != nil { + return out, err + } + } + + // copy rest of protocol buffer + sz := len(fs.fields()) + for i := 1; i < sz; i += 1 { + if fs.field(i).id == unixfsDataField { + continue + } + out = append(out, fs.data(i)...) + } + + return out, nil +} + +func decodePB(data []byte, keep func(int32) bool) (fields, error) { + res := make([]field, 0, 6) + offset := 0 + for offset < len(data) { + hdr, newOffset, err := getField(data, offset) + if err != nil { + return fields{}, err + } + if !keep(hdr.id) { + if len(res) > 1 && res[len(res)-1].id == 0 { + // nothing to do + // field will get merged into previous field + } else { + // set the header id to 0 to indicate + // we don't care about the value + res = append(res, field{offset: offset}) + } + } else { + res = append(res, field{hdr, offset}) + } + offset = newOffset + } + if offset != len(data) { + return fields{}, fmt.Errorf("protocol buffer sanity check failed") + } + // insert dummy field with the final offset + res = append(res, field{offset: offset}) + return fields{data, res}, nil +} + +func getField(data []byte, offset0 int) (hdr header, offset int, err error) { + offset = offset0 + hdrVal, varintSz := proto.DecodeVarint(data[offset:]) + if varintSz == 0 { + err = io.ErrUnexpectedEOF + return + } + offset += varintSz + hdr.id = int32(hdrVal) >> 3 + hdr.wire = int32(hdrVal) & 0x07 + switch hdr.wire { + case 0: // Variant + hdr.val, varintSz = proto.DecodeVarint(data[offset:]) + if varintSz == 0 { + err = io.ErrUnexpectedEOF + return + } + offset += varintSz + case 1: // 64 bit + offset += 8 + case 2: // Length-delimited + hdr.val, varintSz = proto.DecodeVarint(data[offset:]) + if varintSz == 0 { + err = io.ErrUnexpectedEOF + return + } + offset += varintSz + int(hdr.val) + case 5: // 32 bit + offset += 4 + default: + err = errors.New("unhandled wire type") + return + } + return +} + +// Note: this is copy and pasted from proto/encode.go, newer versions +// have this function exported. Once upgraded the exported function +// should be used instead. +func sizeVarint(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} diff --git a/filestore/support/blockstore.go b/filestore/support/blockstore.go new file mode 100644 index 00000000000..b3a7f30e168 --- /dev/null +++ b/filestore/support/blockstore.go @@ -0,0 +1,152 @@ +package filestore_support + +import ( + "fmt" + b "github.com/ipfs/go-ipfs/blocks" + BS "github.com/ipfs/go-ipfs/blocks/blockstore" + . "github.com/ipfs/go-ipfs/filestore" + dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help" + pi "github.com/ipfs/go-ipfs/thirdparty/posinfo" + fs_pb "github.com/ipfs/go-ipfs/unixfs/pb" + cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" + "path/filepath" +) + +type blockstore struct { + BS.GCBlockstore + filestore *Datastore +} + +func NewBlockstore(b BS.GCBlockstore, fs *Datastore) BS.GCBlockstore { + return &blockstore{b, fs} +} + +func (bs *blockstore) Put(block b.Block) error { + k := dshelp.CidToDsKey(block.Cid()) + + data, err := bs.prepareBlock(k, block) + if err != nil { + return err + } else if data == nil { + return bs.GCBlockstore.Put(block) + } + return bs.filestore.Put(k, data) +} + +func (bs *blockstore) PutMany(blocks []b.Block) error { + var nonFilestore []b.Block + + t, err := bs.filestore.Batch() + if err != nil { + return err + } + + for _, b := range blocks { + k := dshelp.CidToDsKey(b.Cid()) + data, err := bs.prepareBlock(k, b) + if err != nil { + return err + } else if data == nil { + nonFilestore = append(nonFilestore, b) + continue + } + + err = t.Put(k, data) + if err != nil { + return err + } + } + + err = t.Commit() + if err != nil { + return err + } + + if len(nonFilestore) > 0 { + err := bs.GCBlockstore.PutMany(nonFilestore) + if err != nil { + return err + } + return nil + } else { + return nil + } +} + +func (bs *blockstore) prepareBlock(k ds.Key, block b.Block) (*DataObj, error) { + altData, fsInfo, err := decode(block) + if err != nil { + return nil, err + } + + var fileSize uint64 + if fsInfo == nil { + fileSize = uint64(len(block.RawData())) + } else { + fileSize = fsInfo.FileSize + } + + if fsInfo != nil && fsInfo.Type != fs_pb.Data_Raw && fsInfo.Type != fs_pb.Data_File { + // If the node does not contain file data store using + // the normal datastore and not the filestore. + return nil, nil + } else if fileSize == 0 { + // Special case for empty files as the block doesn't + // have any file information associated with it + return &DataObj{ + FilePath: "", + Offset: 0, + Size: 0, + ModTime: 0, + Flags: Internal | WholeFile, + Data: block.RawData(), + }, nil + } else { + fsn, ok := block.(*pi.FilestoreNode) + if !ok { + // Hack to get around the fact that Adder.PinRoot() might + // readd the same hash, but as a ProtoNode instead of a + // FilestoreNode, see: + // https://github.com/ipfs/go-ipfs/pull/3258#issuecomment-259027672 + have, _ := bs.filestore.DB().HasHash(k.Bytes()) + if have { + return nil, nil + } + return nil, fmt.Errorf("%s: no file information for block", block.Cid()) + } + posInfo := fsn.PosInfo + if posInfo.Stat == nil { + return nil, fmt.Errorf("%s: %s: no stat information for file", block.Cid(), posInfo.FullPath) + } + d := &DataObj{ + FilePath: filepath.Clean(posInfo.FullPath), + Offset: posInfo.Offset, + Size: uint64(fileSize), + ModTime: FromTime(posInfo.Stat.ModTime()), + } + if fsInfo == nil { + d.Flags |= NoBlockData + d.Data = nil + } else if len(fsInfo.Data) == 0 { + d.Flags |= Internal + d.Data = block.RawData() + } else { + d.Flags |= NoBlockData + d.Data = altData + } + return d, nil + } + +} + +func decode(block b.Block) ([]byte, *UnixFSInfo, error) { + switch block.Cid().Type() { + case cid.Protobuf: + return Reconstruct(block.RawData(), nil, 0) + case cid.Raw: + return nil, nil, nil + default: + return nil, nil, fmt.Errorf("unsupported block type") + } +} diff --git a/filestore/support/dagservice.go b/filestore/support/dagservice.go new file mode 100644 index 00000000000..903c87b77af --- /dev/null +++ b/filestore/support/dagservice.go @@ -0,0 +1,51 @@ +package filestore_support + +import ( + "context" + + . "github.com/ipfs/go-ipfs/filestore" + + dag "github.com/ipfs/go-ipfs/merkledag" + dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help" + node "gx/ipfs/QmU7bFWQ793qmvNy7outdCaMfSDNk8uqhx4VNrxYj5fj5g/go-ipld-node" + cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" +) + +func NewDAGService(fs *Datastore, ds dag.DAGService) dag.DAGService { + return &dagService{fs, ds} +} + +type dagService struct { + fs *Datastore + dag.DAGService +} + +func GetLinks(dataObj *DataObj) ([]*node.Link, error) { + if !dataObj.Internal() { + return nil, nil + } + res, err := dag.DecodeProtobuf(dataObj.Data) + if err != nil { + return nil, err + } + return res.Links(), nil +} + +func (ds *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) { + dsKey := dshelp.CidToDsKey(c) + key := NewDbKey(dsKey.String(), "", -1, nil) + _, dataObj, err := ds.fs.GetDirect(key) + if err != nil { + return ds.DAGService.GetLinks(ctx, c) + } + return GetLinks(dataObj) +} + +func (ds *dagService) GetOfflineLinkService() dag.LinkService { + ds2 := ds.DAGService.GetOfflineLinkService() + if ds != ds2 { + return NewDAGService(ds.fs, ds.DAGService) + } else { + return ds2 + } +} diff --git a/filestore/support/multi.go b/filestore/support/multi.go new file mode 100644 index 00000000000..f056b63f942 --- /dev/null +++ b/filestore/support/multi.go @@ -0,0 +1,110 @@ +package filestore_support + +// A very simple multi-blockstore +// Put will only go to the first store + +import ( + //"errors" + "context" + + blocks "github.com/ipfs/go-ipfs/blocks" + bls "github.com/ipfs/go-ipfs/blocks/blockstore" + + cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" + dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" +) + +func NewMultiBlockstore(stores ...bls.Blockstore) *multiblockstore { + return &multiblockstore{ + stores: stores, + } +} + +type multiblockstore struct { + stores []bls.Blockstore +} + +func (bs *multiblockstore) DeleteBlock(key *cid.Cid) error { + // FIXME: Delete from all stores + return bs.stores[0].DeleteBlock(key) +} + +func (bs *multiblockstore) Has(c *cid.Cid) (bool, error) { + var firstErr error + for _, b := range bs.stores { + have, err := b.Has(c) + if have && err == nil { + return have, nil + } + if err != nil && firstErr == nil { + firstErr = err + } + } + return false, firstErr +} + +func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) { + var firstErr error + for _, b := range bs.stores { + blk, err := b.Get(c) + if err == nil { + return blk, nil + } + if firstErr == nil || firstErr == bls.ErrNotFound { + firstErr = err + } + } + return nil, firstErr +} + +func (bs *multiblockstore) Put(blk blocks.Block) error { + // First call Has() to make sure the block doesn't exist in any of + // the sub-blockstores, otherwise we could end with data being + // duplicated in two blockstores. + exists, err := bs.Has(blk.Cid()) + if err == nil && exists { + return nil // already stored + } + return bs.stores[0].Put(blk) +} + +func (bs *multiblockstore) PutMany(blks []blocks.Block) error { + stilladd := make([]blocks.Block, 0, len(blks)) + // First call Has() to make sure the block doesn't exist in any of + // the sub-blockstores, otherwise we could end with data being + // duplicated in two blockstores. + for _, blk := range blks { + exists, err := bs.Has(blk.Cid()) + if err == nil && exists { + continue // already stored + } + stilladd = append(stilladd, blk) + } + if len(stilladd) == 0 { + return nil + } + return bs.stores[0].PutMany(stilladd) +} + +func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { + //return bs.stores[0].Blocks.AllKeysChan(ctx) + //return nil, errors.New("Unimplemented") + in := make([]<-chan *cid.Cid, 0, len(bs.stores)) + for _, b := range bs.stores { + ch, err := b.AllKeysChan(ctx) + if err != nil { + return nil, err + } + in = append(in, ch) + } + out := make(chan *cid.Cid, dsq.KeysOnlyBufSize) + go func() { + defer close(out) + for _, in0 := range in { + for key := range in0 { + out <- key + } + } + }() + return out, nil +} diff --git a/repo/config/config.go b/repo/config/config.go index 898cf56a472..c9b54fefecb 100644 --- a/repo/config/config.go +++ b/repo/config/config.go @@ -19,6 +19,7 @@ var log = logging.Logger("config") type Config struct { Identity Identity // local node's peer identity Datastore Datastore // local node's storage + Filestore Filestore // local node's filestore Addresses Addresses // local node's addresses Mounts Mounts // local node's mount points Discovery Discovery // local node's discovery mechanisms diff --git a/repo/config/datastore.go b/repo/config/datastore.go index 2b861a113cf..de8d1df7f5e 100644 --- a/repo/config/datastore.go +++ b/repo/config/datastore.go @@ -40,3 +40,9 @@ type S3Datastore struct { func DataStorePath(configroot string) (string, error) { return Path(configroot, DefaultDataStoreDirectory) } + +type Filestore struct { + Verify string // one of "always", "ifchanged", "never" + APIServerSidePaths bool + NoDBCompression bool +} diff --git a/repo/fsrepo/defaultds.go b/repo/fsrepo/defaultds.go index ed8fbafe702..b7463255ba2 100644 --- a/repo/fsrepo/defaultds.go +++ b/repo/fsrepo/defaultds.go @@ -2,13 +2,18 @@ package fsrepo import ( "fmt" + "os" "path" + "strings" repo "github.com/ipfs/go-ipfs/repo" config "github.com/ipfs/go-ipfs/repo/config" "github.com/ipfs/go-ipfs/thirdparty/dir" + + filestore "github.com/ipfs/go-ipfs/filestore" "gx/ipfs/QmU4VzzKNLJXJ72SedXBQKyf5Jo8W89iWpbWQjHn9qef8N/go-ds-flatfs" levelds "gx/ipfs/QmUHmMGmcwCrjHQHcYhBnqGCSWs5pBSMbGZmfwavETR1gg/go-ds-leveldb" + //multi "github.com/ipfs/go-ipfs/repo/multi" ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" mount "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/syncmount" @@ -18,9 +23,17 @@ import ( const ( leveldbDirectory = "datastore" flatfsDirectory = "blocks" + fileStoreDir = "filestore-db" + fileStoreDataDir = "filestore-data" ) -func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { +const ( + RootMount = "/" + CacheMount = "/blocks" // needs to be the same as blockstore.DefaultPrefix + FilestoreMount = "/filestore" +) + +func openDefaultDatastore(r *FSRepo) (repo.Datastore, []Mount, error) { leveldbPath := path.Join(r.path, leveldbDirectory) // save leveldb reference so it can be neatly closed afterward @@ -28,7 +41,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { Compression: ldbopts.NoCompression, }) if err != nil { - return nil, fmt.Errorf("unable to open leveldb datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open leveldb datastore: %v", err) } syncfs := !r.config.Datastore.NoSync @@ -36,7 +49,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { // by the Qm prefix. Leaving us with 9 bits, or 512 way sharding blocksDS, err := flatfs.New(path.Join(r.path, flatfsDirectory), 5, syncfs) if err != nil { - return nil, fmt.Errorf("unable to open flatfs datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open flatfs datastore: %v", err) } // Add our PeerID to metrics paths to keep them unique @@ -51,18 +64,38 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { prefix := "fsrepo." + id + ".datastore." metricsBlocks := measure.New(prefix+"blocks", blocksDS) metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS) - mountDS := mount.New([]mount.Mount{ - { - Prefix: ds.NewKey("/blocks"), - Datastore: metricsBlocks, - }, - { - Prefix: ds.NewKey("/"), - Datastore: metricsLevelDB, - }, + + var mounts []mount.Mount + var directMounts []Mount + + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(CacheMount), + Datastore: metricsBlocks, }) + directMounts = append(directMounts, Mount{CacheMount, blocksDS}) + + InitFilestore(r.path) + fileStore, err := r.newFilestore() + if err != nil { + return nil, nil, err + } + if fileStore != nil { + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(FilestoreMount), + Datastore: fileStore, + }) + directMounts = append(directMounts, Mount{FilestoreMount, fileStore}) + } - return mountDS, nil + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(RootMount), + Datastore: metricsLevelDB, + }) + directMounts = append(directMounts, Mount{RootMount, leveldbDS}) + + mountDS := mount.New(mounts) + + return mountDS, directMounts, nil } func initDefaultDatastore(repoPath string, conf *config.Config) error { @@ -79,3 +112,28 @@ func initDefaultDatastore(repoPath string, conf *config.Config) error { } return nil } + +func InitFilestore(repoPath string) error { + fileStorePath := path.Join(repoPath, fileStoreDir) + return filestore.Init(fileStorePath) +} + +// will return nil, nil if the filestore is not enabled +func (r *FSRepo) newFilestore() (*filestore.Datastore, error) { + fileStorePath := path.Join(r.path, fileStoreDir) + if _, err := os.Stat(fileStorePath); os.IsNotExist(err) { + return nil, nil + } + verify := filestore.VerifyAlways + switch strings.ToLower(r.config.Filestore.Verify) { + case "never": + verify = filestore.VerifyNever + case "ifchanged", "if changed": + verify = filestore.VerifyIfChanged + case "", "always": + verify = filestore.VerifyAlways + default: + return nil, fmt.Errorf("invalid value for Filestore.Verify: %s", r.config.Filestore.Verify) + } + return filestore.New(fileStorePath, verify, r.config.Filestore.NoDBCompression) +} diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 03ac313e63b..847ee7777c6 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -20,6 +20,7 @@ import ( dir "github.com/ipfs/go-ipfs/thirdparty/dir" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" util "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" "gx/ipfs/QmeqtHtxGfcsfXiou7wqHJARWPKUTUcPdtSfSYYHp48dtQ/go-ds-measure" ) @@ -93,6 +94,12 @@ type FSRepo struct { lockfile io.Closer config *config.Config ds repo.Datastore + mounts []Mount +} + +type Mount struct { + prefix string + dstore ds.Datastore } var _ repo.Repo = (*FSRepo)(nil) @@ -331,11 +338,12 @@ func (r *FSRepo) openConfig() error { func (r *FSRepo) openDatastore() error { switch r.config.Datastore.Type { case "default", "leveldb", "": - d, err := openDefaultDatastore(r) + d, m, err := openDefaultDatastore(r) if err != nil { return err } r.ds = d + r.mounts = m default: return fmt.Errorf("unknown datastore type: %s", r.config.Datastore.Type) } @@ -557,6 +565,27 @@ func (r *FSRepo) Datastore() repo.Datastore { return d } +func (r *FSRepo) DirectMount(prefix string) ds.Datastore { + packageLock.Lock() + defer packageLock.Unlock() + for _, m := range r.mounts { + if prefix == m.prefix { + return m.dstore + } + } + return nil +} + +func (r *FSRepo) Mounts() []string { + packageLock.Lock() + mounts := make([]string, 0, len(r.mounts)) + for _, m := range r.mounts { + mounts = append(mounts, m.prefix) + } + packageLock.Unlock() + return mounts +} + // GetStorageUsage computes the storage space taken by the repo in bytes func (r *FSRepo) GetStorageUsage() (uint64, error) { pth, err := config.PathRoot() diff --git a/repo/mock.go b/repo/mock.go index 8190a0bda1b..f68e078cfbc 100644 --- a/repo/mock.go +++ b/repo/mock.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/ipfs/go-ipfs/repo/config" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" ) var errTODO = errors.New("TODO: mock repo") @@ -33,6 +34,18 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) { func (m *Mock) Datastore() Datastore { return m.D } +func (m *Mock) DirectMount(prefix string) ds.Datastore { + if prefix == "/" { + return m.D + } else { + return nil + } +} + +func (m *Mock) Mounts() []string { + return []string{"/"} +} + func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil } func (m *Mock) Close() error { return errTODO } diff --git a/repo/repo.go b/repo/repo.go index d95af0446dd..633ff57114b 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -22,6 +22,14 @@ type Repo interface { Datastore() Datastore GetStorageUsage() (uint64, error) + // DirectMount provides direct access to a datastore mounted + // under prefix in order to perform low-level operations. The + // datastore returned is guaranteed not be a proxy (such as a + // go-datastore/measure) normal operations should go through + // Datastore() + DirectMount(prefix string) ds.Datastore + Mounts() []string + // SetAPIAddr sets the API address in the repo. SetAPIAddr(addr string) error diff --git a/test/sharness/lib/test-filestore-lib.sh b/test/sharness/lib/test-filestore-lib.sh new file mode 100644 index 00000000000..6786ea21ee0 --- /dev/null +++ b/test/sharness/lib/test-filestore-lib.sh @@ -0,0 +1,203 @@ +client_err() { + printf "$@\n\nUse 'ipfs add --help' for information about this command\n" +} + + +test_enable_filestore() { + test_expect_success "enable filestore" ' + ipfs filestore enable + ' +} + +test_add_cat_file() { + cmd=$1 + dir=$2 + HASH=$3 # QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH + + test_expect_success "ipfs $cmd succeeds" ' + echo "Hello Worlds!" >mountdir/hello.txt && + ipfs $cmd "$dir"/mountdir/hello.txt >actual + ' + + test_expect_success "ipfs $cmd output looks good" ' + echo "added $HASH hello.txt" >expected && + test_cmp expected actual + ' + + test_expect_success "ipfs cat succeeds" ' + ipfs cat "$HASH" >actual + ' + + test_expect_success "ipfs cat output looks good" ' + echo "Hello Worlds!" >expected && + test_cmp expected actual + ' +} + +test_add_empty_file() { + cmd=$1 + dir=$2 + + EMPTY_HASH="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH" + + test_expect_success "ipfs $cmd on empty file succeeds" ' + ipfs block rm -f $EMPTY_HASH && + cat /dev/null >mountdir/empty.txt && + ipfs $cmd "$dir"/mountdir/empty.txt >actual + ' + + test_expect_success "ipfs $cmd on empty file output looks good" ' + echo "added $EMPTY_HASH empty.txt" >expected && + test_cmp expected actual + ' + + test_expect_success "ipfs cat on empty file succeeds" ' + ipfs cat "$EMPTY_HASH" >actual + ' + + test_expect_success "ipfs cat on empty file output looks good" ' + cat /dev/null >expected && + test_cmp expected actual + ' +} + +test_post_add() { + cmd=$1 + dir=$2 + + test_expect_success "fail after file move" ' + mv mountdir/hello.txt mountdir/hello2.txt + test_must_fail ipfs cat "$HASH" >/dev/null + ' + + test_expect_success "okay again after moving back" ' + mv mountdir/hello2.txt mountdir/hello.txt && + ipfs cat "$HASH" >/dev/null + ' + + test_expect_success "fail after file move" ' + mv mountdir/hello.txt mountdir/hello2.txt + test_must_fail ipfs cat "$HASH" >/dev/null + ' + + test_expect_success "okay after re-adding under new name" ' + ipfs $cmd "$dir"/mountdir/hello2.txt 2> add.output && + ipfs cat "$HASH" >/dev/null + ' + + test_expect_success "restore state" ' + mv mountdir/hello2.txt mountdir/hello.txt && + ipfs $cmd "$dir"/mountdir/hello.txt 2> add.output && + ipfs cat "$HASH" >/dev/null + ' + + test_expect_success "fail after file change" ' + # note: filesize shrinks + echo "hello world!" >mountdir/hello.txt && + test_must_fail ipfs cat "$HASH" >cat.output + ' + + test_expect_success "fail after file change, same size" ' + # note: filesize does not change + echo "HELLO WORLDS!" >mountdir/hello.txt && + test_must_fail ipfs cat "$HASH" >cat.output + ' +} + +test_add_cat_5MB() { + cmd=$1 + dir=$2 + HASH=$3 # "QmSr7FqYkxYWGoSfy8ZiaMWQ5vosb18DQGCzjwEQnVHkTb" + + test_expect_success "generate 5MB file using go-random" ' + random 5242880 41 >mountdir/bigfile + ' + + test_expect_success "sha1 of the file looks ok" ' + echo "11145620fb92eb5a49c9986b5c6844efda37e471660e" >sha1_expected && + multihash -a=sha1 -e=hex mountdir/bigfile >sha1_actual && + test_cmp sha1_expected sha1_actual + ' + + test_expect_success "'ipfs $cmd bigfile' succeeds" ' + ipfs $cmd "$dir"/mountdir/bigfile >actual + ' + + test_expect_success "'ipfs $cmd bigfile' output looks good" ' + echo "added $HASH bigfile" >expected && + test_cmp expected actual + ' + + test_expect_success "'ipfs cat' succeeds" ' + ipfs cat "$HASH" >actual + ' + + test_expect_success "'ipfs cat' output looks good" ' + test_cmp mountdir/bigfile actual + ' +} + +test_add_cat_200MB() { + cmd=$1 + dir=$2 + HASH=$3 #"QmVbVLFLbz72tRSw3HMBh6ABKbRVavMQLoh2BzQ4dUSAYL" + + test_expect_success "generate 200MB file using go-random" ' + random 209715200 41 >mountdir/hugefile + ' + + test_expect_success "sha1 of the file looks ok" ' + echo "11146a3985bff32699f1874517ad0585bbd280efc1de" >sha1_expected && + multihash -a=sha1 -e=hex mountdir/hugefile >sha1_actual && + test_cmp sha1_expected sha1_actual + ' + + test_expect_success "'ipfs add hugefile' succeeds" ' + ipfs $cmd "$dir"/mountdir/hugefile >actual + ' + + test_expect_success "'ipfs add hugefile' output looks good" ' + echo "added $HASH hugefile" >expected && + test_cmp expected actual + ' + + test_expect_success "'ipfs cat' succeeds" ' + ipfs cat "$HASH" >actual + ' + + test_expect_success "'ipfs cat' output looks good" ' + test_cmp mountdir/hugefile actual + ' + + test_expect_success "fail after file rm" ' + rm mountdir/hugefile actual && + test_must_fail ipfs cat "$HASH" >/dev/null + ' +} + +test_add_mulpl_files() { + cmd=$1 + + test_expect_success "generate directory with several files" ' + mkdir adir && + echo "file1" > adir/file1 && + echo "file2" > adir/file2 && + echo "file3" > adir/file3 + ' + + dir="`pwd`"/adir + test_expect_success "add files by listing them all on command line" ' + ipfs $cmd "$dir"/file1 "$dir"/file2 "$dir"/file3 > add-expect + ' + + test_expect_success "all files added" ' + grep file1 add-expect && + grep file2 add-expect && + grep file3 add-expect + ' + + test_expect_success "cleanup" ' + rm -r adir + ' +} + diff --git a/test/sharness/t0260-filestore.sh b/test/sharness/t0260-filestore.sh new file mode 100755 index 00000000000..c76a08e1271 --- /dev/null +++ b/test/sharness/t0260-filestore.sh @@ -0,0 +1,44 @@ +#!/bin/sh +# +# Copyright (c) 2014 Christian Couder +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="Test filestore" + +. lib/test-filestore-lib.sh +. lib/test-lib.sh + +test_init_ipfs + +test_add_cat_file "add --no-copy" "`pwd`" "QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH" + +test_post_add "add --no-copy" "`pwd`" + +test_add_cat_file "add --no-copy --raw-leaves" "`pwd`" "zdvgqC4vX1j7higiYBR1HApkcjVMAFHwJyPL8jnKK6sVMqd1v" + +test_post_add "add --no-copy --raw-leaves" "`pwd`" + +test_add_empty_file "add --no-copy" "`pwd`" + +test_add_cat_5MB "add --no-copy" "`pwd`" "QmSr7FqYkxYWGoSfy8ZiaMWQ5vosb18DQGCzjwEQnVHkTb" + +test_add_cat_5MB "add --no-copy --raw-leaves" "`pwd`" "QmefsDaD3YVphd86mxjJfPLceKv8by98aB6J6sJxK13xS2" + +test_add_mulpl_files "add --no-copy" + +test_expect_success "fail after file move" ' + mv mountdir/bigfile mountdir/bigfile2 + test_must_fail ipfs cat "$HASH" >/dev/null +' + +test_expect_success "must use absolute path" ' + echo "some content" > somefile && + test_must_fail ipfs add --no-copy somefile +' + +test_add_cat_200MB "add --no-copy" "`pwd`" "QmVbVLFLbz72tRSw3HMBh6ABKbRVavMQLoh2BzQ4dUSAYL" + +test_add_cat_200MB "add --no-copy --raw-leaves" "`pwd`" "QmYJWknpk2HUjVCkTDFMcTtxEJB4XbUpFRYW4BCAEfDN6t" + +test_done diff --git a/test/sharness/t0262-filestore-config.sh b/test/sharness/t0262-filestore-config.sh new file mode 100755 index 00000000000..b18ab2e065d --- /dev/null +++ b/test/sharness/t0262-filestore-config.sh @@ -0,0 +1,48 @@ +#!/bin/sh +# +# Copyright (c) 2014 Christian Couder +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="Test filestore" + +. lib/test-filestore-lib.sh +. lib/test-lib.sh + +test_init_ipfs + +test_add_cat_file "add --no-copy" "`pwd`" "QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH" + +export IPFS_LOGGING=debug +export IPFS_LOGGING_FMT=nocolor + +HASH="QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH" + +test_expect_success "file always checked" ' + ipfs config Filestore.Verify always 2> log && + ipfs cat "$HASH" 2> log && + grep -q "verifying block $HASH" log && + ! grep -q "updating block $HASH" log +' + +test_expect_success "file checked after change" ' + ipfs config Filestore.Verify ifchanged 2> log && + sleep 2 && # to accommodate systems without sub-second mod-times + echo "HELLO WORLDS!" >mountdir/hello.txt && + test_must_fail ipfs cat "$HASH" 2> log && + grep -q "verifying block $HASH" log && + grep -q "updating block $HASH" log +' + +test_expect_success "file never checked" ' + echo "Hello Worlds!" >mountdir/hello.txt && + ipfs add "$dir"/mountdir/hello.txt >actual 2> log && + ipfs config Filestore.Verify never 2> log && + echo "HELLO Worlds!" >mountdir/hello.txt && + ( ipfs cat "$HASH" || true ) 2> log && + grep -q "BlockService GetBlock" log && # Make sure we are still logging + ! grep -q "verifying block $HASH" log && + ! grep -q "updating block $HASH" log +' + +test_done