Skip to content

Commit 1b40b8d

Browse files
authored
Client: support pd client access to the router service (#9925)
close #9212 Signed-off-by: bufferflies <1045931706@qq.com>
1 parent 00221d3 commit 1b40b8d

File tree

14 files changed

+955
-154
lines changed

14 files changed

+955
-154
lines changed

client/client.go

Lines changed: 121 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/pingcap/failpoint"
3131
"github.com/pingcap/kvproto/pkg/metapb"
3232
"github.com/pingcap/kvproto/pkg/pdpb"
33+
"github.com/pingcap/kvproto/pkg/routerpb"
3334
"github.com/pingcap/log"
3435

3536
"github.com/tikv/pd/client/clients/gc"
@@ -60,7 +61,7 @@ type RPCClient interface {
6061
// GetStore gets a store from PD by store id.
6162
// The store may expire later. Caller is responsible for caching and taking care
6263
// of store change.
63-
GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error)
64+
GetStore(ctx context.Context, storeID uint64, opts ...opt.GetStoreOption) (*metapb.Store, error)
6465
// GetAllStores gets all stores from pd.
6566
// The store may expire later. Caller is responsible for caching and taking care
6667
// of store change.
@@ -155,6 +156,7 @@ type serviceModeKeeper struct {
155156
tsoSvcDiscovery sd.ServiceDiscovery
156157
routerClient *router.Cli
157158
resourceManagerDiscovery *sd.ResourceManagerDiscovery
159+
msDiscovery sd.ServiceDiscovery
158160
}
159161

160162
func (k *serviceModeKeeper) close() {
@@ -168,6 +170,10 @@ func (k *serviceModeKeeper) close() {
168170
k.resourceManagerDiscovery.Close()
169171
k.resourceManagerDiscovery = nil
170172
}
173+
if k.msDiscovery != nil {
174+
k.msDiscovery.Close()
175+
}
176+
171177
k.tsoClient.Close()
172178
}
173179

@@ -715,22 +721,30 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
715721
if routerClient := c.getRouterClient(); routerClient != nil {
716722
return routerClient.GetRegion(ctx, key, opts...)
717723
}
718-
719-
options := &opt.GetRegionOp{}
724+
option := &opt.GetRegionOp{}
720725
for _, opt := range opts {
721-
opt(options)
726+
opt(option)
722727
}
723728
req := &pdpb.GetRegionRequest{
724729
Header: c.requestHeader(),
725730
RegionKey: key,
726-
NeedBuckets: options.NeedBuckets,
731+
NeedBuckets: option.NeedBuckets,
727732
}
728-
serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx,
729-
options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle())
733+
var (
734+
resp *pdpb.GetRegionResponse
735+
err error
736+
)
737+
serviceClient, cctx, isRouterServiceClient := c.inner.getServiceClient(ctx, option)
730738
if serviceClient == nil {
731739
return nil, errs.ErrClientGetProtoClient
732740
}
733-
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
741+
742+
if isRouterServiceClient {
743+
resp, err = routerpb.NewRouterClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
744+
} else {
745+
resp, err = pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
746+
}
747+
734748
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
735749
protoClient, cctx := c.getClientAndContext(ctx)
736750
if protoClient == nil {
@@ -760,21 +774,28 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
760774
return routerClient.GetPrevRegion(ctx, key, opts...)
761775
}
762776

