Skip to content

Commit 40104c0

Browse files
jbrinkmanYury-Fridlyand
authored andcommitted
GO: pubsub channels command (#3665)
* Add PubSubHandler interface and GetQueue method to BaseClient; enhance PubSubMessageQueue with signal channel support; refactor integration tests to utilize new client creation methods and improve message handling. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Refactor PubSub integration tests to consolidate message receipt patterns into a single comprehensive test function, improving maintainability and readability. This change introduces a parameterized approach to test various client types and message reading methods. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Refactor and enhance PubSub integration tests by consolidating message verification logic and introducing parameterized tests for various client types and message reading methods. This update improves test maintainability and readability while ensuring comprehensive coverage of PubSub functionality. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Add PubSubChannels and PubSubChannelsWithPattern methods to baseClient for enhanced channel management. Update PubSubCommands interface to include new methods. Introduce example utility functions for standalone and cluster clients with subscription capabilities, ensuring a clean state before tests. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Enhance client management by adding strong reference checks in close_client function. Update Go module dependencies to include new libraries for improved functionality. Refactor baseClient methods for PubSubChannels and PubSubChannelsWithPattern, ensuring proper error handling and client closure in tests. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * add pubsub integration test for PubSub channels Signed-off-by: jbrinkman <joe.brinkman@improving.com> * fix formatting Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Update changelog Signed-off-by: jbrinkman <joe.brinkman@improving.com> * code review feedback updates Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Update go/api/base_client.go Co-authored-by: Yury-Fridlyand <yury.fridlyand@improving.com> Signed-off-by: Joseph Brinkman <joe.brinkman@improving.com> * remove unneeded logic Signed-off-by: jbrinkman <joe.brinkman@improving.com> --------- Signed-off-by: jbrinkman <joe.brinkman@improving.com> Signed-off-by: Joseph Brinkman <joe.brinkman@improving.com> Co-authored-by: Yury-Fridlyand <yury.fridlyand@improving.com>
1 parent aea0163 commit 40104c0

File tree

6 files changed

+403
-24
lines changed

6 files changed

+403
-24
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#### Changes
2+
23
* Core: Add an OK response type to FFI ([#3630](https://github.com/valkey-io/valkey-glide/pull/3630))
34
* Core: Move UDS Socket Filename to tmp ([#3615](https://github.com/valkey-io/valkey-glide/pull/3615))
45
* Core: Ensure UDS socket filename is truly unique. ([#3596](https://github.com/valkey-io/valkey-glide/pull/3596))
@@ -16,7 +17,7 @@
1617
* Go: Add `ZLEXCOUNT` ([#3140](https://github.com/valkey-io/valkey-glide/pull/3140))
1718
* Go: Updating examples for `ZInterCard` ([#3232](https://github.com/valkey-io/valkey-glide/pull/3232))
1819
* Go: Fix channel passing from Go to Rust by using `runtime.Pinner` or `cgo.Handle` ([#3208](https://github.com/valkey-io/valkey-glide/pull/3208))
19-
* Go: Add `BZPOPMAX` and ` ZMPOP` [#3257](https://github.com/valkey-io/valkey-glide/pull/3257)
20+
* Go: Add `BZPOPMAX` and `ZMPOP` [#3257](https://github.com/valkey-io/valkey-glide/pull/3257)
2021
* Go: Fix data race on the coreClient with `sync.Mutex` and a channel map ([#3236](https://github.com/valkey-io/valkey-glide/pull/3236))
2122
* Go: Adding support for Az Affinity ([#3235](https://github.com/valkey-io/valkey-glide/pull/3235))
2223
* Go: Adding support for advanced client configs and connectionTimeout ([#3290](https://github.com/valkey-io/valkey-glide/pull/3290))
@@ -48,6 +49,7 @@
4849
* Core/FFI/Go: Add support for async and sync client types in FFI ([#3451](https://github.com/valkey-io/valkey-glide/pull/3451))
4950
* Go: Add PubSub support ([#3417](https://github.com/valkey-io/valkey-glide/pull/3417))
5051
* Go: Add `Publish` ([#3417](https://github.com/valkey-io/valkey-glide/pull/3417))
52+
* Go: Add `PubSubChannels` ([#3665](https://github.com/valkey-io/valkey-glide/pull/3665))
5153
* Go: Add `Config Rewrite` ([#3156](https://github.com/valkey-io/valkey-glide/pull/3156))
5254
* Go: Add `Random Key` ([#3358](https://github.com/valkey-io/valkey-glide/pull/3358))
5355
* Go: Add Function Load, Function Flush, FCall and FCall_RO ([#3474](https://github.com/valkey-io/valkey-glide/pull/3474))

go/api/base_client.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7772,6 +7772,46 @@ func (client *baseClient) Publish(channel string, message string) (int64, error)
77727772
return handleIntResponse(result)
77737773
}
77747774

7775+
// Lists the currently active channels.
7776+
//
7777+
// When used in cluster mode, the command is routed to all nodes and aggregates
7778+
// the responses into a single array.
7779+
//
7780+
// See [valkey.io] for details.
7781+
//
7782+
// [valkey.io]: https://valkey.io/commands/pubsub-channels
7783+
func (client *baseClient) PubSubChannels() ([]string, error) {
7784+
result, err := client.executeCommand(C.PubSubChannels, []string{})
7785+
if err != nil {
7786+
return nil, err
7787+
}
7788+
7789+
return handleStringArrayResponse(result)
7790+
}
7791+
7792+
// PubSubChannelsWithPattern lists the currently active channels matching the specified pattern.
7793+
//
7794+
// Pattern can be any glob-style pattern:
7795+
// - h?llo matches hello, hallo and hxllo
7796+
// - h*llo matches hllo and heeeello
7797+
// - h[ae]llo matches hello and hallo, but not hillo
7798+
//
7799+
// When used in cluster mode, the command is routed to all nodes and aggregates
7800+
// the responses into a single array.
7801+
//
7802+
// See [valkey.io] for details.
7803+
//
7804+
// [valkey.io]: https://valkey.io/commands/pubsub-channels
7805+
func (client *baseClient) PubSubChannelsWithPattern(pattern string) ([]string, error) {
7806+
args := []string{pattern}
7807+
result, err := client.executeCommand(C.PubSubChannels, args)
7808+
if err != nil {
7809+
return nil, err
7810+
}
7811+
7812+
return handleStringArrayResponse(result)
7813+
}
7814+
77757815
// Kills a function that is currently executing.
77767816
//
77777817
// `FUNCTION KILL` terminates read-only functions only.

go/api/example_utils.go

Lines changed: 96 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ var (
1919
)
2020

2121
var (
22-
clusterClient *GlideClusterClient
22+
clusterClients []*GlideClusterClient
2323
clusterOnce sync.Once
24+
clusterSubOnce sync.Once
2425
clusterAddresses []NodeAddress
25-
standaloneClient *GlideClient
26+
standaloneClients []*GlideClient
2627
standaloneOnce sync.Once
28+
standaloneSubOnce sync.Once
2729
standaloneAddresses []NodeAddress
2830
initOnce sync.Once
2931
)
@@ -53,51 +55,112 @@ func getClusterAddresses() []NodeAddress {
5355
func getExampleGlideClient() *GlideClient {
5456
standaloneOnce.Do(func() {
5557
initFlags()
56-
config := NewGlideClientConfiguration().
57-
WithAddress(&standaloneAddresses[0])
58+
})
59+
config := NewGlideClientConfiguration().
60+
WithAddress(&standaloneAddresses[0])
5861

59-
client, err := NewGlideClient(config)
60-
if err != nil {
61-
fmt.Println("error connecting to server: ", err)
62-
}
62+
client, err := NewGlideClient(config)
63+
if err != nil {
64+
fmt.Println("error connecting to server: ", err)
65+
}
6366

64-
standaloneClient = client.(*GlideClient)
65-
})
67+
thisClient := client.(*GlideClient)
68+
standaloneClients = append(standaloneClients, thisClient)
6669

6770
// Flush the database before each test to ensure a clean state.
68-
_, err := standaloneClient.FlushAllWithOptions(options.SYNC)
71+
_, err = thisClient.FlushAllWithOptions(options.SYNC)
6972
if err != nil {
7073
fmt.Println("error flushing database: ", err)
7174
}
7275

73-
return standaloneClient
76+
return thisClient
7477
}
7578

7679
func getExampleGlideClusterClient() *GlideClusterClient {
7780
clusterOnce.Do(func() {
7881
initFlags()
79-
config := NewGlideClusterClientConfiguration().
80-
WithAddress(&clusterAddresses[0]).
81-
WithRequestTimeout(5000)
82+
})
83+
cConfig := NewGlideClusterClientConfiguration().
84+
WithAddress(&clusterAddresses[0]).
85+
WithRequestTimeout(5000)
8286

83-
client, err := NewGlideClusterClient(config)
84-
if err != nil {
85-
fmt.Println("error connecting to server: ", err)
86-
}
87+
client, err := NewGlideClusterClient(cConfig)
88+
if err != nil {
89+
fmt.Println("error connecting to server: ", err)
90+
}
8791

88-
clusterClient = client.(*GlideClusterClient)
89-
})
92+
thisClient := client.(*GlideClusterClient)
93+
clusterClients = append(clusterClients, thisClient)
9094

9195
// Flush the database before each test to ensure a clean state.
9296
mode := options.SYNC
93-
_, err := clusterClient.FlushAllWithOptions(
97+
_, err = thisClient.FlushAllWithOptions(
9498
options.FlushClusterOptions{FlushMode: &mode, RouteOption: &options.RouteOption{Route: config.AllPrimaries}},
9599
)
96100
if err != nil {
97101
fmt.Println("error flushing database: ", err)
98102
}
99103

100-
return clusterClient
104+
return thisClient
105+
}
106+
107+
func getExampleGlideClientWithSubscription(mode PubSubChannelMode, channelOrPattern string) *GlideClient {
108+
standaloneSubOnce.Do(func() {
109+
initFlags()
110+
})
111+
sConfig := NewStandaloneSubscriptionConfig().
112+
WithSubscription(mode, channelOrPattern)
113+
114+
config := NewGlideClientConfiguration().
115+
WithAddress(&standaloneAddresses[0]).
116+
WithSubscriptionConfig(sConfig)
117+
118+
client, err := NewGlideClient(config)
119+
if err != nil {
120+
fmt.Println("error connecting to server: ", err)
121+
}
122+
123+
thisClient := client.(*GlideClient)
124+
standaloneClients = append(standaloneClients, thisClient)
125+
126+
// Flush the database before each test to ensure a clean state.
127+
_, err = thisClient.FlushAllWithOptions(options.SYNC)
128+
if err != nil {
129+
fmt.Println("error flushing database: ", err)
130+
}
131+
132+
return thisClient
133+
}
134+
135+
func getExampleGlideClusterClientWithSubscription(mode PubSubClusterChannelMode, channelOrPattern string) *GlideClusterClient {
136+
clusterSubOnce.Do(func() {
137+
initFlags()
138+
})
139+
cConfig := NewClusterSubscriptionConfig().
140+
WithSubscription(mode, channelOrPattern)
141+
142+
ccConfig := NewGlideClusterClientConfiguration().
143+
WithAddress(&clusterAddresses[0]).
144+
WithSubscriptionConfig(cConfig)
145+
146+
client, err := NewGlideClusterClient(ccConfig)
147+
if err != nil {
148+
fmt.Println("error connecting to server: ", err)
149+
}
150+
151+
thisClient := client.(*GlideClusterClient)
152+
clusterClients = append(clusterClients, thisClient)
153+
154+
// Flush the database before each test to ensure a clean state.
155+
syncmode := options.SYNC
156+
_, err = thisClient.FlushAllWithOptions(
157+
options.FlushClusterOptions{FlushMode: &syncmode, RouteOption: &options.RouteOption{Route: config.AllPrimaries}},
158+
)
159+
if err != nil {
160+
fmt.Println("error flushing database: ", err)
161+
}
162+
163+
return thisClient
101164
}
102165

103166
func parseHosts(addresses string) []NodeAddress {
@@ -120,3 +183,13 @@ func parseHosts(addresses string) []NodeAddress {
120183
}
121184
return result
122185
}
186+
187+
// close all clients
188+
func closeAllClients() {
189+
for _, client := range standaloneClients {
190+
client.Close()
191+
}
192+
for _, client := range clusterClients {
193+
client.Close()
194+
}
195+
}

go/api/pubsub_commands.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ package api
66
type PubSubCommands interface {
77
// Publish publishes a message to a channel. Returns the number of clients that received the message.
88
Publish(channel string, message string) (int64, error)
9+
// PubSubChannels returns a list of all channels in the database.
10+
PubSubChannels() ([]string, error)
11+
// PubSubChannelsWithPattern returns a list of all channels that match the given pattern.
12+
PubSubChannelsWithPattern(pattern string) ([]string, error)
913
}
1014

1115
// PubSubClusterCommands defines additional Pub/Sub operations available only in cluster mode.

0 commit comments

Comments
 (0)