Skip to content

Commit 5fffd9f

Browse files
committed
add fetched samples and chunks count into querier stats
Signed-off-by: Ben Ye <[email protected]>
1 parent 713542c commit 5fffd9f

File tree

8 files changed

+242
-30
lines changed

8 files changed

+242
-30
lines changed

pkg/distributor/query.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
400400
reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
401401
reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize()))
402402
reqStats.AddFetchedDataBytes(uint64(resp.Size()))
403+
reqStats.AddFetchedChunks(uint64(resp.ChunksCount()))
404+
reqStats.AddFetchedSamples(uint64(resp.SamplesCount()))
403405

404406
return resp, nil
405407
}

pkg/frontend/transport/handler.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ type Handler struct {
6464
// Metrics.
6565
querySeconds *prometheus.CounterVec
6666
querySeries *prometheus.CounterVec
67+
querySamples *prometheus.CounterVec
68+
queryChunks *prometheus.CounterVec
6769
queryBytes *prometheus.CounterVec
6870
queryDataBytes *prometheus.CounterVec
6971
activeUsers *util.ActiveUsersCleanupService
@@ -230,13 +232,15 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
230232
userID := tenant.JoinTenantIDs(tenantIDs)
231233
wallTime := stats.LoadWallTime()
232234
numSeries := stats.LoadFetchedSeries()
233-
numBytes := stats.LoadFetchedChunkBytes()
235+
numChunks := stats.LoadFetchedChunks()
236+
numSamples := stats.LoadFetchedSamples()
237+
numChunkBytes := stats.LoadFetchedChunkBytes()
234238
numDataBytes := stats.LoadFetchedDataBytes()
235239

236240
// Track stats.
237241
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
238242
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
239-
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
243+
f.queryBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
240244
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
241245
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
242246

@@ -249,7 +253,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
249253
"response_time", queryResponseTime,
250254
"query_wall_time_seconds", wallTime.Seconds(),
251255
"fetched_series_count", numSeries,
252-
"fetched_chunks_bytes", numBytes,
256+
"fetched_chunks_count", numChunks,
257+
"fetched_samples_count", numSamples,
258+
"fetched_chunks_bytes", numChunkBytes,
253259
"fetched_data_bytes", numDataBytes,
254260
"status_code", statusCode,
255261
}, stats.LoadExtraFields()...)

pkg/ingester/client/custom.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package client
22

