Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
Expand Down
12 changes: 11 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -341,7 +342,7 @@ func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGro
// saveKeyspaceGroups will try to save the given keyspace groups into the storage.
// If any keyspace group already exists and `overwrite` is false, it will return ErrKeyspaceGroupExists.
func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup, overwrite bool) error {
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
for _, keyspaceGroup := range keyspaceGroups {
// Check if keyspace group has already existed.
oldKG, err := m.store.LoadKeyspaceGroup(txn, keyspaceGroup.ID)
Expand Down Expand Up @@ -370,6 +371,15 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
}
return nil
})
if err != nil {
return err
}
// Update metrics only after the transaction has committed, so that gauges
// never reflect a state that was not persisted (e.g. after a rollback).
for _, kg := range keyspaceGroups {
tso.SetKeyspaceGroupKeyspaceCountGauge(kg.ID, float64(len(kg.Keyspaces)))
}
return nil
}

// GetKeyspaceConfigByKind returns the keyspace config for the given user kind.
Expand Down
19 changes: 16 additions & 3 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (

"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/keyspace/constant"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
Expand All @@ -55,6 +54,7 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo/kerneltype"
)

const (
Expand All @@ -68,6 +68,16 @@ const (
groupPatrolInterval = time.Minute
)

// getBootstrapKeyspaceID returns the keyspace ID used for bootstrapping.
// It mirrors keyspace.GetBootstrapKeyspaceID() to avoid importing pkg/keyspace (which would
// create an import cycle with keyspace importing pkg/tso for SetKeyspaceGroupKeyspaceCountGauge).
func getBootstrapKeyspaceID() uint32 {
if kerneltype.IsNextGen() {
return constant.SystemKeyspaceID
}
return constant.DefaultKeyspaceID
}

type state struct {
syncutil.RWMutex
// allocators stores the allocators of the keyspace groups. Each keyspace group is
Expand Down Expand Up @@ -902,7 +912,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
j++
}
}
keyspaceID := keyspace.GetBootstrapKeyspaceID()
keyspaceID := getBootstrapKeyspaceID()
kgm.checkReserveKeyspace(newGroup, newKeyspaces, keyspaceID)
}
// Check the split state.
Expand Down Expand Up @@ -975,6 +985,9 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
}
}
kgm.kgs[groupID] = nil
if kgm.metrics != nil {
kgm.metrics.DeleteKeyspaceListLength(groupID)
}
}

allocator := kgm.allocators[groupID]
Expand All @@ -987,7 +1000,7 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
}

