Skip to content

Improve bucket index loader to handle edge cases #3717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* `cortex_alertmanager_sync_configs_failed_total`
* `cortex_alertmanager_tenants_discovered`
* `cortex_alertmanager_tenants_owned`
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625 #3711
* [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625 #3711 #3715
* [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier, store-gateway and ruler. When enabled, the querier, store-gateway and ruler will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier and ruler: #3614 #3625
* `cortex_bucket_index_loads_total`
* `cortex_bucket_index_load_failures_total`
Expand Down
32 changes: 15 additions & 17 deletions pkg/storage/tsdb/bucketindex/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {
// (eg. corrupted bucket index or not existing).
l.cacheIndex(userID, nil, err)

l.loadFailures.Inc()
if errors.Is(err, ErrIndexNotFound) {
level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID)
} else {
// We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just
// started to remote write and its blocks haven't uploaded to storage yet).
l.loadFailures.Inc()
level.Error(l.logger).Log("msg", "unable to load bucket index", "user", userID, "err", err)
}

Expand Down Expand Up @@ -166,12 +168,17 @@ func (l *Loader) checkCachedIndexesToUpdateAndDelete() (toUpdate, toDelete []str
defer l.indexesMx.RUnlock()

for userID, entry := range l.indexes {
// Given ErrIndexNotFound is a legit case and assuming UpdateOnErrorInterval is lower than
// UpdateOnStaleInterval, we don't consider ErrIndexNotFound as an error with regards to the
// refresh interval and so it will updated once stale.
isError := entry.err != nil && !errors.Is(entry.err, ErrIndexNotFound)

switch {
case now.Sub(entry.getRequestedAt()) >= l.cfg.IdleTimeout:
toDelete = append(toDelete, userID)
case entry.err != nil && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnErrorInterval:
case isError && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnErrorInterval:
toUpdate = append(toUpdate, userID)
case entry.err == nil && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnStaleInterval:
case !isError && now.Sub(entry.getUpdatedAt()) >= l.cfg.UpdateOnStaleInterval:
toUpdate = append(toUpdate, userID)
}
}
Expand All @@ -186,29 +193,20 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) {
l.loadAttempts.Inc()
startTime := time.Now()
idx, err := ReadIndex(readCtx, l.bkt, userID, l.logger)

if errors.Is(err, ErrIndexNotFound) {
level.Info(l.logger).Log("msg", "unloaded bucket index", "user", userID, "reason", "not found during periodic check")

// Remove from cache.
l.indexesMx.Lock()
delete(l.indexes, userID)
l.indexesMx.Unlock()

return
}
if err != nil {
if err != nil && !errors.Is(err, ErrIndexNotFound) {
l.loadFailures.Inc()
level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err)
return
}

l.loadDuration.Observe(time.Since(startTime).Seconds())

// Cache it.
// We cache it either it was successfully refreshed or wasn't found. An use case for caching the ErrIndexNotFound
// is when a tenant has rules configured but hasn't started remote writing yet. Rules will be evaluated and
// bucket index loaded by the ruler.
l.indexesMx.Lock()
l.indexes[userID].index = idx
l.indexes[userID].err = nil
l.indexes[userID].err = err
l.indexes[userID].setUpdatedAt(startTime)
l.indexesMx.Unlock()
}
Expand Down
159 changes: 157 additions & 2 deletions pkg/storage/tsdb/bucketindex/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,13 @@ func TestLoader_GetIndex_ShouldCacheError(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
})

// Write a corrupted index.
require.NoError(t, bkt.Upload(ctx, path.Join("user-1", IndexCompressedFilename), strings.NewReader("invalid!}")))

// Request the index multiple times.
for i := 0; i < 10; i++ {
_, err := loader.GetIndex(ctx, "user-1")
require.Equal(t, ErrIndexNotFound, err)
require.Equal(t, ErrIndexCorrupted, err)
}

// Ensure metrics have been updated accordingly.
Expand All @@ -121,6 +124,42 @@ func TestLoader_GetIndex_ShouldCacheError(t *testing.T) {
))
}

func TestLoader_GetIndex_ShouldCacheIndexNotFoundError(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)

// Create the loader.
loader := NewLoader(prepareLoaderConfig(), bkt, log.NewNopLogger(), reg)
require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
})

// Request the index multiple times.
for i := 0; i < 10; i++ {
_, err := loader.GetIndex(ctx, "user-1")
require.Equal(t, ErrIndexNotFound, err)
}

