Skip to content

Commit 97419bc

Browse files
committed
api: add affinity forwarding and watcher support (tikv#10042)
Signed-off-by: lhy1024 <admin@liudos.us>
1 parent 2fba0b0 commit 97419bc

File tree

14 files changed

+1060
-80
lines changed

14 files changed

+1060
-80
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package affinity
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"path"
21+
"strings"
22+
"sync"
23+
24+
"go.etcd.io/etcd/api/v3/mvccpb"
25+
clientv3 "go.etcd.io/etcd/client/v3"
26+
"go.uber.org/zap"
27+
28+
"github.com/pingcap/log"
29+
30+
"github.com/tikv/pd/pkg/schedule/affinity"
31+
"github.com/tikv/pd/pkg/schedule/labeler"
32+
"github.com/tikv/pd/pkg/utils/etcdutil"
33+
"github.com/tikv/pd/pkg/utils/keypath"
34+
)
35+
36+
// Watcher is used to watch the affinity group and label rule changes from PD.
37+
// It watches two paths:
38+
// affinityGroupsPrefix:
39+
// - Key: /pd/{cluster_id}/affinity_groups/{group_id}
40+
// - Value: affinity.Group
41+
//
42+
// regionLabelPathPrefix:
43+
// - Key:
44+
// - Value: labeler.LabelRule
45+
// - Filtered to only process rules with ID prefix "affinity_group/"
46+
type Watcher struct {
47+
ctx context.Context
48+
cancel context.CancelFunc
49+
wg sync.WaitGroup
50+
etcdClient *clientv3.Client
51+
52+
// affinityManager is used to manage the affinity groups in the scheduling server.
53+
affinityManager *affinity.Manager
54+
55+
regionLabelPathPrefix string
56+
affinityGroupsPathPrefix string
57+
affinityLabelRulesPathPrefix string
58+
59+
groupWatcher *etcdutil.LoopWatcher
60+
labelWatcher *etcdutil.LoopWatcher
61+
}
62+
63+
// NewWatcher creates a new watcher to watch the affinity changes from PD.
64+
func NewWatcher(
65+
ctx context.Context,
66+
etcdClient *clientv3.Client,
67+
affinityManager *affinity.Manager,
68+
) (*Watcher, error) {
69+
ctx, cancel := context.WithCancel(ctx)
70+
w := &Watcher{
71+
ctx: ctx,
72+
cancel: cancel,
73+
etcdClient: etcdClient,
74+
affinityManager: affinityManager,
75+
regionLabelPathPrefix: keypath.RegionLabelPathPrefix(),
76+
affinityGroupsPathPrefix: keypath.AffinityGroupsPathPrefix(),
77+
}
78+
w.affinityLabelRulesPathPrefix = path.Join(w.regionLabelPathPrefix, affinity.LabelRuleIDPrefix)
79+
err := w.initializeGroupWatcher()
80+
if err != nil {
81+
w.Close()
82+
return nil, err
83+
}
84+
err = w.initializeAffinityLabelWatcher()
85+
if err != nil {
86+
w.Close()
87+
return nil, err
88+
}
89+
return w, nil
90+
}
91+
92+
// initializeGroupWatcher initializes the watcher for affinity group changes.
93+
func (w *Watcher) initializeGroupWatcher() error {
94+
putFn := func(kv *mvccpb.KeyValue) error {
95+
log.Info("update affinity group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
96+
group := &affinity.Group{}
97+
if err := json.Unmarshal(kv.Value, group); err != nil {
98+
log.Warn("failed to unmarshal affinity group", zap.String("key", string(kv.Key)), zap.Error(err))
99+
return err
100+
}
101+
w.affinityManager.SyncGroupFromEtcd(group)
102+
return nil
103+
}
104+
105+
deleteFn := func(kv *mvccpb.KeyValue) error {
106+
key := string(kv.Key)
107+
log.Info("delete affinity group", zap.String("key", key))
108+
groupID := strings.TrimPrefix(key, w.affinityGroupsPathPrefix+"/")
109+
w.affinityManager.SyncGroupDeleteFromEtcd(groupID)
110+
return nil
111+
}
112+
113+
w.groupWatcher = etcdutil.NewLoopWatcher(
114+
w.ctx, &w.wg,
115+
w.etcdClient,
116+
"scheduling-affinity-group-watcher",
117+
w.affinityGroupsPathPrefix,
118+
func([]*clientv3.Event) error { return nil },
119+
putFn, deleteFn,
120+
func([]*clientv3.Event) error { return nil },
121+
true, /* withPrefix */
122+
)
123+
w.groupWatcher.StartWatchLoop()
124+
return w.groupWatcher.WaitLoad()
125+
}
126+
127+
// initializeAffinityLabelWatcher initializes the watcher for affinity label rule changes.
128+
// It watches the region_label path but only processes rules with affinity group prefix.
129+
func (w *Watcher) initializeAffinityLabelWatcher() error {
130+
// Note: labelWatcher does not need preEventsFn/postEventsFn locking
131+
// because the sync methods will handle locking internally
132+
preEventsFn := func([]*clientv3.Event) error {
133+
return nil
134+
}
135+
136+
putFn := func(kv *mvccpb.KeyValue) error {
137+
key := string(kv.Key)
138+
log.Info("update affinity label rule", zap.String("key", key), zap.String("value", string(kv.Value)))
139+
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
140+
if err != nil {
141+
log.Warn("failed to unmarshal affinity label rule", zap.String("key", key), zap.Error(err))
142+
return err
143+
}
144+
return w.affinityManager.SyncKeyRangesFromEtcd(rule)
145+
}
146+
147+
deleteFn := func(kv *mvccpb.KeyValue) error {
148+
key := string(kv.Key)
149+
log.Info("delete affinity label rule", zap.String("key", key))
150+
ruleID := strings.TrimPrefix(key, w.regionLabelPathPrefix+"/")
151+
w.affinityManager.SyncKeyRangesDeleteFromEtcd(ruleID)
152+
return nil
153+
}
154+
155+
postEventsFn := func([]*clientv3.Event) error {
156+
return nil
157+
}
158+
159+
w.labelWatcher = etcdutil.NewLoopWatcher(
160+
w.ctx, &w.wg,
161+
w.etcdClient,
162+
"scheduling-affinity-label-watcher",
163+
w.affinityLabelRulesPathPrefix, // Filter: only process affinity group label rules
164+
preEventsFn,
165+
putFn, deleteFn,
166+
postEventsFn,
167+
true, /* withPrefix */
168+
)
169+
w.labelWatcher.StartWatchLoop()
170+
return w.labelWatcher.WaitLoad()
171+
}
172+
173+
// Close closes the watcher.
174+
func (w *Watcher) Close() {
175+
w.cancel()
176+
w.wg.Wait()
177+
}

0 commit comments

Comments
 (0)