Skip to content

Commit 2a9a8a8

Browse files
authored
Go: pubsub numpat command (#3666)
* Add PubSubHandler interface and GetQueue method to BaseClient; enhance PubSubMessageQueue with signal channel support; refactor integration tests to utilize new client creation methods and improve message handling. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Refactor PubSub integration tests to consolidate message receipt patterns into a single comprehensive test function, improving maintainability and readability. This change introduces a parameterized approach to test various client types and message reading methods. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Refactor and enhance PubSub integration tests by consolidating message verification logic and introducing parameterized tests for various client types and message reading methods. This update improves test maintainability and readability while ensuring comprehensive coverage of PubSub functionality. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Add PubSubChannels and PubSubChannelsWithPattern methods to baseClient for enhanced channel management. Update PubSubCommands interface to include new methods. Introduce example utility functions for standalone and cluster clients with subscription capabilities, ensuring a clean state before tests. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Enhance client management by adding strong reference checks in close_client function. Update Go module dependencies to include new libraries for improved functionality. Refactor baseClient methods for PubSubChannels and PubSubChannelsWithPattern, ensuring proper error handling and client closure in tests. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Add PubSubNumPat method to baseClient for counting unique subscribed patterns. Update PubSubCommands interface and implement integration tests for standalone and cluster clients. Add example usage for documentation. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Update changelog with corrected PR number Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Refactor baseClient methods by removing redundant error checks in Publish and PubSubChannelsWithPattern functions. Clean up unused imports and comments for improved code clarity. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * remove unneeded check Signed-off-by: jbrinkman <joe.brinkman@improving.com> * fix linting errors Signed-off-by: jbrinkman <joe.brinkman@improving.com> --------- Signed-off-by: jbrinkman <joe.brinkman@improving.com>
1 parent 0c2917f commit 2a9a8a8

File tree

5 files changed

+142
-9
lines changed

5 files changed

+142
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* Go: Add PubSub support ([#3417](https://github.com/valkey-io/valkey-glide/pull/3417))
5151
* Go: Add `Publish` ([#3417](https://github.com/valkey-io/valkey-glide/pull/3417))
5252
* Go: Add `PubSubChannels` ([#3665](https://github.com/valkey-io/valkey-glide/pull/3665))
53+
* Go: Add `PubSubNumPat` ([#3666](https://github.com/valkey-io/valkey-glide/pull/3666))
5354
* Go: Add `Config Rewrite` ([#3156](https://github.com/valkey-io/valkey-glide/pull/3156))
5455
* Go: Add `Random Key` ([#3358](https://github.com/valkey-io/valkey-glide/pull/3358))
5556
* Go: Add Function Load, Function Flush, FCall and FCall_RO ([#3474](https://github.com/valkey-io/valkey-glide/pull/3474))

go/api/base_client.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package api
2121
import "C"
2222

2323
import (
24-
goErr "errors"
2524
"fmt"
2625
"math"
2726
"strconv"
@@ -7760,9 +7759,6 @@ func (client *baseClient) FCallReadOnlyWithKeysAndArgs(
77607759
//
77617760
// [valkey.io]: https://valkey.io/commands/publish
77627761
func (client *baseClient) Publish(channel string, message string) (int64, error) {
7763-
if message == "" || channel == "" {
7764-
return 0, goErr.New("both message and channel are required for Publish command")
7765-
}
77667762
args := []string{channel, message}
77677763
result, err := client.executeCommand(C.Publish, args)
77687764
if err != nil {
@@ -7812,6 +7808,26 @@ func (client *baseClient) PubSubChannelsWithPattern(pattern string) ([]string, e
78127808
return handleStringArrayResponse(result)
78137809
}
78147810

7811+
// PubSubNumPat returns the number of patterns that are subscribed to by clients.
7812+
//
7813+
// This returns the total number of unique patterns that all clients are subscribed to,
7814+
// not the count of clients subscribed to patterns.
7815+
//
7816+
// When used in cluster mode, the command is routed to all nodes and aggregates
7817+
// the responses.
7818+
//
7819+
// See [valkey.io] for details.
7820+
//
7821+
// [valkey.io]: https://valkey.io/commands/pubsub-numpat
7822+
func (client *baseClient) PubSubNumPat() (int64, error) {
7823+
result, err := client.executeCommand(C.PubSubNumPat, []string{})
7824+
if err != nil {
7825+
return 0, err
7826+
}
7827+
7828+
return handleIntResponse(result)
7829+
}
7830+
78157831
// Kills a function that is currently executing.
78167832
//
78177833
// `FUNCTION KILL` terminates read-only functions only.

go/api/pubsub_commands.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ type PubSubCommands interface {
1010
PubSubChannels() ([]string, error)
1111
// PubSubChannelsWithPattern returns a list of all channels that match the given pattern.
1212
PubSubChannelsWithPattern(pattern string) ([]string, error)
13+
// PubSubNumPat returns the number of patterns that are subscribed to by clients.
14+
PubSubNumPat() (int64, error)
1315
}
1416

1517
// PubSubClusterCommands defines additional Pub/Sub operations available only in cluster mode.

go/api/pubsub_commands_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,43 @@ func ExampleGlideClusterClient_PubSubChannelsWithPattern() {
154154

155155
// Output: [news.sports news.weather]
156156
}
157+
158+
func ExampleGlideClient_PubSubNumPat() {
159+
var publisher *GlideClient = getExampleGlideClient() // example helper function
160+
defer closeAllClients()
161+
162+
// Create subscribers with subscriptions
163+
getExampleGlideClientWithSubscription(PatternChannelMode, "news.*")
164+
getExampleGlideClientWithSubscription(PatternChannelMode, "events.*")
165+
166+
// Allow subscriptions to establish
167+
time.Sleep(100 * time.Millisecond)
168+
169+
result, err := publisher.PubSubNumPat()
170+
if err != nil {
171+
fmt.Println("Glide example failed with an error: ", err)
172+
}
173+
fmt.Println(result)
174+
175+
// Output: 2
176+
}
177+
178+
func ExampleGlideClusterClient_PubSubNumPat() {
179+
var publisher *GlideClusterClient = getExampleGlideClusterClient() // example helper function
180+
defer closeAllClients()
181+
182+
// Create subscribers with subscriptions
183+
getExampleGlideClusterClientWithSubscription(PatternClusterChannelMode, "news.*")
184+
getExampleGlideClusterClientWithSubscription(PatternClusterChannelMode, "events.*")
185+
186+
// Allow subscriptions to establish
187+
time.Sleep(100 * time.Millisecond)
188+
189+
result, err := publisher.PubSubNumPat()
190+
if err != nil {
191+
fmt.Println("Glide example failed with an error: ", err)
192+
}
193+
fmt.Println(result)
194+
195+
// Output: 2
196+
}

go/integTest/pubsub_commands_test.go

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,6 @@ func (suite *GlideTestSuite) TestPubSubChannels() {
6565

6666
for _, tt := range tests {
6767
suite.T().Run(tt.name, func(t *testing.T) {
68-
// Skip cluster tests if cluster hosts are not available
69-
if tt.clientType == 1 && len(suite.clusterHosts) == 0 {
70-
t.Skip("Cluster not available")
71-
}
72-
7368
// Create channel definitions for all channels
7469
channels := make([]ChannelDefn, len(tt.channelNames))
7570
for i, channelName := range tt.channelNames {
@@ -102,3 +97,82 @@ func (suite *GlideTestSuite) TestPubSubChannels() {
10297
})
10398
}
10499
}
100+
101+
// TestPubSubNumPat tests the PubSubNumPat command for standalone and cluster clients
102+
func (suite *GlideTestSuite) TestPubSubNumPat() {
103+
tests := []struct {
104+
name string
105+
clientType ClientType
106+
channelDefns []ChannelDefn
107+
expectedCount int64
108+
}{
109+
{
110+
name: "Standalone Single Pattern",
111+
clientType: GlideClient,
112+
channelDefns: []ChannelDefn{{Channel: "news.*", Mode: PatternMode}},
113+
expectedCount: 1,
114+
},
115+
{
116+
name: "Standalone Multiple Patterns",
117+
clientType: GlideClient,
118+
channelDefns: []ChannelDefn{
119+
{Channel: "news.*", Mode: PatternMode},
120+
{Channel: "events.*", Mode: PatternMode},
121+
{Channel: "sports.*", Mode: PatternMode},
122+
},
123+
expectedCount: 3,
124+
},
125+
{
126+
name: "Standalone Mixed Modes",
127+
clientType: GlideClient,
128+
channelDefns: []ChannelDefn{
129+
{Channel: "news.*", Mode: PatternMode},
130+
{Channel: "events.local", Mode: ExactMode},
131+
{Channel: "sports.*", Mode: PatternMode},
132+
},
133+
expectedCount: 2,
134+
},
135+
{
136+
name: "Cluster Single Pattern",
137+
clientType: GlideClusterClient,
138+
channelDefns: []ChannelDefn{{Channel: "cluster.news.*", Mode: PatternMode}},
139+
expectedCount: 1,
140+
},
141+
{
142+
name: "Cluster Multiple Patterns",
143+
clientType: GlideClusterClient,
144+
channelDefns: []ChannelDefn{
145+
{Channel: "cluster.news.*", Mode: PatternMode},
146+
{Channel: "cluster.events.*", Mode: PatternMode},
147+
{Channel: "cluster.sports.*", Mode: PatternMode},
148+
},
149+
expectedCount: 3,
150+
},
151+
{
152+
name: "Cluster Mixed Modes",
153+
clientType: GlideClusterClient,
154+
channelDefns: []ChannelDefn{
155+
{Channel: "cluster.news.*", Mode: PatternMode},
156+
{Channel: "cluster.events.local", Mode: ExactMode},
157+
{Channel: "cluster.sports.*", Mode: PatternMode},
158+
},
159+
expectedCount: 2,
160+
},
161+
}
162+
163+
for _, tt := range tests {
164+
suite.T().Run(tt.name, func(t *testing.T) {
165+
// Create a client with subscriptions
166+
receiver := suite.CreatePubSubReceiver(tt.clientType, tt.channelDefns, 1, false)
167+
defer receiver.Close()
168+
169+
// Allow subscription to establish
170+
time.Sleep(MESSAGE_PROCESSING_DELAY * time.Millisecond)
171+
172+
// Get pattern subscription count
173+
count, err := receiver.PubSubNumPat()
174+
assert.NoError(t, err)
175+
assert.Equal(t, tt.expectedCount, count)
176+
})
177+
}
178+
}

0 commit comments

Comments
 (0)