diff --git a/pom.xml b/pom.xml index 51fbec2551..bc5b9acefd 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 2.4.0-SNAPSHOT + 2.4.0-DATAREDIS-1150-SNAPSHOT Spring Data Redis diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveClusterCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveClusterCommands.java new file mode 100644 index 0000000000..10a5ee4b4d --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveClusterCommands.java @@ -0,0 +1,189 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; + +import org.springframework.data.redis.connection.RedisClusterNode.SlotRange; + +/** + * Interface for the {@literal cluster} commands supported by Redis executed using reactive infrastructure.. A + * {@link RedisClusterNode} can be obtained from {@link #clusterGetNodes()} or it can be constructed using either + * {@link RedisClusterNode#getHost() host} and {@link RedisClusterNode#getPort()} or the {@link RedisClusterNode#getId() + * node Id}. + * + * @author Mark Paluch + * @since 2.3.1 + */ +public interface ReactiveClusterCommands { + + /** + * Retrieve cluster node information such as {@literal id}, {@literal host}, {@literal port} and {@literal slots}. + * + * @return never {@literal null}. + * @see Redis Documentation: CLUSTER NODES + */ + Flux clusterGetNodes(); + + /** + * Retrieve information about connected slaves for given master node. + * + * @param master must not be {@literal null}. + * @return never {@literal null}. + * @see Redis Documentation: CLUSTER SLAVES + */ + Flux clusterGetSlaves(RedisClusterNode master); + + /** + * Retrieve information about masters and their connected slaves. + * + * @return never {@literal null}. + * @see Redis Documentation: CLUSTER SLAVES + */ + Mono>> clusterGetMasterSlaveMap(); + + /** + * Find the slot for a given {@code key}. + * + * @param key must not be {@literal null}. + * @return + * @see Redis Documentation: CLUSTER KEYSLOT + */ + Mono clusterGetSlotForKey(ByteBuffer key); + + /** + * Find the {@link RedisClusterNode} serving given {@literal slot}. + * + * @param slot + * @return + */ + Mono clusterGetNodeForSlot(int slot); + + /** + * Find the {@link RedisClusterNode} serving given {@literal key}. + * + * @param key must not be {@literal null}. + * @return + */ + Mono clusterGetNodeForKey(ByteBuffer key); + + /** + * Get cluster information. + * + * @return + * @see Redis Documentation: CLUSTER INFO + */ + Mono clusterGetClusterInfo(); + + /** + * Assign slots to given {@link RedisClusterNode}. + * + * @param node must not be {@literal null}. + * @param slots + * @see Redis Documentation: CLUSTER ADDSLOTS + */ + Mono clusterAddSlots(RedisClusterNode node, int... slots); + + /** + * Assign {@link SlotRange#getSlotsArray()} to given {@link RedisClusterNode}. + * + * @param node must not be {@literal null}. + * @param range must not be {@literal null}. + * @see Redis Documentation: CLUSTER ADDSLOTS + */ + Mono clusterAddSlots(RedisClusterNode node, SlotRange range); + + /** + * Count the number of keys assigned to one {@literal slot}. + * + * @param slot + * @return + * @see Redis Documentation: CLUSTER COUNTKEYSINSLOT + */ + Mono clusterCountKeysInSlot(int slot); + + /** + * Remove slots from {@link RedisClusterNode}. + * + * @param node must not be {@literal null}. + * @param slots + * @see Redis Documentation: CLUSTER DELSLOTS + */ + Mono clusterDeleteSlots(RedisClusterNode node, int... slots); + + /** + * Removes {@link SlotRange#getSlotsArray()} from given {@link RedisClusterNode}. + * + * @param node must not be {@literal null}. + * @param range must not be {@literal null}. + * @see Redis Documentation: CLUSTER DELSLOTS + */ + Mono clusterDeleteSlotsInRange(RedisClusterNode node, SlotRange range); + + /** + * Remove given {@literal node} from cluster. + * + * @param node must not be {@literal null}. + * @see Redis Documentation: CLUSTER FORGET + */ + Mono clusterForget(RedisClusterNode node); + + /** + * Add given {@literal node} to cluster. + * + * @param node must contain {@link RedisClusterNode#getHost() host} and {@link RedisClusterNode#getPort()} and must + * not be {@literal null}. + * @see Redis Documentation: CLUSTER MEET + */ + Mono clusterMeet(RedisClusterNode node); + + /** + * @param node must not be {@literal null}. + * @param slot + * @param mode must not be{@literal null}. + * @see Redis Documentation: CLUSTER SETSLOT + */ + Mono clusterSetSlot(RedisClusterNode node, int slot, AddSlots mode); + + /** + * Get {@literal keys} served by slot. + * + * @param slot + * @param count must not be {@literal null}. + * @return + * @see Redis Documentation: CLUSTER GETKEYSINSLOT + */ + Flux clusterGetKeysInSlot(int slot, int count); + + /** + * Assign a {@literal slave} to given {@literal master}. + * + * @param master must not be {@literal null}. + * @param replica must not be {@literal null}. + * @see Redis Documentation: CLUSTER REPLICATE + */ + Mono clusterReplicate(RedisClusterNode master, RedisClusterNode replica); + + enum AddSlots { + MIGRATING, IMPORTING, STABLE, NODE + } + +} diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveRedisClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/ReactiveRedisClusterConnection.java index 781f4b8929..8f42234468 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveRedisClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveRedisClusterConnection.java @@ -22,7 +22,7 @@ * @author Mark Paluch * @since 2.0 */ -public interface ReactiveRedisClusterConnection extends ReactiveRedisConnection { +public interface ReactiveRedisClusterConnection extends ReactiveRedisConnection, ReactiveClusterCommands { @Override ReactiveClusterKeyCommands keyCommands(); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index afd4143af8..6ce11a3906 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -284,6 +284,44 @@ public RedisClusterServerCommands serverCommands() { return new LettuceClusterServerCommands(this); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.lettuce.LettuceConnection#ping() + */ + @Override + public String ping() { + Collection ping = clusterCommandExecutor + .executeCommandOnAllNodes((LettuceClusterCommandCallback) BaseRedisCommands::ping).resultsAsList(); + + for (String result : ping) { + if (!ObjectUtils.nullSafeEquals("PONG", result)) { + return ""; + } + } + + return "PONG"; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisClusterConnection#ping(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public String ping(RedisClusterNode node) { + + return clusterCommandExecutor + .executeCommandOnSingleNode((LettuceClusterCommandCallback) BaseRedisCommands::ping, node).getValue(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisClusterCommands#getClusterNodes() + */ + @Override + public List clusterGetNodes() { + return new ArrayList<>(topologyProvider.getTopology().getNodes()); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisClusterCommands#getClusterSlaves(org.springframework.data.redis.connection.RedisClusterNode) @@ -301,6 +339,27 @@ public Set clusterGetSlaves(RedisClusterNode master) { .getValue(); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisClusterCommands#clusterGetMasterSlaveMap() + */ + @Override + public Map> clusterGetMasterSlaveMap() { + + List>> nodeResults = clusterCommandExecutor.executeCommandAsyncOnNodes( + (LettuceClusterCommandCallback>) client -> Converters + .toSetOfRedisClusterNodes(client.clusterSlaves(client.clusterMyId())), + topologyProvider.getTopology().getActiveMasterNodes()).getResults(); + + Map> result = new LinkedHashMap<>(); + + for (NodeResult> nodeResult : nodeResults) { + result.put(nodeResult.getNode(), nodeResult.getValue()); + } + + return result; + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisClusterCommands#getClusterSlotForKey(byte[]) @@ -369,6 +428,20 @@ public void clusterAddSlots(RedisClusterNode node, SlotRange range) { clusterAddSlots(node, range.getSlotsArray()); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisClusterCommands#countKeys(int) + */ + @Override + public Long clusterCountKeysInSlot(int slot) { + + try { + return getConnection().clusterCountKeysInSlot(slot); + } catch (Exception ex) { + throw exceptionConverter.translate(ex); + } + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisClusterCommands#deleteSlots(org.springframework.data.redis.connection.RedisClusterNode, int[]) @@ -464,20 +537,6 @@ public List clusterGetKeysInSlot(int slot, Integer count) { } } - /* - * (non-Javadoc) - * @see org.springframework.data.redis.connection.RedisClusterCommands#countKeys(int) - */ - @Override - public Long clusterCountKeysInSlot(int slot) { - - try { - return getConnection().clusterCountKeysInSlot(slot); - } catch (Exception ex) { - throw exceptionConverter.translate(ex); - } - } - /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisClusterCommands#clusterReplicate(org.springframework.data.redis.connection.RedisClusterNode, org.springframework.data.redis.connection.RedisClusterNode) @@ -490,35 +549,6 @@ public void clusterReplicate(RedisClusterNode master, RedisClusterNode replica) (LettuceClusterCommandCallback) client -> client.clusterReplicate(masterNode.getId()), replica); } - /* - * (non-Javadoc) - * @see org.springframework.data.redis.connection.lettuce.LettuceConnection#ping() - */ - @Override - public String ping() { - Collection ping = clusterCommandExecutor - .executeCommandOnAllNodes((LettuceClusterCommandCallback) BaseRedisCommands::ping).resultsAsList(); - - for (String result : ping) { - if (!ObjectUtils.nullSafeEquals("PONG", result)) { - return ""; - } - } - - return "PONG"; - } - - /* - * (non-Javadoc) - * @see org.springframework.data.redis.connection.RedisClusterConnection#ping(org.springframework.data.redis.connection.RedisClusterNode) - */ - @Override - public String ping(RedisClusterNode node) { - - return clusterCommandExecutor - .executeCommandOnSingleNode((LettuceClusterCommandCallback) BaseRedisCommands::ping, node).getValue(); - } - /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisClusterConnection#keys(org.springframework.data.redis.connection.RedisClusterNode, byte[]) @@ -559,15 +589,6 @@ public void select(int dbIndex) { // --> cluster node stuff - /* - * (non-Javadoc) - * @see org.springframework.data.redis.connection.RedisClusterCommands#getClusterNodes() - */ - @Override - public List clusterGetNodes() { - return new ArrayList<>(topologyProvider.getTopology().getNodes()); - } - /* * (non-Javadoc) * @see org.springframework.data.redis.connection.lettuce.LettuceConnection#watch(byte[][]) @@ -595,26 +616,6 @@ public void multi() { throw new InvalidDataAccessApiUsageException("MULTI is currently not supported in cluster mode."); } - /* - * (non-Javadoc) - * @see org.springframework.data.redis.connection.RedisClusterCommands#clusterGetMasterSlaveMap() - */ - @Override - public Map> clusterGetMasterSlaveMap() { - - List>> nodeResults = clusterCommandExecutor.executeCommandAsyncOnNodes( - (LettuceClusterCommandCallback>) client -> Converters - .toSetOfRedisClusterNodes(client.clusterSlaves(client.clusterMyId())), - topologyProvider.getTopology().getActiveMasterNodes()).getResults(); - - Map> result = new LinkedHashMap<>(); - - for (NodeResult> nodeResult : nodeResults) { - result.put(nodeResult.getNode(), nodeResult.getValue()); - } - - return result; - } public ClusterCommandExecutor getClusterCommandExecutor() { return clusterCommandExecutor; diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java index ffc34abc59..0aebacccc6 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisClusterConnection.java @@ -20,17 +20,29 @@ import io.lettuce.core.api.reactive.BaseRedisReactiveCommands; import io.lettuce.core.api.reactive.RedisReactiveCommands; import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.SlotHash; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; import java.nio.ByteBuffer; - +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +import org.springframework.dao.InvalidDataAccessApiUsageException; +import org.springframework.data.redis.connection.ClusterInfo; import org.springframework.data.redis.connection.ClusterTopologyProvider; import org.springframework.data.redis.connection.ReactiveRedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisNode; +import org.springframework.data.redis.connection.convert.Converters; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -188,6 +200,15 @@ public LettuceReactiveClusterStreamCommands streamCommands() { return new LettuceReactiveClusterStreamCommands(this); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterConnection#ping() + */ + @Override + public Mono ping() { + return clusterGetNodes().flatMap(node -> execute(node, BaseRedisReactiveCommands::ping)).last(); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveRedisClusterConnection#ping(org.springframework.data.redis.connection.RedisClusterNode) @@ -197,7 +218,242 @@ public Mono ping(RedisClusterNode node) { return execute(node, BaseRedisReactiveCommands::ping).next(); } + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterGetNodes() + */ + @Override + public Flux clusterGetNodes() { + return Flux.fromIterable(doGetActiveNodes()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterGetSlaves(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Flux clusterGetSlaves(RedisClusterNode master) { + + Assert.notNull(master, "Master must not be null!"); + + RedisClusterNode nodeToUse = lookup(master); + + return execute(nodeToUse, cmd -> cmd.clusterSlaves(nodeToUse.getId()) // + .flatMapIterable(LettuceConverters::toSetOfRedisClusterNodes)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterGetMasterSlaveMap() + */ + @Override + public Mono>> clusterGetMasterSlaveMap() { + + return Flux.fromIterable(topologyProvider.getTopology().getActiveMasterNodes()) // + .flatMap(node -> { + return Mono.just(node).zipWith(execute(node, cmd -> cmd.clusterSlaves(node.getId())) // + .collectList() // + .map(Converters::toSetOfRedisClusterNodes)); + }).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterGetSlotForKey(java.nio.ByteBuffer) + */ + @Override + public Mono clusterGetSlotForKey(ByteBuffer key) { + return Mono.fromSupplier(() -> SlotHash.getSlot(key)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterGetNodeForSlot(int) + */ + @Override + public Mono clusterGetNodeForSlot(int slot) { + + Set nodes = topologyProvider.getTopology().getSlotServingNodes(slot); + return nodes.isEmpty() ? Mono.empty() : Flux.fromIterable(nodes).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterGetNodeForKey(java.nio.ByteBuffer) + */ + @Override + public Mono clusterGetNodeForKey(ByteBuffer key) { + + Assert.notNull(key, "Key must not be null."); + + return clusterGetSlotForKey(key).flatMap(this::clusterGetNodeForSlot); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterGetClusterInfo() + */ + @Override + public Mono clusterGetClusterInfo() { + return executeCommandOnArbitraryNode(RedisClusterReactiveCommands::clusterInfo) // + .map(LettuceConverters::toProperties) // + .map(ClusterInfo::new) // + .single(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterAddSlots(org.springframework.data.redis.connection.RedisClusterNode, int[]) + */ + @Override + public Mono clusterAddSlots(RedisClusterNode node, int... slots) { + return execute(node, cmd -> cmd.clusterAddSlots(slots)).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterAddSlots(org.springframework.data.redis.connection.RedisClusterNode, org.springframework.data.redis.connection.RedisClusterNode.SlotRange) + */ + @Override + public Mono clusterAddSlots(RedisClusterNode node, RedisClusterNode.SlotRange range) { + + Assert.notNull(range, "Range must not be null."); + + return execute(node, cmd -> cmd.clusterAddSlots(range.getSlotsArray())).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterCountKeysInSlot(int) + */ + @Override + public Mono clusterCountKeysInSlot(int slot) { + return execute(cmd -> cmd.clusterCountKeysInSlot(slot)).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterDeleteSlots(org.springframework.data.redis.connection.RedisClusterNode, int[]) + */ + @Override + public Mono clusterDeleteSlots(RedisClusterNode node, int... slots) { + return execute(node, cmd -> cmd.clusterDelSlots(slots)).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterDeleteSlotsInRange(org.springframework.data.redis.connection.RedisClusterNode, org.springframework.data.redis.connection.RedisClusterNode.SlotRange) + */ + @Override + public Mono clusterDeleteSlotsInRange(RedisClusterNode node, RedisClusterNode.SlotRange range) { + + Assert.notNull(range, "Range must not be null."); + + return execute(node, cmd -> cmd.clusterDelSlots(range.getSlotsArray())).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterForget(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono clusterForget(RedisClusterNode node) { + + List nodes = new ArrayList<>(doGetActiveNodes()); + RedisClusterNode nodeToRemove = lookup(node); + nodes.remove(nodeToRemove); + + return Flux.fromIterable(nodes).flatMap(actualNode -> execute(node, cmd -> cmd.clusterForget(nodeToRemove.getId()))) + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterMeet(org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono clusterMeet(RedisClusterNode node) { + + Assert.notNull(node, "Cluster node must not be null for CLUSTER MEET command!"); + Assert.hasText(node.getHost(), "Node to meet cluster must have a host!"); + Assert.isTrue(node.getPort() != null && node.getPort() > 0, "Node to meet cluster must have a port greater 0!"); + + return clusterGetNodes() + .flatMap(actualNode -> execute(node, cmd -> cmd.clusterMeet(node.getHost(), node.getPort()))).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterSetSlot(org.springframework.data.redis.connection.RedisClusterNode, int, org.springframework.data.redis.connection.ReactiveRedisClusterCommands.AddSlots) + */ + @Override + public Mono clusterSetSlot(RedisClusterNode node, int slot, AddSlots mode) { + + Assert.notNull(node, "Node must not be null."); + Assert.notNull(mode, "AddSlots mode must not be null."); + + RedisClusterNode nodeToUse = lookup(node); + String nodeId = nodeToUse.getId(); + + return execute(node, cmd -> { + + switch (mode) { + case MIGRATING: + return cmd.clusterSetSlotMigrating(slot, nodeId); + case IMPORTING: + return cmd.clusterSetSlotImporting(slot, nodeId); + case NODE: + return cmd.clusterSetSlotNode(slot, nodeId); + case STABLE: + return cmd.clusterSetSlotStable(slot); + default: + throw new InvalidDataAccessApiUsageException("Invalid import mode for cluster slot: " + slot); + } + + }).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterGetKeysInSlot(int, int) + */ + @Override + public Flux clusterGetKeysInSlot(int slot, int count) { + return execute(cmd -> cmd.clusterGetKeysInSlot(slot, count)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.ReactiveRedisClusterCommands#clusterReplicate(org.springframework.data.redis.connection.RedisClusterNode, org.springframework.data.redis.connection.RedisClusterNode) + */ + @Override + public Mono clusterReplicate(RedisClusterNode master, RedisClusterNode replica) { + + RedisClusterNode masterToUse = lookup(master); + + return execute(replica, cmd -> cmd.clusterReplicate(masterToUse.getId())).then(); + } + + /** + * Run {@link LettuceReactiveCallback} on a random node. + * + * @param callback must not be {@literal null}. + * @throws IllegalArgumentException when {@code node} or {@code callback} is {@literal null}. + * @return {@link Flux} emitting execution results. + */ + public Flux executeCommandOnArbitraryNode(LettuceReactiveCallback callback) { + + Assert.notNull(callback, "ReactiveCallback must not be null!"); + + List nodes = new ArrayList<>(doGetActiveNodes()); + int random = new Random().nextInt(nodes.size()); + + return execute(nodes.get(random), callback); + } + /** + * Run {@link LettuceReactiveCallback} on given {@link RedisClusterNode}. + * * @param node must not be {@literal null}. * @param callback must not be {@literal null}. * @throws IllegalArgumentException when {@code node} or {@code callback} is {@literal null}. @@ -205,12 +461,8 @@ public Mono ping(RedisClusterNode node) { */ public Flux execute(RedisNode node, LettuceReactiveCallback callback) { - try { - Assert.notNull(node, "RedisClusterNode must not be null!"); - Assert.notNull(callback, "ReactiveCallback must not be null!"); - } catch (IllegalArgumentException e) { - return Flux.error(e); - } + Assert.notNull(node, "RedisClusterNode must not be null!"); + Assert.notNull(callback, "ReactiveCallback must not be null!"); return getCommands(node).flatMapMany(callback::doWithCommands).onErrorMap(translateException()); } @@ -244,4 +496,19 @@ protected Mono> getCommands(RedisN return getConnection().flatMap(it -> Mono.fromCompletionStage(it.getConnectionAsync(node.getHost(), node.getPort())) .map(StatefulRedisConnection::reactive)); } + + /** + * Lookup a {@link RedisClusterNode} by using either ids {@link RedisClusterNode#getId() node id} or host and port to + * obtain the full node details from the underlying {@link ClusterTopologyProvider}. + * + * @param nodeToLookup the node to lookup. + * @return the {@link RedisClusterNode} from the topology lookup. + */ + private RedisClusterNode lookup(RedisClusterNode nodeToLookup) { + return topologyProvider.getTopology().lookup(nodeToLookup); + } + + private Set doGetActiveNodes() { + return topologyProvider.getTopology().getActiveNodes(); + } } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterCommandsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterCommandsTests.java new file mode 100644 index 0000000000..aec89586e0 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterCommandsTests.java @@ -0,0 +1,116 @@ +/* + * Copyright 2016-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import static org.assertj.core.api.Assertions.*; + +import reactor.test.StepVerifier; + +import java.nio.ByteBuffer; + +import org.junit.Test; + +import org.springframework.data.redis.connection.ReactiveClusterCommands; +import org.springframework.data.redis.connection.RedisClusterNode; + +/** + * Integration tests for {@link LettuceReactiveRedisClusterConnection} via {@link ReactiveClusterCommands}. + *

+ * Some assertions check against node 1 and node 4 (ports 7379/7382) as sometimes a failover happens and node 4 becomes + * the master node. + * + * @author Mark Paluch + */ +public class LettuceReactiveClusterCommandsTests extends LettuceReactiveClusterCommandsTestsBase { + + @Test // DATAREDIS-1150 + public void clusterGetNodesShouldReturnNodes() { + + connection.clusterGetNodes().collectList() // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + assertThat(actual).hasSizeGreaterThan(3); + }).verifyComplete(); + } + + @Test // DATAREDIS-1150 + public void clusterGetSlavesShouldReturnNodes() { + + connection.clusterGetNodes().filter(RedisClusterNode::isMaster) + .filter(node -> (node.getPort() == 7379 || node.getPort() == 7382)) + .flatMap(it -> connection.clusterGetSlaves(it)) // + .collectList() // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + assertThat(actual).hasSize(1); + }).verifyComplete(); + } + + @Test // DATAREDIS-1150 + public void clusterGetMasterSlaveMapShouldReportTopology() { + + connection.clusterGetMasterSlaveMap() // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + assertThat(actual).hasSize(3); + }).verifyComplete(); + } + + @Test // DATAREDIS-1150 + public void clusterGetSlotForKeyShouldResolveSlot() { + + connection.clusterGetSlotForKey(ByteBuffer.wrap("hello".getBytes())) // + .as(StepVerifier::create) // + .expectNext(866) // + .verifyComplete(); + } + + @Test // DATAREDIS-1150 + public void clusterGetNodeForSlotShouldReportNode() { + + connection.clusterGetNodeForSlot(866) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + assertThat(actual.getPort()).isIn(7379, 7382); + }).verifyComplete(); + } + + @Test // DATAREDIS-1150 + public void clusterGetNodeForKeyShouldReportNode() { + + connection.clusterGetNodeForKey(ByteBuffer.wrap("hello".getBytes())) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + assertThat(actual.getPort()).isIn(7379, 7382); + }).verifyComplete(); + } + + @Test // DATAREDIS-1150 + public void clusterGetClusterInfoShouldReportState() { + + connection.clusterGetClusterInfo() // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + assertThat(actual.getSlotsAssigned()).isEqualTo(16384); + }).verifyComplete(); + } +}