From 8111524927c5c6869dede6adc9cccfe5c74d2dea Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2019 12:20:37 +0100 Subject: [PATCH 01/10] Do not allow to query PENDING ingesters (consider them unhealthy) Signed-off-by: Marco Pracucci --- pkg/ring/model.go | 2 +- pkg/ring/model_test.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 43ae3140ff..b2900d4bbd 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -184,7 +184,7 @@ func (d *Desc) TokensFor(id string) (tokens, other Tokens) { func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool { if op == Write && i.State != ACTIVE { return false - } else if op == Read && i.State == JOINING { + } else if op == Read && i.State != ACTIVE && i.State != LEAVING { return false } return time.Now().Sub(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 97e954ad06..8b13f9a4f5 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -40,6 +40,18 @@ func TestIngesterDesc_IsHealthy(t *testing.T) { writeExpected: false, readExpected: true, }, + "PENDING ingester with last keepalive newer than timeout": { + ingester: &IngesterDesc{State: PENDING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + writeExpected: false, + readExpected: false, + }, + "LEFT ingester with last keepalive newer than timeout": { + ingester: &IngesterDesc{State: LEFT, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + writeExpected: false, + readExpected: false, + }, } for testName, testData := range tests { From 14b2e8c4c87392d4afd33adfa7e3b482624cb0a6 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2019 12:48:56 +0100 Subject: [PATCH 02/10] Do not create TSDB if not exist when querying an ingester Signed-off-by: Marco Pracucci --- pkg/ingester/ingester.go | 2 +- pkg/ingester/ingester_v2.go | 22 ++--- pkg/ingester/ingester_v2_test.go | 144 ++++++++++++++++++++++++++++++- 3 files changed, 155 insertions(+), 13 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2cbb5899d7..eee34b3892 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -191,7 +191,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c } if cfg.TSDBEnabled { - return NewV2(cfg, clientConfig, limits, chunkStore, registerer) + return NewV2(cfg, clientConfig, limits, registerer) } i := &Ingester{ diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 5eebd7d0cc..6c2a9e5cbb 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -33,7 +33,7 @@ type TSDBState struct { } // NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage -func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) { +func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) { bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.TSDBConfig, "cortex", util.Logger) if err != nil { return nil, err @@ -44,7 +44,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, clientConfig: clientConfig, metrics: newIngesterMetrics(registerer), limits: limits, - chunkStore: chunkStore, + chunkStore: nil, quit: make(chan struct{}), TSDBState: TSDBState{ @@ -151,9 +151,9 @@ func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*clie i.metrics.queries.Inc() - db, err := i.getOrCreateTSDB(userID) - if err != nil { - return nil, fmt.Errorf("failed to find/create user db: %v", err) + db := i.getTSDB(userID) + if db == nil { + return &client.QueryResponse{}, nil } q, err := db.Querier(int64(from), int64(through)) @@ -220,9 +220,9 @@ func (i *Ingester) v2LabelValues(ctx old_ctx.Context, req *client.LabelValuesReq return nil, err } - db, err := i.getOrCreateTSDB(userID) - if err != nil { - return nil, fmt.Errorf("failed to find/create user db: %v", err) + db := i.getTSDB(userID) + if db == nil { + return &client.LabelValuesResponse{}, nil } through := time.Now() @@ -249,9 +249,9 @@ func (i *Ingester) v2LabelNames(ctx old_ctx.Context, req *client.LabelNamesReque return nil, err } - db, err := i.getOrCreateTSDB(userID) - if err != nil { - return nil, fmt.Errorf("failed to find/create user db: %v", err) + db := i.getTSDB(userID) + if db == nil { + return &client.LabelNamesResponse{}, nil } through := time.Now() diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 3f476810f9..1f021ada50 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1,6 +1,7 @@ package ingester import ( + "fmt" "io/ioutil" "math" "net/http" @@ -388,6 +389,147 @@ func Test_Ingester_v2Query(t *testing.T) { }) } } +func TestIngester_v2Query_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) { + i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil) + require.NoError(t, err) + defer i.Shutdown() + defer cleanup() + + // Mock request + userID := "test" + ctx := user.InjectOrgID(context.Background(), userID) + req := &client.QueryRequest{} + + res, err := i.v2Query(ctx, req) + require.NoError(t, err) + assert.Equal(t, &client.QueryResponse{}, res) + + // Check if the TSDB has been created + _, tsdbCreated := i.TSDBState.dbs[userID] + assert.False(t, tsdbCreated) +} + +func TestIngester_v2LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) { + i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil) + require.NoError(t, err) + defer i.Shutdown() + defer cleanup() + + // Mock request + userID := "test" + ctx := user.InjectOrgID(context.Background(), userID) + req := &client.LabelValuesRequest{} + + res, err := i.v2LabelValues(ctx, req) + require.NoError(t, err) + assert.Equal(t, &client.LabelValuesResponse{}, res) + + // Check if the TSDB has been created + _, tsdbCreated := i.TSDBState.dbs[userID] + assert.False(t, tsdbCreated) +} + +func TestIngester_v2LabelNames_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) { + i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil) + require.NoError(t, err) + defer i.Shutdown() + defer cleanup() + + // Mock request + userID := "test" + ctx := user.InjectOrgID(context.Background(), userID) + req := &client.LabelNamesRequest{} + + res, err := i.v2LabelNames(ctx, req) + require.NoError(t, err) + assert.Equal(t, &client.LabelNamesResponse{}, res) + + // Check if the TSDB has been created + _, tsdbCreated := i.TSDBState.dbs[userID] + assert.False(t, tsdbCreated) +} + +func TestIngester_v2Push_ShouldNotCreateTSDBIfNotInActiveState(t *testing.T) { + i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil) + require.NoError(t, err) + defer i.Shutdown() + defer cleanup() + require.Equal(t, ring.PENDING, i.lifecycler.GetState()) + + // Mock request + userID := "test" + ctx := user.InjectOrgID(context.Background(), userID) + req := &client.WriteRequest{} + + res, err := i.v2Push(ctx, req) + assert.Equal(t, fmt.Errorf(errTSDBCreateIncompatibleState, "PENDING"), err) + assert.Nil(t, res) + + // Check if the TSDB has been created + _, tsdbCreated := i.TSDBState.dbs[userID] + assert.False(t, tsdbCreated) +} + +func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNotActive(t *testing.T) { + tests := map[string]struct { + state ring.IngesterState + expectedErr error + }{ + "not allow to create TSDB if in PENDING state": { + state: ring.PENDING, + expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.PENDING), + }, + "not allow to create TSDB if in JOINING state": { + state: ring.JOINING, + expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.JOINING), + }, + "not allow to create TSDB if in LEAVING state": { + state: ring.LEAVING, + expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.LEAVING), + }, + "allow to create TSDB if in ACTIVE state": { + state: ring.ACTIVE, + expectedErr: nil, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 60 * time.Second + + i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil) + require.NoError(t, err) + defer i.Shutdown() + defer cleanup() + + // Switch ingester state to the expected one in the test + if i.lifecycler.GetState() != testData.state { + var stateChain []ring.IngesterState + + if testData.state == ring.LEAVING { + stateChain = []ring.IngesterState{ring.ACTIVE, ring.LEAVING} + } else { + stateChain = []ring.IngesterState{testData.state} + } + + for _, s := range stateChain { + err = i.lifecycler.ChangeState(context.Background(), s) + require.NoError(t, err) + } + } + + db, err := i.getOrCreateTSDB("test", false) + assert.Equal(t, testData.expectedErr, err) + + if testData.expectedErr != nil { + assert.Nil(t, db) + } else { + assert.NotNil(t, db) + } + }) + } +} func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse) { samples := []client.Sample{ @@ -432,7 +574,7 @@ func newIngesterMockWithTSDBStorage(ingesterCfg Config, registerer prometheus.Re ingesterCfg.TSDBConfig.Backend = "s3" ingesterCfg.TSDBConfig.S3.Endpoint = "localhost" - ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil, registerer) + ingester, err := NewV2(ingesterCfg, clientCfg, overrides, registerer) if err != nil { return nil, nil, err } From a8ae594f51f3496a80978b0ffe48098984b6314f Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2019 14:38:29 +0100 Subject: [PATCH 03/10] Do not allow to create TSDB if the ingester is not in ACTIVE state Signed-off-by: Marco Pracucci --- pkg/ingester/ingester_v2.go | 107 ++++++++++++++++++++---------------- pkg/ring/lifecycler.go | 10 ++-- pkg/storage/tsdb/config.go | 8 ++- 3 files changed, 71 insertions(+), 54 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 6c2a9e5cbb..66c3183eaf 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -3,7 +3,6 @@ package ingester import ( "fmt" "net/http" - "path/filepath" "time" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -26,6 +25,10 @@ import ( old_ctx "golang.org/x/net/context" ) +const ( + errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)" +) + // TSDBState holds data structures used by the TSDB storage engine type TSDBState struct { dbs map[string]*tsdb.DB // tsdb sharded by userID @@ -281,56 +284,64 @@ func (i *Ingester) getTSDB(userID string) *tsdb.DB { func (i *Ingester) getOrCreateTSDB(userID string) (*tsdb.DB, error) { db := i.getTSDB(userID) - if db == nil { - i.userStatesMtx.Lock() - defer i.userStatesMtx.Unlock() - - // Check again for DB in the event it was created in-between locks - var ok bool - db, ok = i.TSDBState.dbs[userID] - if !ok { - - udir := i.userDir(userID) - - // Create a new user database - var err error - db, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{ - RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond), - BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(), - NoLockfile: true, - }) - if err != nil { - return nil, err - } + if db != nil { + return db, nil + } - // Thanos shipper requires at least 1 external label to be set. For this reason, - // we set the tenant ID as external label and we'll filter it out when reading - // the series from the storage. - l := lbls.Labels{ - { - Name: cortex_tsdb.TenantIDExternalLabel, - Value: userID, - }, - } + i.userStatesMtx.Lock() + defer i.userStatesMtx.Unlock() - // Create a new shipper for this database - s := shipper.New(util.Logger, nil, udir, &Bucket{userID, i.TSDBState.bucket}, func() lbls.Labels { return l }, metadata.ReceiveSource) - i.done.Add(1) - go func() { - defer i.done.Done() - runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error { - if uploaded, err := s.Sync(context.Background()); err != nil { - level.Warn(util.Logger).Log("err", err, "uploaded", uploaded) - } - return nil - }) - }() - - i.TSDBState.dbs[userID] = db - } + // Check again for DB in the event it was created in-between locks + var ok bool + db, ok = i.TSDBState.dbs[userID] + if ok { + return db, nil + } + + // We're ready to create the TSDB, however we must be sure that the ingester + // is in the ACTIVE state, otherwise it may conflict with the transfer in/out. + // The TSDB is created when the first series is pushed and this shouldn't happen + // to a non-ACTIVE ingester, however we want to protect from any bug, cause we + // may have data loss or TSDB WAL corruption if the TSDB is created before/during + // a transfer in occurs. + ingesterState := i.lifecycler.GetState() + if ingesterState != ring.ACTIVE { + return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState) } + udir := i.cfg.TSDBConfig.BlocksDir(userID) + + // Create a new user database + var err error + db, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{ + RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond), + BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(), + NoLockfile: true, + }) + if err != nil { + return nil, err + } + + // Create a new shipper for this database + l := lbls.Labels{ + { + Name: "user", + Value: userID, + }, + } + s := shipper.New(util.Logger, nil, udir, &Bucket{userID, i.TSDBState.bucket}, func() lbls.Labels { return l }, metadata.ReceiveSource) + i.done.Add(1) + go func() { + defer i.done.Done() + runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error { + if uploaded, err := s.Sync(context.Background()); err != nil { + level.Warn(util.Logger).Log("err", err, "uploaded", uploaded) + } + return nil + }) + }() + + i.TSDBState.dbs[userID] = db + return db, nil } - -func (i *Ingester) userDir(userID string) string { return filepath.Join(i.cfg.TSDBConfig.Dir, userID) } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index c1e7172295..59cd1c143e 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -125,7 +125,7 @@ type Lifecycler struct { // We need to remember the ingester state just in case consul goes away and comes // back empty. And it changes during lifecycle of ingester. - stateMtx sync.Mutex + stateMtx sync.RWMutex state IngesterState tokens Tokens @@ -228,8 +228,8 @@ func (i *Lifecycler) CheckReady(ctx context.Context) error { // GetState returns the state of this ingester. func (i *Lifecycler) GetState() IngesterState { - i.stateMtx.Lock() - defer i.stateMtx.Unlock() + i.stateMtx.RLock() + defer i.stateMtx.RUnlock() return i.state } @@ -249,8 +249,8 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error } func (i *Lifecycler) getTokens() Tokens { - i.stateMtx.Lock() - defer i.stateMtx.Unlock() + i.stateMtx.RLock() + defer i.stateMtx.RUnlock() return i.tokens } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 7634cb2fb0..beb126689e 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -3,6 +3,7 @@ package tsdb import ( "errors" "flag" + "path/filepath" "strings" "time" @@ -104,7 +105,6 @@ type BucketStoreConfig struct { // RegisterFlags registers the BucketStore flags func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&cfg.SyncDir, "experimental.tsdb.bucket-store.sync-dir", "tsdb-sync", "Directory to place synced tsdb indicies.") f.Uint64Var(&cfg.IndexCacheSizeBytes, "experimental.tsdb.bucket-store.index-cache-size-bytes", uint64(250*units.Mebibyte), "Size of index cache in bytes per tenant.") f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size of chunk pool in bytes per tenant.") @@ -112,3 +112,9 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxConcurrent, "experimental.tsdb.bucket-store.max-concurrent", 20, "Max number of concurrent queries to the storage per tenant.") f.IntVar(&cfg.BlockSyncConcurrency, "experimental.tsdb.bucket-store.block-sync-concurrency", 20, "Number of Go routines to use when syncing blocks from object storage per tenant.") } + +// BlocksDir returns the directory path where TSDB blocks and wal should be +// stored by the ingester +func (cfg *Config) BlocksDir(userID string) string { + return filepath.Join(cfg.Dir, userID) +} From be939fbcab4b344cec5de816e37eed8f08b3e9a8 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2019 15:11:52 +0100 Subject: [PATCH 04/10] Fixed TSDB ingester after rebase Signed-off-by: Marco Pracucci --- pkg/ingester/ingester_v2.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 66c3183eaf..d5e5911e12 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -325,7 +325,7 @@ func (i *Ingester) getOrCreateTSDB(userID string) (*tsdb.DB, error) { // Create a new shipper for this database l := lbls.Labels{ { - Name: "user", + Name: cortex_tsdb.TenantIDExternalLabel, Value: userID, }, } From 9feaff7a2f2929d54b13248908de7c16c23ef45f Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2019 15:13:37 +0100 Subject: [PATCH 05/10] Restored comments in TSDB ingester after rebase Signed-off-by: Marco Pracucci --- pkg/ingester/ingester_v2.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index d5e5911e12..d52f9969ca 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -322,13 +322,17 @@ func (i *Ingester) getOrCreateTSDB(userID string) (*tsdb.DB, error) { return nil, err } - // Create a new shipper for this database + // Thanos shipper requires at least 1 external label to be set. For this reason, + // we set the tenant ID as external label and we'll filter it out when reading + // the series from the storage. l := lbls.Labels{ { Name: cortex_tsdb.TenantIDExternalLabel, Value: userID, }, } + + // Create a new shipper for this database s := shipper.New(util.Logger, nil, udir, &Bucket{userID, i.TSDBState.bucket}, func() lbls.Labels { return l }, metadata.ReceiveSource) i.done.Add(1) go func() { From 428bd2cef82fcf3051ddd9771b45df7718bb01dd Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2019 15:42:32 +0100 Subject: [PATCH 06/10] Fixed loading of TSDBs right after transfer while ingester is still in JOINING state Signed-off-by: Marco Pracucci --- pkg/ingester/ingester_v2.go | 12 +++++++----- pkg/ingester/transfer.go | 31 ++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index d52f9969ca..a056c8185c 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -80,7 +80,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien return nil, fmt.Errorf("no user id") } - db, err := i.getOrCreateTSDB(userID) + db, err := i.getOrCreateTSDB(userID, false) if err != nil { return nil, err } @@ -282,7 +282,7 @@ func (i *Ingester) getTSDB(userID string) *tsdb.DB { return db } -func (i *Ingester) getOrCreateTSDB(userID string) (*tsdb.DB, error) { +func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) { db := i.getTSDB(userID) if db != nil { return db, nil @@ -304,9 +304,11 @@ func (i *Ingester) getOrCreateTSDB(userID string) (*tsdb.DB, error) { // to a non-ACTIVE ingester, however we want to protect from any bug, cause we // may have data loss or TSDB WAL corruption if the TSDB is created before/during // a transfer in occurs. - ingesterState := i.lifecycler.GetState() - if ingesterState != ring.ACTIVE { - return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState) + if !force { + ingesterState := i.lifecycler.GetState() + if ingesterState != ring.ACTIVE { + return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState) + } } udir := i.cfg.TSDBConfig.BlocksDir(userID) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 54df1d6ade..9b41c843bc 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -303,7 +303,36 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error level.Info(util.Logger).Log("msg", "Total xfer", "from_ingester", fromIngesterID, "files", filesXfer, "bytes", bytesXfer) // Move the tmpdir to the final location - return os.Rename(tmpDir, i.cfg.TSDBConfig.Dir) + err = os.Rename(tmpDir, i.cfg.TSDBConfig.Dir) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("unable to move received TSDB blocks from %s to %s", tmpDir, i.cfg.TSDBConfig.Dir)) + } + + // At this point all TSDBs have been received, so we can proceed loading TSDBs in memory. + // This is required because of two reasons: + // 1. No WAL replay performance penalty once the ingester switches to ACTIVE state + // 2. If a query is received on user X, for which the TSDB has been transferred, before + // the first series is ingested, if we don't open the TSDB the query will return an + // empty result (because the TSDB is opened only on first push or transfer) + userIDs, err := ioutil.ReadDir(i.cfg.TSDBConfig.Dir) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("unable to list TSDB users in %s", i.cfg.TSDBConfig.Dir)) + } + + for _, user := range userIDs { + userID := user.Name() + + level.Info(util.Logger).Log("msg", fmt.Sprintf("Loading TSDB for user %s", userID)) + _, err = i.getOrCreateTSDB(userID, true) + + if err != nil { + level.Error(util.Logger).Log("msg", fmt.Sprintf("Unable to load TSDB for user %s", userID), "err", err) + } else { + level.Info(util.Logger).Log("msg", fmt.Sprintf("Loaded TSDB for user %s", userID)) + } + } + + return nil } if err := i.transfer(stream.Context(), xfer); err != nil { From 3a926c77ec25b95c29d20d7100d4c72eb74e98d5 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 14 Nov 2019 15:45:20 +0100 Subject: [PATCH 07/10] Compacted if statement in getOrCreateTSDB() Signed-off-by: Marco Pracucci --- pkg/ingester/ingester_v2.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index a056c8185c..c9ff24bcf8 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -304,11 +304,8 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) // to a non-ACTIVE ingester, however we want to protect from any bug, cause we // may have data loss or TSDB WAL corruption if the TSDB is created before/during // a transfer in occurs. - if !force { - ingesterState := i.lifecycler.GetState() - if ingesterState != ring.ACTIVE { - return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState) - } + if ingesterState := i.lifecycler.GetState(); !force && ingesterState != ring.ACTIVE { + return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState) } udir := i.cfg.TSDBConfig.BlocksDir(userID) From a34967014e9e82cdfd380f0e58d169396eabe1bf Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 2 Dec 2019 10:24:11 +0100 Subject: [PATCH 08/10] Rolled back controversial change to IngesterDesc.IsHealthy() which will require a dedicated discussion Signed-off-by: Marco Pracucci --- pkg/ring/model.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/model.go b/pkg/ring/model.go index b2900d4bbd..43ae3140ff 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -184,7 +184,7 @@ func (d *Desc) TokensFor(id string) (tokens, other Tokens) { func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) bool { if op == Write && i.State != ACTIVE { return false - } else if op == Read && i.State != ACTIVE && i.State != LEAVING { + } else if op == Read && i.State == JOINING { return false } return time.Now().Sub(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout From bc4e8b153652245afaf5f9a27ee6283adb7dac11 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 2 Dec 2019 10:26:07 +0100 Subject: [PATCH 09/10] Rollback tests added to IngesterDesc.IsHealthy() Signed-off-by: Marco Pracucci --- pkg/ring/model_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 8b13f9a4f5..97e954ad06 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -40,18 +40,6 @@ func TestIngesterDesc_IsHealthy(t *testing.T) { writeExpected: false, readExpected: true, }, - "PENDING ingester with last keepalive newer than timeout": { - ingester: &IngesterDesc{State: PENDING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, - timeout: time.Minute, - writeExpected: false, - readExpected: false, - }, - "LEFT ingester with last keepalive newer than timeout": { - ingester: &IngesterDesc{State: LEFT, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, - timeout: time.Minute, - writeExpected: false, - readExpected: false, - }, } for testName, testData := range tests { From 772ed2587d8024530ff2c86e2e38410ecbec428e Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 4 Dec 2019 11:00:56 +0100 Subject: [PATCH 10/10] Fixed ingester tests after rebasing Signed-off-by: Marco Pracucci --- pkg/ingester/ingester_v2_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 1f021ada50..78d6b1a4d2 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -139,13 +139,21 @@ func TestIngester_v2Push(t *testing.T) { registry := prometheus.NewRegistry() // Create a mocked ingester - i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), registry) + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 0 + + i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, registry) require.NoError(t, err) defer i.Shutdown() defer cleanup() ctx := user.InjectOrgID(context.Background(), "test") + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + // Push timeseries for idx, req := range testData.reqs { _, err := i.v2Push(ctx, req)