Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions api/client/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,32 @@ func defaultServerConfig(t *testing.T) server.Config {

grpcMaddr := util.MustParseAddr(grpcHostAddress)
conf := server.Config{
WalletInitialFunds: *big.NewInt(int64(4000000000)),
IpfsAPIAddr: ipfsAddr,
LotusAddress: devnetAddr,
LotusAuthToken: "",
LotusMasterAddr: "",
LotusConnectionRetries: 5,
Devnet: true,
GrpcHostNetwork: grpcHostNetwork,
GrpcHostAddress: grpcMaddr,
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
FFSMaxParallelDealPreparing: 1,
FFSGCAutomaticGCInterval: 0,
DealWatchPollDuration: time.Second * 15,
SchedMaxParallel: 10,
AskIndexQueryAskTimeout: time.Second * 3,
AskIndexRefreshInterval: time.Second * 3,
AskIndexRefreshOnStart: true,
AskindexMaxParallel: 2,
IndexMinersRefreshOnStart: false,
WalletInitialFunds: *big.NewInt(int64(4000000000)),
IpfsAPIAddr: ipfsAddr,
LotusAddress: devnetAddr,
LotusAuthToken: "",
LotusMasterAddr: "",
LotusConnectionRetries: 5,
Devnet: true,
GrpcHostNetwork: grpcHostNetwork,
GrpcHostAddress: grpcMaddr,
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
FFSMaxParallelDealPreparing: 1,
FFSGCAutomaticGCInterval: 0,
FFSRetrievalNextEventTimeout: time.Hour,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new config attribute to set a timeout for retrieval that might get stuck.
If we don't receive any data or event while doing the retrieval in this duration, we fail.

Unfortunately, there're situations/bugs in Lotus in which a retrieval might get stuck, so this is a safety net.

DealWatchPollDuration: time.Second * 15,
SchedMaxParallel: 10,
AskIndexQueryAskTimeout: time.Second * 3,
AskIndexRefreshInterval: time.Second * 3,
AskIndexRefreshOnStart: true,
AskindexMaxParallel: 2,
IndexMinersRefreshOnStart: false,
}
return conf
}
Expand Down
29 changes: 15 additions & 14 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,20 @@ type Config struct {
MongoURI string
MongoDB string

FFSAdminToken string
FFSUseMasterAddr bool
FFSDealFinalityTimeout time.Duration
FFSMinimumPieceSize uint64
FFSMaxParallelDealPreparing int
FFSGCAutomaticGCInterval time.Duration
FFSGCStageGracePeriod time.Duration
SchedMaxParallel int
MinerSelector string
MinerSelectorParams string
DealWatchPollDuration time.Duration
AutocreateMasterAddr bool
WalletInitialFunds big.Int
FFSAdminToken string
FFSUseMasterAddr bool
FFSDealFinalityTimeout time.Duration
FFSMinimumPieceSize uint64
FFSRetrievalNextEventTimeout time.Duration
FFSMaxParallelDealPreparing int
FFSGCAutomaticGCInterval time.Duration
FFSGCStageGracePeriod time.Duration
SchedMaxParallel int
MinerSelector string
MinerSelectorParams string
DealWatchPollDuration time.Duration
AutocreateMasterAddr bool
WalletInitialFunds big.Int

AskIndexQueryAskTimeout time.Duration
AskindexMaxParallel int
Expand Down Expand Up @@ -261,7 +262,7 @@ func NewServer(conf Config) (*Server, error) {
if conf.Devnet {
conf.FFSMinimumPieceSize = 0
}
cs := filcold.New(ms, dm, wm, ipfs, chain, l, lsm, conf.FFSMinimumPieceSize, conf.FFSMaxParallelDealPreparing)
cs := filcold.New(ms, dm, wm, ipfs, chain, l, lsm, conf.FFSMinimumPieceSize, conf.FFSMaxParallelDealPreparing, conf.FFSRetrievalNextEventTimeout)
hs, err := coreipfs.New(txndstr.Wrap(ds, "ffs/coreipfs"), ipfs, l)
if err != nil {
return nil, fmt.Errorf("creating coreipfs: %s", err)
Expand Down
27 changes: 15 additions & 12 deletions cmd/powd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func configFromFlags() (server.Config, error) {
ffsSchedMaxParallel := config.GetInt("ffsschedmaxparallel")
ffsDealWatchFinalityTimeout := time.Minute * time.Duration(config.GetInt("ffsdealfinalitytimeout"))
ffsMinimumPieceSize := config.GetUint64("ffsminimumpiecesize")
ffsRetrievalNextEventTimeout := config.GetDuration("ffsretrievalnexteventtimeout")
ffsMaxParallelDealPreparing := config.GetInt("ffsmaxparalleldealpreparing")
ffsGCInterval := time.Minute * time.Duration(config.GetInt("ffsgcinterval"))
ffsGCStagedGracePeriod := time.Minute * time.Duration(config.GetInt("ffsgcstagedgraceperiod"))
Expand Down Expand Up @@ -166,18 +167,19 @@ func configFromFlags() (server.Config, error) {
MongoURI: mongoURI,
MongoDB: mongoDB,

FFSAdminToken: ffsAdminToken,
FFSUseMasterAddr: ffsUseMasterAddr,
FFSDealFinalityTimeout: ffsDealWatchFinalityTimeout,
FFSMinimumPieceSize: ffsMinimumPieceSize,
FFSMaxParallelDealPreparing: ffsMaxParallelDealPreparing,
FFSGCAutomaticGCInterval: ffsGCInterval,
FFSGCStageGracePeriod: ffsGCStagedGracePeriod,
AutocreateMasterAddr: autocreateMasterAddr,
MinerSelector: minerSelector,
MinerSelectorParams: minerSelectorParams,
SchedMaxParallel: ffsSchedMaxParallel,
DealWatchPollDuration: dealWatchPollDuration,
FFSAdminToken: ffsAdminToken,
FFSUseMasterAddr: ffsUseMasterAddr,
FFSDealFinalityTimeout: ffsDealWatchFinalityTimeout,
FFSMinimumPieceSize: ffsMinimumPieceSize,
FFSRetrievalNextEventTimeout: ffsRetrievalNextEventTimeout,
FFSMaxParallelDealPreparing: ffsMaxParallelDealPreparing,
FFSGCAutomaticGCInterval: ffsGCInterval,
FFSGCStageGracePeriod: ffsGCStagedGracePeriod,
AutocreateMasterAddr: autocreateMasterAddr,
MinerSelector: minerSelector,
MinerSelectorParams: minerSelectorParams,
SchedMaxParallel: ffsSchedMaxParallel,
DealWatchPollDuration: dealWatchPollDuration,

AskIndexQueryAskTimeout: askIndexQueryAskTimeout,
AskIndexRefreshInterval: askIndexRefreshInterval,
Expand Down Expand Up @@ -387,6 +389,7 @@ func setupFlags() error {
pflag.String("ffsminerselector", "reputation", "Miner selector to be used by FFS: 'sr2', 'reputation'.")
pflag.String("ffsminerselectorparams", "", "Miner selector configuration parameter, depends on --ffsminerselector.")
pflag.String("ffsminimumpiecesize", "67108864", "Minimum piece size in bytes allowed to be stored in Filecoin.")
pflag.Duration("ffsretrievalnexteventtimeout", time.Hour, "Maximum amount of time to wait for the next retrieval event before erroring it.")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great in the future to change all duration flags to .Duration.
Created #803

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I've been using that lately, quite nice.

pflag.String("ffsschedmaxparallel", "1000", "Maximum amount of Jobs executed in parallel.")
pflag.String("ffsdealfinalitytimeout", "4320", "Deadline in minutes in which a deal must prove liveness changing status before considered abandoned.")
pflag.String("ffsmaxparalleldealpreparing", "2", "Max parallel deal preparing tasks.")
Expand Down
2 changes: 1 addition & 1 deletion deals/module/records.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (m *Module) recordRetrieval(addr string, offer api.QueryOffer, bytesReceive
RootCid: offer.Root,
Size: offer.Size,
MinPrice: offer.MinPrice.Uint64(),
Miner: offer.Miner.String(),
Miner: offer.MinerPeer.Address.String(),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solve a situation in which miners claim that the retrieval will be done from a worker address, and not their owner address. This might confuse the user since the deal was made with miner X, and it might see that the retrieval is being made from miner Y. (And miner Y hasn't stored data in the network and it's mostly empty).

MinerPeerID: offer.MinerPeer.ID.String(),
PaymentInterval: offer.PaymentInterval,
PaymentIntervalIncrease: offer.PaymentIntervalIncrease,
Expand Down
8 changes: 4 additions & 4 deletions deals/module/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l
break Loop
}
if e.Err != "" {
log.Infof("in progress retrieval errored: %s", err)
log.Infof("in progress retrieval errored: %s", e.Err)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong error variable.

errMsg = e.Err
}
if dtStart.IsZero() && e.Event == retrievalmarket.ClientEventBlocksReceived {
Expand All @@ -146,23 +146,23 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l
// payment channel creation. This isn't ideal, but
// it's better than missing the data.
// We WARN just to signal this might be happening.
if dtStart.IsZero() {
if dtStart.IsZero() && errMsg == "" {
dtStart = retrievalStartTime
log.Warnf("retrieval data-transfer start fallback to retrieval start")
}
// This is a fallback to not receiving an expected
// event in the retrieval. We just fallback to Now(),
// which should always be pretty close to the real
// event. We WARN just to signal this is happening.
if dtEnd.IsZero() {
if dtEnd.IsZero() && errMsg == "" {
dtEnd = time.Now()
log.Warnf("retrieval data-transfer end fallback to retrieval end")
}
m.recordRetrieval(waddr, o, bytesReceived, dtStart, dtEnd, errMsg)
}
}()

return o.Miner.String(), out, nil
return o.MinerPeer.Address.String(), out, nil
}

func getRetrievalOffers(ctx context.Context, lapi *apistruct.FullNodeStruct, payloadCid cid.Cid, pieceCid *cid.Cid, miners []string) []api.QueryOffer {
Expand Down
12 changes: 7 additions & 5 deletions ffs/coreipfs/coreipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ func New(ds datastore.TxnDatastore, ipfs iface.CoreAPI, l ffs.JobLogger) (*CoreI

// Stage adds the data of io.Reader in the storage, and creates a stage-pin on the resulting cid.
func (ci *CoreIpfs) Stage(ctx context.Context, iid ffs.APIID, r io.Reader) (cid.Cid, error) {
ci.lock.Lock()
defer ci.lock.Unlock()

p, err := ci.ipfs.Unixfs().Add(ctx, ipfsfiles.NewReaderFile(r), options.Unixfs.Pin(true))
if err != nil {
return cid.Undef, fmt.Errorf("adding data to ipfs: %s", err)
}
ci.lock.Lock()
defer ci.lock.Unlock()

if err := ci.ps.AddStaged(iid, p.Cid()); err != nil {
return cid.Undef, fmt.Errorf("saving new pin in pinstore: %s", err)
Expand All @@ -72,8 +71,11 @@ func (ci *CoreIpfs) Stage(ctx context.Context, iid ffs.APIID, r io.Reader) (cid.
return p.Cid(), nil
}

// StageCid stage-pin a Cid.
// StageCid pull the Cid data and stage-pin it.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're changing the meaning of StageCid to do something similar to what Stage does.
Before it only tracked the Cid as "staged-pined" (that temporal pin that we created to allow multitenancy).
Now it also pins the data in the go-ipfs node, instead of assuming that's true.

func (ci *CoreIpfs) StageCid(ctx context.Context, iid ffs.APIID, c cid.Cid) error {
if err := ci.ipfs.Pin().Add(ctx, path.IpfsPath(c), options.Pin.Recursive(true)); err != nil {
return fmt.Errorf("adding data to ipfs: %s", err)
}
ci.lock.Lock()
defer ci.lock.Unlock()

Expand Down Expand Up @@ -263,7 +265,7 @@ Loop:

// Skip Cids that are excluded.
if _, ok := excludeMap[stagedPin.Cid]; ok {
log.Infof("skipping staged cid %s since it's in exclusion list", stagedPin)
log.Infof("skipping staged cid %s since it's in exclusion list", stagedPin.Cid)
continue Loop
}
// A Cid is only safe to GC if all existing stage-pin are older than
Expand Down
85 changes: 53 additions & 32 deletions ffs/filcold/filcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
iface "github.com/ipfs/interface-go-ipfs-core"
Expand All @@ -35,15 +36,16 @@ var (
// FilCold is a ColdStorage implementation which saves data in the Filecoin network.
// It assumes the underlying Filecoin client has access to an IPFS node where data is stored.
type FilCold struct {
ms ffs.MinerSelector
dm *dealsModule.Module
wm wallet.Module
ipfs iface.CoreAPI
chain FilChain
l ffs.JobLogger
lsm *lotus.SyncMonitor
minPieceSize uint64
semaphDealPrep chan struct{}
ms ffs.MinerSelector
dm *dealsModule.Module
wm wallet.Module
ipfs iface.CoreAPI
chain FilChain
l ffs.JobLogger
lsm *lotus.SyncMonitor
minPieceSize uint64
retrNextEventTimeout time.Duration
semaphDealPrep chan struct{}
}

var _ ffs.ColdStorage = (*FilCold)(nil)
Expand All @@ -54,17 +56,18 @@ type FilChain interface {
}

// New returns a new FilCold instance.
func New(ms ffs.MinerSelector, dm *dealsModule.Module, wm wallet.Module, ipfs iface.CoreAPI, chain FilChain, l ffs.JobLogger, lsm *lotus.SyncMonitor, minPieceSize uint64, maxParallelDealPreparing int) *FilCold {
func New(ms ffs.MinerSelector, dm *dealsModule.Module, wm wallet.Module, ipfs iface.CoreAPI, chain FilChain, l ffs.JobLogger, lsm *lotus.SyncMonitor, minPieceSize uint64, maxParallelDealPreparing int, retrievalNextEventTimeout time.Duration) *FilCold {
return &FilCold{
ms: ms,
dm: dm,
wm: wm,
ipfs: ipfs,
chain: chain,
l: l,
lsm: lsm,
minPieceSize: minPieceSize,
semaphDealPrep: make(chan struct{}, maxParallelDealPreparing),
ms: ms,
dm: dm,
wm: wm,
ipfs: ipfs,
chain: chain,
l: l,
lsm: lsm,
minPieceSize: minPieceSize,
retrNextEventTimeout: retrievalNextEventTimeout,
semaphDealPrep: make(chan struct{}, maxParallelDealPreparing),
}
}

Expand All @@ -76,21 +79,39 @@ func (fc *FilCold) Fetch(ctx context.Context, pyCid cid.Cid, piCid *cid.Cid, wad
return ffs.FetchInfo{}, fmt.Errorf("fetching from deal module: %s", err)
}
fc.l.Log(ctx, "Fetching from %s...", miner)
var fundsSpent uint64
var lastMsg string
for e := range events {
if e.Err != "" {
return ffs.FetchInfo{}, fmt.Errorf("event error in retrieval progress: %s", e.Err)
}
strEvent := retrievalmarket.ClientEvents[e.Event]
strDealStatus := retrievalmarket.DealStatuses[e.Status]
fundsSpent = e.FundsSpent.Uint64()
newMsg := fmt.Sprintf("Received %s, total spent: %sFIL (%s/%s)", humanize.IBytes(e.BytesReceived), util.AttoFilToFil(fundsSpent), strEvent, strDealStatus)
if newMsg != lastMsg {
fc.l.Log(ctx, newMsg)
lastMsg = newMsg

var (
fundsSpent uint64
lastMsg string
lastEvent marketevents.RetrievalEvent
)
Loop:
for {
select {
case <-time.After(fc.retrNextEventTimeout):
return ffs.FetchInfo{}, fmt.Errorf("didn't receive events for %d minutes", int64(fc.retrNextEventTimeout.Minutes()))
case e, ok := <-events:
if !ok {
break Loop
}
if e.Err != "" {
return ffs.FetchInfo{}, fmt.Errorf("event error in retrieval progress: %s", e.Err)
}
strEvent := retrievalmarket.ClientEvents[e.Event]
strDealStatus := retrievalmarket.DealStatuses[e.Status]
fundsSpent = e.FundsSpent.Uint64()
newMsg := fmt.Sprintf("Received %s, total spent: %sFIL (%s/%s)", humanize.IBytes(e.BytesReceived), util.AttoFilToFil(fundsSpent), strEvent, strDealStatus)
if newMsg != lastMsg {
fc.l.Log(ctx, newMsg)
lastMsg = newMsg
}
lastEvent = e
}
}
if lastEvent.Status != retrievalmarket.DealStatusCompleted {
return ffs.FetchInfo{}, fmt.Errorf("retrieval failed with status %s and message %s", retrievalmarket.DealStatuses[lastEvent.Status], lastMsg)
}

Comment on lines +89 to +114
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR:

  • Use the timeout to fail if the retrieval got stuck
  • If the events channel is closed, we check that the last received event is exactly the one that means the retrieval ended successfully. If for some reason the channel gets closed in some non-final status, that's wrong and we should error.

return ffs.FetchInfo{RetrievedMiner: miner, FundsSpent: fundsSpent}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion ffs/integrationtest/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewCustomFFSManager(t require.TestingT, ds datastore.TxnDatastore, cb lotus
l := joblogger.New(txndstr.Wrap(ds, "ffs/joblogger"))
lsm, err := lotus.NewSyncMonitor(cb)
require.NoError(t, err)
cl := filcold.New(ms, dm, nil, ipfsClient, fchain, l, lsm, minimumPieceSize, 1)
cl := filcold.New(ms, dm, nil, ipfsClient, fchain, l, lsm, minimumPieceSize, 1, time.Hour)
hl, err := coreipfs.New(ds, ipfsClient, l)
require.NoError(t, err)
sched, err := scheduler.New(txndstr.Wrap(ds, "ffs/scheduler"), l, hl, cl, 10, time.Minute*10, nil, scheduler.GCConfig{AutoGCInterval: 0})
Expand Down
2 changes: 1 addition & 1 deletion ffs/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type HotStorage interface {
// Stage adds io.Reader and stage-pins it.
Stage(context.Context, APIID, io.Reader) (cid.Cid, error)

// StageCid stage-pins a cid.
// StageCid pulls Cid data and stage-pin it.
StageCid(context.Context, APIID, cid.Cid) error

// Unpin unpins a Cid.
Expand Down
2 changes: 1 addition & 1 deletion ffs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
Hot: ffs.HotConfig{
Enabled: false,
Ipfs: ffs.IpfsConfig{
AddTimeout: 480, // 8 min
AddTimeout: 15 * 60, // 15min
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is a better default for big-data. The user can change it anyway.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth updating everywhere in the db using the massive change tool?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I have that as a task in my pre-deployment steps. Using powcfg to do it :)

},
},
Cold: ffs.ColdConfig{
Expand Down
2 changes: 2 additions & 0 deletions ffs/scheduler/internal/sjstore/sjstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (s *Store) Enqueue(j ffs.StorageJob) error {
// GetExecutingJob returns a JobID that is currently executing for
// data with cid c in iid. If there's not such job, it returns nil.
func (s *Store) GetExecutingJob(iid ffs.APIID, c cid.Cid) *ffs.JobID {
s.lock.Lock()
defer s.lock.Unlock()
j, ok := s.executingJobs[iid][c]
if !ok {
return nil
Expand Down
Loading