-
Notifications
You must be signed in to change notification settings - Fork 816
distributor: don't return 5xx on a single ingester outage #4388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
08ef41b
to
c606386
Compare
Just leaving this as a draft now to see if make sense... @pracucci @bboreham @pstibrany @tomwilkie any thoughts? Thanks |
64a8407
to
a294a99
Compare
7266c6c
to
996fc93
Compare
Let's keep talking in the issue until we reach a consensus on the logic. I left a comment to explain why I'm not sure returning 4xx on first 4xx received may be correct: |
625957f
to
2c3ccce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't really follow the central logic. I suspect it does work, but it does so because of an intricate interplay. Can the code explain itself better or be less intricate?
pkg/ring/batch.go
Outdated
verifyIfShouldReturn := func() { | ||
if sampleTrackers[i].remaining.Dec() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would pass i
as a parameter, to avoid any problem with closure binding.
pkg/ring/batch.go
Outdated
if b.rpcsFailed.Inc() == 1 { | ||
b.err <- err | ||
} | ||
} else { | ||
if sampleTrackers[i].succeeded.Inc() != int32(sampleTrackers[i].minSuccess) { | ||
verifyIfShouldReturn() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can call b.rpcsFailed.Inc()
. Why are we incrementing the Failed
number on the no-error branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only do that when there is no remaining ingesters to call anymore - So even though the last ingester succeeded, we need to return an error. (in other words, because there is cases where we return an error even though the ingesters call succeeded).
lets consider the following scenario:
First Request: 4xx
Second Request: 5xx (no quorum yet)
Third Request: 2xx (in this case we did not reach maxFailures for 4xx nor 5xx)
In this particular scenario we can see that even though we have 2 errors (4xx + 5xx) we did not returned any error yet (as we had no quorum). What the verifyIfShouldReturn
is doing is basically checking that there is no remaining calls to do for this particular sample and id not, return an error (pick any of the 4xx or 5xx). Its important to note that this function is called only if we did not reach the minSuccess
needed to return a success.
Ps:
b.rpcsFailed.Inc()
is here only to make sure we only return one error for the whole batch.
pkg/distributor/distributor_test.go
Outdated
|
||
// Using 489 just to make sure we are not hitting the &limits | ||
// Simulating 2 4xx and 1 5xx -> Should return 4xx | ||
ingesters[0].failResp.Store(httpgrpc.Errorf(489, "Throttling")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a typo for 429, or am I missing some context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its a typo :D
pkg/distributor/distributor_test.go
Outdated
ingesters[1].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) | ||
ingesters[2].failResp.Store(httpgrpc.Errorf(489, "Throttling")) | ||
|
||
for i := 0; i < 1000; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 1000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried many different implementations and many of them failed due race conditions... so this tests is testing multiples times just to make sure we dont have a race condition problem
pkg/distributor/distributor_test.go
Outdated
ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling")) | ||
ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) | ||
|
||
for i := 0; i < 10000; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 10000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
pkg/distributor/distributor_test.go
Outdated
require.NoError(t, err) | ||
|
||
// Give time to the ring get updated with the KV value | ||
time.Sleep(5 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very long compared to sleeps in other tests - is there another way to ensure propagation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.. probably i can...
pkg/distributor/distributor_test.go
Outdated
ingesters[1].happy.Store(true) | ||
ingesters[2].happy.Store(true) | ||
|
||
err := r.KVClient.CAS(context.Background(), ring.IngesterRingKey, func(in interface{}) (interface{}, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we only doing a CAS on one ingester?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CAS return the whole ring here (in this case that has 3 ingesters):
r := in.(*ring.Desc)
And im just changing the state of ingester2 to "LEFT" -> So after this operation the ring will still have 3 ingester but ingester2 will be in left state.
pkg/ring/batch.go
Outdated
@@ -118,15 +134,28 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { | |||
// The use of atomic increments here guarantees only a single sendSamples | |||
// goroutine will write to either channel. | |||
for i := range sampleTrackers { | |||
verifyIfShouldReturn := func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verify
to me suggests a function without side-effects; this one has three.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah..
i agree that this logic is hard to understand.. lemme try to rewrite in another way.
4399ab1
to
49478f9
Compare
I converted to draft as seems that the cc @bboreham |
Signed-off-by: Alan Protasio <[email protected]>
Signed-off-by: Alan Protasio <[email protected]>
0d6754f
to
a7efd0d
Compare
Hi... I was going to do this change in the DSKit but as this code is back in cortex I rebased it here (I can still do there if someone think it is worth). @bboreham do you think with the comments now the code is a little more clear? |
Signed-off-by: Alan Protasio <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it does look better. I am down to complaining about comments now :-)
Take this comment, which you didn't change:
// If we succeed, decrement each sample's pending count by one.
I think this sentence should have been removed long ago. The following sentence is more true:
If we reach
// the required number of successful puts on this sample, then decrement the
// number of pending samples by one.
And it is still true, but you have added a similar comment on line 155. Maybe we don't need this one any more.
If we successfully push all samples to
// min success instances, wake up the waiting rpc so it can return early.
This is still true; perhaps could move to line 155.
// Similarly, track the number of errors, and if it exceeds maxFailures
// shortcut the waiting rpc.
This bit is more complicated now: the comment on line 144 is closer.
// The use of atomic increments here guarantees only a single sendSamples
// goroutine will write to either channel.
Still true, but you now have more atomics so perhaps clarify it is specifically rpcsFailed
and rpcsPending
that are referred to.
Separately, in the test I would like a comment explaining the 1000 and 10000 "magic numbers".
pkg/ring/batch.go
Outdated
b.err <- err | ||
// We count the error by error family (4xx and 5xx) | ||
errCount := sampleTrackers[i].recordError(err) | ||
// We should return an error if we reach the maxFailure (quorum) on a give error family OR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/give/given/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated! :D
dab8dfb
to
8a2b76d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved!
Thanks a lot @bboreham :D |
What this PR does:
Fixes cases where a single ingester outage cause 5XXs.
Which issue(s) this PR fixes:
Fixes #4381
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]