Skip to content
99 changes: 95 additions & 4 deletions redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3777,6 +3777,8 @@ def xadd(
minid: Union[StreamIdT, None] = None,
limit: Optional[int] = None,
ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None,
idmpauto: Optional[str] = None,
idmp: Optional[tuple[str, bytes]] = None,
) -> ResponseT:
"""
Add to a stream.
Expand All @@ -3794,16 +3796,45 @@ def xadd(
- KEEPREF (default): When trimming, preserves references in consumer groups' PEL
- DELREF: When trimming, removes all references from consumer groups' PEL
- ACKED: When trimming, only removes entries acknowledged by all consumer groups
idmpauto: Producer ID for automatic idempotent ID calculation.
Automatically calculates an idempotent ID based on entry content to prevent
duplicate entries. Can only be used with id='*'. Creates an IDMP map if it
doesn't exist yet. The producer ID must be unique per producer and consistent
across restarts.
idmp: Tuple of (producer_id, idempotent_id) for explicit idempotent ID.
Uses a specific idempotent ID to prevent duplicate entries. Can only be used
with id='*'. The producer ID must be unique per producer and consistent across
restarts. The idempotent ID must be unique per message and per producer.
Shorter idempotent IDs require less memory and allow faster processing.
Creates an IDMP map if it doesn't exist yet.

For more information, see https://redis.io/commands/xadd
"""
pieces: list[EncodableT] = []
if maxlen is not None and minid is not None:
raise DataError("Only one of ```maxlen``` or ```minid``` may be specified")

if idmpauto is not None and idmp is not None:
raise DataError("Only one of ```idmpauto``` or ```idmp``` may be specified")

if (idmpauto is not None or idmp is not None) and id != "*":
raise DataError("IDMPAUTO and IDMP can only be used with id='*'")

if ref_policy is not None and ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
raise DataError("XADD ref_policy must be one of: KEEPREF, DELREF, ACKED")

