Skip to content

Commit 3a61b85

Browse files
authored
Merge branch 'master' into parquet-converter
Signed-off-by: Alan Protasio <[email protected]>
2 parents 9c0dcb1 + 6331a76 commit 3a61b85

29 files changed

+684
-483
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
1111
* [FEATURE] Ruler: Add support for group labels. #6665
1212
* [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716
13+
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
1314
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681
1415
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695
1516
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4354,6 +4354,11 @@ dynamic_query_splits:
43544354
# CLI flag: -querier.max-fetched-data-duration-per-query
43554355
[max_fetched_data_duration_per_query: <duration> | default = 0s]
43564356
4357+
# [EXPERIMENTAL] Dynamically adjust vertical shard size to maximize the total
4358+
# combined number of query shards and splits.
4359+
# CLI flag: -querier.enable-dynamic-vertical-sharding
4360+
[enable_dynamic_vertical_sharding: <boolean> | default = false]
4361+
43574362
# Mutate incoming queries to align their start and end with their step.
43584363
# CLI flag: -querier.align-querier-with-step
43594364
[align_queries_with_step: <boolean> | default = false]
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/cortexproject/cortex/integration/e2e"
12+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
13+
"github.com/cortexproject/cortex/integration/e2ecortex"
14+
)
15+
16+
func Test_ResourceBasedLimiter_shouldStartWithoutError(t *testing.T) {
17+
s, err := e2e.NewScenario(networkName)
18+
require.NoError(t, err)
19+
defer s.Close()
20+
21+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
22+
"-monitored.resources": "cpu,heap",
23+
})
24+
25+
// Start dependencies.
26+
consul := e2edb.NewConsul()
27+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
28+
require.NoError(t, s.StartAndWaitReady(consul, minio))
29+
30+
// Start Cortex components.
31+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
32+
"-ingester.instance-limits.cpu-utilization": "0.8",
33+
"-ingester.instance-limits.heap-utilization": "0.8",
34+
}), "")
35+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
36+
"-store-gateway.instance-limits.cpu-utilization": "0.8",
37+
"-store-gateway.instance-limits.heap-utilization": "0.8",
38+
}), "")
39+
require.NoError(t, s.StartAndWaitReady(ingester, storeGateway))
40+
}

pkg/cortex/modules.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -777,26 +777,26 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) {
777777
}
778778

779779
func (t *Cortex) initResourceMonitor() (services.Service, error) {
780-
if len(t.Cfg.MonitoredResources) == 0 {
780+
if t.Cfg.MonitoredResources.String() == "" || len(t.Cfg.MonitoredResources) == 0 {
781781
return nil, nil
782782
}
783783

784+
util_log.WarnExperimentalUse(fmt.Sprintf("resource monitor for [%s]", t.Cfg.MonitoredResources.String()))
785+
784786
containerLimits := make(map[resource.Type]float64)
785787
for _, res := range t.Cfg.MonitoredResources {
786788
switch resource.Type(res) {
787789
case resource.CPU:
788790
containerLimits[resource.Type(res)] = float64(runtime.GOMAXPROCS(0))
789791
case resource.Heap:
790792
containerLimits[resource.Type(res)] = float64(debug.SetMemoryLimit(-1))
793+
default:
794+
return nil, fmt.Errorf("unknown resource type: %s", res)
791795
}
792796
}
793797

794798
var err error
795799
t.ResourceMonitor, err = resource.NewMonitor(containerLimits, prometheus.DefaultRegisterer)
796-
if t.ResourceMonitor != nil {
797-
util_log.WarnExperimentalUse("resource monitor")
798-
}
799-
800800
return t.ResourceMonitor, err
801801
}
802802

@@ -805,7 +805,7 @@ func (t *Cortex) setupModuleManager() error {
805805

806806
// Register all modules here.
807807
// RegisterModule(name string, initFn func()(services.Service, error))
808-
mm.RegisterModule(ResourceMonitor, t.initResourceMonitor)
808+
mm.RegisterModule(ResourceMonitor, t.initResourceMonitor, modules.UserInvisibleModule)
809809
mm.RegisterModule(Server, t.initServer, modules.UserInvisibleModule)
810810
mm.RegisterModule(API, t.initAPI, modules.UserInvisibleModule)
811811
mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule)

pkg/cortex/modules_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,3 +232,16 @@ func Test_setupModuleManager(t *testing.T) {
232232
}
233233
}
234234
}
235+
236+
func Test_initResourceMonitor_shouldFailOnInvalidResource(t *testing.T) {
237+
cortex := &Cortex{
238+
Server: &server.Server{},
239+
Cfg: Config{
240+
MonitoredResources: []string{"invalid"},
241+
},
242+
}
243+
244+
// log warning message and spin up other cortex services
245+
_, err := cortex.initResourceMonitor()
246+
require.ErrorContains(t, err, "unknown resource type")
247+
}

