Skip to content

Commit 92048cd

Browse files
committed
Reintroduce old behaviour, of reading from LEAVING ingesters.
1 parent c649934 commit 92048cd

File tree

4 files changed

+12
-9
lines changed

4 files changed

+12
-9
lines changed

pkg/ring/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
102102
ing := r.ringDesc.Ingesters[id]
103103
timestamp := time.Unix(ing.Timestamp, 0)
104104
state := ing.State.String()
105-
if !r.IsHealthy(ing) {
105+
if !r.IsHealthy(ing, Write) {
106106
state = unhealthy
107107
}
108108

pkg/ring/replication_strategy.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
// tolerate.
1111
// - Filters out dead ingesters so the one doesn't even try to write to them.
1212
// - Checks there is enough ingesters for an operation to succeed.
13-
func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) (
13+
func (r *Ring) replicationStrategy(ingesters []*IngesterDesc, op Operation) (
1414
liveIngesters []*IngesterDesc, maxFailure int, err error,
1515
) {
1616
// We need a response from a quorum of ingesters, which is n/2 + 1. In the
@@ -28,7 +28,7 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) (
2828
// will cause the whole write to fail.
2929
liveIngesters = make([]*IngesterDesc, 0, len(ingesters))
3030
for _, ingester := range ingesters {
31-
if r.IsHealthy(ingester) {
31+
if r.IsHealthy(ingester, op) {
3232
liveIngesters = append(liveIngesters, ingester)
3333
} else {
3434
maxFailure--
@@ -47,8 +47,10 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) (
4747
}
4848

4949
// IsHealthy checks whether an ingester appears to be alive and heartbeating
50-
func (r *Ring) IsHealthy(ingester *IngesterDesc) bool {
51-
if ingester.State != ACTIVE {
50+
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool {
51+
if op == Write && ingester.State != ACTIVE {
52+
return false
53+
} else if op == Read && ingester.State == JOINING {
5254
return false
5355
}
5456
return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.cfg.HeartbeatTimeout

pkg/ring/replication_strategy_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
func TestReplicationStrategy(t *testing.T) {
1313
for i, tc := range []struct {
1414
RF, LiveIngesters, DeadIngesters int
15+
op Operation // Will default to READ
1516
ExpectedMaxFailure int
1617
ExpectedError string
1718
}{
@@ -90,7 +91,7 @@ func TestReplicationStrategy(t *testing.T) {
9091
require.NoError(t, err)
9192

9293
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
93-
liveIngesters, maxFailure, err := r.replicationStrategy(ingesters)
94+
liveIngesters, maxFailure, err := r.replicationStrategy(ingesters, tc.op)
9495
if tc.ExpectedError == "" {
9596
assert.NoError(t, err)
9697
assert.Equal(t, tc.LiveIngesters, len(liveIngesters))

pkg/ring/ring.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) {
227227
ingesters = append(ingesters, ingester)
228228
}
229229

230-
liveIngesters, maxFailure, err := r.replicationStrategy(ingesters)
230+
liveIngesters, maxFailure, err := r.replicationStrategy(ingesters, op)
231231
if err != nil {
232232
return ReplicationSet{}, err
233233
}
@@ -251,7 +251,7 @@ func (r *Ring) GetAll() (ReplicationSet, error) {
251251
maxErrors := r.cfg.ReplicationFactor / 2
252252

253253
for _, ingester := range r.ringDesc.Ingesters {
254-
if !r.IsHealthy(ingester) {
254+
if !r.IsHealthy(ingester, Read) {
255255
maxErrors--
256256
continue
257257
}
@@ -323,7 +323,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) {
323323
LEAVING.String(): 0,
324324
}
325325
for _, ingester := range r.ringDesc.Ingesters {
326-
if !r.IsHealthy(ingester) {
326+
if !r.IsHealthy(ingester, Write) {
327327
byState[unhealthy]++
328328
} else {
329329
byState[ingester.State.String()]++

0 commit comments

Comments
 (0)