diff --git a/redis/cluster.py b/redis/cluster.py index 4491e29f5a..4b327ad79e 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -284,6 +284,7 @@ class RedisCluster(RedisClusterCommands): "READONLY", "READWRITE", "TIME", + "GRAPH.CONFIG", ], DEFAULT_NODE, ), @@ -810,6 +811,10 @@ def lock( thread_local=thread_local, ) + def set_response_callback(self, command, callback): + """Set a custom Response Callback""" + self.cluster_response_callbacks[command] = callback + def _determine_nodes(self, *args, **kwargs): command = args[0] nodes_flag = kwargs.pop("nodes_flag", None) @@ -1181,6 +1186,20 @@ def _process_result(self, command, res, **kwargs): else: return res + def load_external_module( + self, + funcname, + func, + ): + """ + This function can be used to add externally defined redis modules, + and their namespaces to the redis client. + + ``funcname`` - A string containing the name of the function to create + ``func`` - The function, being added to this class. + """ + setattr(self, funcname, func) + class ClusterNode: def __init__(self, host, port, server_type=None, redis_connection=None): @@ -2026,7 +2045,13 @@ def _send_cluster_commands( # turn the response back into a simple flat array that corresponds # to the sequence of commands issued in the stack in pipeline.execute() - response = [c.result for c in sorted(stack, key=lambda x: x.position)] + response = [] + for c in sorted(stack, key=lambda x: x.position): + if c.args[0] in self.cluster_response_callbacks: + c.result = self.cluster_response_callbacks[c.args[0]]( + c.result, **c.options + ) + response.append(c.result) if raise_on_error: self.raise_first_error(stack) @@ -2040,6 +2065,9 @@ def _fail_on_redirect(self, allow_redirections): "ASK & MOVED redirection not allowed in this pipeline" ) + def exists(self, *keys): + return self.execute_command("EXISTS", *keys) + def eval(self): """ """ raise RedisClusterException("method eval() is not implemented") diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 8bdcbbadf6..7342c0c482 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -9,6 +9,7 @@ ScriptCommands, ) from .helpers import list_or_args +from .redismodules import RedisModuleCommands class ClusterMultiKeyCommands: @@ -212,6 +213,7 @@ class RedisClusterCommands( PubSubCommands, ClusterDataAccessCommands, ScriptCommands, + RedisModuleCommands, ): """ A class for all Redis Cluster commands diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index 12c0648722..638e4eb166 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -103,16 +103,34 @@ def pipeline(self, transaction=True, shard_hint=None): pipe.jsonget('foo') pipe.jsonget('notakey') """ - p = Pipeline( - connection_pool=self.client.connection_pool, - response_callbacks=self.MODULE_CALLBACKS, - transaction=transaction, - shard_hint=shard_hint, - ) + if isinstance(self.client, redis.RedisCluster): + p = ClusterPipeline( + nodes_manager=self.client.nodes_manager, + commands_parser=self.client.commands_parser, + startup_nodes=self.client.nodes_manager.startup_nodes, + result_callbacks=self.client.result_callbacks, + cluster_response_callbacks=self.client.cluster_response_callbacks, + cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + read_from_replicas=self.client.read_from_replicas, + reinitialize_steps=self.client.reinitialize_steps, + ) + + else: + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) + p._encode = self._encode p._decode = self._decode return p +class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline): + """Cluster pipeline for the module.""" + + class Pipeline(JSONCommands, redis.client.Pipeline): """Pipeline for the module.""" diff --git a/redis/commands/parser.py b/redis/commands/parser.py index 2bb0576910..89292ab2d3 100644 --- a/redis/commands/parser.py +++ b/redis/commands/parser.py @@ -17,7 +17,14 @@ def __init__(self, redis_connection): self.initialize(redis_connection) def initialize(self, r): - self.commands = r.execute_command("COMMAND") + commands = r.execute_command("COMMAND") + uppercase_commands = [] + for cmd in commands: + if any(x.isupper() for x in cmd): + uppercase_commands.append(cmd) + for cmd in uppercase_commands: + commands[cmd.lower()] = commands.pop(cmd) + self.commands = commands # As soon as this PR is merged into Redis, we should reimplement # our logic to use COMMAND INFO changes to determine the key positions diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py index 5b1f15114d..4720a430f8 100644 --- a/redis/commands/timeseries/__init__.py +++ b/redis/commands/timeseries/__init__.py @@ -1,4 +1,4 @@ -import redis.client +import redis from ..helpers import parse_to_list from .commands import ( @@ -67,14 +67,31 @@ def pipeline(self, transaction=True, shard_hint=None): pipeline.execute() """ - p = Pipeline( - connection_pool=self.client.connection_pool, - response_callbacks=self.MODULE_CALLBACKS, - transaction=transaction, - shard_hint=shard_hint, - ) + if isinstance(self.client, redis.RedisCluster): + p = ClusterPipeline( + nodes_manager=self.client.nodes_manager, + commands_parser=self.client.commands_parser, + startup_nodes=self.client.nodes_manager.startup_nodes, + result_callbacks=self.client.result_callbacks, + cluster_response_callbacks=self.client.cluster_response_callbacks, + cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + read_from_replicas=self.client.read_from_replicas, + reinitialize_steps=self.client.reinitialize_steps, + ) + + else: + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) return p +class ClusterPipeline(TimeSeriesCommands, redis.cluster.ClusterPipeline): + """Cluster pipeline for the module.""" + + class Pipeline(TimeSeriesCommands, redis.client.Pipeline): """Pipeline for the module.""" diff --git a/tests/test_bloom.py b/tests/test_bloom.py index 8936584ea8..a3e9e158f4 100644 --- a/tests/test_bloom.py +++ b/tests/test_bloom.py @@ -191,6 +191,7 @@ def test_cms(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_cms_merge(client): assert client.cms().initbydim("A", 1000, 5) assert client.cms().initbydim("B", 1000, 5) diff --git a/tests/test_graph.py b/tests/test_graph.py index 8de63465a0..3a430ed4ab 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -342,6 +342,7 @@ def test_config(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_list_keys(client): result = client.graph().list_keys() assert result == [] diff --git a/tests/test_search.py b/tests/test_search.py index 5ee17a2c36..88d57a921d 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -21,6 +21,9 @@ from .conftest import default_redismod_url, skip_ifmodversion_lt +pytestmark = pytest.mark.onlynoncluster + + WILL_PLAY_TEXT = os.path.abspath( os.path.join(os.path.dirname(__file__), "testdata", "will_play_text.csv.bz2") ) diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index aee37aaa43..421c9d5c04 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -264,6 +264,7 @@ def test_rev_range(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def testMultiRange(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"}) @@ -293,6 +294,7 @@ def testMultiRange(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster @skip_ifmodversion_lt("99.99.99", "timeseries") def test_multi_range_advanced(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) @@ -349,6 +351,7 @@ def test_multi_range_advanced(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster @skip_ifmodversion_lt("99.99.99", "timeseries") def test_multi_reverse_range(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) @@ -442,6 +445,7 @@ def test_get(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_mget(client): client.ts().create(1, labels={"Test": "This"}) client.ts().create(2, labels={"Test": "This", "Taste": "That"}) @@ -483,6 +487,7 @@ def testInfoDuplicatePolicy(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_query_index(client): client.ts().create(1, labels={"Test": "This"}) client.ts().create(2, labels={"Test": "This", "Taste": "That"}) diff --git a/tox.ini b/tox.ini index 82e79d7611..d4f15acd50 100644 --- a/tox.ini +++ b/tox.ini @@ -286,7 +286,7 @@ setenv = commands = standalone: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' {posargs} standalone-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs} - cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} {posargs} + cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} --redismod-url={env:CLUSTER_URL:} {posargs} cluster-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs} [testenv:redis5]