Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ const (
KeyspaceConfig = "/pd/api/v2/keyspaces/%s/config"
GetKeyspaceMetaByName = "/pd/api/v2/keyspaces/%s"
GetKeyspaceMetaByID = "/pd/api/v2/keyspaces/id/%d"
// Affinity
AffinityGroups = "/pd/api/v2/affinity-groups"
AffinityGroupByID = "/pd/api/v2/affinity-groups/%s"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
Expand Down
186 changes: 186 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ type Client interface {
GetKeyspaceMetaByName(ctx context.Context, keyspaceName string) (*keyspacepb.KeyspaceMeta, error)
GetKeyspaceMetaByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error)

/* Affinity group interfaces */

// CreateAffinityGroups creates one or more affinity groups with key ranges.
// The affinityGroups parameter is a map from group ID to a list of key ranges.
CreateAffinityGroups(ctx context.Context, affinityGroups map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error)
// GetAffinityGroup gets an affinity group by group ID.
GetAffinityGroup(ctx context.Context, groupID string) (*AffinityGroupState, error)
// GetAllAffinityGroups gets all affinity groups.
GetAllAffinityGroups(ctx context.Context) (map[string]*AffinityGroupState, error)
// UpdateAffinityGroupPeers updates the leader and voter stores of an affinity group.
UpdateAffinityGroupPeers(ctx context.Context, groupID string, leaderStoreID uint64, voterStoreIDs []uint64) (*AffinityGroupState, error)
// DeleteAffinityGroup deletes an affinity group by group ID.
DeleteAffinityGroup(ctx context.Context, groupID string, force bool) error
// BatchDeleteAffinityGroups deletes multiple affinity groups in batch.
BatchDeleteAffinityGroups(ctx context.Context, groupIDs []string, force bool) error
// AddAffinityGroupKeyRanges adds key ranges to affinity groups.
AddAffinityGroupKeyRanges(ctx context.Context, groupKeyRanges map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error)
// RemoveAffinityGroupKeyRanges removes key ranges from affinity groups.
RemoveAffinityGroupKeyRanges(ctx context.Context, groupKeyRanges map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error)

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
WithCallerID(string) Client
Expand Down Expand Up @@ -1213,3 +1233,169 @@ func (c *client) DeleteGCSafePoint(ctx context.Context, serviceID string) (strin
}
return msg, nil
}

// CreateAffinityGroups creates one or more affinity groups with key ranges.
func (c *client) CreateAffinityGroups(ctx context.Context, affinityGroups map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error) {
// Construct the request body
reqGroups := make(map[string]CreateAffinityGroupInput, len(affinityGroups))
for groupID, ranges := range affinityGroups {
reqGroups[groupID] = CreateAffinityGroupInput{Ranges: ranges}
}
req := CreateAffinityGroupsRequest{AffinityGroups: reqGroups}

reqJSON, err := json.Marshal(req)
if err != nil {
return nil, errors.Trace(err)
}
var resp AffinityGroupsResponse
err = c.request(ctx, newRequestInfo().
WithName("CreateAffinityGroups").
WithURI(AffinityGroups).
WithMethod(http.MethodPost).
WithBody(reqJSON).
WithResp(&resp))
if err != nil {
return nil, err
}
return resp.AffinityGroups, nil
}

// GetAffinityGroup gets an affinity group by group ID.
func (c *client) GetAffinityGroup(ctx context.Context, groupID string) (*AffinityGroupState, error) {
var state AffinityGroupState
err := c.request(ctx, newRequestInfo().
WithName("GetAffinityGroup").
WithURI(fmt.Sprintf(AffinityGroupByID, groupID)).
WithMethod(http.MethodGet).
WithResp(&state))
if err != nil {
return nil, err
}
return &state, nil
}

// GetAllAffinityGroups gets all affinity groups.
func (c *client) GetAllAffinityGroups(ctx context.Context) (map[string]*AffinityGroupState, error) {
var resp AffinityGroupsResponse
err := c.request(ctx, newRequestInfo().
WithName("GetAllAffinityGroups").
WithURI(AffinityGroups).
WithMethod(http.MethodGet).
WithResp(&resp))
if err != nil {
return nil, err
}
return resp.AffinityGroups, nil
}

