Skip to content

Commit 0720b5d

Browse files
committed
api: add affinity forwarding
Signed-off-by: lhy1024 <admin@liudos.us>
1 parent 0551820 commit 0720b5d

File tree

7 files changed

+391
-58
lines changed

7 files changed

+391
-58
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2025 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package apis
16+
17+
import (
18+
"net/http"
19+
20+
"github.com/gin-gonic/gin"
21+
22+
"github.com/tikv/pd/pkg/errs"
23+
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
24+
"github.com/tikv/pd/pkg/schedule/affinity"
25+
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
26+
)
27+
28+
// AffinityGroupsResponse defines the response payload for listing affinity groups.
29+
type AffinityGroupsResponse struct {
30+
AffinityGroups map[string]*affinity.GroupState `json:"affinity_groups"`
31+
}
32+
33+
// RegisterAffinityRouter registers affinity routes to the v1 API group.
34+
func (s *Service) RegisterAffinityRouter() {
35+
redirector := multiservicesapi.ServiceRedirector()
36+
router := s.root.Group("affinity-groups")
37+
router.Use(redirector)
38+
router.GET("", getAllAffinityGroups)
39+
router.GET("/:group_id", getAffinityGroup)
40+
}
41+
42+
func getAffinityManager(c *gin.Context) (*affinity.Manager, bool) {
43+
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
44+
if svr.IsClosed() {
45+
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrServerNotStarted.FastGenByArgs().Error())
46+
return nil, false
47+
}
48+
cluster := svr.GetCluster()
49+
if cluster == nil {
50+
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error())
51+
return nil, false
52+
}
53+
manager := cluster.GetAffinityManager()
54+
if manager == nil {
55+
c.AbortWithStatusJSON(http.StatusServiceUnavailable, errs.ErrAffinityInternal.FastGenByArgs().Error())
56+
return nil, false
57+
}
58+
return manager, true
59+
}
60+
61+
func getAllAffinityGroups(c *gin.Context) {
62+
manager, ok := getAffinityManager(c)
63+
if !ok {
64+
return
65+
}
66+
allGroupStates := manager.GetAllAffinityGroupStates()
67+
resp := AffinityGroupsResponse{
68+
AffinityGroups: make(map[string]*affinity.GroupState, len(allGroupStates)),
69+
}
70+
for _, state := range allGroupStates {
71+
resp.AffinityGroups[state.ID] = state
72+
}
73+
c.IndentedJSON(http.StatusOK, resp)
74+
}
75+
76+
func getAffinityGroup(c *gin.Context) {
77+
manager, ok := getAffinityManager(c)
78+
if !ok {
79+
return
80+
}
81+
groupID := c.Param("group_id")
82+
groupState, err := manager.CheckAndGetAffinityGroupState(groupID)
83+
if err != nil {
84+
switch {
85+
case errs.ErrInvalidGroupID.Equal(err):
86+
c.AbortWithStatusJSON(http.StatusBadRequest, err.Error())
87+
case errs.ErrAffinityGroupNotFound.Equal(err):
88+
c.AbortWithStatusJSON(http.StatusNotFound, err.Error())
89+
default:
90+
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
91+
}
92+
return
93+
}
94+
c.IndentedJSON(http.StatusOK, groupState)
95+
}

pkg/mcs/scheduling/server/apis/v1/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func NewService(srv *scheserver.Service) *Service {
127127
s.RegisterRegionsRouter()
128128
s.RegisterStoresRouter()
129129
s.RegisterPrimaryRouter()
130+
s.RegisterAffinityRouter()
130131
return s
131132
}
132133

pkg/utils/apiutil/serverapi/middleware.go

Lines changed: 70 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package serverapi
1616

