Skip to content

Commit 585428c

Browse files
committed
add fetched samples and chunks count into querier stats
Signed-off-by: Ben Ye <[email protected]>
1 parent 64b6c2b commit 585428c

File tree

9 files changed

+300
-37
lines changed

9 files changed

+300
-37
lines changed

pkg/distributor/query.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
400400
reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
401401
reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize()))
402402
reqStats.AddFetchedDataBytes(uint64(resp.Size()))
403+
reqStats.AddFetchedChunks(uint64(resp.ChunksCount()))
404+
reqStats.AddFetchedSamples(uint64(resp.SamplesCount()))
403405

404406
return resp, nil
405407
}

pkg/frontend/transport/handler.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ type Handler struct {
6262
roundTripper http.RoundTripper
6363

6464
// Metrics.
65-
querySeconds *prometheus.CounterVec
66-
querySeries *prometheus.CounterVec
67-
queryBytes *prometheus.CounterVec
68-
queryDataBytes *prometheus.CounterVec
69-
activeUsers *util.ActiveUsersCleanupService
65+
querySeconds *prometheus.CounterVec
66+
querySeries *prometheus.CounterVec
67+
queryChunkBytes *prometheus.CounterVec
68+
queryDataBytes *prometheus.CounterVec
69+
activeUsers *util.ActiveUsersCleanupService
7070
}
7171

7272
// NewHandler creates a new frontend handler.
@@ -88,7 +88,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
8888
Help: "Number of series fetched to execute a query.",
8989
}, []string{"user"})
9090

91-
h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
91+
h.queryChunkBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
9292
Name: "cortex_query_fetched_chunks_bytes_total",
9393
Help: "Size of all chunks fetched to execute a query in bytes.",
9494
}, []string{"user"})
@@ -101,7 +101,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
101101
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
102102
h.querySeconds.DeleteLabelValues(user)
103103
h.querySeries.DeleteLabelValues(user)
104-
h.queryBytes.DeleteLabelValues(user)
104+
h.queryChunkBytes.DeleteLabelValues(user)
105105
h.queryDataBytes.DeleteLabelValues(user)
106106
})
107107
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
@@ -230,13 +230,15 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
230230
userID := tenant.JoinTenantIDs(tenantIDs)
231231
wallTime := stats.LoadWallTime()
232232
numSeries := stats.LoadFetchedSeries()
233-
numBytes := stats.LoadFetchedChunkBytes()
233+
numChunks := stats.LoadFetchedChunks()
234+
numSamples := stats.LoadFetchedSamples()
235+
numChunkBytes := stats.LoadFetchedChunkBytes()
234236
numDataBytes := stats.LoadFetchedDataBytes()
235237

236238
// Track stats.
237239
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
238240
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
239-
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
241+
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
240242
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
241243
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
242244

@@ -249,7 +251,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
249251
"response_time", queryResponseTime,
250252
"query_wall_time_seconds", wallTime.Seconds(),
251253
"fetched_series_count", numSeries,
252-
"fetched_chunks_bytes", numBytes,
254+
"fetched_chunks_count", numChunks,
255+
"fetched_samples_count", numSamples,
256+
"fetched_chunks_bytes", numChunkBytes,
253257
"fetched_data_bytes", numDataBytes,
254258
"status_code", statusCode,
255259
}, stats.LoadExtraFields()...)

pkg/ingester/client/custom.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package client
22

3+
import (
4+
"encoding/binary"
5+
6+
"github.com/cortexproject/cortex/pkg/chunk/encoding"
7+
)
8+
39
// ChunksCount returns the number of chunks in response.
410
func (m *QueryStreamResponse) ChunksCount() int {
511
if len(m.Chunkseries) == 0 {
@@ -27,3 +33,17 @@ func (m *QueryStreamResponse) ChunksSize() int {
2733
}
2834
return size
2935
}
36+
37+
func (m *QueryStreamResponse) SamplesCount() (count int) {
38+
for _, ts := range m.Timeseries {
39+
count += len(ts.Samples)
40+
}
41+
for _, cs := range m.Chunkseries {
42+
for _, c := range cs.Chunks {
43+
if c.Encoding == int32(encoding.PrometheusXorChunk) {
44+
count += int(binary.BigEndian.Uint16(c.Data))
45+
}
46+
}
47+
}
48+
return
49+
}

pkg/querier/blocks_store_queryable.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,16 +692,21 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
692692
}
693693

694694
numSeries := len(mySeries)
695+
numSamples, chunksCount := countSamplesAndChunks(mySeries...)
695696
chunkBytes := countChunkBytes(mySeries...)
696697
dataBytes := countDataBytes(mySeries...)
697698

698699
reqStats.AddFetchedSeries(uint64(numSeries))
700+
reqStats.AddFetchedChunks(chunksCount)
701+
reqStats.AddFetchedSamples(numSamples)
699702
reqStats.AddFetchedChunkBytes(uint64(chunkBytes))
700703
reqStats.AddFetchedDataBytes(uint64(dataBytes))
701704