pkg/cortexpb/signature.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package cortexpb
2+
3+
import (
4+
"github.com/prometheus/common/model"
5+
"github.com/prometheus/prometheus/model/labels"
6+
)
7+
8+
// Inline and byte-free variant of hash/fnv's fnv64a.
9+
// Ref: https://github.com/prometheus/common/blob/main/model/fnv.go
10+
11+
func LabelsToFingerprint(lset labels.Labels) model.Fingerprint {
12+
if len(lset) == 0 {
13+
return model.Fingerprint(hashNew())
14+
}
15+
16+
sum := hashNew()
17+
lset.Range(func(l labels.Label) {
18+
sum = hashAdd(sum, string(l.Name))
19+
sum = hashAddByte(sum, model.SeparatorByte)
20+
sum = hashAdd(sum, string(l.Value))
21+
sum = hashAddByte(sum, model.SeparatorByte)
22+
})
23+
return model.Fingerprint(sum)
24+
}
25+
26+
const (
27+
offset64 = 14695981039346656037
28+
prime64 = 1099511628211
29+
)
30+
31+
// hashNew initializes a new fnv64a hash value.
32+
func hashNew() uint64 {
33+
return offset64
34+
}
35+
36+
// hashAdd adds a string to a fnv64a hash value, returning the updated hash.
37+
func hashAdd(h uint64, s string) uint64 {
38+
for i := 0; i < len(s); i++ {
39+
h ^= uint64(s[i])
40+
h *= prime64
41+
}
42+
return h
43+
}
44+
45+
// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash.
46+
func hashAddByte(h uint64, b byte) uint64 {
47+
h ^= uint64(b)
48+
h *= prime64
49+
return h
50+
}

pkg/distributor/distributor.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,8 +1367,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint
13671367
}
13681368

13691369
// MetricsForLabelMatchers gets the metrics that match said matchers
1370-
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]model.Metric, error) {
1371-
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1370+
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]labels.Labels, error) {
1371+
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]labels.Labels, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
13721372
_, err := d.ForReplicationSet(ctx, rs, false, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13731373
resp, err := client.MetricsForLabelMatchers(ctx, req)
13741374
if err != nil {
@@ -1380,8 +1380,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
13801380
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
13811381
for _, m := range resp.Metric {
13821382
s = append(s, m.Labels)
1383-
m := cortexpb.FromLabelAdaptersToMetric(m.Labels)
1384-
fingerprint := m.Fingerprint()
1383+
m := cortexpb.FromLabelAdaptersToLabels(m.Labels)
1384+
fingerprint := cortexpb.LabelsToFingerprint(m)
13851385
mutex.Lock()
13861386
(*metrics)[fingerprint] = m
13871387
mutex.Unlock()
@@ -1396,8 +1396,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
13961396
}, matchers...)
13971397
}
13981398

