@@ -18,147 +18,16 @@ import (
1818 "context"
1919 "fmt"
2020 "io"
21- "math"
2221 "net/http"
2322 "net/http/httptest"
24- "sort"
2523 "sync"
2624 "time"
2725
2826 "github.com/influxdata/tdigest"
29- "github.com/prometheus/client_golang/prometheus"
30- dto "github.com/prometheus/client_model/go"
3127)
3228
3329var latencyTDigest * tdigest.TDigest = tdigest .New ()
3430
35- const tsoBatchSizeMetricName = "pd_client_request_handle_tso_batch_size"
36-
37- type histogramSnapshot struct {
38- count uint64
39- sum float64
40- buckets map [float64 ]uint64 // upperBound => cumulativeCount
41- }
42-
43- func gatherHistogramSnapshot (metricName string ) (histogramSnapshot , bool , error ) {
44- mfs , err := prometheus .DefaultGatherer .Gather ()
45- if err != nil {
46- return histogramSnapshot {}, false , err
47- }
48-
49- for _ , mf := range mfs {
50- if mf .GetName () != metricName {
51- continue
52- }
53- if mf .GetType () != dto .MetricType_HISTOGRAM {
54- return histogramSnapshot {}, false , fmt .Errorf ("metric %q is not a histogram" , metricName )
55- }
56-
57- snap := histogramSnapshot {
58- buckets : make (map [float64 ]uint64 ),
59- }
60- for _ , m := range mf .GetMetric () {
61- h := m .GetHistogram ()
62- if h == nil {
63- continue
64- }
65- snap .count += h .GetSampleCount ()
66- snap .sum += h .GetSampleSum ()
67- for _ , b := range h .GetBucket () {
68- snap .buckets [b .GetUpperBound ()] += b .GetCumulativeCount ()
69- }
70- }
71- return snap , true , nil
72- }
73-
74- return histogramSnapshot {}, false , nil
75- }
76-
77- func diffHistogramSnapshot (cur , prev histogramSnapshot ) histogramSnapshot {
78- if cur .count < prev .count {
79- // Metrics got reset/re-registered, treat as starting from zero.
80- return cur
81- }
82-
83- delta := histogramSnapshot {
84- count : cur .count - prev .count ,
85- sum : cur .sum - prev .sum ,
86- buckets : make (map [float64 ]uint64 , len (cur .buckets )),
87- }
88- for upperBound , curCum := range cur .buckets {
89- prevCum := prev .buckets [upperBound ]
90- if curCum < prevCum {
91- // Unexpected, but keep it safe.
92- delta .buckets [upperBound ] = 0
93- continue
94- }
95- delta .buckets [upperBound ] = curCum - prevCum
96- }
97- return delta
98- }
99-
100- // histogramQuantile approximates a quantile from a Prometheus histogram, using the same linear interpolation
101- // assumption as PromQL's histogram_quantile.
102- func histogramQuantile (q float64 , count uint64 , buckets map [float64 ]uint64 ) float64 {
103- if math .IsNaN (q ) || q < 0 || q > 1 {
104- return math .NaN ()
105- }
106- if count == 0 || len (buckets ) == 0 {
107- return math .NaN ()
108- }
109-
110- upperBounds := make ([]float64 , 0 , len (buckets ))
111- for ub := range buckets {
112- upperBounds = append (upperBounds , ub )
113- }
114- sort .Float64s (upperBounds )
115-
116- rank := q * float64 (count )
117-
118- var (
119- prevCum uint64
120- prevBound float64
121- )
122- for i , ub := range upperBounds {
123- cum := buckets [ub ]
124- if float64 (cum ) < rank {
125- prevCum = cum
126- prevBound = ub
127- continue
128- }
129-
130- lowerBound := prevBound
131- if i == 0 && ub > 0 {
132- // For the first bucket, PromQL assumes a lower bound of 0.
133- lowerBound = 0
134- prevCum = 0
135- }
136- if math .IsInf (ub , 1 ) {
137- return lowerBound
138- }
139-
140- bucketCount := cum - prevCum
141- if bucketCount == 0 {
142- return lowerBound
143- }
144- pos := (rank - float64 (prevCum )) / float64 (bucketCount )
145- return lowerBound + (ub - lowerBound )* pos
146- }
147-
148- // Should be unreachable if +Inf bucket exists; return the largest finite upper bound as a fallback.
149- return upperBounds [len (upperBounds )- 1 ]
150- }
151-
152- func printBatchSizeStats (prefix string , snap histogramSnapshot ) {
153- if snap .count == 0 {
154- fmt .Printf ("batch-size(%s): count=0\n " , prefix )
155- return
156- }
157- avg := snap .sum / float64 (snap .count )
158- p99 := histogramQuantile (0.99 , snap .count , snap .buckets )
159- fmt .Printf ("batch-size(%s): count=%d avg=%.3f p99=%.3f\n " , prefix , snap .count , avg , p99 )
160- }
161-
16231// ShowStats shows the current stats and updates them with the given duration.
16332func ShowStats (
16433 ctx context.Context ,
@@ -176,32 +45,12 @@ func ShowStats(
17645
17746 s , total := NewStats (), NewStats ()
17847 fmt .Println ("start stats collecting" )
179-
180- var (
181- baseBatchSnap histogramSnapshot
182- prevBatchSnap histogramSnapshot
183- hasBatchSnap bool
184- )
185- if verbose {
186- if snap , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
187- baseBatchSnap , prevBatchSnap , hasBatchSnap = snap , snap , true
188- }
189- }
19048 for {
19149 select {
19250 case <- ticker .C :
19351 // runtime.GC()
19452 if verbose {
19553 fmt .Println (s .counter ())
196- // Print batch-size stats for the latest interval.
197- if ! hasBatchSnap {
198- if snap , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
199- baseBatchSnap , prevBatchSnap , hasBatchSnap = snap , snap , true
200- }
201- } else if cur , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
202- printBatchSizeStats ("interval" , diffHistogramSnapshot (cur , prevBatchSnap ))
203- prevBatchSnap = cur
204- }
20554 }
20655 total .merge (s )
20756 s = NewStats ()
@@ -215,17 +64,6 @@ func ShowStats(
21564 fmt .Printf ("P0.5: %.4fms, P0.8: %.4fms, P0.9: %.4fms, P0.99: %.4fms\n \n " ,
21665 latencyTDigest .Quantile (0.5 ), latencyTDigest .Quantile (0.8 ), latencyTDigest .Quantile (0.9 ), latencyTDigest .Quantile (0.99 ))
21766 if verbose {
218- // Print batch-size stats for the whole run.
219- if ! hasBatchSnap {
220- if snap , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
221- baseBatchSnap , hasBatchSnap = snap , true
222- }
223- }
224- if hasBatchSnap {
225- if cur , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
226- printBatchSizeStats ("total" , diffHistogramSnapshot (cur , baseBatchSnap ))
227- }
228- }
22967 fmt .Println (collectMetrics (promServer ))
23068 }
23169 return
0 commit comments