702705
level.Debug(spanLog).Log("msg", "received series from store-gateway",
703706
"instance", c.RemoteAddress(),
704707
"fetched series", numSeries,
708+
"fetched chunks", chunksCount,
709+
"fetched samples", numSamples,
705710
"fetched chunk bytes", chunkBytes,
706711
"fetched data bytes", dataBytes,
707712
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
@@ -1018,6 +1023,19 @@ func countDataBytes(series ...*storepb.Series) (count int) {
10181023
return count
10191024
}
10201025

1026+
// countSamplesAndChunks counts the number of samples and number counts from the series.
1027+
func countSamplesAndChunks(series ...*storepb.Series) (samplesCount, chunksCount uint64) {
1028+
for _, s := range series {
1029+
chunksCount += uint64(len(s.Chunks))
1030+
for _, c := range s.Chunks {
1031+
if c.Raw != nil {
1032+
samplesCount += uint64(c.Raw.XORNumSamples())
1033+
}
1034+
}
1035+
}
1036+
return
1037+
}
1038+
10211039
// only retry connection issues
10221040
func isRetryableError(err error) bool {
10231041
switch status.Code(err) {

pkg/querier/blocks_store_queryable_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1687,3 +1687,64 @@ func valuesFromSeries(name string, series ...labels.Labels) []string {
16871687
sort.Strings(values)
16881688
return values
16891689
}
1690+
1691+
func TestCountSamplesAndChunks(t *testing.T) {
1692+
c := chunkenc.NewXORChunk()
1693+
appender, err := c.Appender()
1694+
require.NoError(t, err)
1695+
samples := 300
1696+
for i := 0; i < samples; i++ {
1697+
appender.Append(int64(i), float64(i))
1698+
}
1699+
1700+
for i, tc := range []struct {
1701+
serieses []*storepb.Series
1702+
expectedChunks uint64
1703+
expectedSamples uint64
1704+
}{
1705+
{
1706+
serieses: []*storepb.Series{
1707+
{
1708+
Chunks: []storepb.AggrChunk{
1709+
{
1710+
Raw: &storepb.Chunk{
1711+
Type: storepb.Chunk_XOR,
1712+
Data: c.Bytes(),
1713+
},
1714+
},
1715+
},
1716+
},
1717+
},
1718+
expectedSamples: uint64(samples),
1719+
expectedChunks: 1,
1720+
},
1721+
{
1722+
serieses: []*storepb.Series{
1723+
{
1724+
Chunks: []storepb.AggrChunk{
1725+
{
1726+
Raw: &storepb.Chunk{
1727+
Type: storepb.Chunk_XOR,
1728+
Data: c.Bytes(),
1729+
},
1730+
},
1731+
{
1732+
Raw: &storepb.Chunk{
1733+
Type: storepb.Chunk_XOR,
1734+
Data: c.Bytes(),
1735+
},
1736+
},
1737+
},
1738+
},
1739+
},
1740+
expectedSamples: uint64(int64(samples) * 2),
1741+
expectedChunks: 2,
1742+
},
1743+
} {
1744+
t.Run(fmt.Sprintf("test_case_%d", i), func(t *testing.T) {
1745+
samples, chunks := countSamplesAndChunks(tc.serieses...)
1746+
require.Equal(t, tc.expectedSamples, samples)
1747+
require.Equal(t, tc.expectedChunks, chunks)
1748+
})
1749+
}
1750+
}

pkg/querier/stats/stats.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,39 @@ func (s *QueryStats) LoadFetchedDataBytes() uint64 {
148148
return atomic.LoadUint64(&s.FetchedDataBytes)
149149
}
150150

151-
// Merge the provide Stats into this one.
151+
func (s *QueryStats) AddFetchedSamples(count uint64) {
152+
if s == nil {
153+
return
154+
}
155+
156+
atomic.AddUint64(&s.FetchedSamplesCount, count)
157+
}
158+
159+
func (s *QueryStats) LoadFetchedSamples() uint64 {
160+
if s == nil {
161+
return 0
162+
}
163+
164+
return atomic.LoadUint64(&s.FetchedSamplesCount)
165+
}
166+
167+
func (s *QueryStats) AddFetchedChunks(count uint64) {
168+
if s == nil {
169+
return
170+
}
171+
172+
atomic.AddUint64(&s.FetchedChunksCount, count)
173+
}
174+
175+
func (s *QueryStats) LoadFetchedChunks() uint64 {
176+
if s == nil {
177+
return 0
178+
}
179+
180+
return atomic.LoadUint64(&s.FetchedChunksCount)
181+
}
182+
183+
// Merge the provided Stats into this one.
152184
func (s *QueryStats) Merge(other *QueryStats) {
153185
if s == nil || other == nil {
154186
return
@@ -158,6 +190,8 @@ func (s *QueryStats) Merge(other *QueryStats) {
158190
s.AddFetchedSeries(other.LoadFetchedSeries())
159191
s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes())
160192
s.AddFetchedDataBytes(other.LoadFetchedDataBytes())
193+
s.AddFetchedSamples(other.LoadFetchedSamples())
194+
s.AddFetchedChunks(other.LoadFetchedChunks())
161195
s.AddExtraFields(other.LoadExtraFields()...)
162196
}
163197

0 commit comments

Comments
 (0)