1399-
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]model.Metric, error) {
1400-
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1399+
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]labels.Labels, error) {
1400+
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]labels.Labels, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
14011401
_, err := d.ForReplicationSet(ctx, rs, false, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
14021402
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
14031403
if err != nil {
@@ -1417,9 +1417,9 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
14171417

14181418
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
14191419
for _, metric := range resp.Metric {
1420-
m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels)
1420+
m := cortexpb.FromLabelAdaptersToLabels(metric.Labels)
14211421
s = append(s, metric.Labels)
1422-
fingerprint := m.Fingerprint()
1422+
fingerprint := cortexpb.LabelsToFingerprint(m)
14231423
mutex.Lock()
14241424
(*metrics)[fingerprint] = m
14251425
mutex.Unlock()
@@ -1436,7 +1436,7 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
14361436
}, matchers...)
14371437
}
14381438

1439-
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, hints *storage.SelectHints, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
1439+
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, hints *storage.SelectHints, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]labels.Labels, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]labels.Labels, error) {
14401440
replicationSet, err := d.GetIngestersForMetadata(ctx)
14411441
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
14421442
if err != nil {
@@ -1448,7 +1448,7 @@ func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, t
14481448
return nil, err
14491449
}
14501450
mutex := sync.Mutex{}
1451-
metrics := map[model.Fingerprint]model.Metric{}
1451+
metrics := map[model.Fingerprint]labels.Labels{}
14521452

14531453
err = f(ctx, replicationSet, req, &metrics, &mutex, queryLimiter)
14541454

@@ -1457,7 +1457,7 @@ func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, t
14571457
}
14581458

14591459
mutex.Lock()
1460-
result := make([]model.Metric, 0, len(metrics))
1460+
result := make([]labels.Labels, 0, len(metrics))
14611461
for _, m := range metrics {
14621462
result = append(result, m)
14631463
}