763-
options := &opt.GetRegionOp{}
777+
option := &opt.GetRegionOp{}
764778
for _, opt := range opts {
765-
opt(options)
779+
opt(option)
766780
}
767781
req := &pdpb.GetRegionRequest{
768782
Header: c.requestHeader(),
769783
RegionKey: key,
770-
NeedBuckets: options.NeedBuckets,
784+
NeedBuckets: option.NeedBuckets,
771785
}
772-
serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx,
773-
options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle())
786+
var (
787+
resp *pdpb.GetRegionResponse
788+
err error
789+
)
790+
serviceClient, cctx, isRouterServiceClient := c.inner.getServiceClient(ctx, option)
774791
if serviceClient == nil {
775792
return nil, errs.ErrClientGetProtoClient
776793
}
777-
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
794+
if isRouterServiceClient {
795+
resp, err = routerpb.NewRouterClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
796+
} else {
797+
resp, err = pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
798+
}
778799
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
779800
protoClient, cctx := c.getClientAndContext(ctx)
780801
if protoClient == nil {
@@ -804,22 +825,30 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
804825
return routerClient.GetRegionByID(ctx, regionID, opts...)
805826
}
806827

807-
options := &opt.GetRegionOp{}
828+
option := &opt.GetRegionOp{}
808829
for _, opt := range opts {
809-
opt(options)
830+
opt(option)
810831
}
811832
req := &pdpb.GetRegionByIDRequest{
812833
Header: c.requestHeader(),
813834
RegionId: regionID,
814-
NeedBuckets: options.NeedBuckets,
835+
NeedBuckets: option.NeedBuckets,
815836
}
816-
serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx,
817-
options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle())
837+
var (
838+
resp *pdpb.GetRegionResponse
839+
err error
840+
)
841+
serviceClient, cctx, isRouterServiceClient := c.inner.getServiceClient(ctx, option)
818842
if serviceClient == nil {
819843
return nil, errs.ErrClientGetProtoClient
820844
}
821845

822-
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
846+
if isRouterServiceClient {
847+
resp, err = routerpb.NewRouterClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
848+
} else {
849+
resp, err = pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
850+
}
851+
823852
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
824853
protoClient, cctx := c.getClientAndContext(ctx)
825854
if protoClient == nil {
@@ -849,26 +878,30 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
849878
scanCtx, cancel = context.WithTimeout(ctx, c.inner.option.Timeout)
850879
defer cancel()
851880
}
852-
options := &opt.GetRegionOp{}
881+
option := &opt.GetRegionOp{}
853882
for _, opt := range opts {
854-
opt(options)
883+
opt(option)
855884
}
885+
856886
req := &pdpb.ScanRegionsRequest{
857887
Header: c.requestHeader(),
858888
StartKey: key,
859889
EndKey: endKey,
860890
Limit: int32(limit),
861891
}
862-
serviceClient, cctx := c.inner.getRegionAPIClientAndContext(scanCtx,
863-
options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle())
892+
var (
893+
serviceClient sd.ServiceClient
894+
cctx context.Context
895+
)
896+
// the scan region api is not supported by the router service, so we need to use the leader client.
897+
option.AllowRouterServiceHandle = false
898+
serviceClient, cctx, _ = c.inner.getServiceClient(scanCtx, option)
899+
864900
if serviceClient == nil {
865901
return nil, errs.ErrClientGetProtoClient
866902
}
867903
//nolint:staticcheck
868904
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
869-
failpoint.Inject("responseNil", func() {
870-
resp = nil
871-
})
872905
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
873906
protoClient, cctx := c.getClientAndContext(scanCtx)
874907
if protoClient == nil {
@@ -900,27 +933,36 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []router.KeyRange,
900933
scanCtx, cancel = context.WithTimeout(ctx, c.inner.option.Timeout)
901934
defer cancel()
902935
}
903-
options := &opt.GetRegionOp{}
936+
option := &opt.GetRegionOp{}
904937
for _, opt := range opts {
905-
opt(options)
938+
opt(option)
906939
}
907940
pbRanges := make([]*pdpb.KeyRange, 0, len(ranges))
908941
for _, r := range ranges {
909942
pbRanges = append(pbRanges, &pdpb.KeyRange{StartKey: r.StartKey, EndKey: r.EndKey})
910943
}
911944
req := &pdpb.BatchScanRegionsRequest{
912945
Header: c.requestHeader(),
913-
NeedBuckets: options.NeedBuckets,
946+
NeedBuckets: option.NeedBuckets,
914947
Ranges: pbRanges,
915948
Limit: int32(limit),
916-
ContainAllKeyRange: options.OutputMustContainAllKeyRange,
949+
ContainAllKeyRange: option.OutputMustContainAllKeyRange,
917950
}
918-
serviceClient, cctx := c.inner.getRegionAPIClientAndContext(scanCtx,
919-
options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle())
951+
var (
952+
resp *pdpb.BatchScanRegionsResponse
953+
err error
954+
)
955+
serviceClient, cctx, isRouterServiceClient := c.inner.getServiceClient(scanCtx, option)
920956
if serviceClient == nil {
921957
return nil, errs.ErrClientGetProtoClient
922958
}
923-
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req)
959+
960+
if isRouterServiceClient {
961+
resp, err = routerpb.NewRouterClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req)
962+
} else {
963+
resp, err = pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req)
964+
}
965+
924966
failpoint.Inject("responseNil", func() {
925967
resp = nil
926968
})
@@ -986,7 +1028,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*router.Region {
9861028
}
9871029

