Skip to content

Commit 648cb85

Browse files
committed
address comment
Signed-off-by: bufferflies <1045931706@qq.com>
1 parent b24add0 commit 648cb85

File tree

5 files changed

+39
-54
lines changed

5 files changed

+39
-54
lines changed

client/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ type serviceModeKeeper struct {
156156
tsoSvcDiscovery sd.ServiceDiscovery
157157
routerClient *router.Cli
158158
resourceManagerDiscovery *sd.ResourceManagerDiscovery
159-
mcsSvcDiscovery sd.ServiceDiscovery
159+
msDiscovery sd.ServiceDiscovery
160160
}
161161

162162
func (k *serviceModeKeeper) close() {
@@ -170,8 +170,8 @@ func (k *serviceModeKeeper) close() {
170170
k.resourceManagerDiscovery.Close()
171171
k.resourceManagerDiscovery = nil
172172
}
173-
if k.mcsSvcDiscovery != nil {
174-
k.mcsSvcDiscovery.Close()
173+
if k.msDiscovery != nil {
174+
k.msDiscovery.Close()
175175
}
176176

177177
k.tsoClient.Close()

client/clients/router/client.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ type Cli struct {
180180
// updateConnectionCh is used to trigger the connection update actively.
181181
updateConnectionCh chan struct{}
182182

183-
// msDiscovery is the service discovery for the router mcs service.
183+
// msDiscovery is the service discovery for the router service.
184184
msDiscovery sd.ServiceDiscovery
185185
// msConCtxMgr is used to store the context of the router service stream connection.
186186
msConCtxMgr *cctx.Manager[routerpb.Router_QueryRegionClient]
@@ -196,7 +196,7 @@ type Cli struct {
196196
func NewClient(
197197
ctx context.Context,
198198
svcDiscovery sd.ServiceDiscovery,
199-
mcsSvcDiscovery sd.ServiceDiscovery,
199+
msDiscovery sd.ServiceDiscovery,
200200
option *opt.Option,
201201
) *Cli {
202202
ctx, cancel := context.WithCancel(ctx)
@@ -208,7 +208,7 @@ func NewClient(
208208
conCtxMgr: cctx.NewManager[pdpb.PD_QueryRegionClient](),
209209
updateConnectionCh: make(chan struct{}, 1),
210210
msConCtxMgr: cctx.NewManager[routerpb.Router_QueryRegionClient](),
211-
msDiscovery: mcsSvcDiscovery,
211+
msDiscovery: msDiscovery,
212212
bo: retry.InitialBackoffer(
213213
sd.UpdateMemberBackOffBaseTime,
214214
sd.UpdateMemberMaxBackoffTime,
@@ -376,7 +376,7 @@ func (c *Cli) getAllClientConns() map[string]*grpc.ClientConn {
376376
return conns
377377
}
378378

379-
func (c *Cli) getAllMcsClientConns() map[string]*grpc.ClientConn {
379+
func (c *Cli) getAllMsClientConns() map[string]*grpc.ClientConn {
380380
if c.msDiscovery == nil {
381381
return nil
382382
}
@@ -417,7 +417,7 @@ func (c *Cli) connectionDaemon() {
417417
log.Info("[router] connection daemon is started")
418418
for {
419419
c.updateConnection(updaterCtx)
420-
c.updateMcsServiceConnection(updaterCtx)
420+
c.updateMsConnection(updaterCtx)
421421
select {
422422
case <-updaterCtx.Done():
423423
log.Info("[router] connection daemon is exiting")
@@ -434,11 +434,11 @@ func (c *Cli) connectionDaemon() {
434434
}
435435
}
436436

437-
func (c *Cli) updateMcsServiceConnection(ctx context.Context) {
437+
func (c *Cli) updateMsConnection(ctx context.Context) {
438438
if c.msDiscovery == nil {
439439
return
440440
}
441-
conns := c.getAllMcsClientConns()
441+
conns := c.getAllMsClientConns()
442442
if len(conns) == 0 {
443443
log.Warn("[router] no router service node found")
444444
return
@@ -610,7 +610,7 @@ batchLoop:
610610
continue batchLoop
611611
default:
612612
}
613-
processQueryFunc, streamURL = c.sendToMcs(ctx)
613+
processQueryFunc, streamURL = c.sendToMs(ctx)
614614
if processQueryFunc == nil {
615615
processQueryFunc, streamURL, retry = c.sendToPD(ctx)
616616
}
@@ -670,8 +670,8 @@ func (c *Cli) sendToPD(ctx context.Context) (processFn, string, bool) {
670670

671671
type processFn func() error
672672

673-
// sendToMcs returns the stream function, stream url
674-
func (c *Cli) sendToMcs(ctx context.Context) (processFn, string) {
673+
// sendToMs returns the stream function, stream url
674+
func (c *Cli) sendToMs(ctx context.Context) (processFn, string) {
675675
allowRouterServiceHandle := c.msConCtxMgr.Size() > 0
676676
if allowRouterServiceHandle {
677677
// We need to ensure all requests in a same batch allow to be handled by the router service.
@@ -690,7 +690,7 @@ func (c *Cli) sendToMcs(ctx context.Context) (processFn, string) {
690690
connectionCtx := c.msConCtxMgr.RandomlyPick()
691691
if connectionCtx == nil {
692692
log.Info("[router] router service stream connection is not ready")
693-
c.updateMcsServiceConnection(ctx)
693+
c.updateMsConnection(ctx)
694694
return nil, ""
695695
}
696696
streamCtx, streamURL, stream := connectionCtx.Ctx, connectionCtx.StreamURL, connectionCtx.Stream

client/errs/errno.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,26 +50,25 @@ var (
5050

5151
// client errors
5252
var (
53-
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
54-
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
55-
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
56-
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
57-
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
58-
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
59-
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
60-
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
61-
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
62-
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
63-
ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember"))
64-
ErrClientNoTargetMember = errors.Normalize("no target member", errors.RFCCodeText("PD:client:ErrClientNoTargetMember"))
65-
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
66-
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
67-
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
68-
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
69-
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
70-
ErrCircuitBreakerOpen = errors.Normalize("circuit breaker is open", errors.RFCCodeText("PD:client:ErrCircuitBreakerOpen"))
71-
ErrClientRouterConnectionTimeout = errors.Normalize("router connection is not ready until timeout", errors.RFCCodeText("PD:client:ErrClientRouterConnectionTimeout"))
72-
ErrClientRouterServiceNotInitialized = errors.Normalize("router service discovery is not initialized", errors.RFCCodeText("PD:client:ErrClientRouterServiceNotInitialized"))
53+
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
54+
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
55+
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
56+
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
57+
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
58+
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
59+
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
60+
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
61+
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
62+
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
63+
ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember"))
64+
ErrClientNoTargetMember = errors.Normalize("no target member", errors.RFCCodeText("PD:client:ErrClientNoTargetMember"))
65+
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
66+
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
67+
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
68+
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
69+
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
70+
ErrCircuitBreakerOpen = errors.Normalize("circuit breaker is open", errors.RFCCodeText("PD:client:ErrCircuitBreakerOpen"))
71+
ErrClientRouterConnectionTimeout = errors.Normalize("router connection is not ready until timeout", errors.RFCCodeText("PD:client:ErrClientRouterConnectionTimeout"))
7372
)
7473

7574
// grpcutil errors

client/inner_client.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,9 @@ func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
7474
}
7575
return err
7676
}
77-
c.mcsSvcDiscovery = sd.NewRouterServiceDiscovery(c.ctx, c, c.serviceDiscovery,
77+
c.msDiscovery = sd.NewRouterServiceDiscovery(c.ctx, c, c.serviceDiscovery,
7878
c.tlsCfg, c.option)
7979

80-
// Check if the router service client has been enabled.
81-
c.enableRouterServiceClient()
82-
8380
// Check if the router client has been enabled.
8481
if c.option.GetEnableRouterClient() {
8582
c.enableRouterClient()
@@ -119,7 +116,7 @@ func (c *innerClient) enableRouterClient() {
119116
}
120117
c.RUnlock()
121118
// Create a new router client first before acquiring the lock.
122-
routerClient := router.NewClient(c.ctx, c.serviceDiscovery, c.mcsSvcDiscovery, c.option)
119+
routerClient := router.NewClient(c.ctx, c.serviceDiscovery, c.msDiscovery, c.option)
123120
c.Lock()
124121
// Double check if the router client has been enabled.
125122
if c.routerClient != nil {
@@ -145,14 +142,6 @@ func (c *innerClient) disableRouterClient() {
145142
routerClient.Close()
146143
}
147144

148-
func (c *innerClient) enableRouterServiceClient() {
149-
c.Lock()
150-
defer c.Unlock()
151-
if err := c.mcsSvcDiscovery.Init(); err != nil {
152-
log.Warn("[pd] failed to initialize router service discovery", zap.Error(err))
153-
}
154-
}
155-
156145
func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) {
157146
c.Lock()
158147
defer c.Unlock()
@@ -323,7 +312,7 @@ func (c *innerClient) getServiceClient(ctx context.Context, regionOp *opt.GetReg
323312
switch {
324313
case allowRouterServiceHandle:
325314
isRouterClient = true
326-
serviceClient = c.mcsSvcDiscovery.GetServiceClient()
315+
serviceClient = c.msDiscovery.GetServiceClient()
327316
mustLeader = false
328317
case regionOp.AllowFollowerHandle && c.option.GetEnableFollowerHandle():
329318
mustLeader = false

client/servicediscovery/router_service_discovery.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,6 @@ func (r *routerServiceDiscovery) GetServiceClientByKind(_ APIKind) ServiceClient
101101

102102
// GetOrCreateGRPCConn creates a gRPC connection to the router service.
103103
func (r *routerServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) {
104-
if r.ctx == nil {
105-
return nil, errs.ErrClientRouterServiceNotInitialized.FastGen("router service discovery is not initialized")
106-
}
107104
return grpcutil.GetOrCreateGRPCConn(r.ctx, &r.clientConns, url, r.tlsCfg, r.option.GRPCDialOptions...)
108105
}
109106

@@ -119,9 +116,6 @@ func (r *routerServiceDiscovery) ScheduleCheckMemberChanged() {
119116

120117
// CheckMemberChanged checks if there is any membership change among the members of the router service.
121118
func (r *routerServiceDiscovery) CheckMemberChanged() error {
122-
if r.ctx == nil {
123-
return errs.ErrClientRouterServiceNotInitialized.FastGen("router service discovery is not initialized")
124-
}
125119
if err := retry.WithConfig(r.ctx, r.updateMember); err != nil {
126120
log.Warn("[router service] failed to update member", errs.ZapError(err))
127121
return err
@@ -158,6 +152,9 @@ func NewRouterServiceDiscovery(
158152
zap.Uint64("cluster-id", c.GetClusterID()),
159153
zap.Uint32("keyspace-id", c.GetKeyspaceID()),
160154
zap.String("default-discovery-key", c.defaultDiscoveryKey))
155+
if err := c.Init(); err != nil {
156+
log.Warn("[pd] failed to initialize router service discovery, will retry init later", zap.Error(err))
157+
}
161158
return c
162159
}
163160

0 commit comments

Comments
 (0)