@@ -18,16 +18,147 @@ import (
1818 "context"
1919 "fmt"
2020 "io"
21+ "math"
2122 "net/http"
2223 "net/http/httptest"
24+ "sort"
2325 "sync"
2426 "time"
2527
2628 "github.com/influxdata/tdigest"
29+ "github.com/prometheus/client_golang/prometheus"
30+ dto "github.com/prometheus/client_model/go"
2731)
2832
2933var latencyTDigest * tdigest.TDigest = tdigest .New ()
3034
35+ const tsoBatchSizeMetricName = "pd_client_request_handle_tso_batch_size"
36+
37+ type histogramSnapshot struct {
38+ count uint64
39+ sum float64
40+ buckets map [float64 ]uint64 // upperBound => cumulativeCount
41+ }
42+
43+ func gatherHistogramSnapshot (metricName string ) (histogramSnapshot , bool , error ) {
44+ mfs , err := prometheus .DefaultGatherer .Gather ()
45+ if err != nil {
46+ return histogramSnapshot {}, false , err
47+ }
48+
49+ for _ , mf := range mfs {
50+ if mf .GetName () != metricName {
51+ continue
52+ }
53+ if mf .GetType () != dto .MetricType_HISTOGRAM {
54+ return histogramSnapshot {}, false , fmt .Errorf ("metric %q is not a histogram" , metricName )
55+ }
56+
57+ snap := histogramSnapshot {
58+ buckets : make (map [float64 ]uint64 ),
59+ }
60+ for _ , m := range mf .Metric {
61+ h := m .GetHistogram ()
62+ if h == nil {
63+ continue
64+ }
65+ snap .count += h .GetSampleCount ()
66+ snap .sum += h .GetSampleSum ()
67+ for _ , b := range h .Bucket {
68+ snap .buckets [b .GetUpperBound ()] += b .GetCumulativeCount ()
69+ }
70+ }
71+ return snap , true , nil
72+ }
73+
74+ return histogramSnapshot {}, false , nil
75+ }
76+
77+ func diffHistogramSnapshot (cur , prev histogramSnapshot ) histogramSnapshot {
78+ if cur .count < prev .count {
79+ // Metrics got reset/re-registered, treat as starting from zero.
80+ return cur
81+ }
82+
83+ delta := histogramSnapshot {
84+ count : cur .count - prev .count ,
85+ sum : cur .sum - prev .sum ,
86+ buckets : make (map [float64 ]uint64 , len (cur .buckets )),
87+ }
88+ for upperBound , curCum := range cur .buckets {
89+ prevCum := prev .buckets [upperBound ]
90+ if curCum < prevCum {
91+ // Unexpected, but keep it safe.
92+ delta .buckets [upperBound ] = 0
93+ continue
94+ }
95+ delta .buckets [upperBound ] = curCum - prevCum
96+ }
97+ return delta
98+ }
99+
100+ // histogramQuantile approximates a quantile from a Prometheus histogram, using the same linear interpolation
101+ // assumption as PromQL's histogram_quantile.
102+ func histogramQuantile (q float64 , count uint64 , buckets map [float64 ]uint64 ) float64 {
103+ if math .IsNaN (q ) || q < 0 || q > 1 {
104+ return math .NaN ()
105+ }
106+ if count == 0 || len (buckets ) == 0 {
107+ return math .NaN ()
108+ }
109+
110+ upperBounds := make ([]float64 , 0 , len (buckets ))
111+ for ub := range buckets {
112+ upperBounds = append (upperBounds , ub )
113+ }
114+ sort .Float64s (upperBounds )
115+
116+ rank := q * float64 (count )
117+
118+ var (
119+ prevCum uint64
120+ prevBound float64
121+ )
122+ for i , ub := range upperBounds {
123+ cum := buckets [ub ]
124+ if float64 (cum ) < rank {
125+ prevCum = cum
126+ prevBound = ub
127+ continue
128+ }
129+
130+ lowerBound := prevBound
131+ if i == 0 && ub > 0 {
132+ // For the first bucket, PromQL assumes a lower bound of 0.
133+ lowerBound = 0
134+ prevCum = 0
135+ }
136+ if math .IsInf (ub , 1 ) {
137+ return lowerBound
138+ }
139+
140+ bucketCount := cum - prevCum
141+ if bucketCount == 0 {
142+ return lowerBound
143+ }
144+ pos := (rank - float64 (prevCum )) / float64 (bucketCount )
145+ return lowerBound + (ub - lowerBound )* pos
146+ }
147+
148+ // Should be unreachable if +Inf bucket exists; return the largest finite upper bound as a fallback.
149+ return upperBounds [len (upperBounds )- 1 ]
150+ }
151+
152+ func printBatchSizeStats (prefix string , snap histogramSnapshot ) {
153+ if snap .count == 0 {
154+ fmt .Printf ("batch-size(%s): count=0\n " , prefix )
155+ return
156+ }
157+ avg := snap .sum / float64 (snap .count )
158+ p99 := histogramQuantile (0.99 , snap .count , snap .buckets )
159+ fmt .Printf ("batch-size(%s): count=%d avg=%.3f p99=%.3f\n " , prefix , snap .count , avg , p99 )
160+ }
161+
31162// ShowStats shows the current stats and updates them with the given duration.
32163func ShowStats (
33164 ctx context.Context ,
@@ -45,12 +176,32 @@ func ShowStats(
45176
46177 s , total := NewStats (), NewStats ()
47178 fmt .Println ("start stats collecting" )
179+
180+ var (
181+ baseBatchSnap histogramSnapshot
182+ prevBatchSnap histogramSnapshot
183+ hasBatchSnap bool
184+ )
185+ if verbose {
186+ if snap , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
187+ baseBatchSnap , prevBatchSnap , hasBatchSnap = snap , snap , true
188+ }
189+ }
48190 for {
49191 select {
50192 case <- ticker .C :
51193 // runtime.GC()
52194 if verbose {
53195 fmt .Println (s .counter ())
196+ // Print batch-size stats for the latest interval.
197+ if ! hasBatchSnap {
198+ if snap , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
199+ baseBatchSnap , prevBatchSnap , hasBatchSnap = snap , snap , true
200+ }
201+ } else if cur , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
202+ printBatchSizeStats ("interval" , diffHistogramSnapshot (cur , prevBatchSnap ))
203+ prevBatchSnap = cur
204+ }
54205 }
55206 total .merge (s )
56207 s = NewStats ()
@@ -64,6 +215,18 @@ func ShowStats(
64215 fmt .Printf ("P0.5: %.4fms, P0.8: %.4fms, P0.9: %.4fms, P0.99: %.4fms\n \n " ,
65216 latencyTDigest .Quantile (0.5 ), latencyTDigest .Quantile (0.8 ), latencyTDigest .Quantile (0.9 ), latencyTDigest .Quantile (0.99 ))
66217 if verbose {
218+ // Print batch-size stats for the whole run.
219+ if ! hasBatchSnap {
220+ if snap , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
221+ baseBatchSnap , prevBatchSnap , hasBatchSnap = snap , snap , true
222+ }
223+ }
224+ if hasBatchSnap {
225+ if cur , ok , err := gatherHistogramSnapshot (tsoBatchSizeMetricName ); err == nil && ok {
226+ printBatchSizeStats ("total" , diffHistogramSnapshot (cur , baseBatchSnap ))
227+ prevBatchSnap = cur
228+ }
229+ }
67230 fmt .Println (collectMetrics (promServer ))
68231 }
69232 return
0 commit comments