Skip to content
132 changes: 129 additions & 3 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
CmdTypeXPending
CmdTypeXPendingExt
CmdTypeXAutoClaim
CmdTypeXAutoClaimWithDeleted
CmdTypeXAutoClaimJustID
CmdTypeXInfoConsumers
CmdTypeXInfoGroups
Expand Down Expand Up @@ -163,6 +164,12 @@ type (
start string
}

CmdTypeXAutoClaimWithDeletedValue struct {
messages []XMessage
start string
deletedIDs []string
}

CmdTypeXAutoClaimJustIDValue struct {
ids []string
start string
Expand Down Expand Up @@ -2668,9 +2675,7 @@ func (cmd *XAutoClaimCmd) readReply(rd *proto.Reader) error {
}

if n >= 3 {
if err := rd.DiscardNext(); err != nil {
Comment thread
Khukharr marked this conversation as resolved.
return err
}
return rd.DiscardNext()
}

return nil
Expand Down Expand Up @@ -2701,6 +2706,119 @@ func (cmd *XAutoClaimCmd) Clone() Cmder {

//------------------------------------------------------------------------------

type XAutoClaimWithDeletedCmd struct {
baseCmd

start string
val []XMessage
deletedIDs []string
}

var _ Cmder = (*XAutoClaimWithDeletedCmd)(nil)

func NewXAutoClaimWithDeletedCmd(ctx context.Context, args ...interface{}) *XAutoClaimWithDeletedCmd {
return &XAutoClaimWithDeletedCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
cmdType: CmdTypeXAutoClaimWithDeleted,
},
}
}

func (cmd *XAutoClaimWithDeletedCmd) SetVal(val []XMessage, start string, deletedIDs []string) {
cmd.val = val
cmd.start = start
cmd.deletedIDs = deletedIDs
}

func (cmd *XAutoClaimWithDeletedCmd) Val() (messages []XMessage, start string, deletedIDs []string) {
return cmd.val, cmd.start, cmd.deletedIDs
}

func (cmd *XAutoClaimWithDeletedCmd) Result() (messages []XMessage, start string, deletedIDs []string, err error) {
return cmd.val, cmd.start, cmd.deletedIDs, cmd.err
}

func (cmd *XAutoClaimWithDeletedCmd) String() string {
return cmdString(cmd, cmd.val)
}

func (cmd *XAutoClaimWithDeletedCmd) readReply(rd *proto.Reader) error {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}

switch n {
case 2, // Redis 6
3: // Redis 7:
// ok
default:
return fmt.Errorf("redis: got %d elements in XAutoClaim reply, wanted 2/3", n)
}

cmd.start, err = rd.ReadString()
if err != nil {
return err
}

cmd.val, err = readXMessageSlice(rd)
if err != nil {
return err
}

if n < 3 {
return nil
}

nn, err := rd.ReadArrayLen()
if err != nil {
return err
}

cmd.deletedIDs = make([]string, nn)
for i := 0; i < nn; i++ {
cmd.deletedIDs[i], err = rd.ReadString()
if err != nil {
Comment thread
cursor[bot] marked this conversation as resolved.
return err
}
}

return nil
}

func (cmd *XAutoClaimWithDeletedCmd) Clone() Cmder {
var val []XMessage
if cmd.val != nil {
val = make([]XMessage, len(cmd.val))
for i, msg := range cmd.val {
val[i] = XMessage{
ID: msg.ID,
}
if msg.Values != nil {
val[i].Values = make(map[string]interface{}, len(msg.Values))
for k, v := range msg.Values {
val[i].Values[k] = v
}
}
}
}
var deletedIDs []string
if cmd.deletedIDs != nil {
deletedIDs = make([]string, len(cmd.deletedIDs))
copy(deletedIDs, cmd.deletedIDs)
}
return &XAutoClaimWithDeletedCmd{
baseCmd: cmd.cloneBaseCmd(),
start: cmd.start,
val: val,
deletedIDs: deletedIDs,
}
}

//------------------------------------------------------------------------------