9881030
// GetStore implements the RPCClient interface.
989-
func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
1031+
func (c *client) GetStore(ctx context.Context, storeID uint64, opts ...opt.GetStoreOption) (*metapb.Store, error) {
9901032
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
9911033
span = span.Tracer().StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
9921034
defer span.Finish()
@@ -1000,11 +1042,33 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
10001042
Header: c.requestHeader(),
10011043
StoreId: storeID,
10021044
}
1003-
protoClient, ctx := c.getClientAndContext(ctx)
1004-
if protoClient == nil {
1045+
var (
1046+
resp *pdpb.GetStoreResponse
1047+
err error
1048+
)
1049+
option := &opt.GetStoreOp{}
1050+
for _, opt := range opts {
1051+
opt(option)
1052+
}
1053+
1054+
serviceClient, cctx, isRouterServiceClient := c.inner.getServiceClient(ctx, &opt.GetRegionOp{}, option)
1055+
if serviceClient == nil {
10051056
return nil, errs.ErrClientGetProtoClient
10061057
}
1007-
resp, err := protoClient.GetStore(ctx, req)
1058+
1059+
if isRouterServiceClient {
1060+
resp, err = routerpb.NewRouterClient(serviceClient.GetClientConn()).GetStore(cctx, req)
1061+
} else {
1062+
resp, err = pdpb.NewPDClient(serviceClient.GetClientConn()).GetStore(cctx, req)
1063+
}
1064+
1065+
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
1066+
protoClient, cctx := c.getClientAndContext(ctx)
1067+
if protoClient == nil {
1068+
return nil, errs.ErrClientGetProtoClient
1069+
}
1070+
resp, err = protoClient.GetStore(cctx, req)
1071+
}
10081072

