Skip to content

Commit e70ec13

Browse files
committed
add more test
Signed-off-by: 童剑 <1045931706@qq.com>
1 parent 2a5abbd commit e70ec13

File tree

8 files changed

+109
-75
lines changed

8 files changed

+109
-75
lines changed

client/clients/router/client.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ func (c *Cli) updateRouterServiceConnection(ctx context.Context) {
438438
cctx, cancel := context.WithCancel(ctx)
439439
stream, err := routerpb.NewRouterClient(conn).QueryRegion(cctx)
440440
if err != nil {
441-
log.Error("[router] failed to create the router service stream connection", errs.ZapError(err))
441+
log.Warn("[router] failed to create the router service stream connection", errs.ZapError(err))
442442
}
443443
// Store the stream connection context if it is successfully created.
444444
if stream != nil {
@@ -461,7 +461,6 @@ func (c *Cli) updateRouterServiceConnection(ctx context.Context) {
461461
// GC all the router service stream connections.
462462
log.Info("[router] release all router service stream connection")
463463
c.routerConCtxMgr.ReleaseAll()
464-
465464
}
466465
}
467466

client/inner_client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ func (c *innerClient) disableRouterServiceClient() {
175175
func (c *innerClient) enableRouterServiceClient() {
176176
c.Lock()
177177
defer c.Unlock()
178-
c.routerSvcDiscovery.Init()
178+
if err := c.routerSvcDiscovery.Init(); err != nil {
179+
log.Warn("[pd] failed to initialize router service discovery", zap.Error(err))
180+
}
179181
}
180182

181183
func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) {
@@ -291,6 +293,7 @@ func (c *innerClient) close() {
291293
c.tokenDispatcher.tokenBatchController.revokePendingTokenRequest(tokenErr)
292294
c.tokenDispatcher.dispatcherCancel()
293295
}
296+
log.Info("[pd] close client successfully")
294297
}
295298

296299
func (c *innerClient) setup() error {
@@ -334,6 +337,8 @@ func (c *innerClient) getServiceClient(ctx context.Context, options *opt.GetRegi
334337
if serviceClient != nil && serviceClient.GetClientConn() != nil && serviceClient.Available() {
335338
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, mustLeader), isRouterClient
336339
}
340+
// Fallback to the leader client.
341+
isRouterClient = false
337342
serviceClient = c.serviceDiscovery.GetServiceClient()
338343
if serviceClient == nil || serviceClient.GetClientConn() == nil {
339344
return nil, ctx, false

client/servicediscovery/router_service_discovery.go

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ import (
3939
)
4040

4141
const (
42-
servicePathFormat = "/ms/%d/router/registry/" // "/ms/{cluster_id}/router/registry/"
42+
// "/ms/{cluster_id}/router/registry/"
43+
servicePathFormat = "/ms/%d/router/registry/"
4344
)
4445

4546
var _ ServiceDiscovery = (*routerServiceDiscovery)(nil)
@@ -64,9 +65,10 @@ type routerServiceDiscovery struct {
6465

6566
checkMembershipCh chan struct{}
6667

67-
ctx context.Context
68-
cancel context.CancelFunc
69-
wg sync.WaitGroup
68+
parentCtx context.Context
69+
ctx context.Context
70+
cancel context.CancelFunc
71+
wg sync.WaitGroup
7072

7173
// Client option.
7274
option *opt.Option
@@ -108,6 +110,8 @@ func (r *routerServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientCo
108110
// ScheduleCheckMemberChanged schedules a check for member changes.
109111
func (r *routerServiceDiscovery) ScheduleCheckMemberChanged() {
110112
select {
113+
case <-r.parentCtx.Done():
114+
log.Info("[router service] service discovery is shutting down")
111115
case r.checkMembershipCh <- struct{}{}:
112116
default:
113117
}
@@ -132,12 +136,10 @@ func NewRouterServiceDiscovery(
132136
ctx context.Context, metaCli metastorage.Client, serviceDiscovery ServiceDiscovery,
133137
tlsCfg *tls.Config, option *opt.Option,
134138
) ServiceDiscovery {
135-
ctx, cancel := context.WithCancel(ctx)
136139
balancer := newServiceBalancer(emptyErrorFn)
137140
c := &routerServiceDiscovery{
138-
ctx: ctx,
141+
parentCtx: ctx,
139142
ServiceDiscovery: serviceDiscovery,
140-
cancel: cancel,
141143
metaCli: metaCli,
142144
tlsCfg: tlsCfg,
143145
option: option,
@@ -149,7 +151,7 @@ func NewRouterServiceDiscovery(
149151
// will be discovered later.
150152
c.defaultDiscoveryKey = fmt.Sprintf(servicePathFormat, c.GetClusterID())
151153

152-
log.Info("created router service discovery",
154+
log.Info("[router service] created router service discovery",
153155
zap.Uint64("cluster-id", c.GetClusterID()),
154156
zap.Uint32("keyspace-id", c.GetKeyspaceID()),
155157
zap.String("default-discovery-key", c.defaultDiscoveryKey))
@@ -158,13 +160,13 @@ func NewRouterServiceDiscovery(
158160

159161
// Init initialize the concrete client underlying
160162
func (r *routerServiceDiscovery) Init() error {
161-
log.Info("initializing router service discovery",
163+
log.Info("[router service] initializing router service discovery",
162164
zap.Int("max-retry-times", r.option.MaxRetryTimes),
163165
zap.Duration("retry-interval", initRetryInterval))
166+
r.ctx, r.cancel = context.WithCancel(r.parentCtx)
164167
if err := r.CheckMemberChanged(); err != nil {
165-
r.cancel()
166-
log.Warn("failed to initialize router service discovery", zap.Error(err))
167-
return err
168+
// Initial check failed, log and continue to run the background loop.
169+
log.Warn("[router service] failed to initialize router service discovery", zap.Error(err))
168170
}
169171
r.wg.Add(2)
170172
go r.startCheckMemberLoop()
@@ -203,19 +205,21 @@ func (r *routerServiceDiscovery) updateNodes(urls []string) {
203205
if client.(*serviceClient).GetClientConn() == nil {
204206
conn, err := r.GetOrCreateGRPCConn(url)
205207
if err != nil || conn == nil {
206-
log.Warn("[pd] failed to connect follower", zap.String("follower", url), errs.ZapError(err))
208+
log.Warn("[router service] failed to connect router service",
209+
zap.String("new-url", newURL), errs.ZapError(err))
207210
continue
208211
}
209212
node := newPDServiceClient(url, r.GetServingURL(), conn, false)
210213
r.nodes.Store(url, node)
211214
}
212215
} else {
213216
conn, err := r.GetOrCreateGRPCConn(url)
214-
follower := newPDServiceClient(url, r.GetServingURL(), conn, false)
215217
if err != nil || conn == nil {
216-
log.Warn("[pd] failed to connect follower", zap.String("follower", url), errs.ZapError(err))
218+
log.Warn("[router service] failed to connect follower",
219+
zap.String("url", url), errs.ZapError(err))
217220
}
218-
r.nodes.LoadOrStore(url, follower)
221+
nodeClient := newPDServiceClient(url, r.GetServingURL(), conn, false)
222+
r.nodes.LoadOrStore(url, nodeClient)
219223
}
220224
}
221225
}
@@ -224,6 +228,9 @@ func (r *routerServiceDiscovery) updateNodes(urls []string) {
224228
clients = append(clients, value.(*serviceClient))
225229
return true
226230
})
231+
log.Info("[router service] updating nodes succeeded",
232+
zap.Strings("urls", urls),
233+
zap.Int("clients-length", len(clients)))
227234
r.balancer.set(clients)
228235
}
229236

@@ -261,7 +268,7 @@ func (r *routerServiceDiscovery) startCheckMemberLoop() {
261268
// so that we can speed up the process of router service discovery when failover happens on the
262269
// router service side and also ensures it won't call updateMember too frequently during normal time.
263270
if err := r.CheckMemberChanged(); err != nil {
264-
log.Error("[router service] failed to update member", errs.ZapError(err))
271+
log.Warn("[router service] failed to update member", errs.ZapError(err))
265272
}
266273
}
267274
}
@@ -287,19 +294,23 @@ func innerRetry(
287294

288295
// Close releases all resources
289296
func (r *routerServiceDiscovery) Close() {
290-
log.Info("closing router service discovery")
291-
r.cancel()
292-
r.wg.Wait()
297+
log.Info("[router service] closing router service discovery")
298+
if r.cancel != nil {
299+
r.cancel()
300+
}
293301

294302
r.clientConns.Range(func(key, cc any) bool {
295303
if err := cc.(*grpc.ClientConn).Close(); err != nil {
296-
log.Error("[router service] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
304+
log.Warn("[router service] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
297305
}
298306
r.clientConns.Delete(key)
299307
return true
300308
})
301-
302-
log.Info("router service discovery is closed")
309+
r.sortedUrls.Store([]string{})
310+
r.balancer.clean()
311+
r.nodes.Clear()
312+
r.wg.Wait()
313+
log.Info("[router service] is closed")
303314
}
304315

305316
// getMSMembers returns all the members of the specified service name.
@@ -315,7 +326,8 @@ func getMSMembers(ctx context.Context, serviceKey string, client metastorage.Cli
315326
for _, kv := range resp.GetKvs() {
316327
var entry ServiceRegistryEntry
317328
if err = entry.Deserialize(kv.Value); err != nil {
318-
log.Error("try to deserialize service registry entry failed", zap.String("key", string(kv.Key)), zap.Error(err))
329+
log.Warn("[router service] try to deserialize service registry entry failed",
330+
zap.String("key", string(kv.Key)), zap.Error(err))
319331
continue
320332
}
321333
ret = append(ret, entry.ServiceAddr)
@@ -335,20 +347,10 @@ type ServiceRegistryEntry struct {
335347
StartTimestamp int64 `json:"start-timestamp"`
336348
}
337349

338-
// Serialize this service registry entry
339-
func (e *ServiceRegistryEntry) Serialize() (serializedValue string, err error) {
340-
data, err := json.Marshal(e)
341-
if err != nil {
342-
log.Error("json marshal the service registry entry failed", zap.Error(err))
343-
return "", err
344-
}
345-
return string(data), nil
346-
}
347-
348350
// Deserialize the data to this service registry entry
349351
func (e *ServiceRegistryEntry) Deserialize(data []byte) error {
350352
if err := json.Unmarshal(data, e); err != nil {
351-
log.Error("json unmarshal the service registry entry failed", zap.Error(err))
353+
log.Warn("[router service] json unmarshal the service registry entry failed", zap.Error(err))
352354
return err
353355
}
354356
return nil
@@ -366,6 +368,7 @@ func (r *routerServiceDiscovery) nodeHealthCheckLoop() {
366368
for {
367369
select {
368370
case <-r.ctx.Done():
371+
log.Info("[router service] exit health check member loop")
369372
return
370373
case <-ticker.C:
371374
r.checkNodeHealth(nodeCheckLoopCtx)

client/servicediscovery/service_discovery.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,13 @@ func (c *serviceBalancer) get() (ret ServiceClient) {
398398
return
399399
}
400400

401+
func (c *serviceBalancer) clean() {
402+
c.mu.Lock()
403+
defer c.mu.Unlock()
404+
c.now = nil
405+
c.totalNode = 0
406+
}
407+
401408
// UpdateKeyspaceIDFunc is the function type for updating the keyspace ID.
402409
type UpdateKeyspaceIDFunc func() error
403410

pkg/mcs/utils/util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
clientv3 "go.etcd.io/etcd/client/v3"
3434
"go.uber.org/zap"
3535
"google.golang.org/grpc"
36+
"google.golang.org/grpc/health"
37+
"google.golang.org/grpc/health/grpc_health_v1"
3638
"google.golang.org/grpc/keepalive"
3739

3840
"github.com/pingcap/kvproto/pkg/diagnosticspb"
@@ -172,6 +174,8 @@ func StartGRPCAndHTTPServers(s server, serverReadyChan chan<- struct{}, l net.Li
172174
MinTime: 5 * time.Second,
173175
}),
174176
)
177+
hs := health.NewServer()
178+
grpc_health_v1.RegisterHealthServer(grpcServer, hs)
175179
s.SetGRPCServer(grpcServer)
176180
s.RegisterGRPCService(grpcServer)
177181
diagnosticspb.RegisterDiagnosticsServer(grpcServer, s)

tests/integrations/mcs/router/server_test.go

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -139,26 +139,39 @@ func (suite *serverTestSuite) TestStoreAPI() {
139139
re.Error(err)
140140
defer cli.Close()
141141

142-
// test router store apis
143-
re.NoError(cli.UpdateOption(opt.EnableRouterServiceHandler, true))
144-
145-
// wait the router service watch the store info
146-
testutil.Eventually(re, func() bool {
147-
store, err := cli.GetStore(suite.ctx, store1)
148-
if err != nil {
149-
return false
142+
checkStore := func() {
143+
// test router store apis
144+
re.NoError(cli.UpdateOption(opt.EnableRouterServiceHandler, true))
145+
// wait the router service watch the store info
146+
testutil.Eventually(re, func() bool {
147+
store, err := cli.GetStore(suite.ctx, store1)
148+
if err != nil {
149+
return false
150+
}
151+
re.Equal(store1, store.GetId())
152+
return true
153+
})
154+
ctx, cancel := context.WithTimeout(suite.ctx, 3*time.Second)
155+
defer func() {
156+
cancel()
157+
}()
158+
for {
159+
select {
160+
case <-ctx.Done():
161+
re.NoError(cli.UpdateOption(opt.EnableRouterServiceHandler, false))
162+
_, err = cli.GetStore(suite.ctx, store1)
163+
re.Error(err)
164+
return
165+
default:
166+
stores, err := cli.GetAllStores(suite.ctx)
167+
re.NoError(err)
168+
re.Len(stores, 2)
169+
}
150170
}
151-
re.Equal(store1, store.GetId())
152-
return true
153-
})
154-
155-
stores, err := cli.GetAllStores(suite.ctx)
156-
re.NoError(err)
157-
re.Len(stores, 2)
158-
159-
re.NoError(cli.UpdateOption(opt.EnableRouterServiceHandler, false))
160-
_, err = cli.GetStore(suite.ctx, store1)
161-
re.Error(err)
171+
}
172+
checkStore()
173+
// enable router client and check store api again
174+
checkStore()
162175
}
163176

164177
func (suite *serverTestSuite) TestRegionAPI() {
@@ -168,26 +181,22 @@ func (suite *serverTestSuite) TestRegionAPI() {
168181
re.NoError(failpoint.Disable("github.com/tikv/pd/server/customTimeout"))
169182
}()
170183
// make sure pd server can't support region grpc request
171-
pdcli, err := pd.NewClientWithContext(suite.ctx, caller.TestComponent,
184+
cli, err := pd.NewClientWithContext(suite.ctx, caller.TestComponent,
172185
[]string{suite.backendEndpoints}, pd.SecurityOption{})
173186
re.NoError(err)
174-
_, err = pdcli.GetRegionByID(suite.ctx, 1)
187+
_, err = cli.GetRegionByID(suite.ctx, 1)
175188
re.Error(err)
176-
pdcli.Close()
189+
defer func() {
190+
cli.Close()
191+
}()
177192

178193
// test region apis
179-
cli, err := pd.NewClientWithContext(suite.ctx, caller.TestComponent,
180-
[]string{suite.backendEndpoints}, pd.SecurityOption{}, opt.WithEnableRouterServiceHandler(true))
181-
re.NoError(err)
194+
re.NoError(cli.UpdateOption(opt.EnableRouterServiceHandler, true))
182195
suite.checkRegionAPI(cli)
183-
cli.Close()
184196

185197
// test region apis with router client enabled
186-
cli2, err := pd.NewClientWithContext(suite.ctx, caller.TestComponent,
187-
[]string{suite.backendEndpoints}, pd.SecurityOption{}, opt.WithEnableRouterServiceHandler(true), opt.WithEnableRouterClient(true))
188-
re.NoError(err)
189-
suite.checkRegionAPI(cli2)
190-
cli2.Close()
198+
re.NoError(cli.UpdateOption(opt.EnableRouterClient, true))
199+
suite.checkRegionAPI(cli)
191200
}
192201

193202
func (suite *serverTestSuite) checkRegionAPI(cli pd.Client) {
@@ -196,9 +205,16 @@ func (suite *serverTestSuite) checkRegionAPI(cli pd.Client) {
196205
// get region by id
197206
allowEnableRouterOpt := opt.WithAllowRouterServiceHandle()
198207
regionID := uint64(1)
199-
r1, err := cli.GetRegionByID(suite.ctx, regionID, allowEnableRouterOpt)
200-
re.NoError(err)
201-
re.Equal(regionID, r1.Meta.Id)
208+
var r1 *router.Region
209+
var err error
210+
testutil.Eventually(re, func() bool {
211+
r1, err = cli.GetRegionByID(suite.ctx, regionID, allowEnableRouterOpt)
212+
if err != nil {
213+
return false
214+
}
215+
re.Equal(regionID, r1.Meta.Id)
216+
return true
217+
})
202218

203219
// get region by key
204220
r2, err := cli.GetRegion(suite.ctx, r1.Meta.GetStartKey(), allowEnableRouterOpt)

tools/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
1313
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
1414
github.com/coreos/go-semver v0.3.1
15-
github.com/docker/go-units v0.5.0
15+
github.com/docker/go-units v0.4.0
1616
github.com/gin-contrib/cors v1.6.0
1717
github.com/gin-contrib/gzip v0.0.1
1818
github.com/gin-contrib/pprof v1.4.0

tools/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
169169
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
170170
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
171171
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
172-
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
173-
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
172+
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
173+
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
174174
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
175175
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
176176
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o=

0 commit comments

Comments
 (0)