Skip to content

Commit 64d7903

Browse files
committed
api: add affinity forwarding
Signed-off-by: lhy1024 <admin@liudos.us>
1 parent 0551820 commit 64d7903

File tree

6 files changed

+380
-58
lines changed

6 files changed

+380
-58
lines changed

pkg/mcs/scheduling/server/apis/v1/api.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
4141
"github.com/tikv/pd/pkg/mcs/utils/constant"
4242
"github.com/tikv/pd/pkg/response"
43+
"github.com/tikv/pd/pkg/schedule/affinity"
4344
sche "github.com/tikv/pd/pkg/schedule/core"
4445
"github.com/tikv/pd/pkg/schedule/handler"
4546
"github.com/tikv/pd/pkg/schedule/operator"
@@ -85,6 +86,11 @@ type server struct {
8586
*scheserver.Server
8687
}
8788

89+
// AffinityGroupsResponse defines the response payload for listing affinity groups.
90+
type AffinityGroupsResponse struct {
91+
AffinityGroups map[string]*affinity.GroupState `json:"affinity_groups"`
92+
}
93+
8894
// GetCluster returns the cluster.
8995
func (s *server) GetCluster() sche.SchedulerCluster {
9096
return s.Server.GetCluster()
@@ -127,6 +133,7 @@ func NewService(srv *scheserver.Service) *Service {
127133
s.RegisterRegionsRouter()
128134
s.RegisterStoresRouter()
129135
s.RegisterPrimaryRouter()
136+
s.RegisterAffinityRouter()
130137
return s
131138
}
132139

@@ -257,6 +264,15 @@ func (s *Service) RegisterPrimaryRouter() {
257264
router.POST("transfer", transferPrimary)
258265
}
259266

267+
// RegisterAffinityRouter registers affinity routes to the v1 API group.
268+
func (s *Service) RegisterAffinityRouter() {
269+
redirector := multiservicesapi.ServiceRedirector()
270+
router := s.root.Group("affinity-groups")
271+
router.Use(redirector)
272+
router.GET("", getAllAffinityGroups)
273+
router.GET("/:group_id", getAffinityGroup)
274+
}
275+
260276
// getHealth returns the health status of the TSO service.
261277
func getHealth(c *gin.Context) {
262278
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
@@ -1597,3 +1613,72 @@ func transferPrimary(c *gin.Context) {
15971613
}
15981614
c.IndentedJSON(http.StatusOK, "success")
15991615
}
1616+
1617+
func getAffinityManager(c *gin.Context) (*affinity.Manager, bool) {
1618+
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
1619+
if svr.IsClosed() {
1620+
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrServerNotStarted.FastGenByArgs().Error())
1621+
return nil, false
1622+
}
1623+
cluster := svr.GetCluster()
1624+
if cluster == nil {
1625+
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error())
1626+
return nil, false
1627+
}
1628+
manager := cluster.GetAffinityManager()
1629+
if manager == nil {
1630+
c.AbortWithStatusJSON(http.StatusServiceUnavailable, errs.ErrAffinityInternal.FastGenByArgs().Error())
1631+
return nil, false
1632+
}
1633+
return manager, true
1634+
}
1635+
1636+
// @Tags affinity-groups
1637+
// @Summary List all affinity groups.
1638+
// @Produce json
1639+
// @Success 200 {object} AffinityGroupsResponse
1640+
// @Failure 500 {string} string "PD server failed to proceed the request."
1641+
// @Router /affinity-groups [get]
1642+
func getAllAffinityGroups(c *gin.Context) {
1643+
manager, ok := getAffinityManager(c)
1644+
if !ok {
1645+
return
1646+
}
1647+
allGroupStates := manager.GetAllAffinityGroupStates()
1648+
resp := AffinityGroupsResponse{
1649+
AffinityGroups: make(map[string]*affinity.GroupState, len(allGroupStates)),
1650+
}
1651+
for _, state := range allGroupStates {
1652+
resp.AffinityGroups[state.ID] = state
1653+
}
1654+
c.IndentedJSON(http.StatusOK, resp)
1655+
}
1656+
1657+
// @Tags affinity-groups
1658+
// @Summary Get an affinity group by group id.
1659+
// @Param group_id path string true "The group id of the affinity group"
1660+
// @Produce json
1661+
// @Success 200 {object} *affinity.GroupState
1662+
// @Failure 404 {string} string "Affinity group not found."
1663+
// @Failure 500 {string} string "PD server failed to proceed the request."
1664+
// @Router /affinity-groups/{group_id} [get]
1665+
func getAffinityGroup(c *gin.Context) {
1666+
manager, ok := getAffinityManager(c)
1667+
if !ok {
1668+
return
1669+
}
1670+
groupID := c.Param("group_id")
1671+
groupState, err := manager.CheckAndGetAffinityGroupState(groupID)
1672+
if err != nil {
1673+
switch {
1674+
case errs.ErrInvalidGroupID.Equal(err):
1675+
c.AbortWithStatusJSON(http.StatusBadRequest, err.Error())
1676+
case errs.ErrAffinityGroupNotFound.Equal(err):
1677+
c.AbortWithStatusJSON(http.StatusNotFound, err.Error())
1678+
default:
1679+
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
1680+
}
1681+
return
1682+
}
1683+
c.IndentedJSON(http.StatusOK, groupState)
1684+
}

