Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha1"
"encoding/json"
"fmt"
"math"
"reflect"
"strconv"
"time"
Expand Down Expand Up @@ -7761,6 +7762,205 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(2)))
})

It("should not XNack with no mode", func() {
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")

// Mode is required by Redis; omitting it should return a server-side error.
_, err := client.XNack(ctx, &redis.XNackArgs{
Stream: "stream",
Group: "group",
IDs: []string{"1-0", "2-0"},
}).Result()
Expect(err).To(HaveOccurred())
})

It("should XNack with SILENT mode", func() {
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")

// All 3 messages are pending (delivered by BeforeEach), each with delivery_count=1.
// SILENT: consumer shutting down; delivery counter decremented by 1 (1 → 0).
n, err := client.XNack(ctx, &redis.XNackArgs{
Stream: "stream",
Group: "group",
Mode: "SILENT",
IDs: []string{"1-0", "2-0"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(2)))

// NACKed messages move to unassigned "nacked" state in the PEL.
// Total PEL count stays 3; only 3-0 remains assigned to consumer.
pendingInfo, err := client.XPending(ctx, "stream", "group").Result()
Expect(err).NotTo(HaveOccurred())
Expect(pendingInfo.Count).To(Equal(int64(3)))
Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 1}))

// Verify the delivery counter was decremented from 1 to 0 for the NACKed messages.
infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: "stream",
Group: "group",
Start: "-",
End: "+",
Count: 10,
}).Result()
Expect(err).NotTo(HaveOccurred())
for _, e := range infoExt {
if e.ID == "1-0" || e.ID == "2-0" {
Expect(e.RetryCount).To(Equal(int64(0)), "SILENT should decrement delivery counter to 0 for %s", e.ID)
}
}
})

It("should XNack with FAIL mode", func() {
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")

// FAIL: delivery counter stays the same (1 → 1).
n, err := client.XNack(ctx, &redis.XNackArgs{
Stream: "stream",
Group: "group",
Mode: "FAIL",
IDs: []string{"1-0"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))

// NACKed message moves to unassigned "nacked" state.
// Only 2 messages (2-0, 3-0) remain assigned to consumer.
pendingInfo, err := client.XPending(ctx, "stream", "group").Result()
Expect(err).NotTo(HaveOccurred())
Expect(pendingInfo.Count).To(Equal(int64(3)))
Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2}))

// Verify the delivery counter was left unchanged at 1.
infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: "stream",
Group: "group",
Start: "-",
End: "+",
Count: 10,
}).Result()
Expect(err).NotTo(HaveOccurred())
for _, e := range infoExt {
if e.ID == "1-0" {
Expect(e.RetryCount).To(Equal(int64(1)), "FAIL should leave delivery counter unchanged")
}
}
})

It("should XNack with FATAL mode", func() {
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")

// FATAL: delivery counter set to MAXINT (for invalid/malicious messages).
n, err := client.XNack(ctx, &redis.XNackArgs{
Stream: "stream",
Group: "group",
Mode: "FATAL",
IDs: []string{"1-0"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))

// NACKed message moves to unassigned state; 2-0 and 3-0 remain assigned.
pendingInfo, err := client.XPending(ctx, "stream", "group").Result()
Expect(err).NotTo(HaveOccurred())
Expect(pendingInfo.Count).To(Equal(int64(3)))
Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2}))

// Verify the delivery counter was set to MAXINT (math.MaxInt64).
infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: "stream",
Group: "group",
Start: "-",
End: "+",
Count: 10,
}).Result()
Expect(err).NotTo(HaveOccurred())
for _, e := range infoExt {
if e.ID == "1-0" {
Expect(e.RetryCount).To(Equal(int64(math.MaxInt64)), "FATAL should set delivery counter to MAXINT")
}
}
})

It("should XNack nacked-count reflected in XINFO STREAM FULL", func() {
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")

// NACK two messages.
n, err := client.XNack(ctx, &redis.XNackArgs{
Stream: "stream",
Group: "group",
Mode: "FAIL",
IDs: []string{"1-0", "2-0"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(2)))

// Verify nacked-count in XINFO STREAM FULL.
info, err := client.XInfoStreamFull(ctx, "stream", 10).Result()
Expect(err).NotTo(HaveOccurred())
Expect(info.Groups).To(HaveLen(1))
Expect(info.Groups[0].NackedCount).To(Equal(uint64(2)))
})

It("should XNack with RetryCount", func() {
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")

retryCount := uint64(5)
n, err := client.XNack(ctx, &redis.XNackArgs{
Stream: "stream",
Group: "group",
Mode: "FAIL",
IDs: []string{"1-0"},
RetryCount: &retryCount,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))

// Verify the delivery counter was set to the explicit RetryCount value,
// overriding the mode's default adjustment.
infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: "stream",
Group: "group",
Start: "-",
End: "+",
Count: 10,
}).Result()
Expect(err).NotTo(HaveOccurred())
// Find 1-0 and verify its delivery counter.
var entry redis.XPendingExt
for _, e := range infoExt {
if e.ID == "1-0" {
entry = e
break
}
}
Expect(entry.ID).To(Equal("1-0"))
Expect(entry.RetryCount).To(Equal(int64(retryCount)))
})

