Skip to content

Commit 2b5dac6

Browse files
lhy1024ti-chi-bot
authored andcommitted
This is an automated cherry-pick of #10042
ref #9764 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
1 parent 9c9a8c8 commit 2b5dac6

File tree

13 files changed

+1114
-44
lines changed

13 files changed

+1114
-44
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Copyright 2026 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package affinity
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"strings"
21+
"sync"
22+
23+
"go.etcd.io/etcd/api/v3/mvccpb"
24+
clientv3 "go.etcd.io/etcd/client/v3"
25+
"go.uber.org/zap"
26+
27+
"github.com/pingcap/log"
28+
29+
"github.com/tikv/pd/pkg/schedule/affinity"
30+
"github.com/tikv/pd/pkg/schedule/labeler"
31+
"github.com/tikv/pd/pkg/utils/etcdutil"
32+
"github.com/tikv/pd/pkg/utils/keypath"
33+
)
34+
35+
// Watcher is used to watch the affinity group and label rule changes from PD.
36+
// It watches two paths:
37+
// affinityGroupsPrefix:
38+
// - Key: /pd/{cluster_id}/affinity_groups/{group_id}
39+
// - Value: affinity.Group
40+
//
41+
// regionLabelPathPrefix:
42+
// - Key: /pd/{cluster_id}/region_label/affinity_group/{group_id}
43+
// - Value: labeler.LabelRule
44+
// - Filtered to only process rules with ID prefix "affinity_group/"
45+
type Watcher struct {
46+
cancel context.CancelFunc
47+
wg *sync.WaitGroup
48+
etcdClient *clientv3.Client
49+
50+
// affinityManager is used to manage the affinity groups in the scheduling server.
51+
affinityManager *affinity.Manager
52+
53+
groupWatcher *etcdutil.LoopWatcher
54+
labelWatcher *etcdutil.LoopWatcher
55+
}
56+
57+
// NewWatcher creates a new watcher to watch the affinity changes from PD.
58+
func NewWatcher(
59+
ctx context.Context,
60+
etcdClient *clientv3.Client,
61+
affinityManager *affinity.Manager,
62+
) (*Watcher, error) {
63+
ctx, cancel := context.WithCancel(ctx)
64+
w := &Watcher{
65+
cancel: cancel,
66+
wg: &sync.WaitGroup{},
67+
etcdClient: etcdClient,
68+
affinityManager: affinityManager,
69+
}
70+
err := w.initializeGroupWatcher(ctx)
71+
if err != nil {
72+
w.Close()
73+
return nil, err
74+
}
75+
err = w.initializeAffinityLabelWatcher(ctx)
76+
if err != nil {
77+
w.Close()
78+
return nil, err
79+
}
80+
return w, nil
81+
}
82+
83+
// initializeGroupWatcher initializes the watcher for affinity group changes.
84+
func (w *Watcher) initializeGroupWatcher(ctx context.Context) error {
85+
putFn := func(kv *mvccpb.KeyValue) error {
86+
log.Info("update affinity group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
87+
group := &affinity.Group{}
88+
if err := json.Unmarshal(kv.Value, group); err != nil {
89+
log.Warn("failed to unmarshal affinity group", zap.String("key", string(kv.Key)), zap.Error(err))
90+
return err
91+
}
92+
w.affinityManager.SyncGroupFromEtcd(group)
93+
return nil
94+
}
95+
96+
deleteFn := func(kv *mvccpb.KeyValue) error {
97+
key := string(kv.Key)
98+
log.Info("delete affinity group", zap.String("key", key))
99+
groupID := strings.TrimPrefix(key, keypath.AffinityGroupsPrefix())
100+
w.affinityManager.SyncGroupDeleteFromEtcd(groupID)
101+
return nil
102+
}
103+
104+
w.groupWatcher = etcdutil.NewLoopWatcher(
105+
ctx, w.wg,
106+
w.etcdClient,
107+
"scheduling-affinity-group-watcher",
108+
strings.TrimSuffix(keypath.AffinityGroupsPrefix(), "/"),
109+
func([]*clientv3.Event) error { return nil },
110+
putFn, deleteFn,
111+
func([]*clientv3.Event) error { return nil },
112+
true, /* withPrefix */
113+
)
114+
w.groupWatcher.StartWatchLoop()
115+
return w.groupWatcher.WaitLoad()
116+
}
117+
118+
// initializeAffinityLabelWatcher initializes the watcher for affinity label rule changes.
119+
// It watches the region_label path but only processes rules with affinity group prefix.
120+
func (w *Watcher) initializeAffinityLabelWatcher(ctx context.Context) error {
121+
// Note: labelWatcher does not need preEventsFn/postEventsFn locking
122+
// because the sync methods will handle locking internally
123+
124+
putFn := func(kv *mvccpb.KeyValue) error {
125+
key := string(kv.Key)
126+
log.Info("update affinity label rule", zap.String("key", key), zap.String("value", string(kv.Value)))
127+
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
128+
if err != nil {
129+
log.Warn("failed to unmarshal affinity label rule", zap.String("key", key), zap.Error(err))
130+
return err
131+
}
132+
return w.affinityManager.SyncKeyRangesFromEtcd(rule)
133+
}
134+
135+
deleteFn := func(kv *mvccpb.KeyValue) error {
136+
key := string(kv.Key)
137+
log.Info("delete affinity label rule", zap.String("key", key))
138+
ruleID := strings.TrimPrefix(key, keypath.RegionLabelPathPrefix())
139+
w.affinityManager.SyncKeyRangesDeleteFromEtcd(ruleID)
140+
return nil
141+
}
142+
143+
w.labelWatcher = etcdutil.NewLoopWatcher(
144+
ctx, w.wg,
145+
w.etcdClient,
146+
"scheduling-affinity-label-watcher",
147+
strings.TrimSuffix(keypath.RegionLabelKeyPath(affinity.LabelRuleIDPrefix), "/"), // Filter: only process affinity group label rules
148+
func([]*clientv3.Event) error { return nil },
149+
putFn, deleteFn,
150+
func([]*clientv3.Event) error { return nil },
151+
true, /* withPrefix */
152+
)
153+
w.labelWatcher.StartWatchLoop()
154+
return w.labelWatcher.WaitLoad()
155+
}
156+
157+
// Close closes the watcher.
158+
func (w *Watcher) Close() {
159+
if w.cancel != nil {
160+
w.cancel()
161+
}
162+
if w.wg != nil {
163+
w.wg.Wait()
164+
}
165+
}

0 commit comments

Comments
 (0)