@@ -30,6 +30,7 @@ import (
3030 "github.com/pingcap/failpoint"
3131 "github.com/pingcap/kvproto/pkg/metapb"
3232 "github.com/pingcap/kvproto/pkg/pdpb"
33+ "github.com/pingcap/kvproto/pkg/routerpb"
3334 "github.com/pingcap/log"
3435
3536 "github.com/tikv/pd/client/clients/gc"
@@ -60,7 +61,7 @@ type RPCClient interface {
6061 // GetStore gets a store from PD by store id.
6162 // The store may expire later. Caller is responsible for caching and taking care
6263 // of store change.
63- GetStore (ctx context.Context , storeID uint64 ) (* metapb.Store , error )
64+ GetStore (ctx context.Context , storeID uint64 , opts ... opt. GetStoreOption ) (* metapb.Store , error )
6465 // GetAllStores gets all stores from pd.
6566 // The store may expire later. Caller is responsible for caching and taking care
6667 // of store change.
@@ -155,6 +156,7 @@ type serviceModeKeeper struct {
155156 tsoSvcDiscovery sd.ServiceDiscovery
156157 routerClient * router.Cli
157158 resourceManagerDiscovery * sd.ResourceManagerDiscovery
159+ msDiscovery sd.ServiceDiscovery
158160}
159161
160162func (k * serviceModeKeeper ) close () {
@@ -168,6 +170,10 @@ func (k *serviceModeKeeper) close() {
168170 k .resourceManagerDiscovery .Close ()
169171 k .resourceManagerDiscovery = nil
170172 }
173+ if k .msDiscovery != nil {
174+ k .msDiscovery .Close ()
175+ }
176+
171177 k .tsoClient .Close ()
172178}
173179
@@ -715,22 +721,30 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
715721 if routerClient := c .getRouterClient (); routerClient != nil {
716722 return routerClient .GetRegion (ctx , key , opts ... )
717723 }
718-
719- options := & opt.GetRegionOp {}
724+ option := & opt.GetRegionOp {}
720725 for _ , opt := range opts {
721- opt (options )
726+ opt (option )
722727 }
723728 req := & pdpb.GetRegionRequest {
724729 Header : c .requestHeader (),
725730 RegionKey : key ,
726- NeedBuckets : options .NeedBuckets ,
731+ NeedBuckets : option .NeedBuckets ,
727732 }
728- serviceClient , cctx := c .inner .getRegionAPIClientAndContext (ctx ,
729- options .AllowFollowerHandle && c .inner .option .GetEnableFollowerHandle ())
733+ var (
734+ resp * pdpb.GetRegionResponse
735+ err error
736+ )
737+ serviceClient , cctx , isRouterServiceClient := c .inner .getServiceClient (ctx , option )
730738 if serviceClient == nil {
731739 return nil , errs .ErrClientGetProtoClient
732740 }
733- resp , err := pdpb .NewPDClient (serviceClient .GetClientConn ()).GetRegion (cctx , req )
741+
742+ if isRouterServiceClient {
743+ resp , err = routerpb .NewRouterClient (serviceClient .GetClientConn ()).GetRegion (cctx , req )
744+ } else {
745+ resp , err = pdpb .NewPDClient (serviceClient .GetClientConn ()).GetRegion (cctx , req )
746+ }
747+
734748 if serviceClient .NeedRetry (resp .GetHeader ().GetError (), err ) {
735749 protoClient , cctx := c .getClientAndContext (ctx )
736750 if protoClient == nil {
@@ -760,21 +774,28 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
760774 return routerClient .GetPrevRegion (ctx , key , opts ... )
761775 }
762776
763- options := & opt.GetRegionOp {}
777+ option := & opt.GetRegionOp {}
764778 for _ , opt := range opts {
765- opt (options )
779+ opt (option )
766780 }
767781 req := & pdpb.GetRegionRequest {
768782 Header : c .requestHeader (),
769783 RegionKey : key ,
770- NeedBuckets : options .NeedBuckets ,
784+ NeedBuckets : option .NeedBuckets ,
771785 }
772- serviceClient , cctx := c .inner .getRegionAPIClientAndContext (ctx ,
773- options .AllowFollowerHandle && c .inner .option .GetEnableFollowerHandle ())
786+ var (
787+ resp * pdpb.GetRegionResponse
788+ err error
789+ )
790+ serviceClient , cctx , isRouterServiceClient := c .inner .getServiceClient (ctx , option )
774791 if serviceClient == nil {
775792 return nil , errs .ErrClientGetProtoClient
776793 }
777- resp , err := pdpb .NewPDClient (serviceClient .GetClientConn ()).GetPrevRegion (cctx , req )
794+ if isRouterServiceClient {
795+ resp , err = routerpb .NewRouterClient (serviceClient .GetClientConn ()).GetPrevRegion (cctx , req )
796+ } else {
797+ resp , err = pdpb .NewPDClient (serviceClient .GetClientConn ()).GetPrevRegion (cctx , req )
798+ }
778799 if serviceClient .NeedRetry (resp .GetHeader ().GetError (), err ) {
779800 protoClient , cctx := c .getClientAndContext (ctx )
780801 if protoClient == nil {
@@ -804,22 +825,30 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
804825 return routerClient .GetRegionByID (ctx , regionID , opts ... )
805826 }
806827
807- options := & opt.GetRegionOp {}
828+ option := & opt.GetRegionOp {}
808829 for _ , opt := range opts {
809- opt (options )
830+ opt (option )
810831 }
811832 req := & pdpb.GetRegionByIDRequest {
812833 Header : c .requestHeader (),
813834 RegionId : regionID ,
814- NeedBuckets : options .NeedBuckets ,
835+ NeedBuckets : option .NeedBuckets ,
815836 }
816- serviceClient , cctx := c .inner .getRegionAPIClientAndContext (ctx ,
817- options .AllowFollowerHandle && c .inner .option .GetEnableFollowerHandle ())
837+ var (
838+ resp * pdpb.GetRegionResponse
839+ err error
840+ )
841+ serviceClient , cctx , isRouterServiceClient := c .inner .getServiceClient (ctx , option )
818842 if serviceClient == nil {
819843 return nil , errs .ErrClientGetProtoClient
820844 }
821845
822- resp , err := pdpb .NewPDClient (serviceClient .GetClientConn ()).GetRegionByID (cctx , req )
846+ if isRouterServiceClient {
847+ resp , err = routerpb .NewRouterClient (serviceClient .GetClientConn ()).GetRegionByID (cctx , req )
848+ } else {
849+ resp , err = pdpb .NewPDClient (serviceClient .GetClientConn ()).GetRegionByID (cctx , req )
850+ }
851+
823852 if serviceClient .NeedRetry (resp .GetHeader ().GetError (), err ) {
824853 protoClient , cctx := c .getClientAndContext (ctx )
825854 if protoClient == nil {
@@ -849,26 +878,30 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
849878 scanCtx , cancel = context .WithTimeout (ctx , c .inner .option .Timeout )
850879 defer cancel ()
851880 }
852- options := & opt.GetRegionOp {}
881+ option := & opt.GetRegionOp {}
853882 for _ , opt := range opts {
854- opt (options )
883+ opt (option )
855884 }
885+
856886 req := & pdpb.ScanRegionsRequest {
857887 Header : c .requestHeader (),
858888 StartKey : key ,
859889 EndKey : endKey ,
860890 Limit : int32 (limit ),
861891 }
862- serviceClient , cctx := c .inner .getRegionAPIClientAndContext (scanCtx ,
863- options .AllowFollowerHandle && c .inner .option .GetEnableFollowerHandle ())
892+ var (
893+ serviceClient sd.ServiceClient
894+ cctx context.Context
895+ )
896+ // the scan region api is not supported by the router service, so we need to use the leader client.
897+ option .AllowRouterServiceHandle = false
898+ serviceClient , cctx , _ = c .inner .getServiceClient (scanCtx , option )
899+
864900 if serviceClient == nil {
865901 return nil , errs .ErrClientGetProtoClient
866902 }
867903 //nolint:staticcheck
868904 resp , err := pdpb .NewPDClient (serviceClient .GetClientConn ()).ScanRegions (cctx , req )
869- failpoint .Inject ("responseNil" , func () {
870- resp = nil
871- })
872905 if serviceClient .NeedRetry (resp .GetHeader ().GetError (), err ) {
873906 protoClient , cctx := c .getClientAndContext (scanCtx )
874907 if protoClient == nil {
@@ -900,27 +933,36 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []router.KeyRange,
900933 scanCtx , cancel = context .WithTimeout (ctx , c .inner .option .Timeout )
901934 defer cancel ()
902935 }
903- options := & opt.GetRegionOp {}
936+ option := & opt.GetRegionOp {}
904937 for _ , opt := range opts {
905- opt (options )
938+ opt (option )
906939 }
907940 pbRanges := make ([]* pdpb.KeyRange , 0 , len (ranges ))
908941 for _ , r := range ranges {
909942 pbRanges = append (pbRanges , & pdpb.KeyRange {StartKey : r .StartKey , EndKey : r .EndKey })
910943 }
911944 req := & pdpb.BatchScanRegionsRequest {
912945 Header : c .requestHeader (),
913- NeedBuckets : options .NeedBuckets ,
946+ NeedBuckets : option .NeedBuckets ,
914947 Ranges : pbRanges ,
915948 Limit : int32 (limit ),
916- ContainAllKeyRange : options .OutputMustContainAllKeyRange ,
949+ ContainAllKeyRange : option .OutputMustContainAllKeyRange ,
917950 }
918- serviceClient , cctx := c .inner .getRegionAPIClientAndContext (scanCtx ,
919- options .AllowFollowerHandle && c .inner .option .GetEnableFollowerHandle ())
951+ var (
952+ resp * pdpb.BatchScanRegionsResponse
953+ err error
954+ )
955+ serviceClient , cctx , isRouterServiceClient := c .inner .getServiceClient (scanCtx , option )
920956 if serviceClient == nil {
921957 return nil , errs .ErrClientGetProtoClient
922958 }
923- resp , err := pdpb .NewPDClient (serviceClient .GetClientConn ()).BatchScanRegions (cctx , req )
959+
960+ if isRouterServiceClient {
961+ resp , err = routerpb .NewRouterClient (serviceClient .GetClientConn ()).BatchScanRegions (cctx , req )
962+ } else {
963+ resp , err = pdpb .NewPDClient (serviceClient .GetClientConn ()).BatchScanRegions (cctx , req )
964+ }
965+
924966 failpoint .Inject ("responseNil" , func () {
925967 resp = nil
926968 })
@@ -986,7 +1028,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*router.Region {
9861028}
9871029
9881030// GetStore implements the RPCClient interface.
989- func (c * client ) GetStore (ctx context.Context , storeID uint64 ) (* metapb.Store , error ) {
1031+ func (c * client ) GetStore (ctx context.Context , storeID uint64 , opts ... opt. GetStoreOption ) (* metapb.Store , error ) {
9901032 if span := opentracing .SpanFromContext (ctx ); span != nil && span .Tracer () != nil {
9911033 span = span .Tracer ().StartSpan ("pdclient.GetStore" , opentracing .ChildOf (span .Context ()))
9921034 defer span .Finish ()
@@ -1000,11 +1042,33 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
10001042 Header : c .requestHeader (),
10011043 StoreId : storeID ,
10021044 }
1003- protoClient , ctx := c .getClientAndContext (ctx )
1004- if protoClient == nil {
1045+ var (
1046+ resp * pdpb.GetStoreResponse
1047+ err error
1048+ )
1049+ option := & opt.GetStoreOp {}
1050+ for _ , opt := range opts {
1051+ opt (option )
1052+ }
1053+
1054+ serviceClient , cctx , isRouterServiceClient := c .inner .getServiceClient (ctx , & opt.GetRegionOp {}, option )
1055+ if serviceClient == nil {
10051056 return nil , errs .ErrClientGetProtoClient
10061057 }
1007- resp , err := protoClient .GetStore (ctx , req )
1058+
1059+ if isRouterServiceClient {
1060+ resp , err = routerpb .NewRouterClient (serviceClient .GetClientConn ()).GetStore (cctx , req )
1061+ } else {
1062+ resp , err = pdpb .NewPDClient (serviceClient .GetClientConn ()).GetStore (cctx , req )
1063+ }
1064+
1065+ if serviceClient .NeedRetry (resp .GetHeader ().GetError (), err ) {
1066+ protoClient , cctx := c .getClientAndContext (ctx )
1067+ if protoClient == nil {
1068+ return nil , errs .ErrClientGetProtoClient
1069+ }
1070+ resp , err = protoClient .GetStore (cctx , req )
1071+ }
10081072
10091073 if err = c .respForErr (metrics .CmdFailedDurationGetStore , start , err , resp .GetHeader ()); err != nil {
10101074 return nil , err
@@ -1025,30 +1089,39 @@ func handleStoreResponse(resp *pdpb.GetStoreResponse) (*metapb.Store, error) {
10251089
10261090// GetAllStores implements the RPCClient interface.
10271091func (c * client ) GetAllStores (ctx context.Context , opts ... opt.GetStoreOption ) ([]* metapb.Store , error ) {
1028- // Applies options
1029- options := & opt.GetStoreOp {}
1030- for _ , opt := range opts {
1031- opt (options )
1032- }
1033-
10341092 if span := opentracing .SpanFromContext (ctx ); span != nil && span .Tracer () != nil {
10351093 span = span .Tracer ().StartSpan ("pdclient.GetAllStores" , opentracing .ChildOf (span .Context ()))
10361094 defer span .Finish ()
10371095 }
10381096 start := time .Now ()
10391097 defer func () { metrics .CmdDurationGetAllStores .Observe (time .Since (start ).Seconds ()) }()
1098+ // Applies option
1099+ option := & opt.GetStoreOp {}
1100+ for _ , opt := range opts {
1101+ opt (option )
1102+ }
10401103
10411104 ctx , cancel := context .WithTimeout (ctx , c .inner .option .Timeout )
10421105 defer cancel ()
10431106 req := & pdpb.GetAllStoresRequest {
10441107 Header : c .requestHeader (),
1045- ExcludeTombstoneStores : options .ExcludeTombstone ,
1108+ ExcludeTombstoneStores : option .ExcludeTombstone ,
10461109 }
1047- protoClient , ctx := c .getClientAndContext (ctx )
1048- if protoClient == nil {
1110+
1111+ var (
1112+ resp * pdpb.GetAllStoresResponse
1113+ err error
1114+ )
1115+ serviceClient , cctx , isRouterServiceClient := c .inner .getServiceClient (ctx , & opt.GetRegionOp {}, option )
1116+
1117+ if serviceClient == nil {
10491118 return nil , errs .ErrClientGetProtoClient
10501119 }
1051- resp , err := protoClient .GetAllStores (ctx , req )
1120+ if isRouterServiceClient {
1121+ resp , err = routerpb .NewRouterClient (serviceClient .GetClientConn ()).GetAllStores (cctx , req )
1122+ } else {
1123+ resp , err = pdpb .NewPDClient (serviceClient .GetClientConn ()).GetAllStores (cctx , req )
1124+ }
10521125
10531126 if err = c .respForErr (metrics .CmdFailedDurationGetAllStores , start , err , resp .GetHeader ()); err != nil {
10541127 return nil , err
0 commit comments