// UpdateAffinityGroupPeers updates the leader and voter stores of an affinity group.
func (c *client) UpdateAffinityGroupPeers(ctx context.Context, groupID string, leaderStoreID uint64, voterStoreIDs []uint64) (*AffinityGroupState, error) {
req := UpdateAffinityGroupPeersRequest{
LeaderStoreID: leaderStoreID,
VoterStoreIDs: voterStoreIDs,
}
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, errors.Trace(err)
}
var state AffinityGroupState
err = c.request(ctx, newRequestInfo().
WithName("UpdateAffinityGroupPeers").
WithURI(fmt.Sprintf(AffinityGroupByID, groupID)).
WithMethod(http.MethodPut).
WithBody(reqJSON).
WithResp(&state))
if err != nil {
return nil, err
}
return &state, nil
}

// DeleteAffinityGroup deletes an affinity group by group ID.
func (c *client) DeleteAffinityGroup(ctx context.Context, groupID string, force bool) error {
uri := fmt.Sprintf(AffinityGroupByID, groupID)
if force {
uri = fmt.Sprintf("%s?force=true", uri)
}
return c.request(ctx, newRequestInfo().
WithName("DeleteAffinityGroup").
WithURI(uri).
WithMethod(http.MethodDelete))
}

// BatchDeleteAffinityGroups deletes multiple affinity groups in batch.
func (c *client) BatchDeleteAffinityGroups(ctx context.Context, groupIDs []string, force bool) error {
req := BatchDeleteAffinityGroupsRequest{
IDs: groupIDs,
Force: force,
}
reqJSON, err := json.Marshal(req)
if err != nil {
return errors.Trace(err)
}
// Use POST with ?delete query parameter for batch deletion
url := AffinityGroups + "?delete"
return c.request(ctx, newRequestInfo().
WithName("BatchDeleteAffinityGroups").
WithURI(url).
WithMethod(http.MethodPost).
WithBody(reqJSON))
}

// AddAffinityGroupKeyRanges adds key ranges to affinity groups.
func (c *client) AddAffinityGroupKeyRanges(ctx context.Context, groupKeyRanges map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error) {
// Convert to the request format
add := make([]GroupRangesModification, 0, len(groupKeyRanges))
for groupID, ranges := range groupKeyRanges {
add = append(add, GroupRangesModification{
ID: groupID,
Ranges: ranges,
})
}
req := BatchModifyAffinityGroupsRequest{Add: add}

reqJSON, err := json.Marshal(req)
if err != nil {
return nil, errors.Trace(err)
}
var resp AffinityGroupsResponse
err = c.request(ctx, newRequestInfo().
WithName("AddAffinityGroupKeyRanges").
WithURI(AffinityGroups).
WithMethod(http.MethodPatch).
WithBody(reqJSON).
WithResp(&resp))
if err != nil {
return nil, err
}
return resp.AffinityGroups, nil
}

// RemoveAffinityGroupKeyRanges removes key ranges from affinity groups.
func (c *client) RemoveAffinityGroupKeyRanges(ctx context.Context, groupKeyRanges map[string][]AffinityGroupKeyRange) (map[string]*AffinityGroupState, error) {
// Convert to the request format
remove := make([]GroupRangesModification, 0, len(groupKeyRanges))
for groupID, ranges := range groupKeyRanges {
remove = append(remove, GroupRangesModification{
ID: groupID,
Ranges: ranges,
})
}
req := BatchModifyAffinityGroupsRequest{Remove: remove}

reqJSON, err := json.Marshal(req)
if err != nil {
return nil, errors.Trace(err)
}
var resp AffinityGroupsResponse
err = c.request(ctx, newRequestInfo().
WithName("RemoveAffinityGroupKeyRanges").
WithURI(AffinityGroups).
WithMethod(http.MethodPatch).
WithBody(reqJSON).
WithResp(&resp))
if err != nil {
return nil, err
}
return resp.AffinityGroups, nil
}
78 changes: 78 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,81 @@ type Health struct {
ClientUrls []string `json:"client_urls"`
Health bool `json:"health"`
}