// Ensure metrics have been updated accordingly.
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures.
# TYPE cortex_bucket_index_load_failures_total counter
cortex_bucket_index_load_failures_total 0
# HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
# TYPE cortex_bucket_index_loaded gauge
cortex_bucket_index_loaded 0
# HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
# TYPE cortex_bucket_index_loads_total counter
cortex_bucket_index_loads_total 1
`),
"cortex_bucket_index_loads_total",
"cortex_bucket_index_load_failures_total",
"cortex_bucket_index_loaded",
))
}

func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -191,6 +230,9 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T)
reg := prometheus.NewPedanticRegistry()
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)

// Write a corrupted index.
require.NoError(t, bkt.Upload(ctx, path.Join("user-1", IndexCompressedFilename), strings.NewReader("invalid!}")))

// Create the loader.
cfg := LoaderConfig{
CheckInterval: time.Second,
Expand All @@ -205,6 +247,59 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T)
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
})

_, err := loader.GetIndex(ctx, "user-1")
assert.Equal(t, ErrIndexCorrupted, err)

// Upload the bucket index.
idx := &Index{
Version: IndexVersion1,
Blocks: Blocks{
{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
},
BlockDeletionMarks: nil,
UpdatedAt: time.Now().Unix(),
}
require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))

// Wait until the index has been updated in background.
test.Poll(t, 3*time.Second, nil, func() interface{} {
_, err := loader.GetIndex(ctx, "user-1")
return err
})

actualIdx, err := loader.GetIndex(ctx, "user-1")
require.NoError(t, err)
assert.Equal(t, idx, actualIdx)

// Ensure metrics have been updated accordingly.
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
# TYPE cortex_bucket_index_loaded gauge
cortex_bucket_index_loaded 1
`),
"cortex_bucket_index_loaded",
))
}

func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousIndexNotFound(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)

// Create the loader.
cfg := LoaderConfig{
CheckInterval: time.Second,
UpdateOnStaleInterval: time.Second,
UpdateOnErrorInterval: time.Hour, // Intentionally high to not hit it.
IdleTimeout: time.Hour, // Intentionally high to not hit it.
}

loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
})

_, err := loader.GetIndex(ctx, "user-1")
assert.Equal(t, ErrIndexNotFound, err)

Expand Down Expand Up @@ -239,7 +334,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T)
))
}

func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) {
func TestLoader_ShouldNotCacheCriticalErrorOnBackgroundUpdates(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
Expand Down Expand Up @@ -295,6 +390,66 @@ func TestLoader_ShouldNotCacheErrorOnBackgroundUpdates(t *testing.T) {
))
}

func TestLoader_ShouldCacheIndexNotFoundOnBackgroundUpdates(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)

// Create a bucket index.
idx := &Index{
Version: IndexVersion1,
Blocks: Blocks{
{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
},
BlockDeletionMarks: nil,
UpdatedAt: time.Now().Unix(),
}
require.NoError(t, WriteIndex(ctx, bkt, "user-1", idx))

// Create the loader.
cfg := LoaderConfig{
CheckInterval: time.Second,
UpdateOnStaleInterval: time.Second,
UpdateOnErrorInterval: time.Second,
IdleTimeout: time.Hour, // Intentionally high to not hit it.
}

loader := NewLoader(cfg, bkt, log.NewNopLogger(), reg)
require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
})

actualIdx, err := loader.GetIndex(ctx, "user-1")
require.NoError(t, err)
assert.Equal(t, idx, actualIdx)

// Delete the bucket index.
require.NoError(t, DeleteIndex(ctx, bkt, "user-1"))

// Wait until the next index load attempt occurs.
prevLoads := testutil.ToFloat64(loader.loadAttempts)
test.Poll(t, 3*time.Second, true, func() interface{} {
return testutil.ToFloat64(loader.loadAttempts) > prevLoads
})

// We expect the bucket index is not considered loaded because of the error.
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
# TYPE cortex_bucket_index_loaded gauge
cortex_bucket_index_loaded 0
`),
"cortex_bucket_index_loaded",
))

// Try to get the index again. We expect no load attempt because the error has been cached.
prevLoads = testutil.ToFloat64(loader.loadAttempts)
actualIdx, err = loader.GetIndex(ctx, "user-1")
assert.Equal(t, ErrIndexNotFound, err)
assert.Nil(t, actualIdx)
assert.Equal(t, prevLoads, testutil.ToFloat64(loader.loadAttempts))
}

func TestLoader_ShouldOffloadIndexIfNotFoundDuringBackgroundUpdates(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
Expand Down