@@ -1122,19 +1122,21 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1122
1122
// Keep track of some stats which are tracked only if the samples will be
1123
1123
// successfully committed
1124
1124
var (
1125
- succeededSamplesCount = 0
1126
- failedSamplesCount = 0
1127
- succeededExemplarsCount = 0
1128
- failedExemplarsCount = 0
1129
- startAppend = time .Now ()
1130
- sampleOutOfBoundsCount = 0
1131
- sampleOutOfOrderCount = 0
1132
- sampleTooOldCount = 0
1133
- newValueForTimestampCount = 0
1134
- perUserSeriesLimitCount = 0
1135
- perLabelSetSeriesLimitCount = 0
1136
- perMetricSeriesLimitCount = 0
1137
- nativeHistogramCount = 0
1125
+ succeededSamplesCount = 0
1126
+ failedSamplesCount = 0
1127
+ succeededHistogramCount = 0
1128
+ failedHistogramCount = 0
1129
+ succeededExemplarsCount = 0
1130
+ failedExemplarsCount = 0
1131
+ startAppend = time .Now ()
1132
+ sampleOutOfBoundsCount = 0
1133
+ sampleOutOfOrderCount = 0
1134
+ sampleTooOldCount = 0
1135
+ newValueForTimestampCount = 0
1136
+ perUserSeriesLimitCount = 0
1137
+ perLabelSetSeriesLimitCount = 0
1138
+ perMetricSeriesLimitCount = 0
1139
+ discardedNativeHistogramCount = 0
1138
1140
1139
1141
updateFirstPartial = func (errFn func () error ) {
1140
1142
if firstPartialErr == nil {
@@ -1213,8 +1215,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1213
1215
tsLabelsHash := tsLabels .Hash ()
1214
1216
ref , copiedLabels := app .GetRef (tsLabels , tsLabelsHash )
1215
1217
1216
- // To find out if any sample was added to this series, we keep old value.
1218
+ // To find out if any sample was added to this series, we keep fold value.
1217
1219
oldSucceededSamplesCount := succeededSamplesCount
1220
+ // To find out if any histogram was added to this series, we keep old value.
1221
+ oldSucceededHistogramsCount := succeededHistogramCount
1218
1222
1219
1223
for _ , s := range ts .Samples {
1220
1224
var err error
@@ -1266,19 +1270,19 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1266
1270
1267
1271
if ref != 0 {
1268
1272
if _ , err = app .AppendHistogram (ref , copiedLabels , hp .TimestampMs , h , fh ); err == nil {
1269
- succeededSamplesCount ++
1273
+ succeededHistogramCount ++
1270
1274
continue
1271
1275
}
1272
1276
} else {
1273
1277
// Copy the label set because both TSDB and the active series tracker may retain it.
1274
1278
copiedLabels = cortexpb .FromLabelAdaptersToLabelsWithCopy (ts .Labels )
1275
1279
if ref , err = app .AppendHistogram (0 , copiedLabels , hp .TimestampMs , h , fh ); err == nil {
1276
- succeededSamplesCount ++
1280
+ succeededHistogramCount ++
1277
1281
continue
1278
1282
}
1279
1283
}
1280
1284
1281
- failedSamplesCount ++
1285
+ failedHistogramCount ++
1282
1286
1283
1287
if rollback := handleAppendFailure (err , hp .TimestampMs , ts .Labels , copiedLabels ); ! rollback {
1284
1288
continue
@@ -1290,7 +1294,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1290
1294
return nil , wrapWithUser (err , userID )
1291
1295
}
1292
1296
} else {
1293
- nativeHistogramCount += len (ts .Histograms )
1297
+ discardedNativeHistogramCount += len (ts .Histograms )
1294
1298
}
1295
1299
1296
1300
if i .cfg .ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount {
@@ -1300,6 +1304,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1300
1304
})
1301
1305
}
1302
1306
1307
+ if i .cfg .ActiveSeriesMetricsEnabled && succeededHistogramCount > oldSucceededHistogramsCount {
1308
+ db .activeSeries .UpdateSeries (tsLabels , tsLabelsHash , startAppend , func (l labels.Labels ) labels.Labels {
1309
+ // we must already have copied the labels if succeededHistogramCount has been incremented.
1310
+ return copiedLabels
1311
+ })
1312
+ }
1313
+
1303
1314
maxExemplarsForUser := i .getMaxExemplars (userID )
1304
1315
if maxExemplarsForUser > 0 {
1305
1316
// app.AppendExemplar currently doesn't create the series, it must
@@ -1344,7 +1355,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1344
1355
i .TSDBState .appenderCommitDuration .Observe (time .Since (startCommit ).Seconds ())
1345
1356
1346
1357
// If only invalid samples are pushed, don't change "last update", as TSDB was not modified.
1347
- if succeededSamplesCount > 0 {
1358
+ if succeededSamplesCount > 0 || succeededHistogramCount > 0 {
1348
1359
db .setLastUpdate (time .Now ())
1349
1360
}
1350
1361
@@ -1353,6 +1364,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1353
1364
// which will be converted into an HTTP 5xx and the client should/will retry.
1354
1365
i .metrics .ingestedSamples .Add (float64 (succeededSamplesCount ))
1355
1366
i .metrics .ingestedSamplesFail .Add (float64 (failedSamplesCount ))
1367
+ i .metrics .ingestedHistograms .Add (float64 (succeededHistogramCount ))
1368
+ i .metrics .ingestedHistogramsFail .Add (float64 (failedHistogramCount ))
1356
1369
i .metrics .ingestedExemplars .Add (float64 (succeededExemplarsCount ))
1357
1370
i .metrics .ingestedExemplarsFail .Add (float64 (failedExemplarsCount ))
1358
1371
@@ -1378,20 +1391,20 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1378
1391
i .validateMetrics .DiscardedSamples .WithLabelValues (perLabelsetSeriesLimit , userID ).Add (float64 (perLabelSetSeriesLimitCount ))
1379
1392
}
1380
1393
1381
- if ! i .cfg .BlocksStorageConfig .TSDB .EnableNativeHistograms && nativeHistogramCount > 0 {
1382
- i .validateMetrics .DiscardedSamples .WithLabelValues (nativeHistogramSample , userID ).Add (float64 (nativeHistogramCount ))
1394
+ if ! i .cfg .BlocksStorageConfig .TSDB .EnableNativeHistograms && discardedNativeHistogramCount > 0 {
1395
+ i .validateMetrics .DiscardedSamples .WithLabelValues (nativeHistogramSample , userID ).Add (float64 (discardedNativeHistogramCount ))
1383
1396
}
1384
1397
1385
1398
// Distributor counts both samples, metadata and histograms, so for consistency ingester does the same.
1386
- i .ingestionRate .Add (int64 (succeededSamplesCount + ingestedMetadata ))
1399
+ i .ingestionRate .Add (int64 (succeededSamplesCount + succeededHistogramCount + ingestedMetadata ))
1387
1400
1388
1401
switch req .Source {
1389
1402
case cortexpb .RULE :
1390
1403
db .ingestedRuleSamples .Add (int64 (succeededSamplesCount ))
1391
1404
case cortexpb .API :
1392
1405
fallthrough
1393
1406
default :
1394
- db .ingestedAPISamples .Add (int64 (succeededSamplesCount ))
1407
+ db .ingestedAPISamples .Add (int64 (succeededSamplesCount + succeededHistogramCount ))
1395
1408
}
1396
1409
1397
1410
if firstPartialErr != nil {
@@ -1400,7 +1413,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
1400
1413
if errors .As (firstPartialErr , & ve ) {
1401
1414
code = ve .code
1402
1415
}
1403
- level .Debug (logutil .WithContext (ctx , i .logger )).Log ("msg" , "partial failures to push" , "totalSamples" , succeededSamplesCount + failedSamplesCount , "failedSamples" , failedSamplesCount , "firstPartialErr" , firstPartialErr )
1416
+ level .Debug (logutil .WithContext (ctx , i .logger )).Log ("msg" , "partial failures to push" , "totalSamples" , succeededSamplesCount + succeededHistogramCount + failedSamplesCount + failedHistogramCount , "failedSamples" , failedSamplesCount , "failedHistogram" , failedHistogramCount , "firstPartialErr" , firstPartialErr )
1404
1417
return & cortexpb.WriteResponse {}, httpgrpc .Errorf (code , wrapWithUser (firstPartialErr , userID ).Error ())
1405
1418
}
1406
1419
0 commit comments