Skip to content

Commit 17bfe24

Browse files
committed
GO: Lazy connect implementation
Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
1 parent 08dde85 commit 17bfe24

File tree

4 files changed

+286
-0
lines changed

4 files changed

+286
-0
lines changed

go/config/config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ type baseClientConfiguration struct {
101101
clientName string
102102
clientAZ string
103103
reconnectStrategy *BackoffStrategy
104+
lazyConnect bool
104105
}
105106

106107
func (config *baseClientConfiguration) toProtobuf() (*protobuf.ConnectionRequest, error) {
@@ -147,6 +148,10 @@ func (config *baseClientConfiguration) toProtobuf() (*protobuf.ConnectionRequest
147148
request.ConnectionRetryStrategy = config.reconnectStrategy.toProtobuf()
148149
}
149150

151+
if config.lazyConnect {
152+
request.LazyConnect = config.lazyConnect
153+
}
154+
150155
return &request, nil
151156
}
152157

@@ -269,6 +274,14 @@ func (config *ClientConfiguration) WithUseTLS(useTLS bool) *ClientConfiguration
269274
return config
270275
}
271276

277+
// WithLazyConnect configures whether the client should establish connections lazily. When set to true,
278+
// the client will only establish connections when needed for the first operation, rather than
279+
// immediately upon client creation.
280+
func (config *ClientConfiguration) WithLazyConnect(lazyConnect bool) *ClientConfiguration {
281+
config.lazyConnect = lazyConnect
282+
return config
283+
}
284+
272285
// WithCredentials sets the credentials for the authentication process. If none are set, the client will not authenticate
273286
// itself with the server.
274287
func (config *ClientConfiguration) WithCredentials(credentials *ServerCredentials) *ClientConfiguration {
@@ -410,6 +423,14 @@ func (config *ClusterClientConfiguration) WithUseTLS(useTLS bool) *ClusterClient
410423
return config
411424
}
412425