It("should XNack with Force", func() {
SkipBeforeRedisVersion(8.8, "XNACK requires Redis 8.8+")

// Force creates a new NACKed PEL entry for an ID that was never
// delivered to a consumer via XREADGROUP. Without Force this would
// be a no-op (the ID is not in any consumer's PEL).
n, err := client.XNack(ctx, &redis.XNackArgs{
Stream: "stream",
Group: "group",
Mode: "FAIL",
IDs: []string{"1-0"},
Force: true,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))

// The entry is now in the group's PEL as an unassigned NACKed entry.
pendingInfo, err := client.XPending(ctx, "stream", "group").Result()
Expect(err).NotTo(HaveOccurred())
Expect(pendingInfo.Count).To(Equal(int64(3)))
Expect(pendingInfo.Consumers).To(Equal(map[string]int64{"consumer": 2}))
})

It("should XReadGroup with CLAIM argument", func() {
SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+")

Expand Down
48 changes: 32 additions & 16 deletions doctests/stream_tutorial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,38 +889,53 @@ func ExampleClient_raceitaly() {
fmt.Println(res31a) // >>> 0-0
// STEP_END

// STEP_START xnack
Comment thread
ndyakov marked this conversation as resolved.
res32, err := rdb.XNack(ctx, &redis.XNackArgs{
Stream: "race:italy",
Group: "italy_riders",
Mode: "FAIL",
IDs: []string{"1692632662819-0"},
}).Result()

if err != nil {
panic(err)
}

fmt.Println(res32) // >>> 1
// STEP_END

// STEP_START xinfo
res32, err := rdb.XInfoStream(ctx, "race:italy").Result()
res33, err := rdb.XInfoStream(ctx, "race:italy").Result()

if err != nil {
panic(err)
}

fmt.Println(res32.Length)
fmt.Println(res33.Length)
// >>> 5
fmt.Println(res32.FirstEntry)
fmt.Println(res33.FirstEntry)
// >>> {1692632639151-0 map[rider:Castilla] 0 0}
// STEP_END

// STEP_START xinfo_groups
res33, err := rdb.XInfoGroups(ctx, "race:italy").Result()
res34, err := rdb.XInfoGroups(ctx, "race:italy").Result()

if err != nil {
panic(err)
}

fmt.Println(res33)
fmt.Println(res34)
// >>> [{italy_riders 3 2 1692632662819-0 3 2}]
// STEP_END

// STEP_START xinfo_consumers
res34, err := rdb.XInfoConsumers(ctx, "race:italy", "italy_riders").Result()
res35, err := rdb.XInfoConsumers(ctx, "race:italy", "italy_riders").Result()

if err != nil {
panic(err)
}

// fmt.Println(res34)
// fmt.Println(res35)
// >>> [{Alice 1 1ms 1ms} {Bob 1 2ms 2ms} {Lora 0 1ms -1ms}]
// STEP_END

Expand Down Expand Up @@ -958,46 +973,46 @@ func ExampleClient_raceitaly() {
panic(err)
}

res35, err := rdb.XLen(ctx, "race:italy").Result()
res36, err := rdb.XLen(ctx, "race:italy").Result()

if err != nil {
panic(err)
}

fmt.Println(res35) // >>> 2
fmt.Println(res36) // >>> 2

res36, err := rdb.XRange(ctx, "race:italy", "-", "+").Result()
res37, err := rdb.XRange(ctx, "race:italy", "-", "+").Result()

if err != nil {
panic(err)
}

// fmt.Println(res36)
// fmt.Println(res37)
// >>> [{1726649529170-1 map[rider:Wood] 0 0} {1726649529171-0 map[rider:Henshaw] 0 0}]
// STEP_END

// STEP_START xtrim
res37, err := rdb.XTrimMaxLen(ctx, "race:italy", 10).Result()
res38, err := rdb.XTrimMaxLen(ctx, "race:italy", 10).Result()

if err != nil {
panic(err)
}

fmt.Println(res37) // >>> 0
fmt.Println(res38) // >>> 0
// STEP_END

// STEP_START xtrim2
res38, err := rdb.XTrimMaxLenApprox(ctx, "race:italy", 10, 20).Result()
res39, err := rdb.XTrimMaxLenApprox(ctx, "race:italy", 10, 20).Result()

if err != nil {
panic(err)
}

fmt.Println(res38) // >>> 0
fmt.Println(res39) // >>> 0
// STEP_END

// REMOVE_START
UNUSED(res27, res34, res36)
UNUSED(res27, res35, res37)
// REMOVE_END

// Output:
Expand All @@ -1012,6 +1027,7 @@ func ExampleClient_raceitaly() {
// 1692632662819-0
// []
// 0-0
// 1
// 5
// {1692632639151-0 map[rider:Castilla] 0 0}
// [{italy_riders 3 2 1692632662819-0 3 2}]
Expand Down
Loading
Loading