-
Notifications
You must be signed in to change notification settings - Fork 818
Unify the replication calculation between query and push. #681
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unify the replication calculation between query and push. #681
Conversation
4ee5628
to
df2f219
Compare
(found a bug with this, don't merge just yet) |
After some more work, I realised all this replication logic really belongs to the ring, as does the replication factor itself. This reduces coupling quite a lot IMO, and the ring interface now becomes "tell me who to talk to and how many errors to tolerate", with all the liveness filtering and quorum calculations happening in the ring. Going to deploy to our dev env and see how this works. |
Have put this through a couple of rolling upgrades and it works, I'd say its ready for review. @bboreham fancy this one? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found some nits.
I find this code rather confusing, will need to come back to it later.
pkg/ring/ring.go
Outdated
// size of the replica set for read, but we can read from Leaving ingesters, | ||
// so don't skip it in this case. | ||
// NB ingester will be filterer later (by distributor/replication_strategy). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spelling "filterer"
pkg/ring/ring.go
Outdated
// want to write the extra replica somewhere. So we increase the size of the | ||
// set of replicas for the key. This means we have to also increase the | ||
// We do not want to Write to Ingesters that are not ACTIVE, because they are | ||
// about to go away, but we do want to write the extra replica somewhere. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: could be pending or joining as well as leaving, so "about to go away" is not a full explanation.
pkg/distributor/distributor_test.go
Outdated
@@ -24,24 +24,33 @@ type mockRing struct { | |||
heartbeatTimeout time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this?
pkg/ring/replication_strategy.go
Outdated
) { | ||
// We need a response from a quorum of ingesters, which is n/2 + 1. In the | ||
// case of a node joining/leaving, the actual replica set might be bigger | ||
// than the replication factor, so we need to account for this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is clear what "account for this" means or refers to.
pkg/ring/replication_strategy.go
Outdated
"time" | ||
) | ||
|
||
func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please explain what the overall purpose of this function is, and what the ingesters
parameter should contain
Thanks for the feedback; this is quite a complicated and important aspect of Cortex so a second brain is appreciated. I've kept |
AAMOI does this make it easier to do #467 (comment) ? |
Yes! We can reevaluate writing out the extra replica when we hit a non-active ingester, given our transfer protocol. I'd prefer to fix up the transfers so they do the right thing though - instead of streaming of the new joining ingester, they should really calculate where their data should like once their tokens are gone, and send it there (similarly, joining nodes should 'steal' data from nodes based on the tokens they pick). This will enable us to much more easily scale up and down, and in this word the current logic in the ring is correct. But thats more work - I'll get to it this year I suspect. |
Friendly ping? |
@bboreham is not available this week. I'm not likely to get to this before then (although will try). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the behaviour will change - we will no longer reads from LEAVING
ingesters. Am I mistaken?
Also a couple more nits.
pkg/ring/ring.go
Outdated
// The order of the result matches the order of the input. | ||
func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]*IngesterDesc, error) { | ||
// BatchGet returns ReplicationFactor (or more) ingesters which form the replicas | ||
// for the given key. The order of the result matches the order of the input. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
key s
pkg/ring/ring.go
Outdated
@@ -84,6 +94,10 @@ type Ring struct { | |||
|
|||
// New creates a new Ring | |||
func New(cfg Config) (*Ring, error) { | |||
if 0 > cfg.ReplicationFactor { | |||
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message does not match test. Suspect you meant 0 >=
pkg/ring/replication_strategy.go
Outdated
|
||
// IsHealthy checks whether an ingester appears to be alive and heartbeating | ||
func (r *Ring) IsHealthy(ingester *IngesterDesc) bool { | ||
if ingester.State != ACTIVE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this means you no longer read from LEAVING
ingesters - was that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't, good catch.
// size of the replica set for read, but we can read from Leaving ingesters, | ||
// so don't skip it in this case. | ||
// NB dead ingester will be filtered later (by replication_strategy.go). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dead and not-ACTIVE
ingesters are filtered by IsHealthy()
, which doesn't really match the logic here. Or I am missing the subtlety.
Thanks for the review, I've reintroduced the old behaviour. |
pkg/ring/ring.go
Outdated
@@ -293,7 +323,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { | |||
LEAVING.String(): 0, | |||
} | |||
for _, ingester := range r.ringDesc.Ingesters { | |||
if !r.IsHealthy(ingester) { | |||
if !r.IsHealthy(ingester, Write) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now you're never going to report LEAVING ingesters
pkg/ring/http.go
Outdated
@@ -102,7 +102,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||
ing := r.ringDesc.Ingesters[id] | |||
timestamp := time.Unix(ing.Timestamp, 0) | |||
state := ing.State.String() | |||
if !r.IsHealthy(ing) { | |||
if !r.IsHealthy(ing, Write) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably don't want this to report unhealthy on LEAVING either
@tomwilkie I think it's close; do you want to rebase and consider the reporting aspects? |
…ategy is up to date.
@tomwilkie I have rebased and added a suggested fix at https://github.com/weaveworks/cortex/compare/604-unify-replication-calcs; PTAL |
92048cd
to
ace81ba
Compare
You're branch looks good Bryan; I've pull it in here. Thanks for doing that. |
Part of #604, fixes #279
The logic for doing replication was subtlety different on the query side and on the push side - push used the number of ingesters returned to calculate quorum, and query used the replication factor. Using the replication factor to calculate quorum will occasionally give the wrong answer when ingesters are joining and leaving. Whats more, there were a bunch of checks on the push path that weren't on the query path.
This PR unifies the logic between these two paths, effectively adding a bunch of checks on the query path and dealing with the case where you have less ingesters than the replication factor (by returning an appropriate error).