3+
import (
4+
"encoding/binary"
5+
"github.com/cortexproject/cortex/pkg/chunk/encoding"
6+
)
7+
38
// ChunksCount returns the number of chunks in response.
49
func (m *QueryStreamResponse) ChunksCount() int {
510
if len(m.Chunkseries) == 0 {
@@ -27,3 +32,17 @@ func (m *QueryStreamResponse) ChunksSize() int {
2732
}
2833
return size
2934
}
35+
36+
func (m *QueryStreamResponse) SamplesCount() (count int) {
37+
for _, ts := range m.Timeseries {
38+
count += len(ts.Samples)
39+
}
40+
for _, cs := range m.Chunkseries {
41+
for _, c := range cs.Chunks {
42+
if c.Encoding == int32(encoding.PrometheusXorChunk) {
43+
count += int(binary.BigEndian.Uint16(c.Data))
44+
}
45+
}
46+
}
47+
return
48+
}

pkg/querier/blocks_store_queryable.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package querier
22

33
import (
44
"context"
5+
"encoding/binary"
56
"fmt"
67
"io"
78
"sort"
@@ -692,16 +693,21 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
692693
}
693694

694695
numSeries := len(mySeries)
696+
numSamples, chunksCount := countSamplesAndChunks(mySeries...)
695697
chunkBytes := countChunkBytes(mySeries...)
696698
dataBytes := countDataBytes(mySeries...)
697699

698700
reqStats.AddFetchedSeries(uint64(numSeries))
701+
reqStats.AddFetchedChunks(chunksCount)
702+
reqStats.AddFetchedSamples(numSamples)
699703
reqStats.AddFetchedChunkBytes(uint64(chunkBytes))
700704
reqStats.AddFetchedDataBytes(uint64(dataBytes))
701705

702706
level.Debug(spanLog).Log("msg", "received series from store-gateway",
703707
"instance", c.RemoteAddress(),
704708
"fetched series", numSeries,
709+
"fetched chunks", chunksCount,
710+
"fetched samples", numSamples,
705711
"fetched chunk bytes", chunkBytes,
706712
"fetched data bytes", dataBytes,
707713
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
@@ -1018,6 +1024,27 @@ func countDataBytes(series ...*storepb.Series) (count int) {
10181024
return count
10191025
}
10201026

1027+
// countChunks
1028+
func countChunks(series ...*storepb.Series) (count int) {
1029+
for _, s := range series {
1030+
count += len(s.Chunks)
1031+
}
1032+
return
1033+
}
1034+
1035+
// countSamples
1036+
func countSamplesAndChunks(series ...*storepb.Series) (samplesCount, chunksCount uint64) {
1037+
for _, s := range series {
1038+
chunksCount += uint64(len(s.Chunks))
1039+
for _, c := range s.Chunks {
1040+
if c.Raw != nil && c.Raw.Type == storepb.Chunk_XOR {
1041+
samplesCount += uint64(binary.BigEndian.Uint16(c.Raw.Data))
1042+
}
1043+
}
1044+
}
1045+
return
1046+
}
1047+
10211048
// only retry connection issues
10221049
func isRetryableError(err error) bool {
10231050
switch status.Code(err) {

pkg/querier/stats/stats.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,39 @@ func (s *QueryStats) LoadFetchedDataBytes() uint64 {
148148
return atomic.LoadUint64(&s.FetchedDataBytes)
149149
}
150150

151-
// Merge the provide Stats into this one.
151+
func (s *QueryStats) AddFetchedSamples(count uint64) {
152+
if s == nil {
153+
return
154+
}
155+
156+
atomic.AddUint64(&s.FetchedSamplesCount, count)
157+
}
158+
159+
func (s *QueryStats) LoadFetchedSamples() uint64 {
160+
if s == nil {
161+
return 0
162+
}
163+
164+
return atomic.LoadUint64(&s.FetchedSamplesCount)
165+
}
166+
167+
func (s *QueryStats) AddFetchedChunks(count uint64) {
168+
if s == nil {
169+
return
170+
}
171+
172+
atomic.AddUint64(&s.FetchedChunksCount, count)
173+
}
174+
175+
func (s *QueryStats) LoadFetchedChunks() uint64 {
176+
if s == nil {
177+
return 0
178+
}
179+
180+
return atomic.LoadUint64(&s.FetchedChunksCount)
181+
}
182+
183+
// Merge the provided Stats into this one.
152184
func (s *QueryStats) Merge(other *QueryStats) {
153185
if s == nil || other == nil {
154186
return
@@ -158,6 +190,8 @@ func (s *QueryStats) Merge(other *QueryStats) {
158190
s.AddFetchedSeries(other.LoadFetchedSeries())
159191
s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes())
160192
s.AddFetchedDataBytes(other.LoadFetchedDataBytes())
193+
s.AddFetchedSamples(other.LoadFetchedSamples())
194+
s.AddFetchedChunks(other.LoadFetchedChunks())
161195
s.AddExtraFields(other.LoadExtraFields()...)
162196
}
163197

pkg/querier/stats/stats.pb.go

Lines changed: 110 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/querier/stats/stats.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,8 @@ message Stats {
2222
uint64 fetched_data_bytes = 4;
2323
// Extra fields to be reported on the stats log
2424
map<string, string> extra_fields = 5;
25+
// The number of chunks fetched for the query
26+
uint64 fetched_chunks_count = 6;
27+
// The number of samples fetched for the query
28+
uint64 fetched_samples_count = 7;
2529
}

0 commit comments

Comments
 (0)