Skip to content

Commit 79c553c

Browse files
authored
Cherry pick #8610, #8615 to v1.76.x (#8621)
Original PRs: #8610, #8615 RELEASE NOTES: * balancer/pick_first: * Fix bug that can cause balancer to get stuck in `IDLE` state on backend address change. * When configured, shuffle addresses in resolver updates that lack endpoints. Since gRPC automatically adds endpoints to resolver updates, this bug should only affect implementers of custom LB policies that use pick_first for delegation but don't forward the endpoints.
1 parent 0513350 commit 79c553c

File tree

4 files changed

+189
-3
lines changed

4 files changed

+189
-3
lines changed

balancer/pickfirst/pickfirst.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
169169
addrs = state.ResolverState.Addresses
170170
if cfg.ShuffleAddressList {
171171
addrs = append([]resolver.Address{}, addrs...)
172-
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
172+
internal.RandShuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
173173
}
174174
}
175175

balancer/pickfirst/pickfirst_ext_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package pickfirst_test
2020

2121
import (
2222
"context"
23+
"encoding/json"
2324
"errors"
2425
"fmt"
2526
"strings"
@@ -28,11 +29,14 @@ import (
2829

2930
"google.golang.org/grpc"
3031
"google.golang.org/grpc/backoff"
32+
"google.golang.org/grpc/balancer"
33+
pfbalancer "google.golang.org/grpc/balancer/pickfirst"
3134
pfinternal "google.golang.org/grpc/balancer/pickfirst/internal"
3235
"google.golang.org/grpc/codes"
3336
"google.golang.org/grpc/connectivity"
3437
"google.golang.org/grpc/credentials/insecure"
3538
"google.golang.org/grpc/internal"
39+
"google.golang.org/grpc/internal/balancer/stub"
3640
"google.golang.org/grpc/internal/channelz"
3741
"google.golang.org/grpc/internal/grpctest"
3842
"google.golang.org/grpc/internal/stubserver"
@@ -463,6 +467,85 @@ func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
463467
}
464468
}
465469

470+
// Tests the PF LB policy with shuffling enabled. It explicitly unsets the
471+
// Endpoints field in the resolver update to test the shuffling of the
472+
// Addresses.
473+
func (s) TestPickFirst_ShuffleAddressListNoEndpoints(t *testing.T) {
474+
// Install a shuffler that always reverses two entries.
475+
origShuf := pfinternal.RandShuffle
476+
defer func() { pfinternal.RandShuffle = origShuf }()
477+
pfinternal.RandShuffle = func(n int, f func(int, int)) {
478+
if n != 2 {
479+
t.Errorf("Shuffle called with n=%v; want 2", n)
480+
return
481+
}
482+
f(0, 1) // reverse the two addresses
483+
}
484+
485+
pfBuilder := balancer.Get(pfbalancer.Name)
486+
shuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": true }`))
487+
if err != nil {
488+
t.Fatal(err)
489+
}
490+
noShuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": false }`))
491+
if err != nil {
492+
t.Fatal(err)
493+
}
494+
var activeCfg serviceconfig.LoadBalancingConfig
495+
496+
bf := stub.BalancerFuncs{
497+
Init: func(bd *stub.BalancerData) {
498+
bd.ChildBalancer = pfBuilder.Build(bd.ClientConn, bd.BuildOptions)
499+
},
500+
Close: func(bd *stub.BalancerData) {
501+
bd.ChildBalancer.Close()
502+
},
503+
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
504+
ccs.BalancerConfig = activeCfg
505+
ccs.ResolverState.Endpoints = nil
506+
return bd.ChildBalancer.UpdateClientConnState(ccs)
507+
},
508+
}
509+
510+
stub.Register(t.Name(), bf)
511+
svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())
512+
// Set up our backends.
513+
cc, r, backends := setupPickFirst(t, 2, grpc.WithDefaultServiceConfig(svcCfg))
514+
addrs := stubBackendsToResolverAddrs(backends)
515+
516+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
517+
defer cancel()
518+
519+
// Push an update with both addresses and shuffling disabled. We should
520+
// connect to backend 0.
521+
activeCfg = noShuffleConfig
522+
resolverState := resolver.State{Addresses: addrs}
523+
r.UpdateState(resolverState)
524+
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
525+
t.Fatal(err)
526+
}
527+
528+
// Send a config with shuffling enabled. This will reverse the addresses,
529+
// but the channel should still be connected to backend 0.
530+
activeCfg = shuffleConfig
531+
r.UpdateState(resolverState)
532+
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
533+
t.Fatal(err)
534+
}
535+
536+
// Send a resolver update with no addresses. This should push the channel
537+
// into TransientFailure.
538+
r.UpdateState(resolver.State{})
539+
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
540+
541+
// Send the same config as last time with shuffling enabled. Since we are
542+
// not connected to backend 0, we should connect to backend 1.
543+
r.UpdateState(resolverState)
544+
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
545+
t.Fatal(err)
546+
}
547+
}
548+
466549
// Test config parsing with the env var turned on and off for various scenarios.
467550
func (s) TestPickFirst_ParseConfig_Success(t *testing.T) {
468551
// Install a shuffler that always reverses two entries.

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
283283
newAddrs = state.ResolverState.Addresses
284284
if cfg.ShuffleAddressList {
285285
newAddrs = append([]resolver.Address{}, newAddrs...)
286-
internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
286+
internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] })
287287
}
288288
}
289289

@@ -351,6 +351,13 @@ func (b *pickfirstBalancer) ExitIdle() {
351351
b.mu.Lock()
352352
defer b.mu.Unlock()
353353
if b.state == connectivity.Idle {
354+
// Move the balancer into CONNECTING state immediately. This is done to
355+
// avoid staying in IDLE if a resolver update arrives before the first
356+
// SubConn reports CONNECTING.
357+
b.updateBalancerState(balancer.State{
358+
ConnectivityState: connectivity.Connecting,
359+
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
360+
})
354361
b.startFirstPassLocked()
355362
}
356363
}
@@ -604,7 +611,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
604611
if !b.addressList.seekTo(sd.addr) {
605612
// This should not fail as we should have only one SubConn after
606613
// entering READY. The SubConn should be present in the addressList.
607-
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
614+
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
608615
return
609616
}
610617
if !b.healthCheckingEnabled {

balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,6 +1504,102 @@ func (s) TestPickFirstLeaf_AddressUpdateWithMetadata(t *testing.T) {
15041504
}
15051505
}
15061506

1507+
// Tests the scenario where a connection is established and then breaks, leading
1508+
// to a reconnection attempt. While the reconnection is in progress, a resolver
1509+
// update with a new address is received. The test verifies that the balancer
1510+
// creates a new SubConn for the new address and that the ClientConn eventually
1511+
// becomes READY.
1512+
func (s) TestPickFirstLeaf_Reconnection(t *testing.T) {
1513+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1514+
defer cancel()
1515+
cc := testutils.NewBalancerClientConn(t)
1516+
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
1517+
defer bal.Close()
1518+
ccState := balancer.ClientConnState{
1519+
ResolverState: resolver.State{
1520+
Endpoints: []resolver.Endpoint{
1521+
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}},
1522+
},
1523+
},
1524+
}
1525+
if err := bal.UpdateClientConnState(ccState); err != nil {
1526+
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
1527+
}
1528+
1529+
select {
1530+
case state := <-cc.NewStateCh:
1531+
if got, want := state, connectivity.Connecting; got != want {
1532+
t.Fatalf("Received unexpected ClientConn sate: got %v, want %v", got, want)
1533+
}
1534+
case <-ctx.Done():
1535+
t.Fatal("Context timed out waiting for ClientConn state update.")
1536+
}
1537+
1538+
sc1 := <-cc.NewSubConnCh
1539+
select {
1540+
case <-sc1.ConnectCh:
1541+
case <-ctx.Done():
1542+
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
1543+
}
1544+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
1545+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
1546+
1547+
if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil {
1548+
t.Fatalf("Context timed out waiting for ClientConn to become READY.")
1549+
}
1550+
1551+
// Simulate a connection breakage, this should result the channel
1552+
// transitioning to IDLE.
1553+
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
1554+
if err := cc.WaitForConnectivityState(ctx, connectivity.Idle); err != nil {
1555+
t.Fatalf("Context timed out waiting for ClientConn to enter IDLE.")
1556+
}
1557+
1558+
// Calling the idle picker should result in the SubConn being re-connected.
1559+
picker := <-cc.NewPickerCh
1560+
if _, err := picker.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
1561+
t.Fatalf("picker.Pick() returned error: %v, want %v", err, balancer.ErrNoSubConnAvailable)
1562+
}
1563+
1564+
select {
1565+
case <-sc1.ConnectCh:
1566+
case <-ctx.Done():
1567+
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
1568+
}
1569+
1570+
// Send a resolver update, removing the existing SubConn. Since the balancer
1571+
// is connecting, it should create a new SubConn for the new backend
1572+
// address.
1573+
ccState = balancer.ClientConnState{
1574+
ResolverState: resolver.State{
1575+
Endpoints: []resolver.Endpoint{
1576+
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
1577+
},
1578+
},
1579+
}
1580+
if err := bal.UpdateClientConnState(ccState); err != nil {
1581+
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
1582+
}
1583+
1584+
var sc2 *testutils.TestSubConn
1585+
select {
1586+
case sc2 = <-cc.NewSubConnCh:
1587+
case <-ctx.Done():
1588+
t.Fatal("Context timed out waiting for new SubConn to be created.")
1589+
}
1590+
1591+
select {
1592+
case <-sc2.ConnectCh:
1593+
case <-ctx.Done():
1594+
t.Fatal("Context timed out waiting for Connect() to be called on sc2.")
1595+
}
1596+
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
1597+
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
1598+
if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil {
1599+
t.Fatalf("Context timed out waiting for ClientConn to become READY.")
1600+
}
1601+
}
1602+
15071603
// healthListenerCapturingCCWrapper is used to capture the health listener so
15081604
// that health updates can be mocked for testing.
15091605
type healthListenerCapturingCCWrapper struct {

0 commit comments

Comments
 (0)