Skip to content

Commit 2013ce2

Browse files
committed
server,tests: add tso batch metric and fix test server lock
Signed-off-by: okjiang <819421878@qq.com>
1 parent 5eddccd commit 2013ce2

File tree

4 files changed

+37
-11
lines changed

4 files changed

+37
-11
lines changed

metrics/grafana/pd.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11003,7 +11003,7 @@
1100311003
"targets": [
1100411004
{
1100511005
"exemplar": true,
11006-
"expr": "histogram_quantile(0.99, sum(rate(pd_client_request_handle_tso_batch_size_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])) by (le))",
11006+
"expr": "histogram_quantile(0.99, sum(rate(pd_server_handle_tso_batch_size_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])) by (le))",
1100711007
"hide": false,
1100811008
"interval": "",
1100911009
"intervalFactor": 1,
@@ -11012,7 +11012,7 @@
1101211012
},
1101311013
{
1101411014
"exemplar": true,
11015-
"expr": "histogram_quantile(0.90, sum(rate(pd_client_request_handle_tso_batch_size_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])) by (le))",
11015+
"expr": "histogram_quantile(0.90, sum(rate(pd_server_handle_tso_batch_size_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])) by (le))",
1101611016
"hide": false,
1101711017
"interval": "",
1101811018
"intervalFactor": 1,
@@ -11021,7 +11021,7 @@
1102111021
},
1102211022
{
1102311023
"exemplar": true,
11024-
"expr": "sum(rate(pd_client_request_handle_tso_batch_size_sum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])) / sum(rate(pd_client_request_handle_tso_batch_size_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m]))",
11024+
"expr": "sum(rate(pd_server_handle_tso_batch_size_sum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])) / sum(rate(pd_server_handle_tso_batch_size_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m]))",
1102511025
"hide": false,
1102611026
"interval": "",
1102711027
"intervalFactor": 1,
@@ -16269,4 +16269,4 @@
1626916269
"title": "Test-Cluster-PD",
1627016270
"uid": "Q6RuHYIWk",
1627116271
"version": 1
16272-
}
16272+
}

server/grpc_service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
569569
err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive")
570570
return errs.ErrUnknown(err)
571571
}
572+
tsoBatchSize.Observe(float64(request.GetCount()))
572573
tsoStreamErr, err = s.handleTSOForwarding(stream.Context(), forwarder, request, tsDeadlineCh)
573574
if tsoStreamErr != nil {
574575
return tsoStreamErr
@@ -584,6 +585,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
584585
return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId())
585586
}
586587
count := request.GetCount()
588+
tsoBatchSize.Observe(float64(count))
587589
ctx, task := trace.NewTask(ctx, "tso")
588590
ts, err := s.tsoAllocator.GenerateTSO(ctx, count)
589591
task.End()

server/metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ var (
101101
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
102102
})
103103

104+
tsoBatchSize = prometheus.NewHistogram(
105+
prometheus.HistogramOpts{
106+
Namespace: "pd",
107+
Subsystem: "server",
108+
Name: "handle_tso_batch_size",
109+
Help: "Bucketed histogram of the batch size of handled tso requests.",
110+
Buckets: prometheus.ExponentialBuckets(1, 2, 13),
111+
})
112+
104113
queryRegionDuration = prometheus.NewHistogram(
105114
prometheus.HistogramOpts{
106115
Namespace: "pd",
@@ -207,6 +216,7 @@ func init() {
207216
prometheus.MustRegister(tsoProxyBatchSize)
208217
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
209218
prometheus.MustRegister(tsoHandleDuration)
219+
prometheus.MustRegister(tsoBatchSize)
210220
prometheus.MustRegister(queryRegionDuration)
211221
prometheus.MustRegister(regionHeartbeatHandleDuration)
212222
prometheus.MustRegister(storeHeartbeatHandleDuration)

tests/cluster.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import (
5959
const (
6060
Initial int32 = iota
6161
Running
62+
Starting
6263
Stop
6364
Destroy
6465
)
@@ -142,22 +143,35 @@ func NewTestServer(ctx context.Context, cfg *config.Config, services []string, h
142143
// Run starts to run a TestServer.
143144
func (s *TestServer) Run() error {
144145
s.Lock()
145-
defer s.Unlock()
146146
if s.state != Initial && s.state != Stop {
147+
s.Unlock()
147148
return errors.Errorf("server(state%d) cannot run", s.state)
148149
}
150+
s.state = Starting
151+
s.Unlock()
152+
149153
if err := s.server.Run(); err != nil {
154+
s.Lock()
155+
if s.state == Starting {
156+
s.state = Stop
157+
}
158+
s.Unlock()
150159
return err
151160
}
152-
s.state = Running
161+
162+
s.Lock()
163+
if s.state == Starting {
164+
s.state = Running
165+
}
166+
s.Unlock()
153167
return nil
154168
}
155169

156170
// Stop is used to stop a TestServer.
157171
func (s *TestServer) Stop() error {
158172
s.Lock()
159173
defer s.Unlock()
160-
if s.state != Running {
174+
if s.state != Running && s.state != Starting {
161175
return errors.Errorf("server(state%d) cannot stop", s.state)
162176
}
163177
s.server.Close()
@@ -169,7 +183,7 @@ func (s *TestServer) Stop() error {
169183
func (s *TestServer) Destroy() error {
170184
s.Lock()
171185
defer s.Unlock()
172-
if s.state == Running {
186+
if s.state == Running || s.state == Starting {
173187
s.server.Close()
174188
}
175189
if err := os.RemoveAll(s.server.GetConfig().DataDir); err != nil {
@@ -699,7 +713,7 @@ func (c *TestCluster) runInitialServersWithRetry(maxRetries int) error {
699713

700714
// Stop and destroy all servers
701715
for _, s := range servers {
702-
if s.State() == Running {
716+
if state := s.State(); state == Running || state == Starting {
703717
_ = s.Stop()
704718
}
705719
_ = s.Destroy()
@@ -738,7 +752,7 @@ func (c *TestCluster) runInitialServersWithRetry(maxRetries int) error {
738752
case strings.Contains(errMsg, "ErrStartEtcd"):
739753
log.Warn("etcd start failed, will retry", zap.Error(lastErr))
740754
for _, s := range servers {
741-
if s.State() == Running {
755+
if state := s.State(); state == Running || state == Starting {
742756
_ = s.Stop()
743757
}
744758
}
@@ -767,7 +781,7 @@ func RunServersWithRetry(servers []*TestServer, maxRetries int) error {
767781
log.Warn("etcd start failed, will retry", zap.Error(lastErr))
768782
// Stop any partially started servers before retrying
769783
for _, s := range servers {
770-
if s.State() == Running {
784+
if state := s.State(); state == Running || state == Starting {
771785
_ = s.Stop()
772786
}
773787
}

0 commit comments

Comments
 (0)