Skip to content

Remove remaining support for denormalised tokens in the ring. #2034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 28, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ instructions below to upgrade your Postgres.
* [CHANGE] Overrides mechanism has been renamed to "runtime config", and is now separate from limits. Runtime config is simply a file that is reloaded by Cortex every couple of seconds. Limits and now also multi KV use this mechanism.<br />New arguments were introduced: `-runtime-config.file` (defaults to empty) and `-runtime-config.reload-period` (defaults to 10 seconds), which replace previously used `-limits.per-user-override-config` and `-limits.per-user-override-period` options. Old options are still used if `-runtime-config.file` is not specified. This change is also reflected in YAML configuration, where old `limits.per_tenant_override_config` and `limits.per_tenant_override_period` fields are replaced with `runtime_config.file` and `runtime_config.period` respectively. #1749
* [CHANGE] Cortex now rejects data with duplicate labels. Previously, such data was accepted, with duplicate labels removed with only one value left. #1964
* [CHANGE] Changed the default value for `-distributor.ha-tracker.prefix` from `collectors/` to `ha-tracker/` in order to not clash with other keys (ie. ring) stored in the same key-value store. #1940
* [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034
* [FEATURE] The distributor can now drop labels from samples (similar to the removal of the replica label for HA ingestion) per user via the `distributor.drop-label` flag. #1726
* [FEATURE] Added flag `debug.mutex-profile-fraction` to enable mutex profiling #1969
* [FEATURE] Added `global` ingestion rate limiter strategy. Deprecated `-distributor.limiter-reload-period` flag. #1766
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ It also talks to a KVStore and has it's own copies of the same flags used by the

Deprecated. New ingesters always write "normalised" tokens to the ring. Normalised tokens consume less memory to encode and decode; as the ring is unmarshalled regularly, this significantly reduces memory usage of anything that watches the ring.

Cortex 0.4.0 is the last version that can *write* denormalised tokens. Cortex 0.5.0 and later will always *write* normalised tokens, although it can still *read* denormalised tokens written by older ingesters.
Cortex 0.4.0 is the last version that can *write* denormalised tokens. Cortex 0.5.0 and above always write normalised tokens.

It's perfectly OK to have a mix of ingesters running denormalised (<= 0.4.0) and normalised tokens (either by using `-ingester.normalise-tokens` in Cortex <= 0.4.0, or Cortex 0.5.0+) during upgrades.
Cortex 0.6.0 is the last version that can *read* denormalised tokens. Starting with Cortex 0.7.0 only normalised tokens are supported, and ingesters writing denormalised tokens to the ring (running Cortex 0.4.0 or earlier with `-ingester.normalise-tokens=false`) are ignored by distributors. Such ingesters should either switch to using normalised tokens, or be upgraded to Cortex 0.5.0 or later.

- `-ingester.chunk-encoding`

Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
sort.Strings(ingesterIDs)

ingesters := []interface{}{}
tokens, owned := countTokens(r.ringDesc)
tokens, owned := countTokens(r.ringDesc, r.ringTokens)
for _, id := range ingesterIDs {
ing := r.ringDesc.Ingesters[id]
timestamp := time.Unix(ing.Timestamp, 0)
Expand Down
103 changes: 7 additions & 96 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,92 +43,12 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
return lifecyclerConfig
}

func checkDenormalisedLeaving(d interface{}, id string) bool {
desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters[id].State == LEAVING &&
len(desc.Ingesters[id].Tokens) == 0 &&
len(desc.Tokens) == 1
}

func checkNormalised(d interface{}, id string) bool {
desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters[id].State == ACTIVE &&
len(desc.Ingesters[id].Tokens) == 1 &&
len(desc.Tokens) == 0
}

func TestRingNormaliseMigration(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec())

r, err := New(ringConfig, "ingester", IngesterRingKey)
require.NoError(t, err)
defer r.Stop()

// Add an 'ingester' with denormalised tokens.
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")

// Since code to insert ingester with denormalised tokens into ring was removed,
// instead of running lifecycler, we do it manually here.
token := uint32(0)
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
require.Nil(t, in)
r := NewDesc()
tks := GenerateTokens(lifecyclerConfig1.NumTokens, nil)
r.Ingesters[lifecyclerConfig1.ID] = IngesterDesc{
Addr: lifecyclerConfig1.Addr,
Timestamp: time.Now().Unix(),
State: LEAVING, // expected by second ingester`
}
for _, t := range tks {
r.Tokens = append(r.Tokens, TokenDesc{
Token: t,
Ingester: lifecyclerConfig1.ID,
})
}
token = tks[0]
return r, true, nil
})
require.NoError(t, err)

// Check this ingester joined, is active, and has one token.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
require.NoError(t, err)
return checkDenormalisedLeaving(d, "ing1")
})

// Add a second ingester with normalised tokens.
var lifecyclerConfig2 = testLifecyclerConfig(ringConfig, "ing2")
lifecyclerConfig2.JoinAfter = 100 * time.Second

l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey, true)
require.NoError(t, err)
l2.Start()

// Since there is nothing that would make l2 to claim tokens from l1 (normally done on transfer)
// we do it manually.
require.NoError(t, l2.ClaimTokensFor(context.Background(), "ing1"))
require.NoError(t, l2.ChangeState(context.Background(), ACTIVE))

// Check the new ingester joined, has the same token, and is active.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
require.NoError(t, err)

if desc, ok := d.(*Desc); ok {
// lifecycler for ingester 1 isn't running, so we need to delete it manually
// (to make checkNormalised happy)
delete(desc.Ingesters, lifecyclerConfig1.ID)
}
return checkNormalised(d, "ing2") &&
d.(*Desc).Ingesters["ing2"].Tokens[0] == token
})
len(desc.Ingesters[id].Tokens) == 1
}

func TestLifecycler_HealthyInstancesCount(t *testing.T) {
Expand Down Expand Up @@ -381,8 +301,7 @@ func TestTokensOnDisk(t *testing.T) {
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == 512 &&
len(desc.Tokens) == 0
len(desc.Ingesters["ing1"].Tokens) == 512
})

l1.Shutdown()
Expand All @@ -406,8 +325,7 @@ func TestTokensOnDisk(t *testing.T) {
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == ACTIVE &&
len(desc.Ingesters["ing2"].Tokens) == 512 &&
len(desc.Tokens) == 0
len(desc.Ingesters["ing2"].Tokens) == 512
})

// Check for same tokens.
Expand Down Expand Up @@ -441,15 +359,8 @@ func TestJoinInLeavingState(t *testing.T) {
State: LEAVING,
Tokens: []uint32{1, 4},
},
},
Tokens: []TokenDesc{
{
Ingester: "ing2",
Token: 2,
},
{
Ingester: "ing2",
Token: 3,
"ing2": {
Tokens: []uint32{2, 3},
},
},
}
Expand All @@ -468,9 +379,9 @@ func TestJoinInLeavingState(t *testing.T) {
require.NoError(t, err)
desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
len(desc.Ingesters) == 2 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == cfg.NumTokens &&
len(desc.Tokens) == 2
len(desc.Ingesters["ing2"].Tokens) == 2
})
}
Loading