Skip to content

Commit 9c910b7

Browse files
committed
add pubsub shardchannels command
Signed-off-by: jbrinkman <joe.brinkman@improving.com>
1 parent f6d02a2 commit 9c910b7

File tree

5 files changed

+142
-5
lines changed

5 files changed

+142
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@
5353
* Go: Add `PubSubChannels` ([#3665](https://github.com/valkey-io/valkey-glide/pull/3665))
5454
* Go: Add `PubSubNumPat` ([#3666](https://github.com/valkey-io/valkey-glide/pull/3666))
5555
* Go: Add `PubSubNumSub` ([#3667](https://github.com/valkey-io/valkey-glide/pull/3667))
56-
* Go: Add `Sharded Publish` ([#3667](https://github.com/valkey-io/valkey-glide/pull/3667))
56+
* Go: Add `Sharded Publish` ([#3692](https://github.com/valkey-io/valkey-glide/pull/3692))
57+
* Go: Add `PubSub ShardChannels` ([#](https://github.com/valkey-io/valkey-glide/pull/))
5758
* Go: Add `Config Rewrite` ([#3156](https://github.com/valkey-io/valkey-glide/pull/3156))
5859
* Go: Add `Random Key` ([#3358](https://github.com/valkey-io/valkey-glide/pull/3358))
5960
* Go: Add Function Load, Function Flush, FCall and FCall_RO ([#3474](https://github.com/valkey-io/valkey-glide/pull/3474))

go/api/glide_cluster_client.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,3 +1616,51 @@ func (client *GlideClusterClient) Publish(channel string, message string, sharde
16161616

16171617
return handleIntResponse(result)
16181618
}
1619+
1620+
// PubSubShardChannels returns a list of all shard channels.
1621+
//
1622+
// Since:
1623+
//
1624+
// Valkey 7.0 and above.
1625+
//
1626+
// See [valkey.io] for details.
1627+
//
1628+
// Return value:
1629+
//
1630+
// A list of shard channels.
1631+
//
1632+
// [valkey.io]: https://valkey.io/commands/pubsub-shard-channels
1633+
func (client *GlideClusterClient) PubSubShardChannels() ([]string, error) {
1634+
result, err := client.executeCommand(C.PubSubShardChannels, []string{})
1635+
if err != nil {
1636+
return nil, err
1637+
}
1638+
1639+
return handleStringArrayResponse(result)
1640+
}
1641+
1642+
// PubSubShardChannelsWithPattern returns a list of all shard channels that match the given pattern.
1643+
//
1644+
// Since:
1645+
//
1646+
// Valkey 7.0 and above.
1647+
//
1648+
// See [valkey.io] for details.
1649+
//
1650+
// Parameters:
1651+
//
1652+
// pattern - A glob-style pattern to match active shard channels.
1653+
//
1654+
// Return value:
1655+
//
1656+
// A list of shard channels that match the given pattern.
1657+
//
1658+
// [valkey.io]: https://valkey.io/commands/pubsub-shard-channels-with-pattern
1659+
func (client *GlideClusterClient) PubSubShardChannelsWithPattern(pattern string) ([]string, error) {
1660+
result, err := client.executeCommand(C.PubSubShardChannels, []string{pattern})
1661+
if err != nil {
1662+
return nil, err
1663+
}
1664+
1665+
return handleStringArrayResponse(result)
1666+
}

go/api/pubsub_commands.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ type PubSubStandaloneCommands interface {
2323
type PubSubClusterCommands interface {
2424
// Publish publishes a message to a channel. Returns the number of clients that received the message.
2525
Publish(channel string, message string, sharded bool) (int64, error)
26+
PubSubShardChannels() ([]string, error)
27+
PubSubShardChannelsWithPattern(pattern string) ([]string, error)
2628
}
2729

2830
type PubSubHandler interface {

go/api/pubsub_commands_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,48 @@ func ExampleGlideClusterClient_PubSubChannelsWithPattern() {
155155
// Output: [news.sports news.weather]
156156
}
157157

158+
func ExampleGlideClusterClient_PubSubShardChannels() {
159+
var publisher *GlideClusterClient = getExampleGlideClusterClient() // example helper function
160+
defer closeAllClients()
161+
162+
// Create subscribers with subscriptions
163+
getExampleGlideClusterClientWithSubscription(ShardedClusterChannelMode, "channel1")
164+
165+
// Allow subscriptions to establish
166+
time.Sleep(100 * time.Millisecond)
167+
168+
result, err := publisher.PubSubShardChannels()
169+
if err != nil {
170+
fmt.Println("Glide example failed with an error: ", err)
171+
}
172+
fmt.Println(result)
173+
174+
// Output: [channel1]
175+
}
176+
177+
func ExampleGlideClusterClient_PubSubShardChannelsWithPattern() {
178+
var publisher *GlideClusterClient = getExampleGlideClusterClient() // example helper function
179+
defer closeAllClients()
180+
181+
// Create subscribers with subscriptions to different channels
182+
getExampleGlideClusterClientWithSubscription(ShardedClusterChannelMode, "news.sports")
183+
getExampleGlideClusterClientWithSubscription(ShardedClusterChannelMode, "news.weather")
184+
getExampleGlideClusterClientWithSubscription(ShardedClusterChannelMode, "events.local")
185+
// Allow subscriptions to establish
186+
time.Sleep(100 * time.Millisecond)
187+
188+
// Get channels matching the "news.*" pattern
189+
result, err := publisher.PubSubShardChannelsWithPattern("news.*")
190+
if err != nil {
191+
fmt.Println("Glide example failed with an error: ", err)
192+
}
193+
194+
sort.Strings(result)
195+
fmt.Println(result)
196+
197+
// Output: [news.sports news.weather]
198+
}
199+
158200
func ExampleGlideClient_PubSubNumPat() {
159201
var publisher *GlideClient = getExampleGlideClient() // example helper function
160202
defer closeAllClients()

go/integTest/pubsub_commands_test.go

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,79 @@ func (suite *GlideTestSuite) TestPubSub_Commands_Channels() {
1919
channelNames []string
2020
pattern string
2121
expectedNames []string
22+
sharded bool
2223
}{
2324
{
2425
name: "Standalone Empty Pattern",
2526
clientType: GlideClient,
2627
channelNames: []string{"news.sports", "news.weather", "events.local"},
2728
pattern: "",
2829
expectedNames: []string{"news.sports", "news.weather", "events.local"},
30+
sharded: false,
2931
},
3032
{
3133
name: "Standalone Exact Match",
3234
clientType: GlideClient,
3335
channelNames: []string{"news.sports", "news.weather", "events.local"},
3436
pattern: "news.sports",
3537
expectedNames: []string{"news.sports"},
38+
sharded: false,
3639
},
3740
{
3841
name: "Standalone Glob Pattern",
3942
clientType: GlideClient,
4043
channelNames: []string{"news.sports", "news.weather", "events.local"},
4144
pattern: "news.*",
4245
expectedNames: []string{"news.sports", "news.weather"},
46+
sharded: false,
4347
},
4448
{
4549
name: "Cluster Empty Pattern",
4650
clientType: GlideClusterClient,
4751
channelNames: []string{"cluster.news.sports", "cluster.news.weather", "cluster.events.local"},
4852
pattern: "",
4953
expectedNames: []string{"cluster.news.sports", "cluster.news.weather", "cluster.events.local"},
54+
sharded: false,
5055
},
5156
{
5257
name: "Cluster Exact Match",
5358
clientType: GlideClusterClient,
5459
channelNames: []string{"cluster.news.sports", "cluster.news.weather", "cluster.events.local"},
5560
pattern: "cluster.news.sports",
5661
expectedNames: []string{"cluster.news.sports"},
62+
sharded: false,
5763
},
5864
{
5965
name: "Cluster Glob Pattern",
6066
clientType: GlideClusterClient,
6167
channelNames: []string{"cluster.news.sports", "cluster.news.weather", "cluster.events.local"},
6268
pattern: "cluster.news.*",
6369
expectedNames: []string{"cluster.news.sports", "cluster.news.weather"},
70+
sharded: false,
71+
},
72+
{
73+
name: "Cluster Sharded Empty Pattern",
74+
clientType: GlideClusterClient,
75+
channelNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather", "cluster.shard.events.local"},
76+
pattern: "",
77+
expectedNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather", "cluster.shard.events.local"},
78+
sharded: true,
79+
},
80+
{
81+
name: "Cluster Sharded Exact Match",
82+
clientType: GlideClusterClient,
83+
channelNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather", "cluster.shard.events.local"},
84+
pattern: "cluster.shard.news.sports",
85+
expectedNames: []string{"cluster.shard.news.sports"},
86+
sharded: true,
87+
},
88+
{
89+
name: "Cluster Sharded Glob Pattern",
90+
clientType: GlideClusterClient,
91+
channelNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather", "cluster.shard.events.local"},
92+
pattern: "cluster.shard.news.*",
93+
expectedNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather"},
94+
sharded: true,
6495
},
6596
}
6697

@@ -69,7 +100,7 @@ func (suite *GlideTestSuite) TestPubSub_Commands_Channels() {
69100
// Create channel definitions for all channels
70101
channels := make([]ChannelDefn, len(tt.channelNames))
71102
for i, channelName := range tt.channelNames {
72-
channels[i] = ChannelDefn{Channel: channelName, Mode: 0} // ExactMode
103+
channels[i] = ChannelDefn{Channel: channelName, Mode: getChannelMode(tt.sharded)}
73104
}
74105

75106
// Create a client with subscriptions
@@ -82,10 +113,23 @@ func (suite *GlideTestSuite) TestPubSub_Commands_Channels() {
82113
// Get active channels
83114
var activeChannels []string
84115
var err error
85-
if tt.pattern == "" {
86-
activeChannels, err = receiver.PubSubChannels()
116+
if tt.sharded {
117+
// For sharded channels, we need to use the cluster-specific methods
118+
clusterClient, ok := receiver.(*api.GlideClusterClient)
119+
if !ok {
120+
t.Fatal("Expected GlideClusterClient for sharded channels")
121+
}
122+
if tt.pattern == "" {
123+
activeChannels, err = clusterClient.PubSubShardChannels()
124+
} else {
125+
activeChannels, err = clusterClient.PubSubShardChannelsWithPattern(tt.pattern)
126+
}
87127
} else {
88-
activeChannels, err = receiver.PubSubChannelsWithPattern(tt.pattern)
128+
if tt.pattern == "" {
129+
activeChannels, err = receiver.PubSubChannels()
130+
} else {
131+
activeChannels, err = receiver.PubSubChannelsWithPattern(tt.pattern)
132+
}
89133
}
90134
assert.NoError(t, err)
91135

0 commit comments

Comments
 (0)