diff --git a/CHANGELOG.md b/CHANGELOG.md index 92627b43d5..00ff7a81c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 +* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/pkg/distributor/ha_tracker.go b/pkg/distributor/ha_tracker.go index e8c2dded3a..51d1b5f370 100644 --- a/pkg/distributor/ha_tracker.go +++ b/pkg/distributor/ha_tracker.go @@ -107,8 +107,8 @@ type haTracker struct { limits haTrackerLimits electedLock sync.RWMutex - elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster". - clusters map[string]int // Number of clusters with elected replicas that a single user has. Key = user. + elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster". + clusters map[string]map[string]struct{} // Known clusters with elected replicas per user. First key = user, second key = cluster name. electedReplicaChanges *prometheus.CounterVec electedReplicaTimestamp *prometheus.GaugeVec @@ -135,7 +135,7 @@ func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re updateTimeoutJitter: jitter, limits: limits, elected: map[string]ReplicaDesc{}, - clusters: map[string]int{}, + clusters: map[string]map[string]struct{}{}, electedReplicaChanges: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ha_tracker_elected_replica_changes_total", @@ -226,6 +226,14 @@ func (c *haTracker) loop(ctx context.Context) error { delete(c.elected, key) c.electedReplicaChanges.DeleteLabelValues(user, cluster) c.electedReplicaTimestamp.DeleteLabelValues(user, cluster) + + userClusters := c.clusters[user] + if userClusters != nil { + delete(userClusters, cluster) + if len(userClusters) == 0 { + delete(c.clusters, user) + } + } return true } @@ -234,7 +242,10 @@ func (c *haTracker) loop(ctx context.Context) error { c.electedReplicaChanges.WithLabelValues(user, cluster).Inc() } if !exists { - c.clusters[user]++ + if c.clusters[user] == nil { + c.clusters[user] = map[string]struct{}{} + } + c.clusters[user][cluster] = struct{}{} } c.elected[key] = *replica c.electedReplicaTimestamp.WithLabelValues(user, cluster).Set(float64(replica.ReceivedAt / 1000)) @@ -359,7 +370,7 @@ func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica s c.electedLock.RLock() entry, ok := c.elected[key] - clusters := c.clusters[userID] + clusters := len(c.clusters[userID]) c.electedLock.RUnlock() if ok && now.Sub(timestamp.Time(entry.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter { diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index dbfdc33bd8..705e34ce9b 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -552,6 +552,28 @@ func TestHAClustersLimit(t *testing.T) { assert.NoError(t, t1.checkReplica(context.Background(), userID, "b", "b2", now)) waitForClustersUpdate(t, 2, t1, userID) + + // Mark cluster "a" for deletion (it was last updated 5 seconds ago) + // We use seconds timestamp resolution here, to avoid cleaning up 'b'. (In KV store, we only store seconds). + t1.cleanupOldReplicas(context.Background(), time.Unix(now.Unix(), 0)) + waitForClustersUpdate(t, 1, t1, userID) + + // Now adding cluster "c" works. + assert.NoError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now)) + waitForClustersUpdate(t, 2, t1, userID) + + // But yet another cluster doesn't. + assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), "too many HA clusters (limit: 2)") + + now = now.Add(5 * time.Second) + + // clean all replicas + t1.cleanupOldReplicas(context.Background(), now) + waitForClustersUpdate(t, 0, t1, userID) + + // Now "a" works again. + assert.NoError(t, t1.checkReplica(context.Background(), userID, "a", "a1", now)) + waitForClustersUpdate(t, 1, t1, userID) } func waitForClustersUpdate(t *testing.T, expected int, tr *haTracker, userID string) { @@ -560,7 +582,7 @@ func waitForClustersUpdate(t *testing.T, expected int, tr *haTracker, userID str tr.electedLock.RLock() defer tr.electedLock.RUnlock() - return tr.clusters[userID] + return len(tr.clusters[userID]) }) } @@ -686,26 +708,31 @@ func TestCheckReplicaCleanup(t *testing.T) { // Replica is not marked for deletion yet. checkReplicaDeletionState(t, time.Second, c, userID, cluster, true, true, false) + checkUserClusters(t, time.Second, c, userID, 1) // This will mark replica for deletion (with time.Now()) c.cleanupOldReplicas(ctx, now.Add(1*time.Second)) // Verify marking for deletion. checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, true, true) + checkUserClusters(t, time.Second, c, userID, 0) // This will "revive" the replica. now = time.Now() err = c.checkReplica(context.Background(), userID, cluster, replica, now) assert.NoError(t, err) checkReplicaTimestamp(t, time.Second, c, userID, cluster, replica, now) // This also checks that entry is not marked for deletion. + checkUserClusters(t, time.Second, c, userID, 1) // This will mark replica for deletion again (with new time.Now()) c.cleanupOldReplicas(ctx, now.Add(1*time.Second)) checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, true, true) + checkUserClusters(t, time.Second, c, userID, 0) // Delete entry marked for deletion completely. c.cleanupOldReplicas(ctx, time.Now().Add(5*time.Second)) checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, false, false) + checkUserClusters(t, time.Second, c, userID, 0) require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total Number of elected replicas marked for deletion. @@ -725,6 +752,21 @@ func TestCheckReplicaCleanup(t *testing.T) { )) } +func checkUserClusters(t *testing.T, duration time.Duration, c *haTracker, user string, expectedClusters int) { + t.Helper() + test.Poll(t, duration, nil, func() interface{} { + c.electedLock.RLock() + cl := len(c.clusters[user]) + c.electedLock.RUnlock() + + if cl != expectedClusters { + return fmt.Errorf("expected clusters: %d, got %d", expectedClusters, cl) + } + + return nil + }) +} + func checkReplicaDeletionState(t *testing.T, duration time.Duration, c *haTracker, user, cluster string, expectedExistsInMemory, expectedExistsInKV, expectedMarkedForDeletion bool) { key := fmt.Sprintf("%s/%s", user, cluster)