Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
fe84f00
Add PubSubHandler interface and GetQueue method to BaseClient; enhanc…
jbrinkman Apr 21, 2025
2f86051
Refactor PubSub integration tests to consolidate message receipt patt…
jbrinkman Apr 21, 2025
3b8d224
Refactor and enhance PubSub integration tests by consolidating messag…
jbrinkman Apr 22, 2025
d6caa36
Add PubSubChannels and PubSubChannelsWithPattern methods to baseClien…
jbrinkman Apr 23, 2025
569f940
Enhance client management by adding strong reference checks in close_…
jbrinkman Apr 23, 2025
b107f62
fix formatting
jbrinkman Apr 24, 2025
f582a58
Add PubSubNumPat method to baseClient for counting unique subscribed …
jbrinkman Apr 24, 2025
ee1480e
Add PubSub NumSub
jbrinkman Apr 24, 2025
1bacd31
Refactor PubSubNumSub method in baseClient to simplify response handl…
jbrinkman Apr 25, 2025
5ff1dc6
fix merge errors
jbrinkman Apr 26, 2025
438bec0
docs: added detailed parameter and return value descriptions to impro…
jbrinkman Apr 27, 2025
b1e4312
Add PubSubChannels and PubSubChannelsWithPattern methods to baseClien…
jbrinkman Apr 23, 2025
8d9ff03
fix formatting
jbrinkman Apr 24, 2025
fe645ea
Add PubSubNumPat method to baseClient for counting unique subscribed …
jbrinkman Apr 24, 2025
0cc64e6
sharded publish
jbrinkman Apr 25, 2025
fbacab6
implement sharded mode support in pubsub tests for subscriptions and …
jbrinkman Apr 28, 2025
6d3128a
update publish command docs
jbrinkman Apr 28, 2025
26cae60
refactor: update skipIfServerVersionLowerThanBy to work with subtests
jbrinkman Apr 28, 2025
f6d02a2
fix linting error
jbrinkman Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* Go: Add `PubSubChannels` ([#3665](https://github.com/valkey-io/valkey-glide/pull/3665))
* Go: Add `PubSubNumPat` ([#3666](https://github.com/valkey-io/valkey-glide/pull/3666))
* Go: Add `PubSubNumSub` ([#3667](https://github.com/valkey-io/valkey-glide/pull/3667))
* Go: Add `Sharded Publish` ([#3667](https://github.com/valkey-io/valkey-glide/pull/3667))
* Go: Add `Config Rewrite` ([#3156](https://github.com/valkey-io/valkey-glide/pull/3156))
* Go: Add `Random Key` ([#3358](https://github.com/valkey-io/valkey-glide/pull/3358))
* Go: Add Function Load, Function Flush, FCall and FCall_RO ([#3474](https://github.com/valkey-io/valkey-glide/pull/3474))
Expand Down
32 changes: 5 additions & 27 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7750,33 +7750,6 @@ func (client *baseClient) FCallReadOnlyWithKeysAndArgs(
return handleAnyResponse(result)
}

// Posts a message to the specified channel. Returns the number of clients that received the message.
//
// Channel can be any string, but common patterns include using "." to create namespaces like
// "news.sports" or "news.weather".
//
// See [valkey.io] for details.
//
// Parameters:
//
// channel - The channel to publish to.
// message - The message to publish.
//
// Return value:
//
// The number of clients that received the message.
//
// [valkey.io]: https://valkey.io/commands/publish
func (client *baseClient) Publish(channel string, message string) (int64, error) {
args := []string{channel, message}
result, err := client.executeCommand(C.Publish, args)
if err != nil {
return 0, err
}

return handleIntResponse(result)
}

// Lists the currently active channels.
//
// When used in cluster mode, the command is routed to all nodes and aggregates
Expand Down Expand Up @@ -7873,6 +7846,11 @@ func (client *baseClient) PubSubNumPat() (int64, error) {
//
// [valkey.io]: https://valkey.io/commands/pubsub-numsub
func (client *baseClient) PubSubNumSub(channels []string) (map[string]int64, error) {
if len(channels) == 0 {
// If no channels specified, just return an empty map
return make(map[string]int64), nil
}

result, err := client.executeCommand(C.PubSubNumSub, channels)
if err != nil {
return nil, err
Expand Down
27 changes: 27 additions & 0 deletions go/api/glide_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,30 @@ func (client *GlideClient) FunctionDelete(libName string) (string, error) {
}
return handleOkResponse(result)
}

// Publish posts a message to the specified channel. Returns the number of clients that received the message.
//
// Channel can be any string, but common patterns include using "." to create namespaces like
// "news.sports" or "news.weather".
//
// See [valkey.io] for details.
//
// Parameters:
//
// channel - The channel to publish the message to.
// message - The message to publish.
//
// Return value:
//
// The number of clients that received the message.
//
// [valkey.io]: https://valkey.io/commands/publish
func (client *GlideClient) Publish(channel string, message string) (int64, error) {
args := []string{channel, message}
result, err := client.executeCommand(C.Publish, args)
if err != nil {
return 0, err
}

return handleIntResponse(result)
}
35 changes: 35 additions & 0 deletions go/api/glide_cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,3 +1581,38 @@ func (client *GlideClusterClient) FunctionKillWithRoute(route options.RouteOptio
}
return handleOkResponse(result)
}

// Publish posts a message to the specified channel. Returns the number of clients that received the message.
//
// Channel can be any string, but common patterns include using "." to create namespaces like
// "news.sports" or "news.weather".
//
// See [valkey.io] for details.
//
// Parameters:
//
// channel - The channel to publish the message to.
// message - The message to publish.
// sharded - Whether the channel is sharded.
//
// Return value:
//
// The number of clients that received the message.
//
// [valkey.io]: https://valkey.io/commands/publish
func (client *GlideClusterClient) Publish(channel string, message string, sharded bool) (int64, error) {
args := []string{channel, message}

var requestType C.RequestType
if sharded {
requestType = C.SPublish
} else {
requestType = C.Publish
}
result, err := client.executeCommand(requestType, args)
if err != nil {
return 0, err
}

return handleIntResponse(result)
}
12 changes: 9 additions & 3 deletions go/api/pubsub_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ package api

// PubSubCommands defines the interface for Pub/Sub operations available in both standalone and cluster modes.
type PubSubCommands interface {
// Publish publishes a message to a channel. Returns the number of clients that received the message.
Publish(channel string, message string) (int64, error)
// PubSubChannels returns a list of all channels in the database.
PubSubChannels() ([]string, error)
// PubSubChannelsWithPattern returns a list of all channels that match the given pattern.
Expand All @@ -16,8 +14,16 @@ type PubSubCommands interface {
PubSubNumSub(channels []string) (map[string]int64, error)
}

type PubSubStandaloneCommands interface {
// Publish publishes a message to a channel. Returns the number of clients that received the message.
Publish(channel string, message string) (int64, error)
}

// PubSubClusterCommands defines additional Pub/Sub operations available only in cluster mode.
type PubSubClusterCommands interface{}
type PubSubClusterCommands interface {
// Publish publishes a message to a channel. Returns the number of clients that received the message.
Publish(channel string, message string, sharded bool) (int64, error)
}

type PubSubHandler interface {
GetQueue() (*PubSubMessageQueue, error)
Expand Down
2 changes: 1 addition & 1 deletion go/api/pubsub_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func ExampleGlideClusterClient_Publish() {
time.Sleep(100 * time.Millisecond)

// Publish a message
result, err := publisher.Publish("my_channel", "Hello, World!")
result, err := publisher.Publish("my_channel", "Hello, World!", false)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
Expand Down
17 changes: 15 additions & 2 deletions go/integTest/glide_test_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ func (suite *GlideTestSuite) verifyOK(result string, err error) {
assert.Equal(suite.T(), api.OK, result)
}

func (suite *GlideTestSuite) SkipIfServerVersionLowerThanBy(version string) {
func (suite *GlideTestSuite) SkipIfServerVersionLowerThanBy(version string, t *testing.T) {
if suite.serverVersion < version {
suite.T().Skipf("This feature is added in version %s", version)
t.Skipf("This feature is added in version %s", version)
}
}

Expand Down Expand Up @@ -453,6 +453,7 @@ type TestChannelMode int
const (
ExactMode TestChannelMode = iota
PatternMode
ShardedMode
)

type ChannelDefn struct {
Expand Down Expand Up @@ -622,6 +623,11 @@ func (suite *GlideTestSuite) CreatePubSubReceiver(
}
switch clientType {
case GlideClient:
if channels[0].Mode == ShardedMode {
assert.Fail(suite.T(), "Sharded mode is not supported for standalone client")
return nil
}

sConfig := api.NewStandaloneSubscriptionConfig()
for _, channel := range channels {
mode := api.PubSubChannelMode(channel.Mode)
Expand All @@ -646,3 +652,10 @@ func (suite *GlideTestSuite) CreatePubSubReceiver(
return nil
}
}

func getChannelMode(sharded bool) TestChannelMode {
if sharded {
return ShardedMode
}
return ExactMode
}
Loading
Loading