Skip to content

Commit 6b0d61c

Browse files
authored
Update cache on CAS for DDB (#5838)
* Update cache on CAS for DDB Signed-off-by: Daniel Deluiggi <[email protected]> * Update CHANGELOG Signed-off-by: Daniel Deluiggi <[email protected]> * Remove id from mock Signed-off-by: Daniel Deluiggi <[email protected]> --------- Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent a5fa68d commit 6b0d61c

File tree

3 files changed

+41
-1
lines changed

3 files changed

+41
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734
3636
* [BUGFIX] Distributor: Shuffle-Sharding with IngestionTenantShardSize == 0, default sharding strategy should be used #5189
3737
* [BUGFIX] Cortex: Fix GRPC stream clients not honoring overrides for call options. #5797
38+
* [BUGFIX] Ring DDB: Fix lifecycle for ring counting unhealthy pods as healthy. #5838
3839

3940

4041
## 1.16.0 2023-11-20

pkg/ring/kv/dynamodb/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,12 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
189189
}
190190

191191
if len(putRequests) > 0 || len(deleteRequests) > 0 {
192-
return c.kv.Batch(ctx, putRequests, deleteRequests)
192+
err = c.kv.Batch(ctx, putRequests, deleteRequests)
193+
if err != nil {
194+
return err
195+
}
196+
c.updateStaleData(key, r, time.Now().UTC())
197+
return nil
193198
}
194199

195200
if len(putRequests) == 0 && len(deleteRequests) == 0 {

pkg/ring/kv/dynamodb/client_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,40 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
228228
})
229229
}
230230

231+
func Test_CAS_UpdateStale(t *testing.T) {
232+
ddbMock := NewDynamodbClientMock()
233+
codecMock := &CodecMock{}
234+
descMock := &DescMock{}
235+
descMockResult := &DescMock{}
236+
startTime := time.Now().UTC().Add(-time.Millisecond)
237+
238+
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
239+
expectedUpdatedKeys := []string{"t1", "t2"}
240+
expectedUpdated := map[string][]byte{
241+
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
242+
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
243+
}
244+
expectedBatch := map[dynamodbKey][]byte{
245+
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
246+
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
247+
}
248+
249+
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
250+
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
251+
descMock.On("Clone").Return(descMock).Once()
252+
descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once()
253+
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
254+
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
255+
256+
err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
257+
return descMockResult, true, nil
258+
})
259+
260+
require.NoError(t, err)
261+
require.Equal(t, descMockResult, c.staleData[key].data)
262+
require.True(t, startTime.Before(c.staleData[key].timestamp))
263+
}
264+
231265
func Test_WatchPrefix(t *testing.T) {
232266
ddbMock := NewDynamodbClientMock()
233267
codecMock := &CodecMock{}

0 commit comments

Comments
 (0)