From 367ab0729e23cc074d471cbefb01a9d8077cf0f0 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 20 Jan 2021 10:17:41 +0100 Subject: [PATCH 1/2] Improve bucket index loader to handle edge cases Signed-off-by: Marco Pracucci --- pkg/storage/tsdb/bucketindex/loader.go | 32 ++-- pkg/storage/tsdb/bucketindex/loader_test.go | 159 +++++++++++++++++++- 2 files changed, 172 insertions(+), 19 deletions(-) diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index e4a0c2b1f5..127171c72c 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -110,10 +110,12 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { // (eg. corrupted bucket index or not existing). l.cacheIndex(userID, nil, err) - l.loadFailures.Inc() if errors.Is(err, ErrIndexNotFound) { level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) } else { + // We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just + // started to remote write and its blocks haven't uploaded to storage yet). + l.loadFailures.Inc() level.Error(l.logger).Log("msg", "unable to load bucket index", "user", userID, "err", err) } @@ -166,12 +168,17 @@ func (l *Loader) checkCachedIndexesToUpdateAndDelete() (toUpdate, toDelete []str defer l.indexesMx.RUnlock() for userID, entry := range l.indexes { + // Given ErrIndexNotFound is a legit case and assuming UpdateOnErrorInterval is lower than + // UpdateOnStaleInterval, we don't consider ErrIndexNotFound as an error with regards to the + // refresh interval and so it will updated once stale. + isError := entry.err != nil && !errors.Is(entry.err, ErrIndexNotFound) + switch { case now.Sub(entry.getRequestedAt()) >= l.cfg.IdleTimeout: toDelete = append(toDelete, userID) - case entry.err != nil && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnErrorInterval: + case isError && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnErrorInterval: toUpdate = append(toUpdate, userID) - case entry.err == nil && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnStaleInterval: + case !isError && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnStaleInterval: toUpdate = append(toUpdate, userID) } } @@ -186,18 +193,7 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) { l.loadAttempts.Inc() startTime := time.Now() idx, err := ReadIndex(readCtx, l.bkt, userID, l.logger) - - if errors.Is(err, ErrIndexNotFound) { - level.Info(l.logger).Log("msg", "unloaded bucket index", "user", userID, "reason", "not found during periodic check") - - // Remove from cache. - l.indexesMx.Lock() - delete(l.indexes, userID) - l.indexesMx.Unlock() - - return - } - if err != nil { + if err != nil && !errors.Is(err, ErrIndexNotFound) { l.loadFailures.Inc() level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err) return @@ -205,10 +201,12 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) { l.loadDuration.Observe(time.Since(startTime).Seconds()) - // Cache it. + // We cache it either it was successfully refreshed or wasn't found. An use case for caching the ErrIndexNotFound + // is when a tenant has rules configured but hasn't started remote writing yet. Rules will be evaluated and + // bucket index loaded by the ruler. l.indexesMx.Lock() l.indexes[userID].index = idx - l.indexes[userID].err = nil + l.indexes[userID].err = err l.indexes[userID].setUpdatedAt(startTime) l.indexesMx.Unlock() } diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index 15d8d45686..16de397869 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -97,10 +97,13 @@ func TestLoader_GetIndex_ShouldCacheError(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) + // Write a corrupted index. + require.NoError(t, bkt.Upload(ctx, path.Join("user-1", IndexCompressedFilename), strings.NewReader("invalid!}"))) + // Request the index multiple times. for i := 0; i < 10; i++ { _, err := loader.GetIndex(ctx, "user-1") - require.Equal(t, ErrIndexNotFound, err) + require.Equal(t, ErrIndexCorrupted, err) } // Ensure metrics have been updated accordingly. @@ -121,6 +124,42 @@ func TestLoader_GetIndex_ShouldCacheError(t *testing.T) { )) } +func TestLoader_GetIndex_ShouldCacheIndexNotFoundError(t *testing.T) { + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Create the loader. + loader := NewLoader(prepareLoaderConfig(), bkt, log.NewNopLogger(), reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, loader)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) + }) + + // Request the index multiple times. + for i := 0; i < 10; i++ { + _, err := loader.GetIndex(ctx, "user-1") + require.Equal(t, ErrIndexNotFound, err) + } + + // Ensure metrics have been updated accordingly. + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures. + # TYPE cortex_bucket_index_load_failures_total counter + cortex_bucket_index_load_failures_total 0 + # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory. + # TYPE cortex_bucket_index_loaded gauge + cortex_bucket_index_loaded 0 + # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts. + # TYPE cortex_bucket_index_loads_total counter + cortex_bucket_index_loads_total 1 + `), + "cortex_bucket_index_loads_total", + "cortex_bucket_index_load_failures_total", + "cortex_bucket_index_loaded", + )) +} + func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() @@ -191,6 +230,9 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T) reg := prometheus.NewPedanticRegistry() bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + // Write a corrupted index. + require.NoError(t, bkt.Upload(ctx, path.Join("user-1", IndexCompressedFilename), strings.NewReader("invalid!}"))) + // Create the loader. cfg := LoaderConfig{ CheckInterval: time.Second, @@ -205,6 +247,59 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T) require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) + _, err := loader.GetIndex(ctx, "user-1") + assert.Equal(t, ErrIndexCorrupted, err) + + // Upload the bucket index. + idx := &Index{ + Version: IndexVersion1, + Blocks: Blocks{ + {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20}, + }, + BlockDeletionMarks: nil, + UpdatedAt: time.Now().Unix(), + } + require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx)) + + // Wait until the index has been updated in background. + test.Poll(t, 3*time.Second, nil, func() interface{} { + _, err := loader.GetIndex(ctx, "user-1") + return err + }) + + actualIdx, err := loader.GetIndex(ctx, "user-1") + require.NoError(t, err) + assert.Equal(t, idx, actualIdx) + + // Ensure metrics have been updated accordingly. + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory. + # TYPE cortex_bucket_index_loaded gauge + cortex_bucket_index_loaded 1 + `), + "cortex_bucket_index_loaded", + )) +} + +func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousIndexNotFound(t *testing.T) { + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Create the loader. + cfg := LoaderConfig{ + CheckInterval: time.Second, + UpdateOnStaleInterval: time.Second, + UpdateOnErrorInterval: time.Hour, // Intentionally high to not hit it. + IdleTimeout: time.Hour, // Intentionally high to not hit it. + } + + loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, loader)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) + }) + _, err := loader.GetIndex(ctx, "user-1") assert.Equal(t, ErrIndexNotFound, err) @@ -239,7 +334,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T) )) } -func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) { +func TestLoader_ShouldNotCacheCriticalErrorOnBackgroundUpdates(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) @@ -295,6 +390,66 @@ func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) { )) } +func TestLoader_ShouldCacheIndexNotFoundOnBackgroundUpdates(t *testing.T) { + ctx := context.Background() + reg := prometheus.NewPedanticRegistry() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Create a bucket index. + idx := &Index{ + Version: IndexVersion1, + Blocks: Blocks{ + {ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20}, + }, + BlockDeletionMarks: nil, + UpdatedAt: time.Now().Unix(), + } + require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx)) + + // Create the loader. + cfg := LoaderConfig{ + CheckInterval: time.Second, + UpdateOnStaleInterval: time.Second, + UpdateOnErrorInterval: time.Second, + IdleTimeout: time.Hour, // Intentionally high to not hit it. + } + + loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, loader)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) + }) + + actualIdx, err := loader.GetIndex(ctx, "user-1") + require.NoError(t, err) + assert.Equal(t, idx, actualIdx) + + // Delete the bucket index. + require.NoError(t, DeleteIndex(ctx, bkt, "user-1")) + + // Wait until the next index load attempt occurs. + prevLoads := testutil.ToFloat64(loader.loadAttempts) + test.Poll(t, 3*time.Second, true, func() interface{} { + return testutil.ToFloat64(loader.loadAttempts) > prevLoads + }) + + // We expect the bucket index is not considered loaded because of the error. + assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory. + # TYPE cortex_bucket_index_loaded gauge + cortex_bucket_index_loaded 0 + `), + "cortex_bucket_index_loaded", + )) + + // Try to get the index again. We expect no load attempt because the error has been cached. + prevLoads = testutil.ToFloat64(loader.loadAttempts) + actualIdx, err = loader.GetIndex(ctx, "user-1") + assert.Equal(t, ErrIndexNotFound, err) + assert.Nil(t, actualIdx) + assert.Equal(t, prevLoads, testutil.ToFloat64(loader.loadAttempts)) +} + func TestLoader_ShouldOffloadIndexIfNotFoundDuringBackgroundUpdates(t *testing.T) { ctx := context.Background() reg := prometheus.NewPedanticRegistry() From f40b6961a0de6ef07ce1eb32a9a49bc99485e380 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 20 Jan 2021 10:18:35 +0100 Subject: [PATCH 2/2] Updated CHANGELOG Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa53d9a5ab..6f28d75f02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ * `cortex_alertmanager_sync_configs_failed_total` * `cortex_alertmanager_tenants_discovered` * `cortex_alertmanager_tenants_owned` -* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625 #3711 +* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625 #3711 #3715 * [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier, store-gateway and ruler. When enabled, the querier, store-gateway and ruler will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier and ruler: #3614 #3625 * `cortex_bucket_index_loads_total` * `cortex_bucket_index_load_failures_total`