10091073
if err = c.respForErr(metrics.CmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
10101074
return nil, err
@@ -1025,30 +1089,39 @@ func handleStoreResponse(resp *pdpb.GetStoreResponse) (*metapb.Store, error) {
10251089

10261090
// GetAllStores implements the RPCClient interface.
10271091
func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
1028-
// Applies options
1029-
options := &opt.GetStoreOp{}
1030-
for _, opt := range opts {
1031-
opt(options)
1032-
}
1033-
10341092
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
10351093
span = span.Tracer().StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
10361094
defer span.Finish()
10371095
}
10381096
start := time.Now()
10391097
defer func() { metrics.CmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }()
1098+
// Applies option
1099+
option := &opt.GetStoreOp{}
1100+
for _, opt := range opts {
1101+
opt(option)
1102+
}
10401103

10411104
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
10421105
defer cancel()
10431106
req := &pdpb.GetAllStoresRequest{
10441107
Header: c.requestHeader(),
1045-
ExcludeTombstoneStores: options.ExcludeTombstone,
1108+
ExcludeTombstoneStores: option.ExcludeTombstone,
10461109
}
1047-
protoClient, ctx := c.getClientAndContext(ctx)
1048-
if protoClient == nil {
1110+
1111+
var (
1112+
resp *pdpb.GetAllStoresResponse
1113+
err error
1114+
)
1115+
serviceClient, cctx, isRouterServiceClient := c.inner.getServiceClient(ctx, &opt.GetRegionOp{}, option)
1116+
1117+
if serviceClient == nil {
10491118
return nil, errs.ErrClientGetProtoClient
10501119
}
1051-
resp, err := protoClient.GetAllStores(ctx, req)
1120+
if isRouterServiceClient {
1121+
resp, err = routerpb.NewRouterClient(serviceClient.GetClientConn()).GetAllStores(cctx, req)
1122+
} else {
1123+
resp, err = pdpb.NewPDClient(serviceClient.GetClientConn()).GetAllStores(cctx, req)
1124+
}
10521125

10531126
if err = c.respForErr(metrics.CmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
10541127
return nil, err

client/clients/metastorage/client.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,16 @@ package metastorage
1616

1717
import (
1818
"context"
19+
"encoding/json"
20+
"sort"
1921

22+
"go.uber.org/zap"
23+
24+
"github.com/pingcap/errors"
2025
"github.com/pingcap/kvproto/pkg/meta_storagepb"
26+
"github.com/pingcap/log"
2127

28+
"github.com/tikv/pd/client/errs"
2229
"github.com/tikv/pd/client/opt"
2330
)
2431

@@ -31,3 +38,48 @@ type Client interface {
3138
// Put puts a key-value pair into meta storage.
3239
Put(ctx context.Context, key []byte, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error)
3340
}
41+
42+
// DiscoveryMSAddrs returns all the sorted members address of the specified service name.
43+
func DiscoveryMSAddrs(ctx context.Context, serviceKey string, client Client) ([]string, error) {
44+
resp, err := client.Get(ctx, []byte(serviceKey), opt.WithPrefix())
45+
if err != nil {
46+
return nil, errs.ErrClientGetMetaStorageClient.Wrap(err).GenWithStackByCause()
47+
}
48+
if err := resp.GetHeader().GetError(); err != nil {
49+
return nil, errs.ErrClientGetProtoClient.Wrap(errors.New(err.GetMessage())).GenWithStackByCause()
50+
}
51+
sortedAddrs := make([]string, 0, len(resp.GetKvs()))
52+
for _, kv := range resp.GetKvs() {
53+
var entry ServiceRegistryEntry
54+
if err = entry.Deserialize(kv.Value); err != nil {
55+
log.Warn("try to deserialize service registry entry failed",
56+
zap.String("service-key", serviceKey),
57+
zap.String("key", string(kv.Key)), zap.Error(err))
58+
continue
59+
}
60+
sortedAddrs = append(sortedAddrs, entry.ServiceAddr)
61+
}
62+
sort.Strings(sortedAddrs)
63+
return sortedAddrs, nil
64+
}
65+
66+
// ServiceRegistryEntry is the registry entry of a service
67+
type ServiceRegistryEntry struct {
68+
// The specific value will be assigned only if the startup parameter is added.
69+
// If not assigned, the default value(service-hostname) will be used.
70+
Name string `json:"name"`
71+
ServiceAddr string `json:"service-addr"`
72+
Version string `json:"version"`
73+
GitHash string `json:"git-hash"`
74+
DeployPath string `json:"deploy-path"`
75+
StartTimestamp int64 `json:"start-timestamp"`
76+
}
77+
78+
// Deserialize the data to this service registry entry
79+
func (e *ServiceRegistryEntry) Deserialize(data []byte) error {
80+
if err := json.Unmarshal(data, e); err != nil {
81+
log.Warn("json unmarshal the service registry entry failed", zap.Error(err))
82+
return err
83+
}
84+
return nil
85+
}

0 commit comments

Comments
 (0)