Skip to content

Commit 2b2f2cf

Browse files
Adds support for the new Idempotent Producers feature in Redis 8.6 (#3926)
* Adds `IDMPAUTO` and `IDMP` Signed-off-by: Elena Kolevska <elena@kolevska.com> * Adds xcfgset Signed-off-by: Elena Kolevska <elena@kolevska.com> * Adds tests for extended response in the XINFO command Signed-off-by: Elena Kolevska <elena@kolevska.com> * Fixes linter Signed-off-by: Elena Kolevska <elena@kolevska.com> Small cleanups after Copilot review Signed-off-by: Elena Kolevska <elena@kolevska.com> One more linter fix Signed-off-by: Elena Kolevska <elena@kolevska.com> Reformat Signed-off-by: Elena Kolevska <elena@kolevska.com> * Temporarily update the version, so that the tests can run with the RC image Signed-off-by: Elena Kolevska <elena@kolevska.com> * Fixing tests --------- Signed-off-by: Elena Kolevska <elena@kolevska.com> Co-authored-by: petyaslavova <petya.slavova@redis.com>
1 parent 90e69a8 commit 2b2f2cf

3 files changed

Lines changed: 527 additions & 4 deletions

File tree

redis/commands/core.py

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3777,6 +3777,8 @@ def xadd(
37773777
minid: Union[StreamIdT, None] = None,
37783778
limit: Optional[int] = None,
37793779
ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None,
3780+
idmpauto: Optional[str] = None,
3781+
idmp: Optional[tuple[str, bytes]] = None,
37803782
) -> ResponseT:
37813783
"""
37823784
Add to a stream.
@@ -3794,16 +3796,45 @@ def xadd(
37943796
- KEEPREF (default): When trimming, preserves references in consumer groups' PEL
37953797
- DELREF: When trimming, removes all references from consumer groups' PEL
37963798
- ACKED: When trimming, only removes entries acknowledged by all consumer groups
3799+
idmpauto: Producer ID for automatic idempotent ID calculation.
3800+
Automatically calculates an idempotent ID based on entry content to prevent
3801+
duplicate entries. Can only be used with id='*'. Creates an IDMP map if it
3802+
doesn't exist yet. The producer ID must be unique per producer and consistent
3803+
across restarts.
3804+
idmp: Tuple of (producer_id, idempotent_id) for explicit idempotent ID.
3805+
Uses a specific idempotent ID to prevent duplicate entries. Can only be used
3806+
with id='*'. The producer ID must be unique per producer and consistent across
3807+
restarts. The idempotent ID must be unique per message and per producer.
3808+
Shorter idempotent IDs require less memory and allow faster processing.
3809+
Creates an IDMP map if it doesn't exist yet.
37973810
37983811
For more information, see https://redis.io/commands/xadd
37993812
"""
38003813
pieces: list[EncodableT] = []
38013814
if maxlen is not None and minid is not None:
38023815
raise DataError("Only one of ```maxlen``` or ```minid``` may be specified")
38033816

3817+
if idmpauto is not None and idmp is not None:
3818+
raise DataError("Only one of ```idmpauto``` or ```idmp``` may be specified")
3819+
3820+
if (idmpauto is not None or idmp is not None) and id != "*":
3821+
raise DataError("IDMPAUTO and IDMP can only be used with id='*'")
3822+
38043823
if ref_policy is not None and ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
38053824
raise DataError("XADD ref_policy must be one of: KEEPREF, DELREF, ACKED")
38063825

3826+
if nomkstream:
3827+
pieces.append(b"NOMKSTREAM")
3828+
if ref_policy is not None:
3829+
pieces.append(ref_policy)
3830+
if idmpauto is not None:
3831+
pieces.extend([b"IDMPAUTO", idmpauto])
3832+
if idmp is not None:
3833+
if not isinstance(idmp, tuple) or len(idmp) != 2:
3834+
raise DataError(
3835+
"XADD idmp must be a tuple of (producer_id, idempotent_id)"
3836+
)
3837+
pieces.extend([b"IDMP", idmp[0], idmp[1]])
38073838
if maxlen is not None:
38083839
if not isinstance(maxlen, int) or maxlen < 0:
38093840
raise DataError("XADD maxlen must be non-negative integer")
@@ -3818,17 +3849,77 @@ def xadd(
38183849
pieces.append(minid)
38193850
if limit is not None:
38203851
pieces.extend([b"LIMIT", limit])
3821-
if nomkstream:
3822-
pieces.append(b"NOMKSTREAM")
3823-
if ref_policy is not None:
3824-
pieces.append(ref_policy)
38253852
pieces.append(id)
38263853
if not isinstance(fields, dict) or len(fields) == 0:
38273854
raise DataError("XADD fields must be a non-empty dict")
38283855
for pair in fields.items():
38293856
pieces.extend(pair)
38303857
return self.execute_command("XADD", name, *pieces)
38313858

3859+
def xcfgset(
3860+
self,
3861+
name: KeyT,
3862+
idmp_duration: Optional[int] = None,
3863+
idmp_maxsize: Optional[int] = None,
3864+
) -> ResponseT:
3865+
"""
3866+
Configure the idempotency parameters for a stream's IDMP map.
3867+
3868+
Sets how long Redis remembers each idempotent ID (iid) and the maximum
3869+
number of iids to track. This command clears the existing IDMP map
3870+
(Redis forgets all previously stored iids), but only if the configuration
3871+
value actually changes.
3872+
3873+
Args:
3874+
name: The name of the stream.
3875+
idmp_duration: How long Redis remembers each iid in seconds.
3876+
Default: 100 seconds (or value set by stream-idmp-duration config).
3877+
Minimum: 1 second, Maximum: 300 seconds.
3878+
Redis won't forget an iid for this duration (unless maxsize is reached).
3879+
Should accommodate application crash recovery time.
3880+
idmp_maxsize: Maximum number of iids Redis remembers per producer ID (pid).
3881+
Default: 100 iids (or value set by stream-idmp-maxsize config).
3882+
Minimum: 1 iid, Maximum: 1,000,000 (1M) iids.
3883+
Should be set to: mark-delay [in msec] × (messages/msec) + margin.
3884+
Example: 10K msgs/sec (10 msgs/msec), 80 msec mark-delay
3885+
→ maxsize = 10 × 80 + margin = 1000 iids.
3886+
3887+
Returns:
3888+
OK on success.
3889+
3890+
For more information, see https://redis.io/commands/xcfgset
3891+
"""
3892+
if idmp_duration is None and idmp_maxsize is None:
3893+
raise DataError(
3894+
"XCFGSET requires at least one of idmp_duration or idmp_maxsize"
3895+
)
3896+
3897+
pieces: list[EncodableT] = []
3898+
3899+
if idmp_duration is not None:
3900+
if (
3901+
not isinstance(idmp_duration, int)
3902+
or idmp_duration < 1
3903+
or idmp_duration > 300
3904+
):
3905+
raise DataError(
3906+
"XCFGSET idmp_duration must be an integer between 1 and 300"
3907+
)
3908+
pieces.extend([b"IDMP-DURATION", idmp_duration])
3909+
3910+
if idmp_maxsize is not None:
3911+
if (
3912+
not isinstance(idmp_maxsize, int)
3913+
or idmp_maxsize < 1
3914+
or idmp_maxsize > 1000000
3915+
):
3916+
raise DataError(
3917+
"XCFGSET idmp_maxsize must be an integer between 1 and 1,000,000"
3918+
)
3919+
pieces.extend([b"IDMP-MAXSIZE", idmp_maxsize])
3920+
3921+
return self.execute_command("XCFGSET", name, *pieces)
3922+
38323923
def xautoclaim(
38333924
self,
38343925
name: KeyT,

tests/test_asyncio/test_commands.py

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3932,6 +3932,58 @@ async def test_xinfo_stream_full(self, r: redis.Redis):
39323932
consumer = info["groups"][0]["consumers"][0]
39333933
assert isinstance(consumer, dict)
39343934

3935+
@skip_if_server_version_lt("8.5.0")
3936+
async def test_xinfo_stream_idempotent_fields(self, r: redis.Redis):
3937+
stream = "stream"
3938+
3939+
# Create stream with regular entry
3940+
await r.xadd(stream, {"foo": "bar"})
3941+
info = await r.xinfo_stream(stream)
3942+
3943+
# Verify new idempotent producer fields are present with default values
3944+
assert "idmp-duration" in info
3945+
assert "idmp-maxsize" in info
3946+
assert "pids-tracked" in info
3947+
assert "iids-tracked" in info
3948+
assert "iids-added" in info
3949+
assert "iids-duplicates" in info
3950+
3951+
# Default values (before any idempotent entries)
3952+
assert info["pids-tracked"] == 0
3953+
assert info["iids-tracked"] == 0
3954+
assert info["iids-added"] == 0
3955+
assert info["iids-duplicates"] == 0
3956+
3957+
# Add idempotent entry
3958+
await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
3959+
info = await r.xinfo_stream(stream)
3960+
3961+
# After adding idempotent entry
3962+
assert info["pids-tracked"] == 1 # One producer tracked
3963+
assert info["iids-tracked"] == 1 # One iid tracked
3964+
assert info["iids-added"] == 1 # One idempotent entry added
3965+
assert info["iids-duplicates"] == 0 # No duplicates yet
3966+
3967+
# Add duplicate entry
3968+
await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
3969+
info = await r.xinfo_stream(stream)
3970+
3971+
# After duplicate
3972+
assert info["pids-tracked"] == 1 # Still one producer
3973+
assert info["iids-tracked"] == 1 # Still one iid (duplicate doesn't add new)
3974+
assert info["iids-added"] == 1 # Still one unique entry
3975+
assert info["iids-duplicates"] == 1 # One duplicate detected
3976+
3977+
# Add entry from different producer
3978+
await r.xadd(stream, {"field2": "value2"}, idmpauto="producer2")
3979+
info = await r.xinfo_stream(stream)
3980+
3981+
# After second producer
3982+
assert info["pids-tracked"] == 2 # Two producers tracked
3983+
assert info["iids-tracked"] == 2 # Two iids tracked
3984+
assert info["iids-added"] == 2 # Two unique entries
3985+
assert info["iids-duplicates"] == 1 # Still one duplicate
3986+
39353987
@skip_if_server_version_lt("5.0.0")
39363988
async def test_xlen(self, r: redis.Redis):
39373989
stream = "stream"
@@ -4660,6 +4712,175 @@ async def test_xadd_with_options(self, r: redis.Redis):
46604712
with pytest.raises(redis.DataError):
46614713
await r.xadd(stream, {"foo": "bar"}, ref_policy="INVALID")
46624714

4715+
@skip_if_server_version_lt("8.5.0")
4716+
async def test_xadd_idmpauto(self, r: redis.Redis):
4717+
stream = "stream"
4718+
4719+
# XADD with IDMPAUTO - first write
4720+
message_id1 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
4721+
4722+
# Test XADD with IDMPAUTO - duplicate write returns same ID
4723+
message_id2 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
4724+
assert message_id1 == message_id2
4725+
4726+
# Test XADD with IDMPAUTO - different content creates new entry
4727+
message_id3 = await r.xadd(stream, {"field1": "value2"}, idmpauto="producer1")
4728+
assert message_id3 != message_id1
4729+
4730+
# Test XADD with IDMPAUTO - different producer creates new entry
4731+
message_id4 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer2")
4732+
assert message_id4 != message_id1
4733+
4734+
# Verify stream has 3 entries (2 unique from producer1, 1 from producer2)
4735+
assert await r.xlen(stream) == 3
4736+
4737+
@skip_if_server_version_lt("8.5.0")
4738+
async def test_xadd_idmp(self, r: redis.Redis):
4739+
stream = "stream"
4740+
4741+
# Test XADD with IDMP - first write
4742+
message_id1 = await r.xadd(
4743+
stream, {"field1": "value1"}, idmp=("producer1", b"msg1")
4744+
)
4745+
4746+
# Test XADD with IDMP - duplicate write returns same ID
4747+
message_id2 = await r.xadd(
4748+
stream, {"field1": "value1"}, idmp=("producer1", b"msg1")
4749+
)
4750+
assert message_id1 == message_id2
4751+
4752+
# Test XADD with IDMP - different iid creates new entry
4753+
message_id3 = await r.xadd(
4754+
stream, {"field1": "value1"}, idmp=("producer1", b"msg2")
4755+
)
4756+
assert message_id3 != message_id1
4757+
4758+
# Test XADD with IDMP - different producer creates new entry
4759+
message_id4 = await r.xadd(
4760+
stream, {"field1": "value1"}, idmp=("producer2", b"msg1")
4761+
)
4762+
assert message_id4 != message_id1
4763+
4764+
# Test XADD with IDMP - shorter binary iid
4765+
await r.xadd(stream, {"field1": "value1"}, idmp=("producer1", b"\x01"))
4766+
4767+
# Verify stream has 4 entries
4768+
assert await r.xlen(stream) == 4
4769+
4770+
@skip_if_server_version_lt("8.5.0")
4771+
async def test_xadd_idmp_validation(self, r: redis.Redis):
4772+
stream = "stream"
4773+
4774+
# Test error: both idmpauto and idmp specified
4775+
with pytest.raises(redis.DataError):
4776+
await r.xadd(
4777+
stream,
4778+
{"foo": "bar"},
4779+
idmpauto="producer1",
4780+
idmp=("producer1", b"msg1"),
4781+
)
4782+
4783+
# Test error: idmpauto with explicit id
4784+
with pytest.raises(redis.DataError):
4785+
await r.xadd(
4786+
stream, {"foo": "bar"}, id="1234567890-0", idmpauto="producer1"
4787+
)
4788+
4789+
# Test error: idmp with explicit id
4790+
with pytest.raises(redis.DataError):
4791+
await r.xadd(
4792+
stream, {"foo": "bar"}, id="1234567890-0", idmp=("producer1", b"msg1")
4793+
)
4794+
4795+
# Test error: idmp not a tuple
4796+
with pytest.raises(redis.DataError):
4797+
await r.xadd(stream, {"foo": "bar"}, idmp="invalid")
4798+
4799+
# Test error: idmp tuple with wrong number of elements
4800+
with pytest.raises(redis.DataError):
4801+
await r.xadd(stream, {"foo": "bar"}, idmp=("producer1",))
4802+
4803+
# Test error: idmp tuple with wrong number of elements
4804+
with pytest.raises(redis.DataError):
4805+
await r.xadd(stream, {"foo": "bar"}, idmp=("producer1", b"msg1", "extra"))
4806+
4807+
@skip_if_server_version_lt("8.5.0")
4808+
async def test_xcfgset_idmp_duration(self, r: redis.Redis):
4809+
stream = "stream"
4810+
4811+
# Create stream first
4812+
await r.xadd(stream, {"foo": "bar"})
4813+
4814+
# Test XCFGSET with IDMP-DURATION only
4815+
assert await r.xcfgset(stream, idmp_duration=120) == b"OK"
4816+
4817+
# Test with minimum value
4818+
assert await r.xcfgset(stream, idmp_duration=1) == b"OK"
4819+
4820+
# Test with maximum value
4821+
assert await r.xcfgset(stream, idmp_duration=300) == b"OK"
4822+
4823+
@skip_if_server_version_lt("8.5.0")
4824+
async def test_xcfgset_idmp_maxsize(self, r: redis.Redis):
4825+
stream = "stream"
4826+
4827+
# Create stream first
4828+
await r.xadd(stream, {"foo": "bar"})
4829+
4830+
# Test XCFGSET with IDMP-MAXSIZE only
4831+
assert await r.xcfgset(stream, idmp_maxsize=5000) == b"OK"
4832+
4833+
# Test with minimum value
4834+
assert await r.xcfgset(stream, idmp_maxsize=1) == b"OK"
4835+
4836+
# Test with maximum value
4837+
assert await r.xcfgset(stream, idmp_maxsize=10000) == b"OK"
4838+
4839+
@skip_if_server_version_lt("8.5.0")
4840+
async def test_xcfgset_both_parameters(self, r: redis.Redis):
4841+
stream = "stream"
4842+
4843+
# Create stream first
4844+
await r.xadd(stream, {"foo": "bar"})
4845+
4846+
# Test XCFGSET with both IDMP-DURATION and IDMP-MAXSIZE
4847+
assert await r.xcfgset(stream, idmp_duration=120, idmp_maxsize=5000) == b"OK"
4848+
4849+
# Test with different values
4850+
assert await r.xcfgset(stream, idmp_duration=60, idmp_maxsize=10000) == b"OK"
4851+
4852+
@skip_if_server_version_lt("8.5.0")
4853+
async def test_xcfgset_validation(self, r: redis.Redis):
4854+
stream = "stream"
4855+
4856+
# Test error: no parameters provided
4857+
with pytest.raises(redis.DataError):
4858+
await r.xcfgset(stream)
4859+
4860+
# Test error: idmp_duration too small
4861+
with pytest.raises(redis.DataError):
4862+
await r.xcfgset(stream, idmp_duration=0)
4863+
4864+
# Test error: idmp_duration too large
4865+
with pytest.raises(redis.DataError):
4866+
await r.xcfgset(stream, idmp_duration=301)
4867+
4868+
# Test error: idmp_duration not an integer
4869+
with pytest.raises(redis.DataError):
4870+
await r.xcfgset(stream, idmp_duration="invalid")
4871+
4872+
# Test error: idmp_maxsize too small
4873+
with pytest.raises(redis.DataError):
4874+
await r.xcfgset(stream, idmp_maxsize=0)
4875+
4876+
# Test error: idmp_maxsize too large
4877+
with pytest.raises(redis.DataError):
4878+
await r.xcfgset(stream, idmp_maxsize=1000001)
4879+
4880+
# Test error: idmp_maxsize not an integer
4881+
with pytest.raises(redis.DataError):
4882+
await r.xcfgset(stream, idmp_maxsize="invalid")
4883+
46634884
@pytest.mark.onlynoncluster
46644885
async def test_bitfield_operations(self, r: redis.Redis):
46654886
# comments show affected bits

0 commit comments

Comments
 (0)