Skip to content

Fix bug with counting of clusters for deleted elected replicas. #4336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

## 1.10.0-rc.0 / 2021-06-28

Expand Down
21 changes: 16 additions & 5 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ type haTracker struct {
limits haTrackerLimits

electedLock sync.RWMutex
elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster".
clusters map[string]int // Number of clusters with elected replicas that a single user has. Key = user.
elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster".
clusters map[string]map[string]struct{} // Known clusters with elected replicas per user. First key = user, second key = cluster name.

electedReplicaChanges *prometheus.CounterVec
electedReplicaTimestamp *prometheus.GaugeVec
Expand All @@ -135,7 +135,7 @@ func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re
updateTimeoutJitter: jitter,
limits: limits,
elected: map[string]ReplicaDesc{},
clusters: map[string]int{},
clusters: map[string]map[string]struct{}{},

electedReplicaChanges: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ha_tracker_elected_replica_changes_total",
Expand Down Expand Up @@ -226,6 +226,14 @@ func (c *haTracker) loop(ctx context.Context) error {
delete(c.elected, key)
c.electedReplicaChanges.DeleteLabelValues(user, cluster)
c.electedReplicaTimestamp.DeleteLabelValues(user, cluster)

userClusters := c.clusters[user]
if userClusters != nil {
delete(userClusters, cluster)
if len(userClusters) == 0 {
delete(c.clusters, user)
}
}
return true
}

Expand All @@ -234,7 +242,10 @@ func (c *haTracker) loop(ctx context.Context) error {
c.electedReplicaChanges.WithLabelValues(user, cluster).Inc()
}
if !exists {
c.clusters[user]++
if c.clusters[user] == nil {
c.clusters[user] = map[string]struct{}{}
}
c.clusters[user][cluster] = struct{}{}
}
c.elected[key] = *replica
c.electedReplicaTimestamp.WithLabelValues(user, cluster).Set(float64(replica.ReceivedAt / 1000))
Expand Down Expand Up @@ -359,7 +370,7 @@ func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica s

c.electedLock.RLock()
entry, ok := c.elected[key]
clusters := c.clusters[userID]
clusters := len(c.clusters[userID])
c.electedLock.RUnlock()

if ok && now.Sub(timestamp.Time(entry.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
Expand Down
44 changes: 43 additions & 1 deletion pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,28 @@ func TestHAClustersLimit(t *testing.T) {

assert.NoError(t, t1.checkReplica(context.Background(), userID, "b", "b2", now))
waitForClustersUpdate(t, 2, t1, userID)

// Mark cluster "a" for deletion (it was last updated 5 seconds ago)
// We use seconds timestamp resolution here, to avoid cleaning up 'b'. (In KV store, we only store seconds).
t1.cleanupOldReplicas(context.Background(), time.Unix(now.Unix(), 0))
waitForClustersUpdate(t, 1, t1, userID)

// Now adding cluster "c" works.
assert.NoError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now))
waitForClustersUpdate(t, 2, t1, userID)

// But yet another cluster doesn't.
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), "too many HA clusters (limit: 2)")

now = now.Add(5 * time.Second)

// clean all replicas
t1.cleanupOldReplicas(context.Background(), now)
waitForClustersUpdate(t, 0, t1, userID)

// Now "a" works again.
assert.NoError(t, t1.checkReplica(context.Background(), userID, "a", "a1", now))
waitForClustersUpdate(t, 1, t1, userID)
}

func waitForClustersUpdate(t *testing.T, expected int, tr *haTracker, userID string) {
Expand All @@ -560,7 +582,7 @@ func waitForClustersUpdate(t *testing.T, expected int, tr *haTracker, userID str
tr.electedLock.RLock()
defer tr.electedLock.RUnlock()

return tr.clusters[userID]
return len(tr.clusters[userID])
})
}

Expand Down Expand Up @@ -686,26 +708,31 @@ func TestCheckReplicaCleanup(t *testing.T) {

// Replica is not marked for deletion yet.
checkReplicaDeletionState(t, time.Second, c, userID, cluster, true, true, false)
checkUserClusters(t, time.Second, c, userID, 1)

// This will mark replica for deletion (with time.Now())
c.cleanupOldReplicas(ctx, now.Add(1*time.Second))

// Verify marking for deletion.
checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, true, true)
checkUserClusters(t, time.Second, c, userID, 0)

// This will "revive" the replica.
now = time.Now()
err = c.checkReplica(context.Background(), userID, cluster, replica, now)
assert.NoError(t, err)
checkReplicaTimestamp(t, time.Second, c, userID, cluster, replica, now) // This also checks that entry is not marked for deletion.
checkUserClusters(t, time.Second, c, userID, 1)

// This will mark replica for deletion again (with new time.Now())
c.cleanupOldReplicas(ctx, now.Add(1*time.Second))
checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, true, true)
checkUserClusters(t, time.Second, c, userID, 0)

// Delete entry marked for deletion completely.
c.cleanupOldReplicas(ctx, time.Now().Add(5*time.Second))
checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, false, false)
checkUserClusters(t, time.Second, c, userID, 0)

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total Number of elected replicas marked for deletion.
Expand All @@ -725,6 +752,21 @@ func TestCheckReplicaCleanup(t *testing.T) {
))
}

func checkUserClusters(t *testing.T, duration time.Duration, c *haTracker, user string, expectedClusters int) {
t.Helper()
test.Poll(t, duration, nil, func() interface{} {
c.electedLock.RLock()
cl := len(c.clusters[user])
c.electedLock.RUnlock()

if cl != expectedClusters {
return fmt.Errorf("expected clusters: %d, got %d", expectedClusters, cl)
}

return nil
})
}

func checkReplicaDeletionState(t *testing.T, duration time.Duration, c *haTracker, user, cluster string, expectedExistsInMemory, expectedExistsInKV, expectedMarkedForDeletion bool) {
key := fmt.Sprintf("%s/%s", user, cluster)

Expand Down