pkg/utils/apiutil/serverapi/middleware.go

Lines changed: 70 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package serverapi
1616

1717
import (
18+
"context"
1819
"net/http"
1920
"net/url"
2021
"strings"
@@ -74,15 +75,16 @@ func IsServiceAllowed(s *server.Server, group apiutil.APIServiceGroup) bool {
7475
type redirector struct {
7576
s *server.Server
7677

77-
microserviceRedirectRules []*microserviceRedirectRule
78+
microserviceRedirectRules []RedirectRule
7879
}
7980

80-
type microserviceRedirectRule struct {
81-
matchPath string
82-
targetPath string
83-
targetServiceName string
84-
matchMethods []string
85-
filter func(*http.Request) bool
81+
// RedirectRule describes how to match and rewrite microservice paths.
82+
type RedirectRule struct {
83+
MatchPath string
84+
TargetPath string
85+
TargetServiceName string
86+
MatchMethods []string
87+
Filter func(*http.Request) bool
8688
}
8789

8890
// NewRedirector redirects request to the leader if needs to be handled in the leader.
@@ -101,24 +103,38 @@ type RedirectorOption func(*redirector)
101103
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
102104
methods []string, filters ...func(*http.Request) bool) RedirectorOption {
103105
return func(s *redirector) {
104-
rule := &microserviceRedirectRule{
105-
matchPath: matchPath,
106-
targetPath: targetPath,
107-
targetServiceName: targetServiceName,
108-
matchMethods: methods,
106+
rule := RedirectRule{
107+
MatchPath: matchPath,
108+
TargetPath: targetPath,
109+
TargetServiceName: targetServiceName,
110+
MatchMethods: methods,
109111
}
110112
if len(filters) > 0 {
111-
rule.filter = filters[0]
113+
rule.Filter = filters[0]
112114
}
113115
s.microserviceRedirectRules = append(s.microserviceRedirectRules, rule)
114116
}
115117
}
116118

117119
func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, string) {
118-
if !h.s.IsKeyspaceGroupEnabled() {
119-
return false, ""
120-
}
121-
if len(h.microserviceRedirectRules) == 0 {
120+
return MatchMicroserviceRedirect(
121+
r,
122+
h.microserviceRedirectRules,
123+
h.s.IsKeyspaceGroupEnabled(),
124+
h.s.IsServiceIndependent,
125+
h.s.GetServicePrimaryAddr)
126+
}
127+
128+
// MatchMicroserviceRedirect checks rules, rewrites path in-place, and returns (matched, targetAddr).
129+
// If matched but no primary is available, it returns matched=true with empty addr so caller can handle the redirect error.
130+
func MatchMicroserviceRedirect(
131+
r *http.Request,
132+
rules []RedirectRule,
133+
isKeyspaceGroupEnabled bool,
134+
isServiceIndependent func(string) bool,
135+
getPrimary func(context.Context, string) (string, bool),
136+
) (bool, string) {
137+
if !isKeyspaceGroupEnabled || len(rules) == 0 {
122138
return false, ""
123139
}
124140
if r.Header.Get(apiutil.XForbiddenForwardToMicroserviceHeader) == "true" {
@@ -127,48 +143,46 @@ func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, stri
127143
// Remove trailing '/' from the URL path
128144
// It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}"
129145
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
130-
for _, rule := range h.microserviceRedirectRules {
146+
for _, rule := range rules {
131147
// Now we only support checking the scheduling service whether it is independent
132-
if rule.targetServiceName == constant.SchedulingServiceName {
133-
if !h.s.IsServiceIndependent(constant.SchedulingServiceName) {
134-
continue
135-
}
148+
if rule.TargetServiceName == constant.SchedulingServiceName && !isServiceIndependent(constant.SchedulingServiceName) {
149+
continue
136150
}
137-
if strings.HasPrefix(r.URL.Path, rule.matchPath) &&
138-
slice.Contains(rule.matchMethods, r.Method) {
139-
if rule.filter != nil && !rule.filter(r) {
140-
continue
141-
}
142-
// we check the service primary addr here,
143-
// if the service is not available, we will return ErrRedirect by returning an empty addr.
144-
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
145-
if !ok || addr == "" {
146-
log.Warn("failed to get the service primary addr when trying to match redirect rules",
147-
zap.String("path", r.URL.Path))
148-
return true, ""
149-
}
150-
// If the URL contains escaped characters, use RawPath instead of Path
151-
origin := r.URL.Path
152-
path := r.URL.Path
153-
if r.URL.RawPath != "" {
154-
path = r.URL.RawPath
155-
}
156-
// Extract parameters from the URL path
157-
// e.g. r.URL.Path = /pd/api/v1/operators/1 (before redirect)
158-
// matchPath = /pd/api/v1/operators
159-
// targetPath = /scheduling/api/v1/operators
160-
// r.URL.Path = /scheduling/api/v1/operator/1 (after redirect)
161-
pathParams := strings.TrimPrefix(path, rule.matchPath)
162-
pathParams = strings.Trim(pathParams, "/") // Remove leading and trailing '/'
163-
if len(pathParams) > 0 {
164-
r.URL.Path = rule.targetPath + "/" + pathParams
165-
} else {
166-
r.URL.Path = rule.targetPath
167-
}
168-
log.Debug("redirect to microservice", zap.String("path", r.URL.Path), zap.String("origin-path", origin),
169-
zap.String("target", addr), zap.String("method", r.Method))
170-
return true, addr
151+
if !strings.HasPrefix(r.URL.Path, rule.MatchPath) || !slice.Contains(rule.MatchMethods, r.Method) {
152+
continue
153+
}
154+
if rule.Filter != nil && !rule.Filter(r) {
155+
continue
156+
}
157+
// we check the service primary addr here,
158+
// if the service is not available, we will return ErrRedirect by returning an empty addr.
159+
addr, ok := getPrimary(r.Context(), rule.TargetServiceName)
160+
if !ok || addr == "" {
161+
log.Warn("failed to get the service primary addr when trying to match redirect rules",
162+
zap.String("path", r.URL.Path))
163+
return true, ""
164+
}
165+
// If the URL contains escaped characters, use RawPath instead of Path
166+
origin := r.URL.Path
167+
path := r.URL.Path
168+
if r.URL.RawPath != "" {
169+
path = r.URL.RawPath
170+
}
171+
// Extract parameters from the URL path
172+
// e.g. r.URL.Path = /pd/api/v1/operators/1 (before redirect)
173+
// matchPath = /pd/api/v1/operators
174+
// targetPath = /scheduling/api/v1/operators
175+
// r.URL.Path = /scheduling/api/v1/operator/1 (after redirect)
176+
pathParams := strings.TrimPrefix(path, rule.MatchPath)
177+
pathParams = strings.Trim(pathParams, "/") // Remove leading and trailing '/'
178+
if len(pathParams) > 0 {
179+
r.URL.Path = rule.TargetPath + "/" + pathParams
180+
} else {
181+
r.URL.Path = rule.TargetPath
171182
}
183+
log.Debug("redirect to microservice", zap.String("path", r.URL.Path), zap.String("origin-path", origin),
184+
zap.String("target", addr), zap.String("method", r.Method))
185+
return true, addr
172186
}
173187
return false, ""
174188
}

0 commit comments

Comments
 (0)