diff --git a/pkg/core/store.go b/pkg/core/store.go index fc9f5726712..428f88e4cf9 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -165,6 +165,22 @@ func (s *StoreInfo) IsTiFlash() bool { return IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) } +<<<<<<< HEAD +======= +// IsTiFlashCompute returns true if the store is TiFlash compute node. +func (s *StoreInfo) IsTiFlashCompute() bool { + return IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlashCompute) +} + +// Engine returns the engine type of the store. +func (s *StoreInfo) Engine() string { + if s.IsTiKV() { + return EngineTiKV + } + return EngineTiFlash +} + +>>>>>>> c87b73a1e0 (statistics: delete cluster status metrics on store deletion (#9945)) // IsUp returns true if store is serving or preparing. func (s *StoreInfo) IsUp() bool { return s.IsServing() || s.IsPreparing() diff --git a/pkg/mcs/router/server/meta/watcher.go b/pkg/mcs/router/server/meta/watcher.go new file mode 100644 index 00000000000..fe591680e93 --- /dev/null +++ b/pkg/mcs/router/server/meta/watcher.go @@ -0,0 +1,128 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meta + +import ( + "context" + "strconv" + "sync" + + "github.com/gogo/protobuf/proto" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" +) + +// Watcher is used to watch the PD for any meta changes. +type Watcher struct { + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + + etcdClient *clientv3.Client + basicCluster *core.BasicCluster + storeWatcher *etcdutil.LoopWatcher +} + +// NewWatcher creates a new watcher to watch the meta change from PD. +func NewWatcher( + ctx context.Context, + etcdClient *clientv3.Client, + basicCluster *core.BasicCluster, +) (*Watcher, error) { + ctx, cancel := context.WithCancel(ctx) + w := &Watcher{ + ctx: ctx, + cancel: cancel, + etcdClient: etcdClient, + basicCluster: basicCluster, + } + err := w.initializeStoreWatcher() + if err != nil { + return nil, err + } + return w, nil +} + +func (w *Watcher) initializeStoreWatcher() error { + putFn := func(kv *mvccpb.KeyValue) error { + store := &metapb.Store{} + if err := proto.Unmarshal(kv.Value, store); err != nil { + log.Warn("failed to unmarshal store entry", + zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + return err + } + log.Debug("update store meta", zap.Stringer("store", store)) + origin := w.basicCluster.GetStore(store.GetId()) + if origin == nil { + w.basicCluster.PutStore(core.NewStoreInfo(store)) + } else { + w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store))) + } + + if store.GetNodeState() == metapb.NodeState_Removed { + statistics.ResetStoreStatistics(store.GetAddress(), strconv.FormatUint(store.GetId(), 10)) + // TODO: remove hot stats + } + + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + key := string(kv.Key) + storeID, err := keypath.ExtractStoreIDFromPath(key) + if err != nil { + return err + } + origin := w.basicCluster.GetStore(storeID) + if origin != nil { + statistics.DeleteClusterStatusMetrics(origin) + w.basicCluster.DeleteStore(origin) + log.Info("delete store meta", zap.Uint64("store-id", storeID)) + } + return nil + } + w.storeWatcher = etcdutil.NewLoopWatcher( + w.ctx, &w.wg, + w.etcdClient, + "router-store-watcher", + // Watch meta store proto + keypath.StorePathPrefix(), + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, + true, /* withPrefix */ + ) + w.storeWatcher.StartWatchLoop() + return w.storeWatcher.WaitLoad() +} + +// Close closes the watcher. +func (w *Watcher) Close() { + w.cancel() + w.wg.Wait() +} + +// GetStoreWatcher returns the store watcher. +func (w *Watcher) GetStoreWatcher() *etcdutil.LoopWatcher { + return w.storeWatcher +} diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 3dbd0fc8c92..efca0d24f3b 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -92,6 +92,7 @@ func (w *Watcher) initializeStoreWatcher() error { } origin := w.basicCluster.GetStore(storeID) if origin != nil { + statistics.DeleteClusterStatusMetrics(origin) w.basicCluster.DeleteStore(origin) } return nil diff --git a/pkg/statistics/metrics.go b/pkg/statistics/metrics.go index a5ea07f4f55..7888242be8c 100644 --- a/pkg/statistics/metrics.go +++ b/pkg/statistics/metrics.go @@ -14,7 +14,13 @@ package statistics -import "github.com/prometheus/client_golang/prometheus" +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/tikv/pd/pkg/core" +) var ( hotCacheStatusGauge = prometheus.NewGaugeVec( @@ -197,3 +203,11 @@ func init() { prometheus.MustRegister(hotCacheFlowQueueStatusGauge) prometheus.MustRegister(hotPeerSummary) } + +// DeleteClusterStatusMetrics deletes the cluster status metrics of a store. +func DeleteClusterStatusMetrics(store *core.StoreInfo) { + engine := store.Engine() + for _, status := range storeStatuses { + clusterStatusGauge.DeleteLabelValues(status, engine, strconv.FormatUint(store.GetID(), 10)) + } +} diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index dcdd77d9112..a74c739f589 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -31,6 +31,21 @@ const ( labelType = "label" ) +var storeStatuses = []string{ + clusterStatusStoreUpCount, + clusterStatusStoreDisconnectedCount, + clusterStatusStoreSlowCount, + clusterStatusStoreDownCount, + clusterStatusStoreUnhealthCount, + clusterStatusStoreOfflineCount, + clusterStatusStoreTombstoneCount, + clusterStatusStoreLowSpaceCount, + clusterStatusStorePreparingCount, + clusterStatusStoreServingCount, + clusterStatusStoreRemovingCount, + clusterStatusStoreRemovedCount, +} + type storeStatistics struct { opt config.ConfProvider Up int @@ -75,6 +90,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { } storeAddress := store.GetAddress() id := strconv.FormatUint(store.GetID(), 10) +<<<<<<< HEAD // Store state. isDown := false switch store.GetNodeState() { @@ -102,6 +118,15 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { case metapb.NodeState_Removed: s.Tombstone++ s.Removed++ +======= + engine := store.Engine() + storeStatusStats := s.observeStoreStatus(store) + for statusType, value := range storeStatusStats { + clusterStatusGauge.WithLabelValues(statusType, engine, id).Set(value) + } + // skip tombstone store avoid to overwrite metrics + if store.GetNodeState() == metapb.NodeState_Removed { +>>>>>>> c87b73a1e0 (statistics: delete cluster status metrics on store deletion (#9945)) return } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 9033b33d2ec..8fafe774a87 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2170,7 +2170,12 @@ func (c *RaftCluster) deleteStore(store *core.StoreInfo) error { return err } } +<<<<<<< HEAD c.core.DeleteStore(store) +======= + statistics.DeleteClusterStatusMetrics(store) + c.DeleteStore(store) +>>>>>>> c87b73a1e0 (statistics: delete cluster status metrics on store deletion (#9945)) return nil }