From 8b98eabe03524b987f7ea9357fe18e02f4df036d Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Thu, 23 Apr 2026 14:22:39 +0100 Subject: [PATCH 1/3] Adds support for the XNACK command introduced in Redis 8.8 --- commands_test.go | 201 +++++++++++++++++++++++++++++++ doctests/stream_tutorial_test.go | 48 +++++--- stream_commands.go | 59 +++++++++ 3 files changed, 292 insertions(+), 16 deletions(-) diff --git a/commands_test.go b/commands_test.go index 992663b2d1..f510c432a6 100644 --- a/commands_test.go +++ b/commands_test.go @@ -6,6 +6,7 @@ import ( "crypto/sha1" "encoding/json" "fmt" + "math" "reflect" "strconv" "time" @@ -7625,6 +7626,206 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(2))) }) + It("should not XNack with no mode", func() { + SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+") + + // Mode is required by Redis; omitting it should return an error. + _, err := client.XNack(ctx, &redis.XNackArgs{ + Stream: "stream", + Group: "group", + IDs: []string{"1-0", "2-0"}, + }).Result() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("mode must be SILENT, FAIL, or FATAL")) + }) + + It("should XNack with SILENT mode", func() { + SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+") + + // All 3 messages are pending (delivered by BeforeEach), each with delivery_count=1. + // SILENT: consumer shutting down; delivery counter decremented by 1 (1 → 0). + n, err := client.XNack(ctx, &redis.XNackArgs{ + Stream: "stream", + Group: "group", + Mode: "SILENT", + IDs: []string{"1-0", "2-0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(2))) + + // NACKed messages move to unassigned "nacked" state in the PEL. + // Total PEL count stays 3; only 3-0 remains assigned to consumer. + pendingInfo, err := client.XPending(ctx, "stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(pendingInfo.Count).To(Equal(int64(3))) + Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 1})) + + // Verify the delivery counter was decremented from 1 to 0 for the NACKed messages. + infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: "stream", + Group: "group", + Start: "-", + End: "+", + Count: 10, + }).Result() + Expect(err).NotTo(HaveOccurred()) + for _, e := range infoExt { + if e.ID == "1-0" || e.ID == "2-0" { + Expect(e.RetryCount).To(Equal(int64(0)), "SILENT should decrement delivery counter to 0 for %s", e.ID) + } + } + }) + + It("should XNack with FAIL mode", func() { + SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+") + + // FAIL: delivery counter stays the same (1 → 1). + n, err := client.XNack(ctx, &redis.XNackArgs{ + Stream: "stream", + Group: "group", + Mode: "FAIL", + IDs: []string{"1-0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + // NACKed message moves to unassigned "nacked" state. + // Only 2 messages (2-0, 3-0) remain assigned to consumer. + pendingInfo, err := client.XPending(ctx, "stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(pendingInfo.Count).To(Equal(int64(3))) + Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2})) + + // Verify the delivery counter was left unchanged at 1. + infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: "stream", + Group: "group", + Start: "-", + End: "+", + Count: 10, + }).Result() + Expect(err).NotTo(HaveOccurred()) + for _, e := range infoExt { + if e.ID == "1-0" { + Expect(e.RetryCount).To(Equal(int64(1)), "FAIL should leave delivery counter unchanged") + } + } + }) + + It("should XNack with FATAL mode", func() { + SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+") + + // FATAL: delivery counter set to MAXINT (for invalid/malicious messages). + n, err := client.XNack(ctx, &redis.XNackArgs{ + Stream: "stream", + Group: "group", + Mode: "FATAL", + IDs: []string{"1-0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + // NACKed message moves to unassigned state; 2-0 and 3-0 remain assigned. + pendingInfo, err := client.XPending(ctx, "stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(pendingInfo.Count).To(Equal(int64(3))) + Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2})) + + // Verify the delivery counter was set to MAXINT (math.MaxInt64). + infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: "stream", + Group: "group", + Start: "-", + End: "+", + Count: 10, + }).Result() + Expect(err).NotTo(HaveOccurred()) + for _, e := range infoExt { + if e.ID == "1-0" { + Expect(e.RetryCount).To(Equal(int64(math.MaxInt64)), "FATAL should set delivery counter to MAXINT") + } + } + }) + + It("should XNack nacked-count reflected in XINFO STREAM FULL", func() { + SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+") + + // NACK two messages. + n, err := client.XNack(ctx, &redis.XNackArgs{ + Stream: "stream", + Group: "group", + Mode: "FAIL", + IDs: []string{"1-0", "2-0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(2))) + + // Verify nacked-count in XINFO STREAM FULL. + info, err := client.XInfoStreamFull(ctx, "stream", 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info.Groups).To(HaveLen(1)) + Expect(info.Groups[0].NackedCount).To(Equal(uint64(2))) + }) + + It("should XNack with RetryCount", func() { + SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+") + + retryCount := uint64(5) + n, err := client.XNack(ctx, &redis.XNackArgs{ + Stream: "stream", + Group: "group", + Mode: "FAIL", + IDs: []string{"1-0"}, + RetryCount: &retryCount, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + // Verify the delivery counter was set to the explicit RetryCount value, + // overriding the mode's default adjustment. + infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: "stream", + Group: "group", + Start: "-", + End: "+", + Count: 10, + }).Result() + Expect(err).NotTo(HaveOccurred()) + // Find 1-0 and verify its delivery counter. + var entry redis.XPendingExt + for _, e := range infoExt { + if e.ID == "1-0" { + entry = e + break + } + } + Expect(entry.ID).To(Equal("1-0")) + Expect(entry.RetryCount).To(Equal(int64(retryCount))) + }) + + It("should XNack with Force", func() { + SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+") + + // Force creates a new NACKed PEL entry for an ID that was never + // delivered to a consumer via XREADGROUP. Without Force this would + // be a no-op (the ID is not in any consumer's PEL). + n, err := client.XNack(ctx, &redis.XNackArgs{ + Stream: "stream", + Group: "group", + Mode: "FAIL", + IDs: []string{"1-0"}, + Force: true, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + // The entry is now in the group's PEL as an unassigned NACKed entry. + pendingInfo, err := client.XPending(ctx, "stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(pendingInfo.Count).To(Equal(int64(3))) + Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2})) + }) + It("should XReadGroup with CLAIM argument", func() { SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") diff --git a/doctests/stream_tutorial_test.go b/doctests/stream_tutorial_test.go index 3bff9885c6..8a8548f268 100644 --- a/doctests/stream_tutorial_test.go +++ b/doctests/stream_tutorial_test.go @@ -889,38 +889,53 @@ func ExampleClient_raceitaly() { fmt.Println(res31a) // >>> 0-0 // STEP_END + // STEP_START xnack + res32, err := rdb.XNack(ctx, &redis.XNackArgs{ + Stream: "race:italy", + Group: "italy_riders", + Mode: "FAIL", + IDs: []string{"1692632662819-0"}, + }).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res32) // >>> 1 + // STEP_END + // STEP_START xinfo - res32, err := rdb.XInfoStream(ctx, "race:italy").Result() + res33, err := rdb.XInfoStream(ctx, "race:italy").Result() if err != nil { panic(err) } - fmt.Println(res32.Length) + fmt.Println(res33.Length) // >>> 5 - fmt.Println(res32.FirstEntry) + fmt.Println(res33.FirstEntry) // >>> {1692632639151-0 map[rider:Castilla] 0 0} // STEP_END // STEP_START xinfo_groups - res33, err := rdb.XInfoGroups(ctx, "race:italy").Result() + res34, err := rdb.XInfoGroups(ctx, "race:italy").Result() if err != nil { panic(err) } - fmt.Println(res33) + fmt.Println(res34) // >>> [{italy_riders 3 2 1692632662819-0 3 2}] // STEP_END // STEP_START xinfo_consumers - res34, err := rdb.XInfoConsumers(ctx, "race:italy", "italy_riders").Result() + res35, err := rdb.XInfoConsumers(ctx, "race:italy", "italy_riders").Result() if err != nil { panic(err) } - // fmt.Println(res34) + // fmt.Println(res35) // >>> [{Alice 1 1ms 1ms} {Bob 1 2ms 2ms} {Lora 0 1ms -1ms}] // STEP_END @@ -958,46 +973,46 @@ func ExampleClient_raceitaly() { panic(err) } - res35, err := rdb.XLen(ctx, "race:italy").Result() + res36, err := rdb.XLen(ctx, "race:italy").Result() if err != nil { panic(err) } - fmt.Println(res35) // >>> 2 + fmt.Println(res36) // >>> 2 - res36, err := rdb.XRange(ctx, "race:italy", "-", "+").Result() + res37, err := rdb.XRange(ctx, "race:italy", "-", "+").Result() if err != nil { panic(err) } - // fmt.Println(res36) + // fmt.Println(res37) // >>> [{1726649529170-1 map[rider:Wood] 0 0} {1726649529171-0 map[rider:Henshaw] 0 0}] // STEP_END // STEP_START xtrim - res37, err := rdb.XTrimMaxLen(ctx, "race:italy", 10).Result() + res38, err := rdb.XTrimMaxLen(ctx, "race:italy", 10).Result() if err != nil { panic(err) } - fmt.Println(res37) // >>> 0 + fmt.Println(res38) // >>> 0 // STEP_END // STEP_START xtrim2 - res38, err := rdb.XTrimMaxLenApprox(ctx, "race:italy", 10, 20).Result() + res39, err := rdb.XTrimMaxLenApprox(ctx, "race:italy", 10, 20).Result() if err != nil { panic(err) } - fmt.Println(res38) // >>> 0 + fmt.Println(res39) // >>> 0 // STEP_END // REMOVE_START - UNUSED(res27, res34, res36) + UNUSED(res27, res35, res37) // REMOVE_END // Output: @@ -1012,6 +1027,7 @@ func ExampleClient_raceitaly() { // 1692632662819-0 // [] // 0-0 + // 1 // 5 // {1692632639151-0 map[rider:Castilla] 0 0} // [{italy_riders 3 2 1692632662819-0 3 2}] diff --git a/stream_commands.go b/stream_commands.go index 89ae6a1b2d..58237b3ca8 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -29,6 +29,7 @@ type StreamCmdable interface { XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd + XNack(ctx context.Context, a *XNackArgs) *IntCmd XPending(ctx context.Context, stream, group string) *XPendingCmd XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd @@ -359,6 +360,64 @@ func (c cmdable) XAck(ctx context.Context, stream, group string, ids ...string) return cmd } +// XNackArgs represents the arguments for the XNACK command (Redis >= 8.8). +// +// XNACK negatively acknowledges one or more messages in a consumer group's +// Pending Entries List (PEL), releasing them back to the group so they can be +// redelivered to another consumer via XREADGROUP. +type XNackArgs struct { + Stream string + Group string + + // Mode controls how the delivery counter is adjusted for each NACKed entry. + // Must be one of: + // - "SILENT": the consumer is shutting down or experiencing internal errors + // unrelated to the message. The delivery counter is decremented by 1, + // undoing the increment that happened when the message was delivered. + // - "FAIL": the consumer could not process the message (e.g. insufficient + // memory), but another consumer might succeed. The delivery counter is + // left unchanged. + // - "FATAL": the message is invalid or suspected malicious. The delivery + // counter is set to MAXINT, which will immediately move the message to + // the Dead Letter Queue (DLQ) if one is configured for the group. + Mode string + + // IDs is the list of message IDs to NACK. All IDs must already be in the + // group's PEL (i.e. previously delivered via XREADGROUP), unless Force is set. + IDs []string + + // RetryCount sets the delivery counter to an explicit value, overriding the + // counter adjustment that would otherwise be applied by Mode. + // Leave nil to let Mode control the counter (the common case). + RetryCount *uint64 + + // Force allows NACKing message IDs that are not yet in the group's PEL, + // creating new unowned NACKed PEL entries for them directly. + // This is analogous to the FORCE flag in XCLAIM. + // Primarily used internally by Redis during AOF rewrite to reconstruct + // NACKed entries, but can also be used to manually inject entries. + Force bool +} + +// XNack executes the XNACK command. See [XNackArgs] for the full argument documentation. +// Requires Redis >= 8.8. +func (c cmdable) XNack(ctx context.Context, a *XNackArgs) *IntCmd { + args := make([]interface{}, 0, 6+len(a.IDs)) + args = append(args, "xnack", a.Stream, a.Group, a.Mode, "ids", len(a.IDs)) + for _, id := range a.IDs { + args = append(args, id) + } + if a.RetryCount != nil { + args = append(args, "retrycount", *a.RetryCount) + } + if a.Force { + args = append(args, "force") + } + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) XPending(ctx context.Context, stream, group string) *XPendingCmd { cmd := NewXPendingCmd(ctx, "xpending", stream, group) _ = c(ctx, cmd) From 6a75ee7f012d11cc133c66b3650782e967902aec Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Mon, 18 May 2026 11:38:08 +0300 Subject: [PATCH 2/3] validate Mode client-side and add mode constants --- stream_commands.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/stream_commands.go b/stream_commands.go index f99e3c513a..854c568c35 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -2,6 +2,7 @@ package redis import ( "context" + "errors" "strconv" "strings" "time" @@ -361,6 +362,13 @@ func (c cmdable) XAck(ctx context.Context, stream, group string, ids ...string) return cmd } +// XNACK modes. See [XNackArgs.Mode]. +const ( + XNackModeSilent = "SILENT" + XNackModeFail = "FAIL" + XNackModeFatal = "FATAL" +) + // XNackArgs represents the arguments for the XNACK command (Redis >= 8.8). // // XNACK negatively acknowledges one or more messages in a consumer group's @@ -371,14 +379,14 @@ type XNackArgs struct { Group string // Mode controls how the delivery counter is adjusted for each NACKed entry. - // Must be one of: - // - "SILENT": the consumer is shutting down or experiencing internal errors + // Must be one of [XNackModeSilent], [XNackModeFail], or [XNackModeFatal]: + // - SILENT: the consumer is shutting down or experiencing internal errors // unrelated to the message. The delivery counter is decremented by 1, // undoing the increment that happened when the message was delivered. - // - "FAIL": the consumer could not process the message (e.g. insufficient + // - FAIL: the consumer could not process the message (e.g. insufficient // memory), but another consumer might succeed. The delivery counter is // left unchanged. - // - "FATAL": the message is invalid or suspected malicious. The delivery + // - FATAL: the message is invalid or suspected malicious. The delivery // counter is set to MAXINT, which will immediately move the message to // the Dead Letter Queue (DLQ) if one is configured for the group. Mode string @@ -403,7 +411,7 @@ type XNackArgs struct { // XNack executes the XNACK command. See [XNackArgs] for the full argument documentation. // Requires Redis >= 8.8. func (c cmdable) XNack(ctx context.Context, a *XNackArgs) *IntCmd { - args := make([]interface{}, 0, 6+len(a.IDs)) + args := make([]interface{}, 0, 9+len(a.IDs)) args = append(args, "xnack", a.Stream, a.Group, a.Mode, "ids", len(a.IDs)) for _, id := range a.IDs { args = append(args, id) @@ -415,6 +423,12 @@ func (c cmdable) XNack(ctx context.Context, a *XNackArgs) *IntCmd { args = append(args, "force") } cmd := NewIntCmd(ctx, args...) + switch a.Mode { + case XNackModeSilent, XNackModeFail, XNackModeFatal: + default: + cmd.SetErr(errors.New("redis: XNACK mode must be SILENT, FAIL, or FATAL")) + return cmd + } _ = c(ctx, cmd) return cmd } From b9edc80208049970367b1818e45b15be97937065 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Thu, 21 May 2026 13:54:05 +0300 Subject: [PATCH 3/3] Address comments --- commands_test.go | 3 +-- stream_commands.go | 7 ------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/commands_test.go b/commands_test.go index 4e724c7629..a861864dc9 100644 --- a/commands_test.go +++ b/commands_test.go @@ -7765,14 +7765,13 @@ var _ = Describe("Commands", func() { It("should not XNack with no mode", func() { SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+") - // Mode is required by Redis; omitting it should return an error. + // Mode is required by Redis; omitting it should return a server-side error. _, err := client.XNack(ctx, &redis.XNackArgs{ Stream: "stream", Group: "group", IDs: []string{"1-0", "2-0"}, }).Result() Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("mode must be SILENT, FAIL, or FATAL")) }) It("should XNack with SILENT mode", func() { diff --git a/stream_commands.go b/stream_commands.go index 854c568c35..71191aec4b 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -2,7 +2,6 @@ package redis import ( "context" - "errors" "strconv" "strings" "time" @@ -423,12 +422,6 @@ func (c cmdable) XNack(ctx context.Context, a *XNackArgs) *IntCmd { args = append(args, "force") } cmd := NewIntCmd(ctx, args...) - switch a.Mode { - case XNackModeSilent, XNackModeFail, XNackModeFatal: - default: - cmd.SetErr(errors.New("redis: XNACK mode must be SILENT, FAIL, or FATAL")) - return cmd - } _ = c(ctx, cmd) return cmd }