Skip to content

Commit 0b85d76

Browse files
committed
api: add affinity forwarding and watcher support (tikv#10042)
Signed-off-by: lhy1024 <admin@liudos.us>
1 parent 2fba0b0 commit 0b85d76

File tree

14 files changed

+1052
-80
lines changed

14 files changed

+1052
-80
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package affinity
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"strings"
21+
"sync"
22+
23+
"go.etcd.io/etcd/api/v3/mvccpb"
24+
clientv3 "go.etcd.io/etcd/client/v3"
25+
"go.uber.org/zap"
26+
27+
"github.com/pingcap/log"
28+
29+
"github.com/tikv/pd/pkg/schedule/affinity"
30+
"github.com/tikv/pd/pkg/schedule/labeler"
31+
"github.com/tikv/pd/pkg/utils/etcdutil"
32+
"github.com/tikv/pd/pkg/utils/keypath"
33+
)
34+
35+
// Watcher is used to watch the affinity group and label rule changes from PD.
36+
// It watches two paths:
37+
// affinityGroupsPrefix:
38+
// - Key: /pd/{cluster_id}/affinity_groups/{group_id}
39+
// - Value: affinity.Group
40+
//
41+
// regionLabelPathPrefix:
42+
// - Key:
43+
// - Value: labeler.LabelRule
44+
// - Filtered to only process rules with ID prefix "affinity_group/"
45+
type Watcher struct {
46+
ctx context.Context
47+
cancel context.CancelFunc
48+
wg sync.WaitGroup
49+
etcdClient *clientv3.Client
50+
51+
// affinityManager is used to manage the affinity groups in the scheduling server.
52+
affinityManager *affinity.Manager
53+
54+
groupWatcher *etcdutil.LoopWatcher
55+
labelWatcher *etcdutil.LoopWatcher
56+
}
57+
58+
// NewWatcher creates a new watcher to watch the affinity changes from PD.
59+
func NewWatcher(
60+
ctx context.Context,
61+
etcdClient *clientv3.Client,
62+
affinityManager *affinity.Manager,
63+
) (*Watcher, error) {
64+
ctx, cancel := context.WithCancel(ctx)
65+
w := &Watcher{
66+
ctx: ctx,
67+
cancel: cancel,
68+
etcdClient: etcdClient,
69+
affinityManager: affinityManager,
70+
}
71+
err := w.initializeGroupWatcher()
72+
if err != nil {
73+
w.Close()
74+
return nil, err
75+
}
76+
err = w.initializeAffinityLabelWatcher()
77+
if err != nil {
78+
w.Close()
79+
return nil, err
80+
}
81+
return w, nil
82+
}
83+
84+
// initializeGroupWatcher initializes the watcher for affinity group changes.
85+
func (w *Watcher) initializeGroupWatcher() error {
86+
putFn := func(kv *mvccpb.KeyValue) error {
87+
log.Info("update affinity group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
88+
group := &affinity.Group{}
89+
if err := json.Unmarshal(kv.Value, group); err != nil {
90+
log.Warn("failed to unmarshal affinity group", zap.String("key", string(kv.Key)), zap.Error(err))
91+
return err
92+
}
93+
w.affinityManager.SyncGroupFromEtcd(group)
94+
return nil
95+
}
96+
97+
deleteFn := func(kv *mvccpb.KeyValue) error {
98+
key := string(kv.Key)
99+
log.Info("delete affinity group", zap.String("key", key))
100+
groupID := strings.TrimPrefix(key, keypath.AffinityGroupsPathPrefix())
101+
w.affinityManager.SyncGroupDeleteFromEtcd(groupID)
102+
return nil
103+
}
104+
105+
w.groupWatcher = etcdutil.NewLoopWatcher(
106+
w.ctx, &w.wg,
107+
w.etcdClient,
108+
"scheduling-affinity-group-watcher",
109+
strings.TrimSuffix(keypath.AffinityGroupsPathPrefix(), "/"),
110+
func([]*clientv3.Event) error { return nil },
111+
putFn, deleteFn,
112+
func([]*clientv3.Event) error { return nil },
113+
true, /* withPrefix */
114+
)
115+
w.groupWatcher.StartWatchLoop()
116+
return w.groupWatcher.WaitLoad()
117+
}
118+
119+
// initializeAffinityLabelWatcher initializes the watcher for affinity label rule changes.
120+
// It watches the region_label path but only processes rules with affinity group prefix.
121+
func (w *Watcher) initializeAffinityLabelWatcher() error {
122+
// Note: labelWatcher does not need preEventsFn/postEventsFn locking
123+
// because the sync methods will handle locking internally
124+
preEventsFn := func([]*clientv3.Event) error {
125+
return nil
126+
}
127+
128+
putFn := func(kv *mvccpb.KeyValue) error {
129+
key := string(kv.Key)
130+
log.Info("update affinity label rule", zap.String("key", key), zap.String("value", string(kv.Value)))
131+
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
132+
if err != nil {
133+
log.Warn("failed to unmarshal affinity label rule", zap.String("key", key), zap.Error(err))
134+
return err
135+
}
136+
return w.affinityManager.SyncKeyRangesFromEtcd(rule)
137+
}
138+
139+
deleteFn := func(kv *mvccpb.KeyValue) error {
140+
key := string(kv.Key)
141+
log.Info("delete affinity label rule", zap.String("key", key))
142+
ruleID := strings.TrimPrefix(key, keypath.RegionLabelPathPrefix())
143+
w.affinityManager.SyncKeyRangesDeleteFromEtcd(ruleID)
144+
return nil
145+
}
146+
147+
postEventsFn := func([]*clientv3.Event) error {
148+
return nil
149+
}
150+
151+
w.labelWatcher = etcdutil.NewLoopWatcher(
152+
w.ctx, &w.wg,
153+
w.etcdClient,
154+
"scheduling-affinity-label-watcher",
155+
strings.TrimSuffix(keypath.RegionLabelKeyPath(affinity.LabelRuleIDPrefix), "/"), // Filter: only process affinity group label rules
156+
preEventsFn,
157+
putFn, deleteFn,
158+
postEventsFn,
159+
true, /* withPrefix */
160+
)
161+
w.labelWatcher.StartWatchLoop()
162+
return w.labelWatcher.WaitLoad()
163+
}
164+
165+
// Close closes the watcher.
166+
func (w *Watcher) Close() {
167+
w.cancel()
168+
w.wg.Wait()
169+
}

0 commit comments

Comments
 (0)