Skip to content

Commit e6da572

Browse files
committed
Added per labelset support
Signed-off-by: Essam Eldaly <[email protected]>
1 parent ba079d6 commit e6da572

File tree

4 files changed

+168
-104
lines changed

4 files changed

+168
-104
lines changed

pkg/ingester/ingester.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,35 +1251,35 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12511251
}
12521252
}
12531253

1254-
handleAppendFailure = func(err error, timestampMs int64, lbls []cortexpb.LabelAdapter, copiedLabels labels.Labels, matchedLabelSetLimits []validation.LimitsPerLabelSet, seriesHash uint64) (rollback bool) {
1254+
handleAppendFailure = func(err error, timestampMs int64, lbls []cortexpb.LabelAdapter, copiedLabels labels.Labels, matchedLabelSetLimits []validation.LimitsPerLabelSet) (rollback bool) {
12551255
// Check if the error is a soft error we can proceed on. If so, we keep track
12561256
// of it, so that we can return it back to the distributor, which will return a
12571257
// 400 error to the client. The client (Prometheus) will not retry on 400, and
12581258
// we actually ingested all samples which haven't failed.
12591259
switch cause := errors.Cause(err); {
12601260
case errors.Is(cause, storage.ErrOutOfBounds):
12611261
sampleOutOfBoundsCount++
1262-
i.validateMetrics.DiscardedSeriesTracker.Track("sample_out_of_bounds", userID, seriesHash)
1262+
i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, &copiedLabels)
12631263
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12641264

12651265
case errors.Is(cause, storage.ErrOutOfOrderSample):
12661266
sampleOutOfOrderCount++
1267-
i.validateMetrics.DiscardedSeriesTracker.Track("sample_out_of_order", userID, seriesHash)
1267+
i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, &copiedLabels)
12681268
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12691269

12701270
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
12711271
newValueForTimestampCount++
1272-
i.validateMetrics.DiscardedSeriesTracker.Track("new_value_for_timestamp", userID, seriesHash)
1272+
i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, &copiedLabels)
12731273
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12741274

12751275
case errors.Is(cause, storage.ErrTooOldSample):
12761276
sampleTooOldCount++
1277-
i.validateMetrics.DiscardedSeriesTracker.Track("sample_too_old", userID, seriesHash)
1277+
i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, &copiedLabels)
12781278
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12791279

12801280
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
12811281
perUserSeriesLimitCount++
1282-
i.validateMetrics.DiscardedSeriesTracker.Track("per_user_series_limit", userID, seriesHash)
1282+
i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, &copiedLabels)
12831283
updateFirstPartial(func() error {
12841284
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
12851285
})
@@ -1292,14 +1292,14 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12921292

12931293
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
12941294
perMetricSeriesLimitCount++
1295-
i.validateMetrics.DiscardedSeriesTracker.Track("per_metric_series_limit", userID, seriesHash)
1295+
i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, &copiedLabels)
12961296
updateFirstPartial(func() error {
12971297
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels))
12981298
})
12991299

13001300
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
13011301
perLabelSetSeriesLimitCount++
1302-
i.validateMetrics.DiscardedSeriesTracker.Track("per_labelset_series_limit", userID, seriesHash)
1302+
i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, &copiedLabels)
13031303
// We only track per labelset discarded samples for throttling by labelset limit.
13041304
reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit)
13051305
updateFirstPartial(func() error {
@@ -1381,7 +1381,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
13811381

13821382
failedSamplesCount++
13831383

1384-
if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits, tsLabelsHash); !rollback {
1384+
if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits); !rollback {
13851385
continue
13861386
}
13871387
// The error looks an issue on our side, so we should rollback
@@ -1426,7 +1426,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
14261426

14271427
failedHistogramsCount++
14281428

1429-
if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits, tsLabelsHash); !rollback {
1429+
if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits); !rollback {
14301430
continue
14311431
}
14321432
// The error looks an issue on our side, so we should rollback

pkg/util/discardedseries/tracker.go

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,22 @@ import (
55
"time"
66

77
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/prometheus/prometheus/model/labels"
89
)
910

1011
const (
1112
vendMetricsInterval = 30 * time.Second
1213
)
1314

15+
type labelCounterStruct struct {
16+
*sync.RWMutex
17+
*labels.Labels
18+
inCurrentCycle bool
19+
}
20+
1421
type seriesCounterStruct struct {
1522
*sync.RWMutex
16-
seriesCountMap map[uint64]struct{}
23+
seriesCountMap map[uint64]*labelCounterStruct
1724
}
1825