// AffinityGroupKeyRange represents a key range for affinity group operations.
type AffinityGroupKeyRange struct {
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`
}

// CreateAffinityGroupInput defines the input for a single group in the creation request.
type CreateAffinityGroupInput struct {
Ranges []AffinityGroupKeyRange `json:"ranges"`
}

// CreateAffinityGroupsRequest defines the body for the POST request to create affinity groups.
type CreateAffinityGroupsRequest struct {
AffinityGroups map[string]CreateAffinityGroupInput `json:"affinity_groups"`
}

// AffinityGroup defines an affinity group.
type AffinityGroup struct {
ID string `json:"id"`
CreateTimestamp uint64 `json:"create_timestamp"`
LeaderStoreID uint64 `json:"leader_store_id,omitempty"`
VoterStoreIDs []uint64 `json:"voter_store_ids,omitempty"`
}

// AffinityGroupState defines the runtime state of an affinity group.
type AffinityGroupState struct {
AffinityGroup
Phase string `json:"phase"`
RangeCount int `json:"range_count"`
RegionCount int `json:"region_count"`
AffinityRegionCount int `json:"affinity_region_count"`
}

// IsPending indicates that the Group is still determining the StoreIDs.
// If the Group has no KeyRanges, it remains in pending forever.
func (s *AffinityGroupState) IsPending() bool {
return s.Phase == "pending"
}

// IsPreparing indicates that the Group is scheduling Regions according to the required Peers.
func (s *AffinityGroupState) IsPreparing() bool {
return s.Phase == "preparing"
}

// IsStable indicates that the Group has completed the required scheduling and is currently in a stable state.
func (s *AffinityGroupState) IsStable() bool {
return s.Phase == "stable"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phase name is strange, but we can rename it later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is public. Can we rename it in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I means "Can" we update it?

}

// AffinityGroupsResponse defines the success response for affinity group operations.
type AffinityGroupsResponse struct {
AffinityGroups map[string]*AffinityGroupState `json:"affinity_groups"`
}

// BatchDeleteAffinityGroupsRequest defines the body for batch delete request.
type BatchDeleteAffinityGroupsRequest struct {
IDs []string `json:"ids"`
Force bool `json:"force,omitempty"`
}

// GroupRangesModification defines add or remove operations for a specific group.
type GroupRangesModification struct {
ID string `json:"id"`
Ranges []AffinityGroupKeyRange `json:"ranges"`
}

// BatchModifyAffinityGroupsRequest defines the body for batch modify request.
type BatchModifyAffinityGroupsRequest struct {
Add []GroupRangesModification `json:"add,omitempty"`
Remove []GroupRangesModification `json:"remove,omitempty"`
}

// UpdateAffinityGroupPeersRequest defines the body for updating peer distribution of an affinity group.
type UpdateAffinityGroupPeersRequest struct {
LeaderStoreID uint64 `json:"leader_store_id"`
VoterStoreIDs []uint64 `json:"voter_store_ids"`
}
11 changes: 11 additions & 0 deletions pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ func PatchJSON(client *http.Client, url string, data []byte) (*http.Response, er
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
return client.Do(req)
}

// PutJSON is used to do put request
func PutJSON(client *http.Client, url string, data []byte) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(data))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
return client.Do(req)
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ func CheckPatchJSON(client *http.Client, url string, data []byte, checkOpts ...f
return checkResp(resp, checkOpts...)
}

// CheckPutJSON is used to do put request and do check options.
func CheckPutJSON(client *http.Client, url string, data []byte, checkOpts ...func([]byte, int, http.Header)) error {
resp, err := apiutil.PutJSON(client, url, data)
if err != nil {
return err
}
return checkResp(resp, checkOpts...)
}

// CheckDelete is used to do delete request and do check options.
func CheckDelete(client *http.Client, url string, checkOpts ...func([]byte, int, http.Header)) error {
resp, err := apiutil.DoDelete(client, url)
Expand Down
Loading