426+
// WithLazyConnect configures whether the client should establish connections lazily. When set to true,
427+
// the client will only establish connections when needed for the first operation, rather than
428+
// immediately upon client creation.
429+
func (config *ClusterClientConfiguration) WithLazyConnect(lazyConnect bool) *ClusterClientConfiguration {
430+
config.lazyConnect = lazyConnect
431+
return config
432+
}
433+
413434
// WithCredentials sets the credentials for the authentication process. If none are set, the client will not authenticate
414435
// itself with the server.
415436
func (config *ClusterClientConfiguration) WithCredentials(

go/config/config_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,3 +296,47 @@ func TestConfig_InvalidRequestAndConnectionTimeouts(t *testing.T) {
296296
_, err8 := config8.ToProtobuf()
297297
assert.EqualError(t, err8, "setting connection timeout returned an error: invalid duration was specified")
298298
}
299+
300+
func TestConfig_LazyConnect(t *testing.T) {
301+
// Test for ClientConfiguration
302+
clientConfig := NewClientConfiguration().
303+
WithLazyConnect(true)
304+
305+
clientResult, err := clientConfig.ToProtobuf()
306+
if err != nil {
307+
t.Fatalf("Failed to convert client config to protobuf: %v", err)
308+
}
309+
310+
assert.True(t, clientResult.LazyConnect)
311+
312+
// Test for ClusterClientConfiguration
313+
clusterConfig := NewClusterClientConfiguration().
314+
WithLazyConnect(true)
315+
316+
clusterResult, err := clusterConfig.ToProtobuf()
317+
if err != nil {
318+
t.Fatalf("Failed to convert cluster config to protobuf: %v", err)
319+
}
320+
321+
assert.True(t, clusterResult.LazyConnect)
322+
323+
// Test default value (false) for ClientConfiguration
324+
defaultClientConfig := NewClientConfiguration()
325+
326+
defaultClientResult, err := defaultClientConfig.ToProtobuf()
327+
if err != nil {
328+
t.Fatalf("Failed to convert default client config to protobuf: %v", err)
329+
}
330+
331+
assert.False(t, defaultClientResult.LazyConnect)
332+
333+
// Test default value (false) for ClusterClientConfiguration
334+
defaultClusterConfig := NewClusterClientConfiguration()
335+
336+
defaultClusterResult, err := defaultClusterConfig.ToProtobuf()
337+
if err != nil {
338+
t.Fatalf("Failed to convert default cluster config to protobuf: %v", err)
339+
}
340+
341+
assert.False(t, defaultClusterResult.LazyConnect)
342+
}

go/integTest/connection_test.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package integTest
44

55
import (
66
"context"
7+
"fmt"
78
"strings"
89
"sync"
910
"time"
@@ -14,6 +15,147 @@ import (
1415
"github.com/valkey-io/valkey-glide/go/v2/internal/interfaces"
1516
)
1617

18+
func newDedicatedValkey(suite *GlideTestSuite, clusterMode bool) (string, error) {
19+
// Build command arguments
20+
args := []string{}
21+
args = append(args, "start")
22+
if clusterMode {
23+
args = append(args, "--cluster-mode")
24+
}
25+
26+
// shardCount := 1
27+
// if clusterMode {
28+
// shardCount = 3
29+
// }
30+
// args = append(args, fmt.Sprintf("-n %d", shardCount))
31+
args = append(args, fmt.Sprintf("-r %d", 1))
32+
33+
// Execute cluster manager script
34+
output := runClusterManager(suite, args, false)
35+
36+
return output, nil
37+
}
38+
39+
func stopDedicatedValkey(suite *GlideTestSuite, clusterFolder string) {
40+
args := []string{}
41+
args = append(args, "stop", "--cluster-folder", clusterFolder)
42+
43+
runClusterManager(suite, args, false)
44+
}
45+
46+
func createDedicatedClient(
47+
addresses []config.NodeAddress,
48+
clusterMode bool,
49+
lazyConnect bool,
50+
) (interfaces.BaseClientCommands, error) {
51+
if clusterMode {
52+
cfg := config.NewClusterClientConfiguration()
53+
for _, addr := range addresses {
54+
cfg.WithAddress(&addr)
55+
}
56+
57+
cfg.WithRequestTimeout(3 * time.Second)
58+
advCfg := config.NewAdvancedClusterClientConfiguration()
59+
advCfg.WithConnectionTimeout(3 * time.Second)
60+
cfg.WithAdvancedConfiguration(advCfg)
61+
cfg.WithLazyConnect(lazyConnect)
62+
63+
return glide.NewClusterClient(cfg)
64+
}
65+
66+
cfg := config.NewClientConfiguration()
67+
for _, addr := range addresses {
68+
cfg.WithAddress(&addr)
69+
}
70+
71+
cfg.WithRequestTimeout(3 * time.Second)
72+
advCfg := config.NewAdvancedClientConfiguration()
73+
advCfg.WithConnectionTimeout(3 * time.Second)
74+
cfg.WithAdvancedConfiguration(advCfg)
75+
cfg.WithLazyConnect(lazyConnect)
76+
77+
return glide.NewClient(cfg)
78+
}
79+
80+
// getClientListOutputCount parses CLIENT LIST output and returns the number of clients
81+
func getClientListOutputCount(output interface{}) int {
82+
if output == nil {
83+
return 0
84+
}
85+
86+
var text string
87+
switch v := output.(type) {
88+
case []byte:
89+
text = string(v)
90+
case string:
91+
text = v
92+
default:
93+
return 0
94+
}
95+
96+
if text = strings.TrimSpace(text); text == "" {
97+
return 0
98+
}
99+
100+
return len(strings.Split(text, "\n"))
101+
}
102+
103+
// getClientCount returns the number of connected clients
104+
func getClientCount(ctx context.Context, client interfaces.BaseClientCommands) (int, error) {
105+
if clusterClient, ok := client.(interfaces.GlideClusterClientCommands); ok {
106+
// For cluster client, execute CLIENT LIST on all nodes
107+
result, err := clusterClient.CustomCommandWithRoute(ctx, []string{"CLIENT", "LIST"}, config.AllNodes)
108+
if err != nil {
109+
return 0, err
110+
}
111+
112+
// Result will be a map with node addresses as keys and CLIENT LIST output as values
113+
totalCount := 0
114+
for _, nodeOutput := range result.MultiValue() {
115+
totalCount += getClientListOutputCount(nodeOutput)
116+
}
117+
return totalCount, nil
118+
}
119+
120+
// For standalone client, execute CLIENT LIST directly
121+
glideClient := client.(interfaces.GlideClientCommands)
122+
result, err := glideClient.CustomCommand(ctx, []string{"CLIENT", "LIST"})
123+
if err != nil {
124+
return 0, err
125+
}
126+
return getClientListOutputCount(result), nil
127+
}
128+
129+
// getExpectedNewConnections returns the expected number of new connections when a lazy client is initialized
130+
func getExpectedNewConnections(ctx context.Context, client interfaces.BaseClientCommands) (int, error) {
131+
if clusterClient, ok := client.(interfaces.GlideClusterClientCommands); ok {
132+
// For cluster, get node count and multiply by 2 (2 connections per node)
133+
result, err := clusterClient.CustomCommand(ctx, []string{"CLUSTER", "NODES"})
134+
if err != nil {
135+
return 0, err
136+
}
137+
138+
var nodesInfo string
139+
switch v := result.SingleValue().(type) {
140+
case []byte:
141+
nodesInfo = string(v)
142+
case string:
143+
nodesInfo = v
144+
default:
145+
nodesInfo = ""
146+
}
147+
148+
if nodesInfo = strings.TrimSpace(nodesInfo); nodesInfo == "" {
149+
return 0, nil
150+
}
151+
152+
return len(strings.Split(nodesInfo, "\n")) * 2, nil
153+
}
154+
155+
// For standalone, always expect 1 new connection
156+
return 1, nil
157+
}
158+
17159
func (suite *GlideTestSuite) TestStandaloneConnect() {
18160
config := config.NewClientConfiguration().
19161
WithAddress(&suite.standaloneHosts[0])
@@ -156,3 +298,60 @@ func (suite *GlideTestSuite) TestConnectionTimeout() {
156298
}
157299
})
158300
}
301+
302+
func (suite *GlideTestSuite) TestLazyConnectionEstablishesOnFirstCommand() {
303+
// Run test for both standalone and cluster modes
304+
suite.runWithTimeoutClients(func(client interfaces.BaseClientCommands) {
305+
ctx := context.Background()
306+
_, isCluster := client.(interfaces.GlideClusterClientCommands)
307+
308+
// Create a monitoring client (eagerly connected)
309+
output, err := newDedicatedValkey(suite, isCluster)
310+
suite.NoError(err)
311+
clusterFolder := extractClusterFolder(suite, output)
312+
addresses := extractAddresses(suite, output)
313+
defer stopDedicatedValkey(suite, clusterFolder)
314+
monitoringClient, err := createDedicatedClient(addresses, isCluster, false)
315+
suite.NoError(err)
316+
defer monitoringClient.Close()
317+
318+
// Get initial client count
319+
clientsBeforeLazyInit, err := getClientCount(ctx, monitoringClient)
320+
suite.NoError(err)
321+
322+
// Create the "lazy" client
323+
lazyClient, err := createDedicatedClient(addresses, isCluster, true)
324+
suite.NoError(err)
325+
defer lazyClient.Close()
326+
327+
// Check count (should not change)
328+
clientsAfterLazyInit, err := getClientCount(ctx, monitoringClient)
329+
suite.NoError(err)
330+
suite.Equal(clientsBeforeLazyInit, clientsAfterLazyInit,
331+
"Lazy client should not connect before the first command")
332+
333+
// Send the first command using the lazy client
334+
var result interface{}
335+
if isCluster {
336+
clusterClient := lazyClient.(interfaces.GlideClusterClientCommands)
337+
result, err = clusterClient.Ping(ctx)
338+
} else {
339+
glideClient := lazyClient.(interfaces.GlideClientCommands)
340+
result, err = glideClient.Ping(ctx)
341+
}
342+
suite.NoError(err)
343+
344+
// Assert PING success for both modes
345+
suite.Equal("PONG", result)
346+
347+
// Check client count after the first command
348+
clientsAfterFirstCommand, err := getClientCount(ctx, monitoringClient)
349+
suite.NoError(err)
350+
351+
expectedNewConnections, err := getExpectedNewConnections(ctx, monitoringClient)
352+
suite.NoError(err)
353+
354+
suite.Equal(clientsBeforeLazyInit+expectedNewConnections, clientsAfterFirstCommand,
355+
"Lazy client should establish expected number of new connections after the first command")
356+
})
357+
}

go/integTest/glide_test_suite_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,28 @@ func parseHosts(suite *GlideTestSuite, addresses string) []config.NodeAddress {
146146
return result
147147
}
148148

149+
func extractClusterFolder(suite *GlideTestSuite, output string) string {
150+
lines := strings.Split(output, "\n")
151+
foundFolder := false
152+
clusterFolder := ""
153+
154+
for _, line := range lines {
155+
if strings.Contains(line, "CLUSTER_FOLDER=") {
156+
parts := strings.SplitN(line, "CLUSTER_FOLDER=", 2)
157+
if len(parts) != 2 {
158+
suite.T().Fatalf("invalid CLUSTER_FOLDER line format: %s", line)
159+
}
160+
clusterFolder = strings.TrimSpace(parts[1])
161+
foundFolder = true
162+
}
163+
}
164+
165+
if !foundFolder {
166+
suite.T().Fatalf("missing required output fields")
167+
}
168+
return clusterFolder
169+
}
170+
149171
func extractAddresses(suite *GlideTestSuite, output string) []config.NodeAddress {
150172
for _, line := range strings.Split(output, "\n") {
151173
if !strings.HasPrefix(line, "CLUSTER_NODES=") {

0 commit comments

Comments
 (0)