func (kgm *KeyspaceGroupManager) genDefaultKeyspaceGroupMeta() *endpoint.KeyspaceGroup {
keyspaces := []uint32{keyspace.GetBootstrapKeyspaceID()}
keyspaces := []uint32{getBootstrapKeyspaceID()}
return &endpoint.KeyspaceGroup{
ID: constant.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{
Expand Down
74 changes: 74 additions & 0 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -1147,6 +1149,78 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() {
wg.Wait()
}

// TestKeyspaceListLengthMetric tests that tso_keyspace_group_keyspace_list_length can be set
// (e.g. by PD API saveKeyspaceGroups via SetKeyspaceGroupKeyspaceCountGauge) and removed
// when the group is deleted (DeleteKeyspaceListLength on TSO service).
func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceListLengthMetric() {
re := suite.Require()
groupID := uint32(1)
kgm := &KeyspaceGroupManager{
state: state{keyspaceLookupTable: make(map[uint32]uint32)},
metrics: newKeyspaceGroupMetrics(),
}

getGaugeValue := func(g prometheus.Gauge) float64 {
var out dto.Metric
re.NoError(g.(prometheus.Metric).Write(&out))
return out.GetGauge().GetValue()
}

// Test SetKeyspaceGroupKeyspaceCountGauge (used by PD API saveKeyspaceGroups)
SetKeyspaceGroupKeyspaceCountGauge(groupID, 3)
gauge, err := keyspaceGroupKeyspaceCountGauge.GetMetricWithLabelValues(strconv.FormatUint(uint64(groupID), 10))
re.NoError(err)
re.Equal(3.0, getGaugeValue(gauge))

SetKeyspaceGroupKeyspaceCountGauge(groupID, 5)
gauge, err = keyspaceGroupKeyspaceCountGauge.GetMetricWithLabelValues(strconv.FormatUint(uint64(groupID), 10))
re.NoError(err)
re.Equal(5.0, getGaugeValue(gauge))

SetKeyspaceGroupKeyspaceCountGauge(groupID, 0)
gauge, err = keyspaceGroupKeyspaceCountGauge.GetMetricWithLabelValues(strconv.FormatUint(uint64(groupID), 10))
re.NoError(err)
re.Equal(0.0, getGaugeValue(gauge))

// Test DeleteKeyspaceListLength: set group 2 via metrics, then delete, gather and ensure group 2 is not present
groupID2 := uint32(2)
kgm.metrics.SetKeyspaceListLength(groupID2, 10)
mfs, err := prometheus.DefaultGatherer.Gather()
re.NoError(err)
var foundGroup2Before bool
for _, mf := range mfs {
if mf.GetName() == "tso_keyspace_group_keyspace_list_length" {
for _, m := range mf.GetMetric() {
for _, lp := range m.GetLabel() {
if lp.GetName() == "group" && lp.GetValue() == "2" {
foundGroup2Before = true
re.Equal(10.0, m.GetGauge().GetValue())
break
}
}
}
break
}
}
re.True(foundGroup2Before, "metric for group 2 should exist before delete")

kgm.metrics.DeleteKeyspaceListLength(groupID2)
mfs, err = prometheus.DefaultGatherer.Gather()
re.NoError(err)
for _, mf := range mfs {
if mf.GetName() == "tso_keyspace_group_keyspace_list_length" {
for _, m := range mf.GetMetric() {
for _, lp := range m.GetLabel() {
if lp.GetName() == "group" && lp.GetValue() == "2" {
re.Fail("metric for group 2 should be removed after DeleteKeyspaceListLength")
}
}
}
break
}
}
}

// Register TSO server.
func (suite *keyspaceGroupManagerTestSuite) registerTSOServer(
re *require.Assertions, svcAddr string, cfg *TestServiceConfig,
Expand Down
37 changes: 36 additions & 1 deletion pkg/tso/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package tso

import "github.com/prometheus/client_golang/prometheus"
import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
)

const (
pdNamespace = "pd"
Expand Down Expand Up @@ -83,6 +87,15 @@ var (
Help: "Bucketed histogram of processing time(s) of the Keyspace Group operations.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{typeLabel})

// keyspaceGroupKeyspaceCountGauge records the keyspace list length of each keyspace group.
keyspaceGroupKeyspaceCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: tsoNamespace,
Subsystem: "keyspace_group",
Name: "keyspace_list_length",
Help: "The length of keyspace list in each TSO keyspace group.",
}, []string{groupLabel})
)

func init() {
Expand All @@ -93,6 +106,7 @@ func init() {
prometheus.MustRegister(tsoAllocatorRole)
prometheus.MustRegister(keyspaceGroupStateGauge)
prometheus.MustRegister(keyspaceGroupOpDuration)
prometheus.MustRegister(keyspaceGroupKeyspaceCountGauge)
}

type tsoMetrics struct {
Expand Down Expand Up @@ -176,6 +190,9 @@ type keyspaceGroupMetrics struct {
finishSplitDuration prometheus.Observer
finishMergeSendDuration prometheus.Observer
finishMergeDuration prometheus.Observer

// keyspaceListLengthGauge records the keyspace list length per keyspace group.
keyspaceListLengthGauge *prometheus.GaugeVec
}

func newKeyspaceGroupMetrics() *keyspaceGroupMetrics {
Expand All @@ -190,5 +207,23 @@ func newKeyspaceGroupMetrics() *keyspaceGroupMetrics {
finishSplitDuration: keyspaceGroupOpDuration.WithLabelValues("finish-split"),
finishMergeSendDuration: keyspaceGroupOpDuration.WithLabelValues("finish-merge-send"),
finishMergeDuration: keyspaceGroupOpDuration.WithLabelValues("finish-merge"),

keyspaceListLengthGauge: keyspaceGroupKeyspaceCountGauge,
}
}

// SetKeyspaceListLength sets the keyspace list length metric for the given keyspace group.
func (m *keyspaceGroupMetrics) SetKeyspaceListLength(groupID uint32, length float64) {
m.keyspaceListLengthGauge.WithLabelValues(strconv.FormatUint(uint64(groupID), 10)).Set(length)
}

// DeleteKeyspaceListLength removes the keyspace list length metric for the given keyspace group.
func (m *keyspaceGroupMetrics) DeleteKeyspaceListLength(groupID uint32) {
m.keyspaceListLengthGauge.DeleteLabelValues(strconv.FormatUint(uint64(groupID), 10))
}

// SetKeyspaceGroupKeyspaceCountGauge sets the keyspace list length metric for the given keyspace group.
// It is used by PD API service when saveKeyspaceGroups is executed.
func SetKeyspaceGroupKeyspaceCountGauge(groupID uint32, length float64) {
keyspaceGroupKeyspaceCountGauge.WithLabelValues(strconv.FormatUint(uint64(groupID), 10)).Set(length)
}
14 changes: 12 additions & 2 deletions pkg/utils/tsoutil/tso_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/pingcap/kvproto/pkg/pdpb"

"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/keyspace/constant"
"github.com/tikv/pd/pkg/versioninfo/kerneltype"
)

// Request is an interface wrapping tsopb.TsoRequest and pdpb.TsoRequest so
Expand Down Expand Up @@ -58,6 +58,16 @@ func NewPDProtoRequest(forwardedHost string, clientConn *grpc.ClientConn, reques
return tsoRequest
}

// getBootstrapKeyspaceID returns the keyspace ID used for bootstrapping.
// It mirrors keyspace.GetBootstrapKeyspaceID() to avoid importing pkg/keyspace (which would
// create an import cycle: keyspace -> tso -> tsoutil -> keyspace).
func getBootstrapKeyspaceID() uint32 {
if kerneltype.IsNextGen() {
return constant.SystemKeyspaceID
}
return constant.DefaultKeyspaceID
}

// getForwardedHost returns the forwarded host
func (r *PDProtoRequest) getForwardedHost() string {
return r.forwardedHost
Expand All @@ -76,7 +86,7 @@ func (r *PDProtoRequest) getCount() uint32 {
// process sends request and receive response via stream.
// count defines the count of timestamps to retrieve.
func (r *PDProtoRequest) process(forwardStream stream, count uint32) (tsoResp, error) {
keyspaceID := keyspace.GetBootstrapKeyspaceID()
keyspaceID := getBootstrapKeyspaceID()
return forwardStream.process(r.request.GetHeader().GetClusterId(), count,
keyspaceID, constant.DefaultKeyspaceGroupID)
}
Expand Down