diff --git a/CHANGELOG.md b/CHANGELOG.md index ee4e4d005a..41f58ee041 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734 * [BUGFIX] Distributor: Shuffle-Sharding with IngestionTenantShardSize == 0, default sharding strategy should be used #5189 * [BUGFIX] Cortex: Fix GRPC stream clients not honoring overrides for call options. #5797 +* [BUGFIX] Ring DDB: Fix lifecycle for ring counting unhealthy pods as healthy. #5838 ## 1.16.0 2023-11-20 diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 3ae0299f07..75fef517b3 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -189,7 +189,12 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou } if len(putRequests) > 0 || len(deleteRequests) > 0 { - return c.kv.Batch(ctx, putRequests, deleteRequests) + err = c.kv.Batch(ctx, putRequests, deleteRequests) + if err != nil { + return err + } + c.updateStaleData(key, r, time.Now().UTC()) + return nil } if len(putRequests) == 0 && len(deleteRequests) == 0 { diff --git a/pkg/ring/kv/dynamodb/client_test.go b/pkg/ring/kv/dynamodb/client_test.go index 9a72dbeb93..666e2cf311 100644 --- a/pkg/ring/kv/dynamodb/client_test.go +++ b/pkg/ring/kv/dynamodb/client_test.go @@ -228,6 +228,40 @@ func Test_WatchKey_UpdateStale(t *testing.T) { }) } +func Test_CAS_UpdateStale(t *testing.T) { + ddbMock := NewDynamodbClientMock() + codecMock := &CodecMock{} + descMock := &DescMock{} + descMockResult := &DescMock{} + startTime := time.Now().UTC().Add(-time.Millisecond) + + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) + expectedUpdatedKeys := []string{"t1", "t2"} + expectedUpdated := map[string][]byte{ + expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), + expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]), + } + expectedBatch := map[dynamodbKey][]byte{ + {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]), + {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]), + } + + ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() + descMock.On("Clone").Return(descMock).Once() + descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once() + codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once() + + err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { + return descMockResult, true, nil + }) + + require.NoError(t, err) + require.Equal(t, descMockResult, c.staleData[key].data) + require.True(t, startTime.Before(c.staleData[key].timestamp)) +} + func Test_WatchPrefix(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{}