pkg/distributor/distributor_test.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
502502
# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
503503
# TYPE cortex_distributor_exemplars_in_total counter
504504
cortex_distributor_exemplars_in_total{user="userA"} 5
505-
505+
506506
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
507507
# TYPE cortex_distributor_ingester_append_failures_total counter
508508
cortex_distributor_ingester_append_failures_total{ingester="ingester-0",status="2xx",type="metadata"} 1
@@ -2459,7 +2459,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
24592459
shuffleShardEnabled bool
24602460
shuffleShardSize int
24612461
matchers []*labels.Matcher
2462-
expectedResult []model.Metric
2462+
expectedResult []labels.Labels
24632463
expectedIngesters int
24642464
queryLimiter *limiter.QueryLimiter
24652465
expectedErr error
@@ -2468,7 +2468,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
24682468
matchers: []*labels.Matcher{
24692469
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "unknown"),
24702470
},
2471-
expectedResult: []model.Metric{},
2471+
expectedResult: []labels.Labels{},
24722472
expectedIngesters: numIngesters,
24732473
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
24742474
expectedErr: nil,
@@ -2477,9 +2477,9 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
24772477
matchers: []*labels.Matcher{
24782478
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
24792479
},
2480-
expectedResult: []model.Metric{
2481-
util.LabelsToMetric(fixtures[0].lbls),
2482-
util.LabelsToMetric(fixtures[1].lbls),
2480+
expectedResult: []labels.Labels{
2481+
fixtures[0].lbls,
2482+
fixtures[1].lbls,
24832483
},
24842484
expectedIngesters: numIngesters,
24852485
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
@@ -2490,8 +2490,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
24902490
mustNewMatcher(labels.MatchEqual, "status", "200"),
24912491
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
24922492
},
2493-
expectedResult: []model.Metric{
2494-
util.LabelsToMetric(fixtures[0].lbls),
2493+
expectedResult: []labels.Labels{
2494+
fixtures[0].lbls,
24952495
},
24962496
expectedIngesters: numIngesters,
24972497
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
@@ -2501,9 +2501,9 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
25012501
matchers: []*labels.Matcher{
25022502
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "fast_fingerprint_collision"),
25032503
},
2504-
expectedResult: []model.Metric{
2505-
util.LabelsToMetric(fixtures[3].lbls),
2506-
util.LabelsToMetric(fixtures[4].lbls),
2504+
expectedResult: []labels.Labels{
2505+
fixtures[3].lbls,
2506+
fixtures[4].lbls,
25072507
},
25082508
expectedIngesters: numIngesters,
25092509
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
@@ -2515,9 +2515,9 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
25152515
matchers: []*labels.Matcher{
25162516
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
25172517
},
2518-
expectedResult: []model.Metric{
2519-
util.LabelsToMetric(fixtures[0].lbls),
2520-
util.LabelsToMetric(fixtures[1].lbls),
2518+
expectedResult: []labels.Labels{
2519+
fixtures[0].lbls,
2520+
fixtures[1].lbls,
25212521
},
25222522
expectedIngesters: 3,
25232523
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
@@ -2529,9 +2529,9 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
25292529
matchers: []*labels.Matcher{
25302530
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
25312531
},
2532-
expectedResult: []model.Metric{
2533-
util.LabelsToMetric(fixtures[0].lbls),
2534-
util.LabelsToMetric(fixtures[1].lbls),
2532+
expectedResult: []labels.Labels{
2533+
fixtures[0].lbls,
2534+
fixtures[1].lbls,
25352535
},
25362536
expectedIngesters: numIngesters,
25372537
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
@@ -2563,8 +2563,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
25632563
matchers: []*labels.Matcher{
25642564
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2"),
25652565
},
2566-
expectedResult: []model.Metric{
2567-
util.LabelsToMetric(fixtures[2].lbls),
2566+
expectedResult: []labels.Labels{
2567+
fixtures[2].lbls,
25682568
},
25692569
expectedIngesters: numIngesters,
25702570
queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0),

pkg/frontend/transport/handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
219219

220220
tenantIDs, err := tenant.TenantIDs(r.Context())
221221
if err != nil {
222+
http.Error(w, err.Error(), http.StatusBadRequest)
222223
return
223224
}
224225

pkg/frontend/transport/handler_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,46 @@ func TestReportQueryStatsFormat(t *testing.T) {
553553
}
554554
}
555555

556+
func Test_ExtractTenantIDs(t *testing.T) {
557+
roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
558+
return &http.Response{
559+
StatusCode: http.StatusOK,
560+
Body: io.NopCloser(strings.NewReader("{}")),
561+
}, nil
562+
})
563+
564+
tests := []struct {
565+
name string
566+
orgId string
567+
expectedStatusCode int
568+
}{
569+
{
570+
name: "invalid tenantID",
571+
orgId: "aaa\\/",
572+
expectedStatusCode: http.StatusBadRequest,
573+
},
574+
{
575+
name: "valid tenantID",
576+
orgId: "user-1",
577+
expectedStatusCode: http.StatusOK,
578+
},
579+
}
580+
581+
for _, test := range tests {
582+
t.Run(test.name, func(t *testing.T) {
583+
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, roundTripper, log.NewNopLogger(), nil)
584+
handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler)
585+
586+
req := httptest.NewRequest("GET", "http://fake", nil)
587+
req.Header.Set("X-Scope-OrgId", test.orgId)
588+
resp := httptest.NewRecorder()
589+
590+
handlerWithAuth.ServeHTTP(resp, req)
591+
require.Equal(t, test.expectedStatusCode, resp.Code)
592+
})
593+
}
594+
}
595+
556596
func Test_TenantFederation_MaxTenant(t *testing.T) {
557597
// set a multi tenant resolver
558598
tenant.WithDefaultResolver(tenant.NewMultiResolver())

0 commit comments

Comments
 (0)