diff --git a/client/opt/option.go b/client/opt/option.go index 2ceb83ffaec..c46edb8f924 100644 --- a/client/opt/option.go +++ b/client/opt/option.go @@ -261,6 +261,13 @@ func WithAllowRouterServiceHandleStoreRequest() GetStoreOption { return func(op *GetStoreOp) { op.AllowRouterServiceHandle = true } } +// WithPDLeaderHandleStoreRequestOnly means the store request must be handled by PD leader. +func WithPDLeaderHandleStoreRequestOnly() GetStoreOption { + return func(op *GetStoreOp) { + op.AllowRouterServiceHandle = false + } +} + // RegionsOp represents available options when operate regions type RegionsOp struct { Group string @@ -317,6 +324,14 @@ func WithOutputMustContainAllKeyRange() GetRegionOption { return func(op *GetRegionOp) { op.OutputMustContainAllKeyRange = true } } +// WithAllowPDLeaderOnly means the request must be handled by PD leader. +func WithAllowPDLeaderOnly() GetRegionOption { + return func(op *GetRegionOp) { + op.AllowRouterServiceHandle = false + op.AllowFollowerHandle = false + } +} + // MetaStorageOp represents available options when using meta storage client. type MetaStorageOp struct { RangeEnd []byte diff --git a/client/opt/option_test.go b/client/opt/option_test.go index 4d0df3ff468..08caea59a69 100644 --- a/client/opt/option_test.go +++ b/client/opt/option_test.go @@ -112,6 +112,26 @@ func TestDynamicOptionChange(t *testing.T) { ensureNoNotification(t, o.EnableRouterClientCh) } +func TestOptions(t *testing.T) { + re := require.New(t) + op := GetRegionOp{} + re.False(op.AllowFollowerHandle) + WithAllowFollowerHandle()(&op) + WithAllowRouterServiceHandle()(&op) + re.True(op.AllowFollowerHandle) + re.True(op.AllowRouterServiceHandle) + WithAllowPDLeaderOnly()(&op) + re.False(op.AllowFollowerHandle) + re.False(op.AllowRouterServiceHandle) + + storeOp := GetStoreOp{} + re.False(storeOp.AllowRouterServiceHandle) + WithAllowRouterServiceHandleStoreRequest()(&storeOp) + re.True(storeOp.AllowRouterServiceHandle) + WithPDLeaderHandleStoreRequestOnly()(&storeOp) + re.False(storeOp.AllowRouterServiceHandle) +} + // clearChannel drains any pending events from the channel. func clearChannel(ch chan struct{}) { select { diff --git a/pkg/mcs/router/server/sync.go b/pkg/mcs/router/server/sync.go index 53ada7aff18..f73054c9dfc 100644 --- a/pkg/mcs/router/server/sync.go +++ b/pkg/mcs/router/server/sync.go @@ -122,13 +122,6 @@ func (s *RegionSyncer) updatePDMemberLoop() { defer ticker.Stop() var curLeader uint64 for { - select { - case <-s.serverCtx.Done(): - log.Info("server is closed, exit update member loop") - return - case <-ticker.C: - case <-s.checkMembershipCh: - } members, err := etcdutil.ListEtcdMembers(s.serverCtx, s.getClient()) if err != nil { log.Warn("failed to list members", errs.ZapError(err)) @@ -157,6 +150,13 @@ func (s *RegionSyncer) updatePDMemberLoop() { break } } + select { + case <-s.serverCtx.Done(): + log.Info("server is closed, exit update member loop") + return + case <-ticker.C: + case <-s.checkMembershipCh: + } } }