Skip to content

Commit c099589

Browse files
okJiangti-chi-bot
authored andcommitted
This is an automated cherry-pick of #9945
close #9942 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
1 parent 9b18ac3 commit c099589

File tree

6 files changed

+190
-1
lines changed

6 files changed

+190
-1
lines changed

pkg/core/store.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,22 @@ func (s *StoreInfo) IsTiFlash() bool {
165165
return IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash)
166166
}
167167

168+
<<<<<<< HEAD
169+
=======
170+
// IsTiFlashCompute returns true if the store is TiFlash compute node.
171+
func (s *StoreInfo) IsTiFlashCompute() bool {
172+
return IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlashCompute)
173+
}
174+
175+
// Engine returns the engine type of the store.
176+
func (s *StoreInfo) Engine() string {
177+
if s.IsTiKV() {
178+
return EngineTiKV
179+
}
180+
return EngineTiFlash
181+
}
182+
183+
>>>>>>> c87b73a1e0 (statistics: delete cluster status metrics on store deletion (#9945))
168184
// IsUp returns true if store is serving or preparing.
169185
func (s *StoreInfo) IsUp() bool {
170186
return s.IsServing() || s.IsPreparing()
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package meta
16+
17+
import (
18+
"context"
19+
"strconv"
20+
"sync"
21+
22+
"github.com/gogo/protobuf/proto"
23+
"go.etcd.io/etcd/api/v3/mvccpb"
24+
clientv3 "go.etcd.io/etcd/client/v3"
25+
"go.uber.org/zap"
26+
27+
"github.com/pingcap/kvproto/pkg/metapb"
28+
"github.com/pingcap/log"
29+
30+
"github.com/tikv/pd/pkg/core"
31+
"github.com/tikv/pd/pkg/statistics"
32+
"github.com/tikv/pd/pkg/utils/etcdutil"
33+
"github.com/tikv/pd/pkg/utils/keypath"
34+
)
35+
36+
// Watcher is used to watch the PD for any meta changes.
37+
type Watcher struct {
38+
wg sync.WaitGroup
39+
ctx context.Context
40+
cancel context.CancelFunc
41+
42+
etcdClient *clientv3.Client
43+
basicCluster *core.BasicCluster
44+
storeWatcher *etcdutil.LoopWatcher
45+
}
46+
47+
// NewWatcher creates a new watcher to watch the meta change from PD.
48+
func NewWatcher(
49+
ctx context.Context,
50+
etcdClient *clientv3.Client,
51+
basicCluster *core.BasicCluster,
52+
) (*Watcher, error) {
53+
ctx, cancel := context.WithCancel(ctx)
54+
w := &Watcher{
55+
ctx: ctx,
56+
cancel: cancel,
57+
etcdClient: etcdClient,
58+
basicCluster: basicCluster,
59+
}
60+
err := w.initializeStoreWatcher()
61+
if err != nil {
62+
return nil, err
63+
}
64+
return w, nil
65+
}
66+
67+
func (w *Watcher) initializeStoreWatcher() error {
68+
putFn := func(kv *mvccpb.KeyValue) error {
69+
store := &metapb.Store{}
70+
if err := proto.Unmarshal(kv.Value, store); err != nil {
71+
log.Warn("failed to unmarshal store entry",
72+
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
73+
return err
74+
}
75+
log.Debug("update store meta", zap.Stringer("store", store))
76+
origin := w.basicCluster.GetStore(store.GetId())
77+
if origin == nil {
78+
w.basicCluster.PutStore(core.NewStoreInfo(store))
79+
} else {
80+
w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store)))
81+
}
82+
83+
if store.GetNodeState() == metapb.NodeState_Removed {
84+
statistics.ResetStoreStatistics(store.GetAddress(), strconv.FormatUint(store.GetId(), 10))
85+
// TODO: remove hot stats
86+
}
87+
88+
return nil
89+
}
90+
deleteFn := func(kv *mvccpb.KeyValue) error {
91+
key := string(kv.Key)
92+
storeID, err := keypath.ExtractStoreIDFromPath(key)
93+
if err != nil {
94+
return err
95+
}
96+
origin := w.basicCluster.GetStore(storeID)
97+
if origin != nil {
98+
statistics.DeleteClusterStatusMetrics(origin)
99+
w.basicCluster.DeleteStore(origin)
100+
log.Info("delete store meta", zap.Uint64("store-id", storeID))
101+
}
102+
return nil
103+
}
104+
w.storeWatcher = etcdutil.NewLoopWatcher(
105+
w.ctx, &w.wg,
106+
w.etcdClient,
107+
"router-store-watcher",
108+
// Watch meta store proto
109+
keypath.StorePathPrefix(),
110+
func([]*clientv3.Event) error { return nil },
111+
putFn, deleteFn,
112+
func([]*clientv3.Event) error { return nil },
113+
true, /* withPrefix */
114+
)
115+
w.storeWatcher.StartWatchLoop()
116+
return w.storeWatcher.WaitLoad()
117+
}
118+
119+
// Close closes the watcher.
120+
func (w *Watcher) Close() {
121+
w.cancel()
122+
w.wg.Wait()
123+
}
124+
125+
// GetStoreWatcher returns the store watcher.
126+
func (w *Watcher) GetStoreWatcher() *etcdutil.LoopWatcher {
127+
return w.storeWatcher
128+
}

pkg/mcs/scheduling/server/meta/watcher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ func (w *Watcher) initializeStoreWatcher() error {
101101
}
102102
origin := w.basicCluster.GetStore(storeID)
103103
if origin != nil {
104+
statistics.DeleteClusterStatusMetrics(origin)
104105
w.basicCluster.DeleteStore(origin)
105106
log.Info("delete store meta", zap.Uint64("store-id", storeID))
106107
}

pkg/statistics/metrics.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@
1414

1515
package statistics
1616

17-
import "github.com/prometheus/client_golang/prometheus"
17+
import (
18+
"strconv"
19+
20+
"github.com/prometheus/client_golang/prometheus"
21+
22+
"github.com/tikv/pd/pkg/core"
23+
)
1824

1925
var (
2026
hotCacheStatusGauge = prometheus.NewGaugeVec(
@@ -197,3 +203,11 @@ func init() {
197203
prometheus.MustRegister(hotCacheFlowQueueStatusGauge)
198204
prometheus.MustRegister(hotPeerSummary)
199205
}
206+
207+
// DeleteClusterStatusMetrics deletes the cluster status metrics of a store.
208+
func DeleteClusterStatusMetrics(store *core.StoreInfo) {
209+
engine := store.Engine()
210+
for _, status := range storeStatuses {
211+
clusterStatusGauge.DeleteLabelValues(status, engine, strconv.FormatUint(store.GetID(), 10))
212+
}
213+
}

pkg/statistics/store_collection.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,21 @@ const (
3131
labelType = "label"
3232
)
3333

34+
var storeStatuses = []string{
35+
clusterStatusStoreUpCount,
36+
clusterStatusStoreDisconnectedCount,
37+
clusterStatusStoreSlowCount,
38+
clusterStatusStoreDownCount,
39+
clusterStatusStoreUnhealthCount,
40+
clusterStatusStoreOfflineCount,
41+
clusterStatusStoreTombstoneCount,
42+
clusterStatusStoreLowSpaceCount,
43+
clusterStatusStorePreparingCount,
44+
clusterStatusStoreServingCount,
45+
clusterStatusStoreRemovingCount,
46+
clusterStatusStoreRemovedCount,
47+
}
48+
3449
type storeStatistics struct {
3550
opt config.ConfProvider
3651
Up int
@@ -75,6 +90,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) {
7590
}
7691
storeAddress := store.GetAddress()
7792
id := strconv.FormatUint(store.GetID(), 10)
93+
<<<<<<< HEAD
7894
// Store state.
7995
isDown := false
8096
switch store.GetNodeState() {
@@ -102,6 +118,15 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) {
102118
case metapb.NodeState_Removed:
103119
s.Tombstone++
104120
s.Removed++
121+
=======
122+
engine := store.Engine()
123+
storeStatusStats := s.observeStoreStatus(store)
124+
for statusType, value := range storeStatusStats {
125+
clusterStatusGauge.WithLabelValues(statusType, engine, id).Set(value)
126+
}
127+
// skip tombstone store avoid to overwrite metrics
128+
if store.GetNodeState() == metapb.NodeState_Removed {
129+
>>>>>>> c87b73a1e0 (statistics: delete cluster status metrics on store deletion (#9945))
105130
return
106131
}
107132

server/cluster/cluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2057,7 +2057,12 @@ func (c *RaftCluster) deleteStore(store *core.StoreInfo) error {
20572057
return err
20582058
}
20592059
}
2060+
<<<<<<< HEAD
20602061
c.core.DeleteStore(store)
2062+
=======
2063+
statistics.DeleteClusterStatusMetrics(store)
2064+
c.DeleteStore(store)
2065+
>>>>>>> c87b73a1e0 (statistics: delete cluster status metrics on store deletion (#9945))
20612066
return nil
20622067
}
20632068

0 commit comments

Comments
 (0)