Skip to content

Fixed per-series sharding token generation in the distributor #1888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,26 +285,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// Validates a single series from a write request. Will remove labels if
// any are configured to be dropped for the user ID.
// Returns the validated series with it's labels/samples, and any error.
func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTimeseries, userID string, removeReplica bool) (client.PreallocTimeseries, error) {
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
if removeReplica {
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
}

for _, labelName := range d.limits.DropLabels(userID) {
removeLabel(labelName, &ts.Labels)
}
if len(ts.Labels) == 0 {
return emptyPreallocSeries, nil
}

key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return emptyPreallocSeries, err
}

func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, userID string) (client.PreallocTimeseries, error) {
labelsHistogram.Observe(float64(len(ts.Labels)))
if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil {
return emptyPreallocSeries, err
Expand Down Expand Up @@ -367,12 +348,30 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
keys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
for _, ts := range req.Timeseries {
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
if removeReplica {
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
}

for _, labelName := range d.limits.DropLabels(userID) {
removeLabel(labelName, &ts.Labels)
}

if len(ts.Labels) == 0 {
continue
}

// Generate the sharding token based on the series labels without the HA replica
// label and dropped labels (if any)
key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return nil, err
}

validatedSeries, err := d.validateSeries(key, ts, userID, removeReplica)
validatedSeries, err := d.validateSeries(ts, userID)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
if err != nil {
Expand Down
232 changes: 161 additions & 71 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"net/http"
"reflect"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -100,7 +99,7 @@ func TestDistributorPush(t *testing.T) {
} {
for _, shardByAllLabels := range []bool{true, false} {
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v)", i, shardByAllLabels), func(t *testing.T) {
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, nil)
d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, shardByAllLabels, nil)
defer d.Stop()

request := makeWriteRequest(tc.samples)
Expand Down Expand Up @@ -157,7 +156,7 @@ func TestDistributorPushHAInstances(t *testing.T) {
flagext.DefaultValues(&limits)
limits.AcceptHASamples = true

d := prepare(t, 1, 1, 0, shardByAllLabels, &limits)
d, _ := prepare(t, 1, 1, 0, shardByAllLabels, &limits)
codec := codec.Proto{Factory: ProtoReplicaDescFactory}
mock := kv.PrefixClient(consul.NewInMemoryClient(codec), "prefix")

Expand Down Expand Up @@ -279,7 +278,7 @@ func TestDistributorPushQuery(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
d := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil)
d, _ := prepare(t, tc.numIngesters, tc.happyIngesters, 0, tc.shardByAllLabels, nil)
defer d.Stop()

request := makeWriteRequest(tc.samples)
Expand All @@ -302,78 +301,54 @@ func TestDistributorPushQuery(t *testing.T) {
}
}

func TestDistributorValidateSeriesLabelRemoval(t *testing.T) {
func TestDistributor_Push_LabelRemoval(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

type testcase struct {
series client.PreallocTimeseries
outputSeries client.PreallocTimeseries
removeReplica bool
removeLabels []string
inputSeries labels.Labels
expectedSeries labels.Labels
removeReplica bool
removeLabels []string
}

cases := []testcase{
{ // Remove both cluster and replica label.
series: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"}},
},
inputSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
outputSeries: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
},
Samples: []client.Sample{},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
},
removeReplica: true,
removeLabels: []string{"cluster"},
},
{ // Remove multiple labels and replica.
series: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
{Name: "foo", Value: "bar"},
{Name: "some", Value: "thing"}},
},
inputSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
{Name: "foo", Value: "bar"},
{Name: "some", Value: "thing"},
},
outputSeries: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
},
Samples: []client.Sample{},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
},
removeReplica: true,
removeLabels: []string{"foo", "some"},
},
{ // Don't remove any labels.
series: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"}},
},
inputSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
outputSeries: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
Samples: []client.Sample{},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
removeReplica: false,
},
Expand All @@ -384,20 +359,135 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) {
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.DropLabels = tc.removeLabels
d := prepare(t, 1, 1, 0, true, &limits)
limits.AcceptHASamples = tc.removeReplica

userID, err := user.ExtractOrgID(ctx)
assert.NoError(t, err)
d, ingesters := prepare(t, 1, 1, 0, true, &limits)
defer d.Stop()

key, err := d.tokenForLabels(userID, tc.series.Labels)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
_, err = d.Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
assert.Equal(t, 1, len(ingesters))
actualSeries := []*client.PreallocTimeseries{}

series, err := d.validateSeries(key, tc.series, userID, tc.removeReplica)
if !reflect.DeepEqual(series, tc.outputSeries) {
t.Fatalf("output of validate series did not match expected output:\n\texpected: %+v\n\t got: %+v", tc.outputSeries, series)
for _, ts := range ingesters[0].timeseries {
actualSeries = append(actualSeries, ts)
}

assert.Equal(t, 1, len(actualSeries))
assert.Equal(t, tc.expectedSeries, client.FromLabelAdaptersToLabels(actualSeries[0].Labels))
}
}