1926
type userCounterStruct struct {
@@ -23,32 +30,33 @@ type userCounterStruct struct {
2330

2431
type DiscardedSeriesTracker struct {
2532
*sync.RWMutex
26-
labelUserMap map[string]*userCounterStruct
33+
reasonUserMap map[string]*userCounterStruct
2734
discardedSeriesGauge *prometheus.GaugeVec
2835
}
2936

3037
func NewDiscardedSeriesTracker(discardedSeriesGauge *prometheus.GaugeVec) *DiscardedSeriesTracker {
3138
tracker := &DiscardedSeriesTracker{
3239
RWMutex: &sync.RWMutex{},
33-
labelUserMap: make(map[string]*userCounterStruct),
40+
reasonUserMap: make(map[string]*userCounterStruct),
3441
discardedSeriesGauge: discardedSeriesGauge,
3542
}
3643
return tracker
3744
}
3845

39-
func (t *DiscardedSeriesTracker) Track(reason string, user string, series uint64) {
46+
func (t *DiscardedSeriesTracker) Track(reason string, user string, labels *labels.Labels) {
47+
series := labels.Hash()
4048
t.RLock()
41-
userCounter, ok := t.labelUserMap[reason]
49+
userCounter, ok := t.reasonUserMap[reason]
4250
t.RUnlock()
4351
if !ok {
4452
t.Lock()
45-
userCounter, ok = t.labelUserMap[reason]
53+
userCounter, ok = t.reasonUserMap[reason]
4654
if !ok {
4755
userCounter = &userCounterStruct{
4856
RWMutex: &sync.RWMutex{},
4957
userSeriesMap: make(map[string]*seriesCounterStruct),
5058
}
51-
t.labelUserMap[reason] = userCounter
59+
t.reasonUserMap[reason] = userCounter
5260
}
5361
t.Unlock()
5462
}
@@ -62,42 +70,66 @@ func (t *DiscardedSeriesTracker) Track(reason string, user string, series uint64
6270
if !ok {
6371
seriesCounter = &seriesCounterStruct{
6472
RWMutex: &sync.RWMutex{},
65-
seriesCountMap: make(map[uint64]struct{}),
73+
seriesCountMap: make(map[uint64]*labelCounterStruct),
6674
}
6775
userCounter.userSeriesMap[user] = seriesCounter
6876
}
6977
userCounter.Unlock()
7078
}
7179

7280
seriesCounter.RLock()
73-
_, ok = seriesCounter.seriesCountMap[series]
81+
labelCounter, ok := seriesCounter.seriesCountMap[series]
7482
seriesCounter.RUnlock()
7583
if !ok {
7684
seriesCounter.Lock()
77-
seriesCounter.seriesCountMap[series] = struct{}{}
85+
labelCounter, ok = seriesCounter.seriesCountMap[series]
86+
if !ok {
87+
labelCounter = &labelCounterStruct{
88+
Labels: labels,
89+
RWMutex: &sync.RWMutex{},
90+
inCurrentCycle: true,
91+
}
92+
seriesCounter.seriesCountMap[series] = labelCounter
93+
}
7894
seriesCounter.Unlock()
7995
}
96+
97+
labelCounter.Lock()
98+
labelCounter.inCurrentCycle = true
99+
labelCounter.Unlock()
80100
}
81101

82102
func (t *DiscardedSeriesTracker) UpdateMetrics() {
83103
usersToDelete := make([]string, 0)
84104
t.RLock()
85-
for label, userCounter := range t.labelUserMap {
105+
for reason, userCounter := range t.reasonUserMap {
86106
userCounter.RLock()
87107
for user, seriesCounter := range userCounter.userSeriesMap {
88108
seriesCounter.Lock()
89-
seriesCount := len(seriesCounter.seriesCountMap)
90-
t.discardedSeriesGauge.WithLabelValues(label, user).Set(float64(seriesCount))
91-
clear(seriesCounter.seriesCountMap)
92-
if seriesCount == 0 {
109+
for hash, labelCounter := range seriesCounter.seriesCountMap {
110+
labelCounter.Lock()
111+
val := 0.0
112+
if labelCounter.inCurrentCycle {
113+
val = 1.0
114+
}
115+
t.discardedSeriesGauge.WithLabelValues(reason, user, labelCounter.Labels.String()).Set(val)
116+
if !labelCounter.inCurrentCycle {
117+
delete(seriesCounter.seriesCountMap, hash)
118+
} else {
119+
labelCounter.inCurrentCycle = false
120+
}
121+
labelCounter.Unlock()
122+
}
123+
if len(seriesCounter.seriesCountMap) == 0 {
93124
usersToDelete = append(usersToDelete, user)
94125
}
95126
seriesCounter.Unlock()
96127
}
97128
userCounter.RUnlock()
98129
userCounter.Lock()
99130
for _, user := range usersToDelete {
100-
if len(userCounter.userSeriesMap[user].seriesCountMap) == 0 {
131+
_, ok := userCounter.userSeriesMap[user]
132+
if ok && userCounter.userSeriesMap[user].seriesCountMap != nil {
101133
delete(userCounter.userSeriesMap, user)
102134
}
103135
}
@@ -116,11 +148,17 @@ func (t *DiscardedSeriesTracker) StartDiscardedSeriesGoroutine() {
116148
}
117149

118150
// only used in testing
119-
func (t *DiscardedSeriesTracker) getSeriesCount(label string, user string) int {
120-
if userCounter, ok := t.labelUserMap[label]; ok {
151+
func (t *DiscardedSeriesTracker) getSeriesCount(reason string, user string) int {
152+
count := -1
153+
if userCounter, ok := t.reasonUserMap[reason]; ok {
121154
if seriesCounter, ok := userCounter.userSeriesMap[user]; ok {
122-
return len(seriesCounter.seriesCountMap)
155+
count = 0
156+
for _, label := range seriesCounter.seriesCountMap {
157+
if label.inCurrentCycle {
158+
count++
159+
}
160+
}
123161
}
124162
}
125-
return -1
163+
return count
126164
}

0 commit comments

Comments
 (0)