Skip to content

Commit 1076d81

Browse files
committed
go: Add multi-database support for cluster mode clients
- Add DatabaseId field to baseClientConfiguration for both standalone and cluster clients - Implement WithDatabaseId() method for ClusterClientConfiguration - Add database ID validation with proper error handling for negative values - Refactor standalone client to use shared database ID logic from base configuration - Add Select() and SelectWithOptions() methods for cluster clients with routing support - Include comprehensive test coverage for database isolation, reconnection persistence, and error handling - Add backward compatibility support - clients default to database 0 when no database_id specified - Add server version compatibility checks (cluster multi-database requires Valkey 9.0+) - Update documentation with warnings about SELECT command limitations and recommended configuration approach This enables cluster mode clients to connect to specific databases at initialization time, with the database selection persisting across reconnections, unlike the SELECT command which resets on reconnection. Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
1 parent 6b24985 commit 1076d81

File tree

10 files changed

+921
-5
lines changed

10 files changed

+921
-5
lines changed

go/config/config.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ func mapReadFrom(readFrom ReadFrom) protobuf.ReadFrom {
9292
return protobuf.ReadFrom_Primary
9393
}
9494

95+
// validateDatabaseId validates the database ID parameter.
96+
func validateDatabaseId(databaseId int) error {
97+
if databaseId < 0 {
98+
return errors.New("database_id must be non-negative")
99+
}
100+
return nil
101+
}
102+
95103
type baseClientConfiguration struct {
96104
addresses []NodeAddress
97105
useTLS bool
@@ -102,6 +110,7 @@ type baseClientConfiguration struct {
102110
clientAZ string
103111
reconnectStrategy *BackoffStrategy
104112
lazyConnect bool
113+
DatabaseId *int `json:"database_id,omitempty"`
105114
}
106115

107116
func (config *baseClientConfiguration) toProtobuf() (*protobuf.ConnectionRequest, error) {
@@ -152,6 +161,13 @@ func (config *baseClientConfiguration) toProtobuf() (*protobuf.ConnectionRequest
152161
request.LazyConnect = config.lazyConnect
153162
}
154163

164+
if config.DatabaseId != nil {
165+
if err := validateDatabaseId(*config.DatabaseId); err != nil {
166+
return nil, err
167+
}
168+
request.DatabaseId = uint32(*config.DatabaseId)
169+
}
170+
155171
return &request, nil
156172
}
157173

@@ -214,7 +230,6 @@ func (strategy *BackoffStrategy) toProtobuf() *protobuf.ConnectionRetryStrategy
214230
// ClientConfiguration represents the configuration settings for a Standalone client.
215231
type ClientConfiguration struct {
216232
baseClientConfiguration
217-
databaseId int
218233
subscriptionConfig *StandaloneSubscriptionConfig
219234
AdvancedClientConfiguration
220235
}
@@ -232,9 +247,6 @@ func (config *ClientConfiguration) ToProtobuf() (*protobuf.ConnectionRequest, er
232247
}
233248
request.ClusterModeEnabled = false
234249

235-
if config.databaseId != 0 {
236-
request.DatabaseId = uint32(config.databaseId)
237-
}
238250
if config.subscriptionConfig != nil && len(config.subscriptionConfig.subscriptions) > 0 {
239251
request.PubsubSubscriptions = config.subscriptionConfig.toProtobuf()
240252
}
@@ -330,7 +342,7 @@ func (config *ClientConfiguration) WithReconnectStrategy(strategy *BackoffStrate
330342

331343
// WithDatabaseId sets the index of the logical database to connect to.
332344
func (config *ClientConfiguration) WithDatabaseId(id int) *ClientConfiguration {
333-
config.databaseId = id
345+
config.DatabaseId = &id
334346
return config
335347
}
336348

@@ -481,6 +493,12 @@ func (config *ClusterClientConfiguration) WithReconnectStrategy(
481493
return config
482494
}
483495

496+
// WithDatabaseId sets the index of the logical database to connect to.
497+
func (config *ClusterClientConfiguration) WithDatabaseId(id int) *ClusterClientConfiguration {
498+
config.DatabaseId = &id
499+
return config
500+
}
501+
484502
// WithAdvancedConfiguration sets the advanced configuration settings for the client.
485503
func (config *ClusterClientConfiguration) WithAdvancedConfiguration(
486504
advancedConfig *AdvancedClusterClientConfiguration,

go/config/config_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,3 +340,122 @@ func TestConfig_LazyConnect(t *testing.T) {
340340

341341
assert.False(t, defaultClusterResult.LazyConnect)
342342
}
343+
344+
func TestConfig_DatabaseId(t *testing.T) {
345+
// Test standalone client with database ID
346+
standaloneConfig := NewClientConfiguration().WithDatabaseId(5)
347+
standaloneResult, err := standaloneConfig.ToProtobuf()
348+
if err != nil {
349+
t.Fatalf("Failed to convert standalone config to protobuf: %v", err)
350+
}
351+
assert.Equal(t, uint32(5), standaloneResult.DatabaseId)
352+
353+
// Test cluster client with database ID
354+
clusterConfig := NewClusterClientConfiguration().WithDatabaseId(3)
355+
clusterResult, err := clusterConfig.ToProtobuf()
356+
if err != nil {
357+
t.Fatalf("Failed to convert cluster config to protobuf: %v", err)
358+
}
359+
assert.Equal(t, uint32(3), clusterResult.DatabaseId)
360+
361+
// Test default behavior (no database ID set)
362+
defaultStandaloneConfig := NewClientConfiguration()
363+
defaultStandaloneResult, err := defaultStandaloneConfig.ToProtobuf()
364+
if err != nil {
365+
t.Fatalf("Failed to convert default standalone config to protobuf: %v", err)
366+
}
367+
assert.Equal(t, uint32(0), defaultStandaloneResult.DatabaseId)
368+
369+
defaultClusterConfig := NewClusterClientConfiguration()
370+
defaultClusterResult, err := defaultClusterConfig.ToProtobuf()
371+
if err != nil {
372+
t.Fatalf("Failed to convert default cluster config to protobuf: %v", err)
373+
}
374+
assert.Equal(t, uint32(0), defaultClusterResult.DatabaseId)
375+
}
376+
377+
func TestConfig_DatabaseId_Validation(t *testing.T) {
378+
// Test negative database ID validation for standalone
379+
standaloneConfig := NewClientConfiguration().WithDatabaseId(-1)
380+
_, err := standaloneConfig.ToProtobuf()
381+
assert.EqualError(t, err, "database_id must be non-negative")
382+
383+
// Test negative database ID validation for cluster
384+
clusterConfig := NewClusterClientConfiguration().WithDatabaseId(-1)
385+
_, err = clusterConfig.ToProtobuf()
386+
assert.EqualError(t, err, "database_id must be non-negative")
387+
388+
// Test valid database ID (0 should be valid)
389+
validStandaloneConfig := NewClientConfiguration().WithDatabaseId(0)
390+
_, err = validStandaloneConfig.ToProtobuf()
391+
assert.NoError(t, err)
392+
393+
validClusterConfig := NewClusterClientConfiguration().WithDatabaseId(0)
394+
_, err = validClusterConfig.ToProtobuf()
395+
assert.NoError(t, err)
396+
}
397+
398+
func TestConfig_DatabaseId_ExtendedValidation(t *testing.T) {
399+
// Test various valid database IDs for both standalone and cluster
400+
validDatabaseIds := []int{0, 1, 15, 50, 100, 999}
401+
402+
for _, dbId := range validDatabaseIds {
403+
t.Run(fmt.Sprintf("ValidDatabaseId_%d", dbId), func(t *testing.T) {
404+
// Test standalone configuration
405+
standaloneConfig := NewClientConfiguration().WithDatabaseId(dbId)
406+
standaloneResult, err := standaloneConfig.ToProtobuf()
407+
assert.NoError(t, err)
408+
assert.Equal(t, uint32(dbId), standaloneResult.DatabaseId)
409+
410+
// Test cluster configuration
411+
clusterConfig := NewClusterClientConfiguration().WithDatabaseId(dbId)
412+
clusterResult, err := clusterConfig.ToProtobuf()
413+
assert.NoError(t, err)
414+
assert.Equal(t, uint32(dbId), clusterResult.DatabaseId)
415+
})
416+
}
417+
418+
// Test invalid database IDs
419+
invalidDatabaseIds := []int{-1, -10, -100}
420+
421+
for _, dbId := range invalidDatabaseIds {
422+
t.Run(fmt.Sprintf("InvalidDatabaseId_%d", dbId), func(t *testing.T) {
423+
// Test standalone configuration
424+
standaloneConfig := NewClientConfiguration().WithDatabaseId(dbId)
425+
_, err := standaloneConfig.ToProtobuf()
426+
assert.EqualError(t, err, "database_id must be non-negative")
427+
428+
// Test cluster configuration
429+
clusterConfig := NewClusterClientConfiguration().WithDatabaseId(dbId)
430+
_, err = clusterConfig.ToProtobuf()
431+
assert.EqualError(t, err, "database_id must be non-negative")
432+
})
433+
}
434+
}
435+
436+
func TestConfig_DatabaseId_BaseConfiguration(t *testing.T) {
437+
// Test that database_id is properly handled in base configuration for both client types
438+
439+
// Test standalone client inherits database_id from base configuration
440+
standaloneConfig := NewClientConfiguration().WithDatabaseId(5)
441+
standaloneResult, err := standaloneConfig.ToProtobuf()
442+
assert.NoError(t, err)
443+
assert.Equal(t, uint32(5), standaloneResult.DatabaseId)
444+
assert.False(t, standaloneResult.ClusterModeEnabled)
445+
446+
// Test cluster client inherits database_id from base configuration
447+
clusterConfig := NewClusterClientConfiguration().WithDatabaseId(3)
448+
clusterResult, err := clusterConfig.ToProtobuf()
449+
assert.NoError(t, err)
450+
assert.Equal(t, uint32(3), clusterResult.DatabaseId)
451+
assert.True(t, clusterResult.ClusterModeEnabled)
452+
453+
// Test that both configurations use the same base validation logic
454+
standaloneConfigInvalid := NewClientConfiguration().WithDatabaseId(-1)
455+
_, err = standaloneConfigInvalid.ToProtobuf()
456+
assert.EqualError(t, err, "database_id must be non-negative")
457+
458+
clusterConfigInvalid := NewClusterClientConfiguration().WithDatabaseId(-1)
459+
_, err = clusterConfigInvalid.ToProtobuf()
460+
assert.EqualError(t, err, "database_id must be non-negative")
461+
}

go/glide_client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,20 @@ func (client *Client) ConfigGet(ctx context.Context, args []string) (map[string]
210210

211211
// Select changes the currently selected database.
212212
//
213+
// WARNING: This command is NOT RECOMMENDED for production use.
214+
// Upon reconnection, the client will revert to the database_id specified
215+
// in the client configuration (default: 0), NOT the database selected
216+
// via this command.
217+
//
218+
// RECOMMENDED APPROACH: Use the database_id parameter in client
219+
// configuration instead:
220+
//
221+
// config := &config.ClientConfiguration{
222+
// Addresses: []config.NodeAddress{{Host: "localhost", Port: 6379}},
223+
// DatabaseId: &databaseId, // Recommended: persists across reconnections
224+
// }
225+
// client, err := NewClient(config)
226+
//
213227
// See [valkey.io] for details.
214228
//
215229
// Parameters:

go/glide_cluster_client.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,87 @@ func (client *ClusterClient) CustomCommandWithRoute(ctx context.Context,
335335
return models.CreateClusterValue[any](data), nil
336336
}
337337

338+
// Select changes the currently selected database on cluster nodes.
339+
// The command will be routed to all primary nodes by default.
340+
//
341+
// WARNING: This command is NOT RECOMMENDED for production use.
342+
// Upon reconnection, nodes will revert to the database_id specified
343+
// in the client configuration (default: 0), NOT the database selected
344+
// via this command.
345+
//
346+
// RECOMMENDED APPROACH: Use the database_id parameter in client
347+
// configuration instead:
348+
//
349+
// config := &config.ClusterClientConfiguration{
350+
// Addresses: []config.NodeAddress{{Host: "localhost", Port: 6379}},
351+
// DatabaseId: &databaseId, // Recommended: persists across reconnections
352+
// }
353+
// client, err := NewClusterClient(config)
354+
//
355+
// CLUSTER BEHAVIOR: This command routes to all nodes by default
356+
// to maintain consistency across the cluster.
357+
//
358+
// See [valkey.io] for details.
359+
//
360+
// Parameters:
361+
//
362+
// ctx - The context for controlling the command execution.
363+
// index - The index of the database to select.
364+
//
365+
// Return value:
366+
//
367+
// A simple `"OK"` response.
368+
//
369+
// [valkey.io]: https://valkey.io/commands/select/
370+
func (client *ClusterClient) Select(ctx context.Context, index int64) (string, error) {
371+
result, err := client.executeCommand(ctx, C.Select, []string{utils.IntToString(index)})
372+
if err != nil {
373+
return models.DefaultStringResponse, err
374+
}
375+
376+
return handleOkResponse(result)
377+
}
378+
379+
// Select changes the currently selected database on cluster nodes.
380+
//
381+
// WARNING: This command is NOT RECOMMENDED for production use.
382+
// Upon reconnection, nodes will revert to the database_id specified
383+
// in the client configuration (default: 0), NOT the database selected
384+
// via this command.
385+
//
386+
// RECOMMENDED APPROACH: Use the database_id parameter in client
387+
// configuration instead.
388+
//
389+
// CLUSTER BEHAVIOR: This command routes to all nodes by default
390+
// to maintain consistency across the cluster.
391+
//
392+
// See [valkey.io] for details.
393+
//
394+
// Parameters:
395+
//
396+
// ctx - The context for controlling the command execution.
397+
// index - The index of the database to select.
398+
// routeOption - Specifies the routing configuration for the command. The client will route the
399+
// command to the nodes defined by routeOption.Route. Defaults to all primary nodes.
400+
//
401+
// Return value:
402+
//
403+
// A simple `"OK"` response.
404+
//
405+
// [valkey.io]: https://valkey.io/commands/select/
406+
func (client *ClusterClient) SelectWithOptions(
407+
ctx context.Context,
408+
index int64,
409+
routeOption options.RouteOption,
410+
) (string, error) {
411+
result, err := client.executeCommandWithRoute(ctx, C.Select, []string{utils.IntToString(index)}, routeOption.Route)
412+
if err != nil {
413+
return models.DefaultStringResponse, err
414+
}
415+
416+
return handleOkResponse(result)
417+
}
418+
338419
// Pings the server.
339420
// The command will be routed to all primary nodes.
340421
//

go/integTest/batch_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2385,6 +2385,12 @@ func CreateServerManagementTests(batch *pipeline.ClusterBatch, isAtomic bool, se
23852385
testData = append(testData, CommandTestData{ExpectedResponse: "OK", TestName: "ConfigResetStat()"})
23862386
// ConfigRewrite skipped, because depends on config
23872387

2388+
// SELECT command is only available in Valkey 9+ for cluster mode
2389+
if serverVer >= "9.0.0" {
2390+
batch.Select(1)
2391+
testData = append(testData, CommandTestData{ExpectedResponse: "OK", TestName: "Select(1)"})
2392+
}
2393+
23882394
return BatchTestData{CommandTestData: testData, TestName: "Server Management commands"}
23892395
}
23902396

0 commit comments

Comments
 (0)