Skip to content
1 change: 1 addition & 0 deletions redis/_parsers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ def string_keys_to_dict(key_string, callback):
"FUNCTION RESTORE": bool_ok,
"GEODIST": float_or_none,
"HSCAN": parse_hscan,
"HOTKEYS GET": lambda r: pairs_to_dict(r, decode_keys=True),
"INFO": parse_info,
"LASTSAVE": timestamp_to_datetime,
"MEMORY PURGE": bool_ok,
Expand Down
4 changes: 4 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,10 @@ def determine_slot(self, *args) -> Optional[int]:
if len(eval_keys) == 0:
return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
keys = eval_keys
elif command.upper().startswith("HOTKEYS"):
# HOTKEYS commands don't have keys
# so we can just return a random slot
return None
else:
keys = self._get_command_keys(*args)
if keys is None or len(keys) == 0:
Expand Down
84 changes: 84 additions & 0 deletions redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ def acl_whoami(self, **kwargs) -> ResponseT:
AsyncACLCommands = ACLCommands


class HotkeysMetricsTypes(Enum):
CPU = "CPU"
NET = "NET"


class ManagementCommands(CommandsProtocol):
"""
Redis management commands
Expand Down Expand Up @@ -1406,6 +1411,85 @@ def failover(self):
"FAILOVER is intentionally not implemented in the client."
)

def hotkeys_start(
self,
count: Optional[int] = None,
metrics: Optional[List[HotkeysMetricsTypes]] = None,
duration: Optional[int] = None,
sample_ratio: Optional[int] = None,
slots: Optional[List[int]] = None,
**kwargs,
) -> ResponseT:
"""
Start collecting hotkeys data.
Returns an error if there is an ongoing collection session.

Args:
count: The number of keys to collect in each criteria (CPU and network consumption)
metrics: List of metrics to track. Supported values: [HotkeysMetricsTypes.CPU, HotkeysMetricsTypes.NET]
duration: Automatically stop the collection after `duration` seconds
sample_ratio: Commands are sampled with probability 1/ratio (1 means no sampling)
slots: Only track keys on the specified hash slots

For more information, see https://redis.io/commands/hotkeys-start
"""
args: List[Union[str, int]] = ["HOTKEYS", "START"]

# Add METRICS
if metrics:
args.append("METRICS")
args.append(len(metrics))
args.extend([str(m.value) for m in metrics])

# Add COUNT
if count is not None:
args.extend(["COUNT", count])

# Add optional DURATION
if duration is not None:
args.extend(["DURATION", duration])

# Add optional SAMPLE ratio
if sample_ratio is not None:
args.extend(["SAMPLE", sample_ratio])

# Add optional SLOTS
if slots is not None:
args.append("SLOTS")
args.append(len(slots))
args.extend(slots)

return self.execute_command(*args, **kwargs)

def hotkeys_stop(self, **kwargs) -> ResponseT:
"""
Stop the ongoing hotkeys collection session (if any).
The results of the last collection session are kept for consumption with HOTKEYS GET.

For more information, see https://redis.io/commands/hotkeys-stop
"""
return self.execute_command("HOTKEYS STOP", **kwargs)

def hotkeys_reset(self, **kwargs) -> ResponseT:
"""
Discard the last hotkeys collection session results (in order to save memory).
Error if there is an ongoing collection session.

For more information, see https://redis.io/commands/hotkeys-reset
"""
return self.execute_command("HOTKEYS RESET", **kwargs)

def hotkeys_get(self, **kwargs) -> ResponseT:
"""
Retrieve the result of the ongoing collection session (if any),
or the last collection session (if any).

Returns a dictionary with the returned fields detailed in the Redis documentation.

For more information, see https://redis.io/commands/hotkeys-get
"""
return self.execute_command("HOTKEYS GET", **kwargs)


class AsyncManagementCommands(ManagementCommands):
async def command_info(self, **kwargs) -> None:
Expand Down
37 changes: 37 additions & 0 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
LoadBalancingStrategy,
get_node_name,
)
from redis.commands.core import HotkeysMetricsTypes
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
AskError,
Expand Down Expand Up @@ -2465,6 +2466,42 @@ async def test_acl_log(

await user_client.aclose()

@skip_if_server_version_lt("8.5.240")
async def test_hotkeys_cluster(self, r: RedisCluster) -> None:
"""Test all HOTKEYS commands in cluster mode targeting a specific node"""
# Get a primary node to target
node = r.get_primaries()[0]

# Clean up any existing session
try:
await r.hotkeys_stop(target_nodes=node)
except Exception:
pass

# Test HOTKEYS START
result = await r.hotkeys_start(
count=10, metrics=[HotkeysMetricsTypes.CPU], target_nodes=node
)
assert result == b"OK"

# Test HOTKEYS GET during ongoing session
result = await r.hotkeys_get(target_nodes=node)
assert isinstance(result, dict)
assert result["tracking-active"] == 1

# Test HOTKEYS STOP
result = await r.hotkeys_stop(target_nodes=node)
assert result == b"OK"

# Test HOTKEYS GET after stopping
result = await r.hotkeys_get(target_nodes=node)
assert isinstance(result, dict)
assert result["tracking-active"] == 0

# Test HOTKEYS RESET
result = await r.hotkeys_reset(target_nodes=node)
assert result == b"OK"


class TestNodesManager:
"""
Expand Down
Loading
Loading