type XAutoClaimJustIDCmd struct {
baseCmd

Expand Down Expand Up @@ -8026,6 +8144,14 @@ func ExtractCommandValue(cmd interface{}) (interface{}, error) {
messages, start := xAutoClaimCmd.Val()
return CmdTypeXAutoClaimValue{messages: messages, start: start}, xAutoClaimCmd.Err()
}
case CmdTypeXAutoClaimWithDeleted:
if xAutoClaimWithDeletedCmd, ok := cmd.(interface {
Val() ([]XMessage, string, []string)
Err() error
}); ok {
messages, start, deletedIDs := xAutoClaimWithDeletedCmd.Val()
return CmdTypeXAutoClaimWithDeletedValue{messages: messages, start: start, deletedIDs: deletedIDs}, xAutoClaimWithDeletedCmd.Err()
}
case CmdTypeXAutoClaimJustID:
if xAutoClaimJustIDCmd, ok := cmd.(interface {
Val() ([]string, string)
Expand Down
25 changes: 25 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7695,6 +7695,31 @@ var _ = Describe("Commands", func() {
Expect(ids).To(Equal([]string{"3-0"}))
})

It("should XAutoClaim return deletedIDs", func() {
xca := &redis.XAutoClaimArgs{
Stream: "stream",
Group: "group",
Consumer: "consumer",
Start: "-",
Count: 3,
}
err := client.XDel(ctx, "stream", "2-0").Err()
Expect(err).NotTo(HaveOccurred())

msgs, start, ids, err := client.XAutoClaimWithDeleted(ctx, xca).Result()
Expect(err).NotTo(HaveOccurred())
Expect(start).To(Equal("0-0"))
Expect(msgs).To(Equal([]redis.XMessage{{
ID: "1-0",
Values: map[string]interface{}{"uno": "un"},
}, {
ID: "3-0",
Values: map[string]interface{}{"tres": "troix"},
}}))
Expect(ids).To(Equal([]string{"2-0"}))

})

It("should XClaim", func() {
msgs, err := client.XClaim(ctx, &redis.XClaimArgs{
Stream: "stream",
Expand Down
8 changes: 8 additions & 0 deletions osscluster_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ func createCommandByType(ctx context.Context, cmdType CmdType, args ...interface
return NewXPendingExtCmd(ctx, args...)
case CmdTypeXAutoClaim:
return NewXAutoClaimCmd(ctx, args...)
case CmdTypeXAutoClaimWithDeleted:
return NewXAutoClaimWithDeletedCmd(ctx, args...)
case CmdTypeXAutoClaimJustID:
return NewXAutoClaimJustIDCmd(ctx, args...)
case CmdTypeXInfoStreamFull:
Expand Down Expand Up @@ -668,6 +670,12 @@ func (c *ClusterClient) setCommandValue(cmd Cmder, value interface{}) error {
c.SetVal(v.messages, v.start)
}
}
case CmdTypeXAutoClaimWithDeleted:
if c, ok := cmd.(*XAutoClaimWithDeletedCmd); ok {
if v, ok := value.(CmdTypeXAutoClaimWithDeletedValue); ok {
c.SetVal(v.messages, v.start, v.deletedIDs)
}
}
case CmdTypeXAutoClaimJustID:
if c, ok := cmd.(*XAutoClaimJustIDCmd); ok {
if v, ok := value.(CmdTypeXAutoClaimJustIDValue); ok {
Expand Down
8 changes: 8 additions & 0 deletions stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type StreamCmdable interface {
XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd
XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd
XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd
XAutoClaimWithDeleted(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimWithDeletedCmd
XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
Expand Down Expand Up @@ -406,6 +407,13 @@ func (c cmdable) XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimC
return cmd
}

func (c cmdable) XAutoClaimWithDeleted(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimWithDeletedCmd {
args := xAutoClaimArgs(ctx, a)
cmd := NewXAutoClaimWithDeletedCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd {
args := xAutoClaimArgs(ctx, a)
args = append(args, "justid")
Expand Down
Loading