From b0ada86f06fd62db626d3e0a69b7abb6bd58c4e4 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Wed, 2 Mar 2022 13:41:52 +0200 Subject: [PATCH 1/9] json-cluster --- redis/cluster.py | 15 ++++++++++++++- redis/commands/cluster.py | 2 ++ redis/commands/json/__init__.py | 30 ++++++++++++++++++++++++------ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 3b30a6e0a8..3308dd2b7f 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -809,6 +809,10 @@ def lock( thread_local=thread_local, ) + def set_response_callback(self, command, callback): + """Set a custom Response Callback""" + self.cluster_response_callbacks[command] = callback + def _determine_nodes(self, *args, **kwargs): command = args[0] nodes_flag = kwargs.pop("nodes_flag", None) @@ -2025,7 +2029,13 @@ def _send_cluster_commands( # turn the response back into a simple flat array that corresponds # to the sequence of commands issued in the stack in pipeline.execute() - response = [c.result for c in sorted(stack, key=lambda x: x.position)] + response = [] + for c in sorted(stack, key=lambda x: x.position): + if c.args[0] in self.cluster_response_callbacks: + c.result = self.cluster_response_callbacks[c.args[0]]( + c.result, **c.options + ) + response.append(c.result) if raise_on_error: self.raise_first_error(stack) @@ -2039,6 +2049,9 @@ def _fail_on_redirect(self, allow_redirections): "ASK & MOVED redirection not allowed in this pipeline" ) + def exists(self, *keys): + return self.execute_command("EXISTS", *keys) + def eval(self): """ """ raise RedisClusterException("method eval() is not implemented") diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 8bdcbbadf6..7342c0c482 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -9,6 +9,7 @@ ScriptCommands, ) from .helpers import list_or_args +from .redismodules import RedisModuleCommands class ClusterMultiKeyCommands: @@ -212,6 +213,7 @@ class RedisClusterCommands( PubSubCommands, ClusterDataAccessCommands, ScriptCommands, + RedisModuleCommands, ): """ A class for all Redis Cluster commands diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index 12c0648722..638e4eb166 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -103,16 +103,34 @@ def pipeline(self, transaction=True, shard_hint=None): pipe.jsonget('foo') pipe.jsonget('notakey') """ - p = Pipeline( - connection_pool=self.client.connection_pool, - response_callbacks=self.MODULE_CALLBACKS, - transaction=transaction, - shard_hint=shard_hint, - ) + if isinstance(self.client, redis.RedisCluster): + p = ClusterPipeline( + nodes_manager=self.client.nodes_manager, + commands_parser=self.client.commands_parser, + startup_nodes=self.client.nodes_manager.startup_nodes, + result_callbacks=self.client.result_callbacks, + cluster_response_callbacks=self.client.cluster_response_callbacks, + cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + read_from_replicas=self.client.read_from_replicas, + reinitialize_steps=self.client.reinitialize_steps, + ) + + else: + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) + p._encode = self._encode p._decode = self._decode return p +class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline): + """Cluster pipeline for the module.""" + + class Pipeline(JSONCommands, redis.client.Pipeline): """Pipeline for the module.""" From cc7cf96aa3eee7af6f85a91e0f776d1ad9f23090 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Wed, 2 Mar 2022 15:06:11 +0200 Subject: [PATCH 2/9] edit testenv --- tests/test_bloom.py | 1 + tests/test_graph.py | 1 + tests/test_search.py | 1 + tests/test_timeseries.py | 1 + tox.ini | 4 ++-- 5 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test_bloom.py b/tests/test_bloom.py index 8936584ea8..243a3ab307 100644 --- a/tests/test_bloom.py +++ b/tests/test_bloom.py @@ -8,6 +8,7 @@ def intlist(obj): return [int(v) for v in obj] +@pytest.mark.onlynoncluster @pytest.fixture def client(modclient): diff --git a/tests/test_graph.py b/tests/test_graph.py index 8de63465a0..240e4bb2a8 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -4,6 +4,7 @@ from redis.commands.graph.execution_plan import Operation from redis.exceptions import ResponseError +@pytest.mark.onlynoncluster @pytest.fixture def client(modclient): diff --git a/tests/test_search.py b/tests/test_search.py index 5ee17a2c36..20d8ce6228 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -29,6 +29,7 @@ os.path.join(os.path.dirname(__file__), "testdata", "titles.csv") ) +@pytest.mark.onlynoncluster def waitForIndex(env, idx, timeout=None): delay = 0.1 diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index aee37aaa43..ef04203f28 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -5,6 +5,7 @@ from .conftest import skip_ifmodversion_lt +@pytest.mark.onlynoncluster @pytest.fixture def client(modclient): diff --git a/tox.ini b/tox.ini index 82e79d7611..281bb974f4 100644 --- a/tox.ini +++ b/tox.ini @@ -282,11 +282,11 @@ extras = hiredis: hiredis ocsp: cryptography, pyopenssl, requests setenv = - CLUSTER_URL = "redis://localhost:16379/0" + CLUSTER_URL = "redis://localhost:46379/0" commands = standalone: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' {posargs} standalone-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs} - cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} {posargs} + cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster' --redis-url={env:CLUSTER_URL:} --redismod-url={env:CLUSTER_URL:} {posargs} cluster-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs} [testenv:redis5] From 1e063677296827f52170880594bec09fb5877c14 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Wed, 2 Mar 2022 15:16:25 +0200 Subject: [PATCH 3/9] linters --- tests/test_bloom.py | 4 +++- tests/test_graph.py | 4 +++- tests/test_search.py | 5 ++++- tests/test_timeseries.py | 4 +++- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/test_bloom.py b/tests/test_bloom.py index 243a3ab307..124cd20a77 100644 --- a/tests/test_bloom.py +++ b/tests/test_bloom.py @@ -5,10 +5,12 @@ from redis.utils import HIREDIS_AVAILABLE +pytestmark = pytest.mark.onlynoncluster + + def intlist(obj): return [int(v) for v in obj] -@pytest.mark.onlynoncluster @pytest.fixture def client(modclient): diff --git a/tests/test_graph.py b/tests/test_graph.py index 240e4bb2a8..ad7acca0d7 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -4,7 +4,9 @@ from redis.commands.graph.execution_plan import Operation from redis.exceptions import ResponseError -@pytest.mark.onlynoncluster + +pytestmark = pytest.mark.onlynoncluster + @pytest.fixture def client(modclient): diff --git a/tests/test_search.py b/tests/test_search.py index 20d8ce6228..aa71dd8cab 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -21,6 +21,10 @@ from .conftest import default_redismod_url, skip_ifmodversion_lt + +pytestmark = pytest.mark.onlynoncluster + + WILL_PLAY_TEXT = os.path.abspath( os.path.join(os.path.dirname(__file__), "testdata", "will_play_text.csv.bz2") ) @@ -29,7 +33,6 @@ os.path.join(os.path.dirname(__file__), "testdata", "titles.csv") ) -@pytest.mark.onlynoncluster def waitForIndex(env, idx, timeout=None): delay = 0.1 diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index ef04203f28..ae9d25fced 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -5,7 +5,9 @@ from .conftest import skip_ifmodversion_lt -@pytest.mark.onlynoncluster + +pytestmark = pytest.mark.onlynoncluster + @pytest.fixture def client(modclient): From 8475401be419834928ca2416da11d32aea8e9341 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Wed, 2 Mar 2022 15:17:32 +0200 Subject: [PATCH 4/9] linters --- tests/test_bloom.py | 1 - tests/test_graph.py | 1 - tests/test_search.py | 1 - tests/test_timeseries.py | 1 - 4 files changed, 4 deletions(-) diff --git a/tests/test_bloom.py b/tests/test_bloom.py index 124cd20a77..becb383a5e 100644 --- a/tests/test_bloom.py +++ b/tests/test_bloom.py @@ -4,7 +4,6 @@ from redis.exceptions import ModuleError, RedisError from redis.utils import HIREDIS_AVAILABLE - pytestmark = pytest.mark.onlynoncluster diff --git a/tests/test_graph.py b/tests/test_graph.py index ad7acca0d7..368c138e0e 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -4,7 +4,6 @@ from redis.commands.graph.execution_plan import Operation from redis.exceptions import ResponseError - pytestmark = pytest.mark.onlynoncluster diff --git a/tests/test_search.py b/tests/test_search.py index aa71dd8cab..88d57a921d 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -21,7 +21,6 @@ from .conftest import default_redismod_url, skip_ifmodversion_lt - pytestmark = pytest.mark.onlynoncluster diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index ae9d25fced..ac45dd41de 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -5,7 +5,6 @@ from .conftest import skip_ifmodversion_lt - pytestmark = pytest.mark.onlynoncluster From ead7e46f6da15e0f9ee595c2b45ef5e20bb785ef Mon Sep 17 00:00:00 2001 From: dvora-h Date: Fri, 4 Mar 2022 02:16:45 +0200 Subject: [PATCH 5/9] graph, timeseries, bloom --- redis/cluster.py | 1 + redis/commands/parser.py | 9 +++++++- redis/commands/timeseries/__init__.py | 31 +++++++++++++++++++++------ tests/test_bloom.py | 3 +-- tests/test_graph.py | 3 +-- tests/test_timeseries.py | 7 ++++-- 6 files changed, 40 insertions(+), 14 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index ab7bedd2e5..df52f1f1df 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -284,6 +284,7 @@ class RedisCluster(RedisClusterCommands): "READONLY", "READWRITE", "TIME", + "GRAPH.CONFIG", ], DEFAULT_NODE, ), diff --git a/redis/commands/parser.py b/redis/commands/parser.py index 2bb0576910..89292ab2d3 100644 --- a/redis/commands/parser.py +++ b/redis/commands/parser.py @@ -17,7 +17,14 @@ def __init__(self, redis_connection): self.initialize(redis_connection) def initialize(self, r): - self.commands = r.execute_command("COMMAND") + commands = r.execute_command("COMMAND") + uppercase_commands = [] + for cmd in commands: + if any(x.isupper() for x in cmd): + uppercase_commands.append(cmd) + for cmd in uppercase_commands: + commands[cmd.lower()] = commands.pop(cmd) + self.commands = commands # As soon as this PR is merged into Redis, we should reimplement # our logic to use COMMAND INFO changes to determine the key positions diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py index 5b1f15114d..4720a430f8 100644 --- a/redis/commands/timeseries/__init__.py +++ b/redis/commands/timeseries/__init__.py @@ -1,4 +1,4 @@ -import redis.client +import redis from ..helpers import parse_to_list from .commands import ( @@ -67,14 +67,31 @@ def pipeline(self, transaction=True, shard_hint=None): pipeline.execute() """ - p = Pipeline( - connection_pool=self.client.connection_pool, - response_callbacks=self.MODULE_CALLBACKS, - transaction=transaction, - shard_hint=shard_hint, - ) + if isinstance(self.client, redis.RedisCluster): + p = ClusterPipeline( + nodes_manager=self.client.nodes_manager, + commands_parser=self.client.commands_parser, + startup_nodes=self.client.nodes_manager.startup_nodes, + result_callbacks=self.client.result_callbacks, + cluster_response_callbacks=self.client.cluster_response_callbacks, + cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + read_from_replicas=self.client.read_from_replicas, + reinitialize_steps=self.client.reinitialize_steps, + ) + + else: + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) return p +class ClusterPipeline(TimeSeriesCommands, redis.cluster.ClusterPipeline): + """Cluster pipeline for the module.""" + + class Pipeline(TimeSeriesCommands, redis.client.Pipeline): """Pipeline for the module.""" diff --git a/tests/test_bloom.py b/tests/test_bloom.py index becb383a5e..a3e9e158f4 100644 --- a/tests/test_bloom.py +++ b/tests/test_bloom.py @@ -4,8 +4,6 @@ from redis.exceptions import ModuleError, RedisError from redis.utils import HIREDIS_AVAILABLE -pytestmark = pytest.mark.onlynoncluster - def intlist(obj): return [int(v) for v in obj] @@ -193,6 +191,7 @@ def test_cms(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_cms_merge(client): assert client.cms().initbydim("A", 1000, 5) assert client.cms().initbydim("B", 1000, 5) diff --git a/tests/test_graph.py b/tests/test_graph.py index 368c138e0e..3a430ed4ab 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -4,8 +4,6 @@ from redis.commands.graph.execution_plan import Operation from redis.exceptions import ResponseError -pytestmark = pytest.mark.onlynoncluster - @pytest.fixture def client(modclient): @@ -344,6 +342,7 @@ def test_config(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_list_keys(client): result = client.graph().list_keys() assert result == [] diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index ac45dd41de..421c9d5c04 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -5,8 +5,6 @@ from .conftest import skip_ifmodversion_lt -pytestmark = pytest.mark.onlynoncluster - @pytest.fixture def client(modclient): @@ -266,6 +264,7 @@ def test_rev_range(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def testMultiRange(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"}) @@ -295,6 +294,7 @@ def testMultiRange(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster @skip_ifmodversion_lt("99.99.99", "timeseries") def test_multi_range_advanced(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) @@ -351,6 +351,7 @@ def test_multi_range_advanced(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster @skip_ifmodversion_lt("99.99.99", "timeseries") def test_multi_reverse_range(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) @@ -444,6 +445,7 @@ def test_get(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_mget(client): client.ts().create(1, labels={"Test": "This"}) client.ts().create(2, labels={"Test": "This", "Taste": "That"}) @@ -485,6 +487,7 @@ def testInfoDuplicatePolicy(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_query_index(client): client.ts().create(1, labels={"Test": "This"}) client.ts().create(2, labels={"Test": "This", "Taste": "That"}) From 35102b9a9eace1fcf28207214c38989aa5e0baf8 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Fri, 4 Mar 2022 02:35:20 +0200 Subject: [PATCH 6/9] load_external_module --- redis/cluster.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/redis/cluster.py b/redis/cluster.py index df52f1f1df..ef78dccbfe 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1185,6 +1185,20 @@ def _process_result(self, command, res, **kwargs): return list(res.values())[0] else: return res + + def load_external_module( + self, + funcname, + func, + ): + """ + This function can be used to add externally defined redis modules, + and their namespaces to the redis client. + + ``funcname`` - A string containing the name of the function to create + ``func`` - The function, being added to this class. + """ + setattr(self, funcname, func) class ClusterNode: From fb457c6da1c9d90230b3b27969ef69ca3c691bae Mon Sep 17 00:00:00 2001 From: dvora-h Date: Fri, 4 Mar 2022 02:42:31 +0200 Subject: [PATCH 7/9] linters --- redis/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/cluster.py b/redis/cluster.py index ef78dccbfe..4b327ad79e 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1185,7 +1185,7 @@ def _process_result(self, command, res, **kwargs): return list(res.values())[0] else: return res - + def load_external_module( self, funcname, From 301b6ef6afa78d9e85eb20f800489cb5df1c67e3 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Sun, 6 Mar 2022 12:05:56 +0200 Subject: [PATCH 8/9] skip redismod --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 281bb974f4..c0a4b5bd88 100644 --- a/tox.ini +++ b/tox.ini @@ -286,7 +286,7 @@ setenv = commands = standalone: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' {posargs} standalone-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs} - cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster' --redis-url={env:CLUSTER_URL:} --redismod-url={env:CLUSTER_URL:} {posargs} + cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} --redismod-url={env:CLUSTER_URL:} {posargs} cluster-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs} [testenv:redis5] From 6fcfbaf87d0ef50c1b1b04d584522a97333b4d57 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Sun, 6 Mar 2022 12:08:58 +0200 Subject: [PATCH 9/9] cluster url --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index c0a4b5bd88..d4f15acd50 100644 --- a/tox.ini +++ b/tox.ini @@ -282,7 +282,7 @@ extras = hiredis: hiredis ocsp: cryptography, pyopenssl, requests setenv = - CLUSTER_URL = "redis://localhost:46379/0" + CLUSTER_URL = "redis://localhost:16379/0" commands = standalone: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' {posargs} standalone-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs}