Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redis

import (
"context"
"errors"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -967,3 +968,106 @@ func TestInitConn_EntraidLike_NoLeakAcrossReinits(t *testing.T) {
t.Fatalf("listener count after orphaned unsubs = %d, want 0", got)
}
}

// TestPubSubConn_PassesPersistedChannelsToNewConn is a regression test for
// https://github.com/redis/go-redis/issues/3806.
//
// ClusterClient.pubSub() installs a newConn closure that picks the target
// node by hash slot of channels[0]. Before the fix, PubSub.conn() only
// passed c.channels (regular SUBSCRIBE) into that closure — sharded
// SSubscribe channels stored in c.schannels were silently dropped.
//
// For an SSubscribe-only PubSub (the common ClusterClient.SSubscribe case)
// the channel list was empty on reconnect, so the closure fell through to
// nodes.Random(). The reconnected SSUBSCRIBE went to the wrong shard, the
// MOVED reply was never read, and the PubSub looked healthy while receiving
// no messages.
//
// This test exercises PubSub.conn() with a stub newConn that records the
// channel list it receives. The cluster routing layer is not under test
// here — the regression is whether the channel list reaches the closure.
func TestPubSubConn_PassesPersistedChannelsToNewConn(t *testing.T) {
const (
regularCh = "regular-ch"
shardCh = "{shardA}-ch1"
freshCh = "fresh-ch"
)

tests := []struct {
name string
channels map[string]struct{}
schannels map[string]struct{}
newChannels []string
wantIn []string // strings that MUST be present in the captured slice
}{
{
name: "subscribe-only reconnect: existing channels forwarded",
channels: map[string]struct{}{regularCh: {}},
wantIn: []string{regularCh},
},
{
// The bug from #3806: SSubscribe-only reconnect dropped
// c.schannels and reached newConn with an empty list.
name: "ssubscribe-only reconnect: schannels forwarded (#3806)",
schannels: map[string]struct{}{shardCh: {}},
wantIn: []string{shardCh},
},
{
name: "initial ssubscribe: newChannels forwarded",
newChannels: []string{shardCh},
wantIn: []string{shardCh},
},
{
name: "mixed subscribe + ssubscribe: both kinds forwarded",
channels: map[string]struct{}{regularCh: {}},
schannels: map[string]struct{}{shardCh: {}},
wantIn: []string{regularCh, shardCh},
},
{
name: "newChannels appended after persisted ones",
schannels: map[string]struct{}{shardCh: {}},
newChannels: []string{freshCh},
wantIn: []string{shardCh, freshCh},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var captured []string
stubErr := errors.New("stub: newConn does not create a real conn")

ps := &PubSub{
opt: &Options{Addr: "stub:6379"},
channels: tt.channels,
schannels: tt.schannels,
newConn: func(_ context.Context, _ string, channels []string) (*pool.Conn, error) {
captured = append([]string(nil), channels...)
return nil, stubErr
},
closeConn: func(*pool.Conn) error { return nil },
}

ps.mu.Lock()
_, err := ps.conn(context.Background(), tt.newChannels)
ps.mu.Unlock()
if !errors.Is(err, stubErr) {
t.Fatalf("conn err = %v, want stub", err)
}

for _, want := range tt.wantIn {
found := false
for _, got := range captured {
if got == want {
found = true
break
}
}
if !found {
t.Errorf("missing %q in channels passed to newConn (got %v); see #3806",
want, captured)
}
}
})
}
}

9 changes: 9 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,16 @@ func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, er
c.opt.Addr = internal.RedisNull
}

// Include c.schannels so reconnect-time routing of an SSubscribe-only
// PubSub picks the slot owner (channels[0] in ClusterClient.pubSub()'s
// newConn closure) instead of a random node.
// See https://github.com/redis/go-redis/issues/3806.
// c.patterns is intentionally NOT included: patterns are not slot-
// addressable, and adding them would force PSubscribe-only PubSubs to
// pin to a single node based on pattern-string hash, regressing the
// existing random-node behaviour.
channels := slices.Collect(maps.Keys(c.channels))
channels = append(channels, slices.Collect(maps.Keys(c.schannels))...)
channels = append(channels, newChannels...)

cn, err := c.newConn(ctx, c.opt.Addr, channels)
Expand Down
Loading