func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *testing.T) {
tests := map[string]struct {
inputSeries labels.Labels
expectedSeries labels.Labels
expectedToken uint32
}{
"metric_1 with value_1": {
inputSeries: labels.Labels{
{Name: "__name__", Value: "metric_1"},
{Name: "key", Value: "value_1"},
{Name: "cluster", Value: "cluster_1"},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "metric_1"},
{Name: "key", Value: "value_1"},
{Name: "cluster", Value: "cluster_1"},
},
expectedToken: 0x58b1e325,
},
"metric_1 with value_1 and dropped label due to config": {
inputSeries: labels.Labels{
{Name: "__name__", Value: "metric_1"},
{Name: "key", Value: "value_1"},
{Name: "cluster", Value: "cluster_1"},
{Name: "dropped", Value: "unused"},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "metric_1"},
{Name: "key", Value: "value_1"},
{Name: "cluster", Value: "cluster_1"},
},
expectedToken: 0x58b1e325,
},
"metric_1 with value_1 and dropped HA replica label": {
inputSeries: labels.Labels{
{Name: "__name__", Value: "metric_1"},
{Name: "key", Value: "value_1"},
{Name: "cluster", Value: "cluster_1"},
{Name: "__replica__", Value: "replica_1"},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "metric_1"},
{Name: "key", Value: "value_1"},
{Name: "cluster", Value: "cluster_1"},
},
expectedToken: 0x58b1e325,
},
"metric_2 with value_1": {
inputSeries: labels.Labels{
{Name: "__name__", Value: "metric_2"},
{Name: "key", Value: "value_1"},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "metric_2"},
{Name: "key", Value: "value_1"},
},
expectedToken: 0xa60906f2,
},
"metric_1 with value_2": {
inputSeries: labels.Labels{
{Name: "__name__", Value: "metric_1"},
{Name: "key", Value: "value_2"},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "metric_1"},
{Name: "key", Value: "value_2"},
},
expectedToken: 0x18abc8a2,
},
}

var limits validation.Limits
flagext.DefaultValues(&limits)
limits.DropLabels = []string{"dropped"}
limits.AcceptHASamples = true

ctx = user.InjectOrgID(context.Background(), "user")

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
d, ingesters := prepare(t, 1, 1, 0, true, &limits)
defer d.Stop()

// Push the series to the distributor
req := mockWriteRequest(testData.inputSeries, 1, 1)
_, err := d.Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
require.Equal(t, 1, len(ingesters))
require.Equal(t, 1, len(ingesters[0].timeseries))

var actualSeries *client.PreallocTimeseries
var actualToken uint32

for token, ts := range ingesters[0].timeseries {
actualSeries = ts
actualToken = token
}

// Ensure the series and the sharding token is the expected one
assert.Equal(t, testData.expectedSeries, client.FromLabelAdaptersToLabels(actualSeries.Labels))
assert.Equal(t, testData.expectedToken, actualToken)
})
}
}

Expand All @@ -410,7 +500,7 @@ func TestSlowQueries(t *testing.T) {
if nIngesters-happy > 1 {
expectedErr = promql.ErrStorage{Err: errFail}
}
d := prepare(t, nIngesters, happy, 100*time.Millisecond, shardByAllLabels, nil)
d, _ := prepare(t, nIngesters, happy, 100*time.Millisecond, shardByAllLabels, nil)
defer d.Stop()

_, err := d.Query(ctx, 0, 10, nameMatcher)
Expand Down Expand Up @@ -476,7 +566,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
}

// Create distributor
d := prepare(t, 3, 3, time.Duration(0), true, nil)
d, _ := prepare(t, 3, 3, time.Duration(0), true, nil)
defer d.Stop()

// Push fixtures
Expand Down Expand Up @@ -520,7 +610,7 @@ func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *cli
return client.ToWriteRequest([]labels.Labels{lbls}, samples, client.API)
}

func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool, limits *validation.Limits) *Distributor {
func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Duration, shardByAllLabels bool, limits *validation.Limits) (*Distributor, []mockIngester) {
ingesters := []mockIngester{}
for i := 0; i < happyIngesters; i++ {
ingesters = append(ingesters, mockIngester{
Expand Down Expand Up @@ -577,7 +667,7 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur
d, err := New(cfg, clientConfig, overrides, ring)
require.NoError(t, err)

return d
return d, ingesters
}

func makeWriteRequest(samples int) *client.WriteRequest {
Expand Down Expand Up @@ -945,7 +1035,7 @@ func TestDistributorValidation(t *testing.T) {
limits.RejectOldSamplesMaxAge = 24 * time.Hour
limits.MaxLabelNamesPerSeries = 2

d := prepare(t, 3, 3, 0, true, &limits)
d, _ := prepare(t, 3, 3, 0, true, &limits)
defer d.Stop()

_, err := d.Push(ctx, client.ToWriteRequest(tc.labels, tc.samples, client.API))
Expand Down