1717
import (
18+
"context"
1819
"net/http"
1920
"net/url"
2021
"strings"
@@ -74,15 +75,16 @@ func IsServiceAllowed(s *server.Server, group apiutil.APIServiceGroup) bool {
7475
type redirector struct {
7576
s *server.Server
7677

77-
microserviceRedirectRules []*microserviceRedirectRule
78+
microserviceRedirectRules []RedirectRule
7879
}
7980

80-
type microserviceRedirectRule struct {
81-
matchPath string
82-
targetPath string
83-
targetServiceName string
84-
matchMethods []string
85-
filter func(*http.Request) bool
81+
// RedirectRule describes how to match and rewrite microservice paths.
82+
type RedirectRule struct {
83+
MatchPath string
84+
TargetPath string
85+
TargetServiceName string
86+
MatchMethods []string
87+
Filter func(*http.Request) bool
8688
}
8789

8890
// NewRedirector redirects request to the leader if needs to be handled in the leader.
@@ -101,24 +103,38 @@ type RedirectorOption func(*redirector)
101103
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
102104
methods []string, filters ...func(*http.Request) bool) RedirectorOption {
103105
return func(s *redirector) {
104-
rule := &microserviceRedirectRule{
105-
matchPath: matchPath,
106-
targetPath: targetPath,
107-
targetServiceName: targetServiceName,
108-
matchMethods: methods,
106+
rule := RedirectRule{
107+
MatchPath: matchPath,
108+
TargetPath: targetPath,
109+
TargetServiceName: targetServiceName,
110+
MatchMethods: methods,
109111
}
110112
if len(filters) > 0 {
111-
rule.filter = filters[0]
113+
rule.Filter = filters[0]
112114
}
113115
s.microserviceRedirectRules = append(s.microserviceRedirectRules, rule)
114116
}
115117
}
116118

117119
func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, string) {
118-
if !h.s.IsKeyspaceGroupEnabled() {
119-
return false, ""
120-
}
121-
if len(h.microserviceRedirectRules) == 0 {
120+
return MatchMicroserviceRedirect(
121+
r,
122+
h.microserviceRedirectRules,
123+
h.s.IsKeyspaceGroupEnabled(),
124+
h.s.IsServiceIndependent,
125+
h.s.GetServicePrimaryAddr)
126+
}
127+
128+
// MatchMicroserviceRedirect checks rules, rewrites path in-place, and returns (matched, targetAddr).
129+
// If matched but no primary is available, it returns matched=true with empty addr so caller can handle the redirect error.
130+
func MatchMicroserviceRedirect(
131+
r *http.Request,
132+
rules []RedirectRule,
133+
isKeyspaceGroupEnabled bool,
134+
isServiceIndependent func(string) bool,
135+
getPrimary func(context.Context, string) (string, bool),
136+
) (bool, string) {
137+
if !isKeyspaceGroupEnabled || len(rules) == 0 {
122138
return false, ""
123139
}
124140
if r.Header.Get(apiutil.XForbiddenForwardToMicroserviceHeader) == "true" {
@@ -127,48 +143,46 @@ func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, stri
127143
// Remove trailing '/' from the URL path
128144
// It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}"
129145
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
130-
for _, rule := range h.microserviceRedirectRules {
146+
for _, rule := range rules {
131147
// Now we only support checking the scheduling service whether it is independent
132-
if rule.targetServiceName == constant.SchedulingServiceName {
133-
if !h.s.IsServiceIndependent(constant.SchedulingServiceName) {
134-
continue
135-
}
148+
if rule.TargetServiceName == constant.SchedulingServiceName && !isServiceIndependent(constant.SchedulingServiceName) {
149+
continue
136150
}
137-
if strings.HasPrefix(r.URL.Path, rule.matchPath) &&
138-
slice.Contains(rule.matchMethods, r.Method) {
139-
if rule.filter != nil && !rule.filter(r) {
140-
continue
141-
}
142-
// we check the service primary addr here,
143-
// if the service is not available, we will return ErrRedirect by returning an empty addr.
144-
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
145-
if !ok || addr == "" {
146-
log.Warn("failed to get the service primary addr when trying to match redirect rules",
147-
zap.String("path", r.URL.Path))
148-
return true, ""
149-
}
150-
// If the URL contains escaped characters, use RawPath instead of Path
151-
origin := r.URL.Path
152-
path := r.URL.Path
153-
if r.URL.RawPath != "" {
154-
path = r.URL.RawPath
155-
}
156-
// Extract parameters from the URL path
157-
// e.g. r.URL.Path = /pd/api/v1/operators/1 (before redirect)
158-
// matchPath = /pd/api/v1/operators
159-
// targetPath = /scheduling/api/v1/operators
160-
// r.URL.Path = /scheduling/api/v1/operator/1 (after redirect)
161-
pathParams := strings.TrimPrefix(path, rule.matchPath)
162-
pathParams = strings.Trim(pathParams, "/") // Remove leading and trailing '/'
163-
if len(pathParams) > 0 {
164-
r.URL.Path = rule.targetPath + "/" + pathParams
165-
} else {
166-
r.URL.Path = rule.targetPath
167-
}
168-
log.Debug("redirect to microservice", zap.String("path", r.URL.Path), zap.String("origin-path", origin),
169-
zap.String("target", addr), zap.String("method", r.Method))
170-
return true, addr
151+
if !strings.HasPrefix(r.URL.Path, rule.MatchPath) || !slice.Contains(rule.MatchMethods, r.Method) {
152+
continue
153+
}
154+
if rule.Filter != nil && !rule.Filter(r) {
155+
continue
156+
}
157+
// we check the service primary addr here,
158+
// if the service is not available, we will return ErrRedirect by returning an empty addr.
159+
addr, ok := getPrimary(r.Context(), rule.TargetServiceName)
160+
if !ok || addr == "" {
161+
log.Warn("failed to get the service primary addr when trying to match redirect rules",
162+
zap.String("path", r.URL.Path))
163+
return true, ""
164+
}
165+
// If the URL contains escaped characters, use RawPath instead of Path
166+
origin := r.URL.Path
167+
path := r.URL.Path
168+
if r.URL.RawPath != "" {
169+
path = r.URL.RawPath
170+
}
171+
// Extract parameters from the URL path
172+
// e.g. r.URL.Path = /pd/api/v1/operators/1 (before redirect)
173+
// matchPath = /pd/api/v1/operators
174+
// targetPath = /scheduling/api/v1/operators
175+
// r.URL.Path = /scheduling/api/v1/operator/1 (after redirect)
176+
pathParams := strings.TrimPrefix(path, rule.MatchPath)
177+
pathParams = strings.Trim(pathParams, "/") // Remove leading and trailing '/'
178+
if len(pathParams) > 0 {
179+
r.URL.Path = rule.TargetPath + "/" + pathParams
180+
} else {
181+
r.URL.Path = rule.TargetPath
171182
}
183+
log.Debug("redirect to microservice", zap.String("path", r.URL.Path), zap.String("origin-path", origin),
184+
zap.String("target", addr), zap.String("method", r.Method))
185+
return true, addr
172186
}
173187
return false, ""
174188
}

0 commit comments

Comments
 (0)