Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2110,10 +2110,18 @@ func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...s
}
}

err = node.Client.Watch(ctx, fn, keys...)
// Track callback errors separately to avoid retrying user failures through cluster retry classification.
var fnErr error
Comment thread
obiyang marked this conversation as resolved.
err = node.Client.Watch(ctx, func(tx *Tx) error {
fnErr = fn(tx)
return fnErr
}, keys...)
if err == nil {
break
}
if fnErr != nil {
return fnErr
}
Comment thread
ndyakov marked this conversation as resolved.

moved, ask, addr := isMovedError(err)
if moved || ask {
Expand Down
22 changes: 22 additions & 0 deletions osscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"slices"
"strconv"
Expand Down Expand Up @@ -1355,6 +1356,27 @@ var _ = Describe("ClusterClient", func() {
Expect(info.Val()).Should(ContainSubstring("tcp_port:16601"))
})

It("should not retry Watch callback errors wrapping io.EOF", func() {
opt := redisClusterOptions()
opt.MaxRedirects = 1
opt.MinRetryBackoff = -1
opt.MaxRetryBackoff = -1

client := cluster.newClusterClient(ctx, opt)
defer func() {
Expect(client.Close()).NotTo(HaveOccurred())
}()

calls := 0
err := client.Watch(ctx, func(tx *redis.Tx) error {
calls++
return fmt.Errorf("external call failed: %w", io.EOF)
}, "watch-callback-net-error")

Expect(errors.Is(err, io.EOF)).To(BeTrue())
Expect(calls).To(Equal(1))
})

assertClusterClient()
})

Expand Down
Loading