Skip to content

Commit f813228

Browse files
[coordinator] Set default namespace tag to avoid colliding with commonly used "namespace" label (#2878)
* [coordinator] Set default namespace tag to avoid colliding with common "namespace" default value * Use defined constant * Add downsampler test case to demonstrate override namespace tag Co-authored-by: Wesley Kim <[email protected]>
1 parent 41ea054 commit f813228

File tree

2 files changed

+94
-2
lines changed

2 files changed

+94
-2
lines changed

src/cmd/services/m3coordinator/downsample/downsampler_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,6 +1274,87 @@ func TestDownsamplerAggregationWithRemoteAggregatorClient(t *testing.T) {
12741274
testDownsamplerRemoteAggregation(t, testDownsampler)
12751275
}
12761276

1277+
func TestDownsamplerWithOverrideNamespace(t *testing.T) {
1278+
overrideNamespaceTag := "override_namespace_tag"
1279+
1280+
gaugeMetric := testGaugeMetric{
1281+
tags: map[string]string{
1282+
nameTag: "http_requests",
1283+
"app": "nginx_edge",
1284+
"status_code": "500",
1285+
"endpoint": "/foo/bar",
1286+
"not_rolled_up": "not_rolled_up_value",
1287+
// Set namespace tags on ingested metrics.
1288+
// The test demonstrates that overrideNamespaceTag is respected, meaning setting
1289+
// values on defaultNamespaceTag won't affect aggregation.
1290+
defaultNamespaceTag: "namespace_ignored",
1291+
},
1292+
timedSamples: []testGaugeMetricTimedSample{
1293+
{value: 42},
1294+
{value: 64, offset: 5 * time.Second},
1295+
},
1296+
}
1297+
res := 5 * time.Second
1298+
ret := 30 * 24 * time.Hour
1299+
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
1300+
rulesConfig: &RulesConfiguration{
1301+
RollupRules: []RollupRuleConfiguration{
1302+
{
1303+
Filter: fmt.Sprintf(
1304+
"%s:http_requests app:* status_code:* endpoint:*",
1305+
nameTag),
1306+
Transforms: []TransformConfiguration{
1307+
{
1308+
Transform: &TransformOperationConfiguration{
1309+
Type: transformation.PerSecond,
1310+
},
1311+
},
1312+
{
1313+
Rollup: &RollupOperationConfiguration{
1314+
MetricName: "http_requests_by_status_code",
1315+
GroupBy: []string{"app", "status_code", "endpoint"},
1316+
Aggregations: []aggregation.Type{aggregation.Sum},
1317+
},
1318+
},
1319+
},
1320+
StoragePolicies: []StoragePolicyConfiguration{
1321+
{
1322+
Resolution: res,
1323+
Retention: ret,
1324+
},
1325+
},
1326+
},
1327+
},
1328+
},
1329+
matcherConfig: MatcherConfiguration{NamespaceTag: overrideNamespaceTag},
1330+
ingest: &testDownsamplerOptionsIngest{
1331+
gaugeMetrics: []testGaugeMetric{gaugeMetric},
1332+
},
1333+
expect: &testDownsamplerOptionsExpect{
1334+
writes: []testExpectedWrite{
1335+
{
1336+
tags: map[string]string{
1337+
nameTag: "http_requests_by_status_code",
1338+
string(rollupTagName): string(rollupTagValue),
1339+
"app": "nginx_edge",
1340+
"status_code": "500",
1341+
"endpoint": "/foo/bar",
1342+
},
1343+
values: []expectedValue{{value: 4.4}},
1344+
attributes: &storagemetadata.Attributes{
1345+
MetricsType: storagemetadata.AggregatedMetricsType,
1346+
Resolution: res,
1347+
Retention: ret,
1348+
},
1349+
},
1350+
},
1351+
},
1352+
})
1353+
1354+
// Test expected output
1355+
testDownsamplerAggregation(t, testDownsampler)
1356+
}
1357+
12771358
func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas {
12781359
ds, ok := testDownsampler.downsampler.(*downsampler)
12791360
require.True(t, ok)
@@ -1751,6 +1832,7 @@ type testDownsamplerOptions struct {
17511832
sampleAppenderOpts *SampleAppenderOptions
17521833
remoteClientMock *client.MockClient
17531834
rulesConfig *RulesConfiguration
1835+
matcherConfig MatcherConfiguration
17541836

17551837
// Test ingest and expectations overrides
17561838
ingest *testDownsamplerOptionsIngest
@@ -1821,6 +1903,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
18211903
if opts.rulesConfig != nil {
18221904
cfg.Rules = opts.rulesConfig
18231905
}
1906+
cfg.Matcher = opts.matcherConfig
18241907

18251908
instance, err := cfg.NewDownsampler(DownsamplerOptions{
18261909
Storage: storage,

src/cmd/services/m3coordinator/downsample/options.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ const (
8686
)
8787

8888
var (
89-
numShards = runtime.NumCPU()
89+
numShards = runtime.NumCPU()
90+
defaultNamespaceTag = metric.M3MetricsPrefixString + "_namespace__"
9091

9192
errNoStorage = errors.New("downsampling enabled with storage not set")
9293
errNoClusterClient = errors.New("downsampling enabled with cluster client not set")
@@ -267,6 +268,9 @@ type Configuration struct {
267268
type MatcherConfiguration struct {
268269
// Cache if non-zero will set the capacity of the rules matching cache.
269270
Cache MatcherCacheConfiguration `yaml:"cache"`
271+
// NamespaceTag defines the namespace tag to use to select rules
272+
// namespace to evaluate against. Default is "__m3_namespace__".
273+
NamespaceTag string `yaml:"namespaceTag"`
270274
}
271275

272276
// MatcherCacheConfiguration is the configuration for the rule matcher cache.
@@ -647,13 +651,17 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
647651
logger = instrumentOpts.Logger()
648652
openTimeout = defaultOpenTimeout
649653
m3PrefixFilter = false
654+
namespaceTag = defaultNamespaceTag
650655
)
651656
if o.StorageFlushConcurrency > 0 {
652657
storageFlushConcurrency = o.StorageFlushConcurrency
653658
}
654659
if o.OpenTimeout > 0 {
655660
openTimeout = o.OpenTimeout
656661
}
662+
if cfg.Matcher.NamespaceTag != "" {
663+
namespaceTag = cfg.Matcher.NamespaceTag
664+
}
657665

658666
pools := o.newAggregatorPools()
659667
ruleSetOpts := o.newAggregatorRulesOptions(pools)
@@ -662,7 +670,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
662670
SetClockOptions(clockOpts).
663671
SetInstrumentOptions(instrumentOpts).
664672
SetRuleSetOptions(ruleSetOpts).
665-
SetKVStore(o.RulesKVStore)
673+
SetKVStore(o.RulesKVStore).
674+
SetNamespaceTag([]byte(namespaceTag))
666675

667676
// NB(r): If rules are being explicitly set in config then we are
668677
// going to use an in memory KV store for rules and explicitly set them up.

0 commit comments

Comments
 (0)