Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions client/opt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ func WithAllowRouterServiceHandleStoreRequest() GetStoreOption {
return func(op *GetStoreOp) { op.AllowRouterServiceHandle = true }
}

// WithPDLeaderHandleStoreRequestOnly means the store request must be handled by PD leader.
func WithPDLeaderHandleStoreRequestOnly() GetStoreOption {
return func(op *GetStoreOp) {
op.AllowRouterServiceHandle = false
}
}

// RegionsOp represents available options when operate regions
type RegionsOp struct {
Group string
Expand Down Expand Up @@ -317,6 +324,14 @@ func WithOutputMustContainAllKeyRange() GetRegionOption {
return func(op *GetRegionOp) { op.OutputMustContainAllKeyRange = true }
}

// WithAllowPDLeaderOnly means the request must be handled by PD leader.
func WithAllowPDLeaderOnly() GetRegionOption {
return func(op *GetRegionOp) {
op.AllowRouterServiceHandle = false
op.AllowFollowerHandle = false
}
}

// MetaStorageOp represents available options when using meta storage client.
type MetaStorageOp struct {
RangeEnd []byte
Expand Down
20 changes: 20 additions & 0 deletions client/opt/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ func TestDynamicOptionChange(t *testing.T) {
ensureNoNotification(t, o.EnableRouterClientCh)
}

func TestOptions(t *testing.T) {
re := require.New(t)
op := GetRegionOp{}
re.False(op.AllowFollowerHandle)
WithAllowFollowerHandle()(&op)
WithAllowRouterServiceHandle()(&op)
re.True(op.AllowFollowerHandle)
re.True(op.AllowRouterServiceHandle)
WithAllowPDLeaderOnly()(&op)
re.False(op.AllowFollowerHandle)
re.False(op.AllowRouterServiceHandle)

storeOp := GetStoreOp{}
re.False(storeOp.AllowRouterServiceHandle)
WithAllowRouterServiceHandleStoreRequest()(&storeOp)
re.True(storeOp.AllowRouterServiceHandle)
WithPDLeaderHandleStoreRequestOnly()(&storeOp)
re.False(storeOp.AllowRouterServiceHandle)
}

// clearChannel drains any pending events from the channel.
func clearChannel(ch chan struct{}) {
select {
Expand Down
14 changes: 7 additions & 7 deletions pkg/mcs/router/server/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,6 @@ func (s *RegionSyncer) updatePDMemberLoop() {
defer ticker.Stop()
var curLeader uint64
for {
select {
case <-s.serverCtx.Done():
log.Info("server is closed, exit update member loop")
return
case <-ticker.C:
case <-s.checkMembershipCh:
}
members, err := etcdutil.ListEtcdMembers(s.serverCtx, s.getClient())
if err != nil {
log.Warn("failed to list members", errs.ZapError(err))
Expand Down Expand Up @@ -157,6 +150,13 @@ func (s *RegionSyncer) updatePDMemberLoop() {
break
}
}
select {
case <-s.serverCtx.Done():
log.Info("server is closed, exit update member loop")
return
case <-ticker.C:
case <-s.checkMembershipCh:
}
}
}

Expand Down
Loading