if nomkstream:
pieces.append(b"NOMKSTREAM")
if ref_policy is not None:
pieces.append(ref_policy)
if idmpauto is not None:
pieces.extend([b"IDMPAUTO", idmpauto])
if idmp is not None:
if not isinstance(idmp, tuple) or len(idmp) != 2:
raise DataError(
"XADD idmp must be a tuple of (producer_id, idempotent_id)"
)
pieces.extend([b"IDMP", idmp[0], idmp[1]])
if maxlen is not None:
if not isinstance(maxlen, int) or maxlen < 0:
raise DataError("XADD maxlen must be non-negative integer")
Expand All @@ -3818,17 +3849,77 @@ def xadd(
pieces.append(minid)
if limit is not None:
pieces.extend([b"LIMIT", limit])
if nomkstream:
pieces.append(b"NOMKSTREAM")
if ref_policy is not None:
pieces.append(ref_policy)
pieces.append(id)
if not isinstance(fields, dict) or len(fields) == 0:
raise DataError("XADD fields must be a non-empty dict")
for pair in fields.items():
pieces.extend(pair)
return self.execute_command("XADD", name, *pieces)

def xcfgset(
self,
name: KeyT,
idmp_duration: Optional[int] = None,
idmp_maxsize: Optional[int] = None,
) -> ResponseT:
"""
Configure the idempotency parameters for a stream's IDMP map.

Sets how long Redis remembers each idempotent ID (iid) and the maximum
number of iids to track. This command clears the existing IDMP map
(Redis forgets all previously stored iids), but only if the configuration
value actually changes.

Args:
name: The name of the stream.
idmp_duration: How long Redis remembers each iid in seconds.
Default: 100 seconds (or value set by stream-idmp-duration config).
Minimum: 1 second, Maximum: 300 seconds.
Redis won't forget an iid for this duration (unless maxsize is reached).
Should accommodate application crash recovery time.
idmp_maxsize: Maximum number of iids Redis remembers per producer ID (pid).
Default: 100 iids (or value set by stream-idmp-maxsize config).
Minimum: 1 iid, Maximum: 1,000,000 (1M) iids.
Should be set to: mark-delay [in msec] × (messages/msec) + margin.
Example: 10K msgs/sec (10 msgs/msec), 80 msec mark-delay
→ maxsize = 10 × 80 + margin = 1000 iids.

Returns:
OK on success.

For more information, see https://redis.io/commands/xcfgset
"""
if idmp_duration is None and idmp_maxsize is None:
raise DataError(
"XCFGSET requires at least one of idmp_duration or idmp_maxsize"
)

pieces: list[EncodableT] = []

if idmp_duration is not None:
if (
not isinstance(idmp_duration, int)
or idmp_duration < 1
or idmp_duration > 300
):
raise DataError(
"XCFGSET idmp_duration must be an integer between 1 and 300"
)
pieces.extend([b"IDMP-DURATION", idmp_duration])

if idmp_maxsize is not None:
if (
not isinstance(idmp_maxsize, int)
or idmp_maxsize < 1
or idmp_maxsize > 1000000
):
raise DataError(
"XCFGSET idmp_maxsize must be an integer between 1 and 1,000,000"
)
pieces.extend([b"IDMP-MAXSIZE", idmp_maxsize])

return self.execute_command("XCFGSET", name, *pieces)

def xautoclaim(
self,
name: KeyT,
Expand Down
221 changes: 221 additions & 0 deletions tests/test_asyncio/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3932,6 +3932,58 @@ async def test_xinfo_stream_full(self, r: redis.Redis):
consumer = info["groups"][0]["consumers"][0]
assert isinstance(consumer, dict)

@skip_if_server_version_lt("8.5.0")
async def test_xinfo_stream_idempotent_fields(self, r: redis.Redis):
stream = "stream"

# Create stream with regular entry
await r.xadd(stream, {"foo": "bar"})
info = await r.xinfo_stream(stream)

# Verify new idempotent producer fields are present with default values
assert "idmp-duration" in info
assert "idmp-maxsize" in info
assert "pids-tracked" in info
assert "iids-tracked" in info
assert "iids-added" in info
assert "iids-duplicates" in info

# Default values (before any idempotent entries)
assert info["pids-tracked"] == 0
assert info["iids-tracked"] == 0
assert info["iids-added"] == 0
assert info["iids-duplicates"] == 0

# Add idempotent entry
await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
info = await r.xinfo_stream(stream)

# After adding idempotent entry
assert info["pids-tracked"] == 1 # One producer tracked
assert info["iids-tracked"] == 1 # One iid tracked
assert info["iids-added"] == 1 # One idempotent entry added
assert info["iids-duplicates"] == 0 # No duplicates yet

# Add duplicate entry
await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
info = await r.xinfo_stream(stream)

# After duplicate
assert info["pids-tracked"] == 1 # Still one producer
assert info["iids-tracked"] == 1 # Still one iid (duplicate doesn't add new)
assert info["iids-added"] == 1 # Still one unique entry
assert info["iids-duplicates"] == 1 # One duplicate detected

# Add entry from different producer
await r.xadd(stream, {"field2": "value2"}, idmpauto="producer2")
info = await r.xinfo_stream(stream)

# After second producer
assert info["pids-tracked"] == 2 # Two producers tracked
assert info["iids-tracked"] == 2 # Two iids tracked
assert info["iids-added"] == 2 # Two unique entries
assert info["iids-duplicates"] == 1 # Still one duplicate

@skip_if_server_version_lt("5.0.0")
async def test_xlen(self, r: redis.Redis):
stream = "stream"
Expand Down Expand Up @@ -4660,6 +4712,175 @@ async def test_xadd_with_options(self, r: redis.Redis):
with pytest.raises(redis.DataError):
await r.xadd(stream, {"foo": "bar"}, ref_policy="INVALID")

@skip_if_server_version_lt("8.5.0")
async def test_xadd_idmpauto(self, r: redis.Redis):
stream = "stream"

# XADD with IDMPAUTO - first write
message_id1 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")

# Test XADD with IDMPAUTO - duplicate write returns same ID
message_id2 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
assert message_id1 == message_id2

# Test XADD with IDMPAUTO - different content creates new entry
message_id3 = await r.xadd(stream, {"field1": "value2"}, idmpauto="producer1")
assert message_id3 != message_id1

# Test XADD with IDMPAUTO - different producer creates new entry
message_id4 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer2")
assert message_id4 != message_id1

# Verify stream has 3 entries (2 unique from producer1, 1 from producer2)
assert await r.xlen(stream) == 3

@skip_if_server_version_lt("8.5.0")
async def test_xadd_idmp(self, r: redis.Redis):
stream = "stream"

# Test XADD with IDMP - first write
message_id1 = await r.xadd(
stream, {"field1": "value1"}, idmp=("producer1", b"msg1")
)

# Test XADD with IDMP - duplicate write returns same ID
message_id2 = await r.xadd(
stream, {"field1": "value1"}, idmp=("producer1", b"msg1")
)
assert message_id1 == message_id2

# Test XADD with IDMP - different iid creates new entry
message_id3 = await r.xadd(
stream, {"field1": "value1"}, idmp=("producer1", b"msg2")
)
assert message_id3 != message_id1

# Test XADD with IDMP - different producer creates new entry
message_id4 = await r.xadd(
stream, {"field1": "value1"}, idmp=("producer2", b"msg1")
)
assert message_id4 != message_id1

# Test XADD with IDMP - shorter binary iid
await r.xadd(stream, {"field1": "value1"}, idmp=("producer1", b"\x01"))

# Verify stream has 4 entries
assert await r.xlen(stream) == 4

@skip_if_server_version_lt("8.5.0")
async def test_xadd_idmp_validation(self, r: redis.Redis):
stream = "stream"

# Test error: both idmpauto and idmp specified
with pytest.raises(redis.DataError):
await r.xadd(
stream,
{"foo": "bar"},
idmpauto="producer1",
idmp=("producer1", b"msg1"),
)

# Test error: idmpauto with explicit id
with pytest.raises(redis.DataError):
await r.xadd(
stream, {"foo": "bar"}, id="1234567890-0", idmpauto="producer1"
)

# Test error: idmp with explicit id
with pytest.raises(redis.DataError):
await r.xadd(
stream, {"foo": "bar"}, id="1234567890-0", idmp=("producer1", b"msg1")
)

# Test error: idmp not a tuple
with pytest.raises(redis.DataError):
await r.xadd(stream, {"foo": "bar"}, idmp="invalid")

# Test error: idmp tuple with wrong number of elements
with pytest.raises(redis.DataError):
await r.xadd(stream, {"foo": "bar"}, idmp=("producer1",))

# Test error: idmp tuple with wrong number of elements
with pytest.raises(redis.DataError):
await r.xadd(stream, {"foo": "bar"}, idmp=("producer1", b"msg1", "extra"))

@skip_if_server_version_lt("8.5.0")
async def test_xcfgset_idmp_duration(self, r: redis.Redis):
stream = "stream"

# Create stream first
await r.xadd(stream, {"foo": "bar"})

# Test XCFGSET with IDMP-DURATION only
assert await r.xcfgset(stream, idmp_duration=120) == b"OK"

# Test with minimum value
assert await r.xcfgset(stream, idmp_duration=1) == b"OK"

# Test with maximum value
assert await r.xcfgset(stream, idmp_duration=300) == b"OK"

@skip_if_server_version_lt("8.5.0")
async def test_xcfgset_idmp_maxsize(self, r: redis.Redis):
stream = "stream"

# Create stream first
await r.xadd(stream, {"foo": "bar"})

# Test XCFGSET with IDMP-MAXSIZE only
assert await r.xcfgset(stream, idmp_maxsize=5000) == b"OK"

# Test with minimum value
assert await r.xcfgset(stream, idmp_maxsize=1) == b"OK"

# Test with maximum value
assert await r.xcfgset(stream, idmp_maxsize=10000) == b"OK"

@skip_if_server_version_lt("8.5.0")
async def test_xcfgset_both_parameters(self, r: redis.Redis):
stream = "stream"

# Create stream first
await r.xadd(stream, {"foo": "bar"})

# Test XCFGSET with both IDMP-DURATION and IDMP-MAXSIZE
assert await r.xcfgset(stream, idmp_duration=120, idmp_maxsize=5000) == b"OK"

# Test with different values
assert await r.xcfgset(stream, idmp_duration=60, idmp_maxsize=10000) == b"OK"

@skip_if_server_version_lt("8.5.0")
async def test_xcfgset_validation(self, r: redis.Redis):
stream = "stream"

# Test error: no parameters provided
with pytest.raises(redis.DataError):
await r.xcfgset(stream)

# Test error: idmp_duration too small
with pytest.raises(redis.DataError):
await r.xcfgset(stream, idmp_duration=0)

# Test error: idmp_duration too large
with pytest.raises(redis.DataError):
await r.xcfgset(stream, idmp_duration=301)

# Test error: idmp_duration not an integer
with pytest.raises(redis.DataError):
await r.xcfgset(stream, idmp_duration="invalid")

# Test error: idmp_maxsize too small
with pytest.raises(redis.DataError):
await r.xcfgset(stream, idmp_maxsize=0)

# Test error: idmp_maxsize too large
with pytest.raises(redis.DataError):
await r.xcfgset(stream, idmp_maxsize=1000001)

# Test error: idmp_maxsize not an integer
with pytest.raises(redis.DataError):
await r.xcfgset(stream, idmp_maxsize="invalid")

@pytest.mark.onlynoncluster
async def test_bitfield_operations(self, r: redis.Redis):
# comments show affected bits
Expand Down
Loading