-
Notifications
You must be signed in to change notification settings - Fork 817
Incrementally transfer chunks per token to improve handover #1764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incrementally transfer chunks per token to improve handover #1764
Conversation
3db8e47
to
0737ed0
Compare
There are multiple test failures and reported race conditions. Would be nice to fix those. |
f67be16
to
b7b9ee6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments after reviewing first commit (thanks for splitting your work into logical steps!)
5370565
to
03b4dc9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another round of comments. I still need to better understand and review the real meat of the PR (like entire incremental_transfer.go).
My initial impression is that this is very complex piece of code, that will be hard to find and fix bugs in :-(
875afd7
to
cbc9007
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another pass, this time mostly around TokenChecker.
ce1e47e
to
a364ceb
Compare
I've addressed most of the review feedback so far. I want to rebase against latest and fix the merge conflicts before I continue addressing feedback. This may take a little bit of time; both the TSDB blocks and the gossip are going to change bits and pieces about the current implementation. |
d9009f2
to
dd5ac36
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet another round of comments.
Main issue I have is with the requirement that we need to move all the replicated data around, not just the primary data for token. This is further complicated by adjacent tokens belonging to the same ingester, dealing with unhealhty ingesters and token states.
Adjacent tokens belonging to the same ingester, I find, is the easier half of the problem. The main complexities with dealing with the ring are when tokens belonging to the same ingester are near, but not next to, each other. For example, when dealing with a ring Another issue with simplifying how we deal with the ring is the risk of introducing minor differences between what the distributor does and what the ingesters do when moving data around. If this were to happen, then incrementally joining and leaving would stop working properly: spillover may be introduced if the ingester doesn't request data from the proper ingester, and chunks may go untransferred and be flushed before their capacity is reached. I don't think we can loosen the requirement of not moving all replicated data around. If we didn't do this, every time an ingester leaves, we would lose |
Can you elaborate on how we would lose data? In the complete rollout scenario, we can use the same mechanism as we do today. Perhaps the solution could be to warn admin about not leaving too many ingesters at once? |
We would lose data because each ingester holds data for |
My understanding is that queriers ask all ingesters, so any ingester with data will reply.
… On 5 Nov 2019, at 14.09, Robert Fratto ***@***.***> wrote:
Can you elaborate on how we would lose data? In the complete rollout scenario, we can use the same mechanism as we do today. Perhaps the solution could be to warn admin about not leaving too many ingesters at once?
We would lose data because each ingester holds data for replicationFactor total ingesters , including itself. My fallback is to flush anything that didn't get transferred, but we generally have queriers configured to only query the store for stuff that isn't in memory anymore. That means there will be some period where an ingester gets a query for some data that it should be a replica of, but it never received that data during the transfer.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or unsubscribe.
|
I may be slightly wrong, but I think queriers ask all ingesters and stop once it receives responses from a quorum number of those ingesters. If none of the quorum had any data, then the query results would show no data. Again, unsure, but I believe this is how it works. |
You may by right, I haven’t checked the code, although why would it need quorum? Either it gets data from somewhere, or not. It cannot possibly get different data from different ingesters (perhaps incomplete?). I need to check the code.
… On 5 Nov 2019, at 20.01, Robert Fratto ***@***.***> wrote:
My understanding is that queriers ask all ingesters, so any ingester with data will reply.
I may be slightly wrong, but I think queriers ask all ingesters and stop once it receives responses from a quorum number of those ingesters. If none of the quorum had any data, then the query results would show no data. Again, unsure, but I believe this is how it works.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or unsubscribe.
|
} | ||
|
||
// Target ingester may not have had any streams to send. | ||
if seriesReceived == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if seriesReceived == 0
check seems unnecssary as you're returning nil regardless.
// when ranges should be unblocked: we should continue to reject writes for as long as we may | ||
// receive them. When the joining token has been completely inserted into the ring, it will | ||
// be safe to remove the blocks. | ||
i.BlockRanges(req.Ranges) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the block occur before copying user states in the unlikely case there's a new series appended during i.userStates.cp()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't happen because the new token isn't in the ring yet, but it's better to be safe here; I'll move the block before the copy.
} | ||
|
||
// UnblockRanges manually removes blocks for the provided ranges. | ||
func (i *Ingester) UnblockRanges(ctx context.Context, in *client.UnblockRangesRequest) (*client.UnblockRangesResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably be _ context.Context
since the arg is unused
3b81d32
to
3522f08
Compare
|
||
// TransferChunksSubset accepts chunks from a client and moves them into the local Ingester. | ||
func (i *Ingester) TransferChunksSubset(stream client.Ingester_TransferChunksSubsetServer) error { | ||
i.userStatesMtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This blocks ingestion https://github.com/cortexproject/cortex/blob/master/pkg/ingester/ingester.go#L329-L335
Can we not take a lock during the entire transfer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that the lock is on the joining ingester so there wouldn't be any data to ingest anyway, right?
return err | ||
} | ||
|
||
userStatesCopy := i.userStates.cp() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there is the race condition that a State
is empty hence removed on userStates.gc()
but we use the State
here and add data to it. That way the data is lost.
@@ -654,3 +760,22 @@ func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { | |||
http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable) | |||
} | |||
} | |||
|
|||
func (i *Ingester) unexpectedStreamsHandler(tokens []uint32) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are taking a function as argument just to log the bad tokens, lets not do it? It makes things harder to read as I need to figure out which unexpectedStreamsHandler
function was passed and what it does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would passing the unexpected series metric to the token checker be acceptable instead? I don't want to remove the logging and metric completely.
pkg/ingester/ingester.go
Outdated
if err != nil { | ||
state = nil // don't want to unlock the fp if there is an error | ||
return err | ||
} | ||
|
||
if sstate == seriesCreated && i.cfg.CheckOnCreate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO(gouthamve): Check if this entire check and metric can be skipped.
pkg/ingester/ingester.go
Outdated
@@ -283,15 +351,15 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. | |||
|
|||
for _, ts := range req.Timeseries { | |||
for _, s := range ts.Samples { | |||
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source) | |||
err := i.append(ctx, userID, ts.Token, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious what will happen if there is a change to the hashing scheme and the tokens change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of this is one of two things will happen:
- The tokens change, sharding to new ingesters. The old ingesters don't receive appends anymore and their data eventually gets flushed as underutilized chunks.
- The tokens change, sharding to the same ingesters (coincidentally). The ingesters update the token in the existing memory series and log a warning.
// is increased for the key. Dead ingesters will be filtered later by | ||
// replication_strategy.go. Filtering later means that we can calculate | ||
// a healthiness quorum. | ||
if !ingester.IsHealthyState(op) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the exact logic we had before. For a Read operation, before we were considering a PENDING
not valid, while now the IsHealthyState()
considers a PENDING
as OK. However, don't change the logic in IsHealthyState()
too easily, cause I've tried to do the same and I've receive a valuable feedback here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I'll change this back to its original logic but add in anther check for ingester.Incremental
, allowing every non-PENDING state to be valid.
This check was also initially in IsHealthyState
but got lost in a rebase. I'll have to add it back in on Monday.
d41e927
to
2df440d
Compare
2711ba9
to
73817fd
Compare
This commit introduces several features to comprise a new "incremental chunk transfer" feature: - pkg/ring: incremental transfer management Introduces managing incremental transfers between ingesters when a lifecycler joins a ring and when it leaves the ring. The implementation of the IncrementalTransferer interface will be done in a future commit. The LifecyclerConfig has been updated with JoinIncrementalTransfer and LeaveIncrementalTransfer, available as join_incremental_transfer and leave_incremental_transfer using the YAML config, and join-incremental-transfer and leave-incremental-transfer using command line flags. When JoinIncrementalTransfer is used, the lifecycler will join the ring immediately. Tokens will be inserted into the ring one by one, first into the JOINING state and then the ACTIVE state after requesting chunks in token ranges they should have data for from neighboring ingesters in the ring. When LeaveIncrementalTransfer is used, the lifecycler will incrementally move tokens in LEAVING state after sending ranges to neighboring ingesters that should now have data. Enabling LeaveIncrementalTransfer will disable the handoff process, and flushing non-transferred data always happens at the end. - pkg/distributor: push shard token to ingesters This modifies the ingesters to be aware of the shard token used by the distributors to send traffic to ingesters. This is a requirement for incremental transfers, where the shard token is used to determine which memory series need to be moved. This assumes that all distributors are using the same sharding mechanism and always use the same token for a specific series. If the memory series is appended to with a different token from the one it was created with, a warning will be logged and the new token will be used. - pkg/ingester: implement IncrementalTransferer interface This implements the IncrementalTransferer interface used by lifecyclers to move memory series around the ring as ingesters join and leave. - pkg/ring: add TokenChecker This introduces a TokenChecker component which runs in the background to support reporting metrics on unexpected tokens pushed to ingesters. It supports checking on an interval, checking when a new stream is pushed, checking when an existing stream is appended to, and checking when a stream is transferred. Signed-off-by: Robert Fratto <[email protected]>
Signed-off-by: Robert Fratto <[email protected]>
Signed-off-by: Robert Fratto <[email protected]>
Signed-off-by: Robert Fratto <[email protected]>
Signed-off-by: Robert Fratto <[email protected]>
Signed-off-by: Robert Fratto <[email protected]>
when rolling out to code using incremental transfers, the warnings messages can be annoying. This commit reduces it to debug-level log lines or logs the warning only if the token checking flag is enabled. Signed-off-by: Robert Fratto <[email protected]>
This was a problem caused by the refactoring, moving the check inside the previous if statement (which only happens once per transfer) is equivalent to what was happening before. Signed-off-by: Robert Fratto <[email protected]>
Signed-off-by: Robert Fratto <[email protected]>
Signed-off-by: Robert Fratto <[email protected]>
55a8a51
to
80249b4
Compare
Signed-off-by: Robert Fratto <[email protected]>
Unfortunately, I think I need to close this PR and take another shot at this. I've rebased it to fix conflicts so many times now that I've lost confidence in its correctness, and I understand that its size has been a pain point for everyone all around. Rather than one giant PR, I'm going to start work on the first of a smaller set of PRs to eventually build up to this feature. Hopefully starting fresh will allow me to reduce the code complexity introduced here, although I still intend to be copying and pasting at least some of the existing code. Huge thank you to everyone who took time tackling this beast, and I hope the second attempt is a smoother experience for everyone 🙂 |
Design document: https://docs.google.com/document/d/1y2TdfEQ9ZKh6CpBVB4o6BYjCr-plNRL9jGD6fJ9bMW0/edit#
This PR introduces two incremental chunk transfer process utilized by the lifecycler to reduce spillover and enable dynamic scaling of ingesters. The incremental transfer process takes precedence over the old handover mechanism.
To migrate a cluster to use incremental transfers, two rollouts must be done:
-ingester.leave-incremental-transfer=true
-ingester.join-incremental-transfer=true
I recognize this is a large PR and I have attempted (to the best of my ability) to split it into smaller, independent commits. It's not perfect, but hopefully the commits I have make it easier to review.
Fixes #1277.
/cc @gouthamve @pstibrany @tomwilkie