Skip to content

Commit cee954e

Browse files
committed
fix test
Signed-off-by: bufferflies <1045931706@qq.com>
1 parent 37f4e51 commit cee954e

File tree

2 files changed

+36
-14
lines changed

2 files changed

+36
-14
lines changed

client/clients/router/client.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func (c *Cli) updateRouterServiceConnection(ctx context.Context) {
460460
// GC all the router router stream connections.
461461
c.routerConCtxMgr.GC(func(url string) bool {
462462
if url != c.getLeaderURL() {
463-
log.Info("[router] release the router service stream connection", zap.String("url", url))
463+
log.Info("[router] release the router service stream connection", zap.String("url", url))
464464
return true
465465
}
466466
return false
@@ -607,9 +607,9 @@ batchLoop:
607607
}
608608
if fn == nil {
609609
fn, streamURL, retry = c.dispatcherAPI(ctx)
610-
}
611-
if retry {
612-
continue connectionCtxChoosingLoop
610+
if retry {
611+
continue connectionCtxChoosingLoop
612+
}
613613
}
614614
break connectionCtxChoosingLoop
615615
}
@@ -664,6 +664,7 @@ func (c *Cli) dispatcherAPI(ctx context.Context) (processFn, string, bool) {
664664

665665
type processFn func() error
666666

667+
// dispatcherRouterService returns the stream function, stream url and need retry again
667668
func (c *Cli) dispatcherRouterService(ctx context.Context) (processFn, string, bool) {
668669
allowRouterServiceHandle := c.option.GetEnableRouterServiceHandler()
669670
if allowRouterServiceHandle {
@@ -680,24 +681,45 @@ func (c *Cli) dispatcherRouterService(ctx context.Context) (processFn, string, b
680681
if allowRouterServiceHandle {
681682
return nil, "", false
682683
}
683-
connect := c.routerConCtxMgr.RandomlyPick()
684-
if connect == nil {
685-
log.Info("[router] router service stream connection is not ready")
686-
c.updateRouterServiceConnection(ctx)
684+
685+
// Check whether allow the follower to handle this batch of requests.
686+
allowFollowerHandle := c.option.GetEnableFollowerHandle()
687+
if allowFollowerHandle {
688+
// We need to ensure all requests in a same batch allow to be handled by the follower.
689+
// IMPROVE: separate into the follower and leader handle batches.
690+
c.batchController.IterCollectedRequests(func(req *Request) bool {
691+
if !req.options.AllowFollowerHandle {
692+
allowFollowerHandle = false
693+
return false
694+
}
695+
return true
696+
})
697+
}
698+
allowFollowerHandle = allowFollowerHandle && c.option.GetEnableFollowerHandle()
699+
var connectionCtx *cctx.ConnectionCtx[pdpb.PD_QueryRegionClient]
700+
if allowFollowerHandle {
701+
connectionCtx = c.conCtxMgr.RandomlyPick()
702+
} else {
703+
connectionCtx = c.conCtxMgr.GetConnectionCtx(c.getLeaderURL())
704+
705+
}
706+
if connectionCtx == nil {
707+
log.Info("[router] router stream connection is not ready")
708+
c.updateConnection(ctx)
687709
return nil, "", true
688710
}
689711
// Check if the stream connection is canceled.
690712
select {
691-
case <-connect.Ctx.Done():
692-
log.Info("[router] router service stream connection is canceled", zap.String("stream-url", connect.StreamURL))
693-
c.routerConCtxMgr.Release(connect.StreamURL)
713+
case <-connectionCtx.Ctx.Done():
714+
log.Info("[router] router stream connection is canceled", zap.String("stream-url", connectionCtx.StreamURL))
715+
c.routerConCtxMgr.Release(connectionCtx.StreamURL)
694716
return nil, "", true
695717
default:
696718
}
697719

698720
return func() error {
699-
return c.processRouterRequests(connect.Stream)
700-
}, connect.StreamURL, false
721+
return c.processRouterRequests(connectionCtx.Stream)
722+
}, connectionCtx.StreamURL, false
701723
}
702724

703725
type recvFn func() (*pdpb.QueryRegionResponse, error)

client/servicediscovery/router_service_discovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (r *routerServiceDiscovery) CheckMemberChanged() error {
165165
log.Warn("[router] failed to check member changed", errs.ZapError(err))
166166
}
167167
if err := innerRetry(r.ctx, queryRetryMaxTimes, r.updateMember); err != nil {
168-
log.Error("[router] failed to update member", errs.ZapError(err))
168+
log.Warn("[router] failed to update member", errs.ZapError(err))
169169
return err
170170
}
171171
return nil

0 commit comments

Comments
 (0)