diff --git a/src/main/java/org/springframework/data/redis/connection/ClusterMessageListener.java b/src/main/java/org/springframework/data/redis/connection/ClusterMessageListener.java new file mode 100644 index 0000000000..284eb01d0f --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/ClusterMessageListener.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection; + +/** + * Listener of messages published in Redis. + * + * @author Bruce Cloud + * @since 2.2 + */ +public interface ClusterMessageListener extends MessageListener { + + /** + * Boolean flag indicating whether this listener requires keyspace events from Redis + */ + boolean listensForKeyspaceNotifications(); +} diff --git a/src/main/java/org/springframework/data/redis/connection/ConnectionUtils.java b/src/main/java/org/springframework/data/redis/connection/ConnectionUtils.java index 1812fa8ac6..61a297362c 100644 --- a/src/main/java/org/springframework/data/redis/connection/ConnectionUtils.java +++ b/src/main/java/org/springframework/data/redis/connection/ConnectionUtils.java @@ -23,6 +23,7 @@ * * @author Jennifer Hickey * @author Thomas Darimont + * @author Bruce Cloud */ public abstract class ConnectionUtils { @@ -37,4 +38,14 @@ public static boolean isLettuce(RedisConnectionFactory connectionFactory) { public static boolean isJedis(RedisConnectionFactory connectionFactory) { return connectionFactory instanceof JedisConnectionFactory; } + + public static boolean isClusterAware(RedisConnectionFactory connectionFactory) { + if (connectionFactory instanceof LettuceConnectionFactory) { + return ((LettuceConnectionFactory) connectionFactory).isClusterAware(); + } + else if (connectionFactory instanceof JedisConnectionFactory) { + return ((JedisConnectionFactory) connectionFactory).isRedisClusterAware(); + } + return false; + } } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java index 902b2c891b..3ebd799a2c 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java @@ -19,6 +19,7 @@ import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; @@ -33,6 +34,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Bruce Cloud * @since 2.0 */ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClientProvider { @@ -93,7 +95,8 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien } } - if (connectionType.equals(StatefulRedisPubSubConnection.class)) { + if (connectionType.equals(StatefulRedisPubSubConnection.class) + || connectionType.equals(StatefulRedisClusterPubSubConnection.class)) { return client.connectPubSubAsync(codec) // .thenApply(connectionType::cast); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index 28a114e024..f91acd1167 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -23,6 +23,7 @@ import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisClusterCommands; import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import lombok.RequiredArgsConstructor; import java.time.Duration; @@ -56,6 +57,7 @@ /** * @author Christoph Strobl * @author Mark Paluch + * @author Bruce Cloud * @since 1.7 */ public class LettuceClusterConnection extends LettuceConnection implements DefaultedRedisClusterConnection { @@ -65,6 +67,8 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau private final Log log = LogFactory.getLog(getClass()); private final RedisClusterClient clusterClient; + + private volatile @Nullable LettuceClusterSubscription subscription; private ClusterCommandExecutor clusterCommandExecutor; private ClusterTopologyProvider topologyProvider; @@ -294,6 +298,89 @@ public RedisZSetCommands zSetCommands() { public RedisClusterServerCommands serverCommands() { return new LettuceClusterServerCommands(this); } + + private void checkSubscription() { + if (isSubscribed()) { + throw new RedisSubscribedConnectionException( + "Connection already subscribed; use the connection Subscription to cancel or add new channels"); + } + } + + /** + * {@link #close()} the current connection and open a new pub/sub connection to the Redis server. + * + * @return never {@literal null}. + */ + @SuppressWarnings("unchecked") + protected StatefulRedisClusterPubSubConnection switchToPubSub() { + + close(); + return connectionProvider.getConnection(StatefulRedisClusterPubSubConnection.class); + } + + private LettuceClusterSubscription initSubscription(MessageListener listener) { + return new LettuceClusterSubscription(listener, switchToPubSub(), connectionProvider); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisPubSubCommands#getSubscription() + */ + @Override + public Subscription getSubscription() { + return subscription; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisPubSubCommands#isSubscribed() + */ + @Override + public boolean isSubscribed() { + return (subscription != null && subscription.isAlive()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisPubSubCommands#pSubscribe(org.springframework.data.redis.connection.MessageListener, byte[][]) + */ + @Override + public void pSubscribe(MessageListener listener, byte[]... patterns) { + + checkSubscription(); + + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("Transaction/Pipelining is not supported for Pub/Sub subscriptions!"); + } + + try { + subscription = initSubscription(listener); + subscription.pSubscribe(patterns); + } catch (Exception ex) { + throw convertLettuceAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisPubSubCommands#subscribe(org.springframework.data.redis.connection.MessageListener, byte[][]) + */ + @Override + public void subscribe(MessageListener listener, byte[]... channels) { + + checkSubscription(); + + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("Transaction/Pipelining is not supported for Pub/Sub subscriptions!"); + } + + try { + subscription = initSubscription(listener); + subscription.subscribe(channels); + } catch (Exception ex) { + throw convertLettuceAccessException(ex); + } + } /* * (non-Javadoc) @@ -641,6 +728,13 @@ public void close() throws DataAccessException { log.warn("Cannot properly close cluster command executor", ex); } } + + if (subscription != null) { + if (subscription.isAlive()) { + subscription.doClose(); + } + subscription = null; + } super.close(); } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterSubscription.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterSubscription.java new file mode 100644 index 0000000000..fe28f02a46 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterSubscription.java @@ -0,0 +1,127 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.lettuce; + +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; +import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import org.springframework.data.redis.connection.ClusterMessageListener; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.connection.util.AbstractSubscription; + +/** + * Message subscription on top of Lettuce cluster support. + * + * @author Bruce Cloud + * @since 2.2 + */ +class LettuceClusterSubscription extends AbstractSubscription { + + private final StatefulRedisClusterPubSubConnection connection; + private final LettuceMessageListener listener; + private final LettuceConnectionProvider connectionProvider; + private final RedisClusterPubSubCommands pubsub; + private final boolean listensForKeyspaceNotifications; + + LettuceClusterSubscription(MessageListener listener, StatefulRedisClusterPubSubConnection pubsubConnection, + LettuceConnectionProvider connectionProvider) { + + super(listener); + + this.connection = pubsubConnection; + + this.listener = new LettuceMessageListener(listener); + this.listensForKeyspaceNotifications = (listener instanceof ClusterMessageListener + && ((ClusterMessageListener)listener).listensForKeyspaceNotifications()); + if (listensForKeyspaceNotifications) pubsubConnection.setNodeMessagePropagation(true); + + this.connectionProvider = connectionProvider; + this.pubsub = connection.sync(); + + this.connection.addListener(this.listener); + } + + protected StatefulRedisPubSubConnection getNativeConnection() { + return connection; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.util.AbstractSubscription#doClose() + */ + protected void doClose() { + + if (!getChannels().isEmpty()) { + pubsub.unsubscribe(new byte[0]); + } + + if (!getPatterns().isEmpty()) { + pubsub.punsubscribe(new byte[0]); + } + + connection.removeListener(this.listener); + connectionProvider.release(connection); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.util.AbstractSubscription#doPsubscribe(byte[][]) + */ + protected void doPsubscribe(byte[]... patterns) { + + if (listensForKeyspaceNotifications) { + pubsub.masters().commands().psubscribe(patterns); + } + else { + pubsub.psubscribe(patterns); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.util.AbstractSubscription#doPUnsubscribe(boolean, byte[][]) + */ + protected void doPUnsubscribe(boolean all, byte[]... patterns) { + + // ignore `all` flag as Lettuce unsubscribes from all patterns if none provided. + pubsub.punsubscribe(patterns); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.util.AbstractSubscription#doSubscribe(byte[][]) + */ + protected void doSubscribe(byte[]... channels) { + + if (listensForKeyspaceNotifications) { + pubsub.masters().commands().subscribe(channels); + } + else { + pubsub.subscribe(channels); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.util.AbstractSubscription#doUnsubscribe(boolean, byte[][]) + */ + protected void doUnsubscribe(boolean all, byte[]... channels) { + + // ignore `all` flag as Lettuce unsubscribes from all channels if none provided. + pubsub.unsubscribe(channels); + } + +} diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java index cc909a798c..ccaf73df32 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java @@ -84,6 +84,7 @@ * @author David Liu * @author Mark Paluch * @author Ninad Divadkar + * @author Bruce Cloud */ public class LettuceConnection extends AbstractRedisConnection { @@ -95,8 +96,8 @@ public class LettuceConnection extends AbstractRedisConnection { private final int defaultDbIndex; private int dbIndex; - - private final LettuceConnectionProvider connectionProvider; + + protected final LettuceConnectionProvider connectionProvider; private final @Nullable StatefulConnection asyncSharedConn; private @Nullable StatefulConnection asyncDedicatedConn; diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java index de6c7705d6..4771bd30b0 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceMessageListener.java @@ -16,7 +16,7 @@ package org.springframework.data.redis.connection.lettuce; import io.lettuce.core.pubsub.RedisPubSubListener; - +import org.springframework.data.redis.connection.ClusterMessageListener; import org.springframework.data.redis.connection.DefaultMessage; import org.springframework.data.redis.connection.MessageListener; import org.springframework.util.Assert; @@ -25,6 +25,7 @@ * MessageListener wrapper around Lettuce {@link RedisPubSubListener}. * * @author Costin Leau + * @author Bruce Cloud */ class LettuceMessageListener implements RedisPubSubListener { @@ -34,6 +35,11 @@ class LettuceMessageListener implements RedisPubSubListener { Assert.notNull(listener, "MessageListener must not be null!"); this.listener = listener; } + + LettuceMessageListener(ClusterMessageListener listener) { + Assert.notNull(listener, "ClusterMessageListener must not be null!"); + this.listener = (MessageListener)listener; + } /* * (non-Javadoc) diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index 5c4c90b68e..32a054d67a 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -35,6 +35,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.data.redis.RedisConnectionFailureException; +import org.springframework.data.redis.connection.ClusterMessageListener; import org.springframework.data.redis.connection.ConnectionUtils; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; @@ -69,6 +70,7 @@ * @author Way Joke * @author Thomas Darimont * @author Mark Paluch + * @author Bruce Cloud */ public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle { @@ -113,6 +115,8 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab private volatile boolean listening = false; private volatile boolean manageExecutor = false; + + private volatile boolean listensForKeyspaceNotifications = false; // lookup maps // to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect the equals/hashcode @@ -311,6 +315,13 @@ public void setConnectionFactory(RedisConnectionFactory connectionFactory) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null!"); this.connectionFactory = connectionFactory; } + + /** + * @param boolean The listensForKeyspaceNotifications to set + */ + public void setListensForKeyspaceNotifications(boolean listensForKeyspaceNotifications) { + this.listensForKeyspaceNotifications = listensForKeyspaceNotifications; + } public void setBeanName(String name) { this.beanName = name; @@ -778,6 +789,14 @@ public void run() { private SubscriptionPresentCondition eventuallyPerformSubscription() { SubscriptionPresentCondition condition = null; + DispatchMessageListener listener = null; + + if (ConnectionUtils.isClusterAware(connectionFactory)) { + listener = new ClusterDispatchMessageListener(listensForKeyspaceNotifications); + } + else { + listener = new DispatchMessageListener(); + } if (channelMapping.isEmpty()) { @@ -978,6 +997,20 @@ public void onMessage(Message message, @Nullable byte[] pattern) { } } } + + private class ClusterDispatchMessageListener extends DispatchMessageListener implements ClusterMessageListener { + + boolean listensForKeyspaceNotifications = false; + + public ClusterDispatchMessageListener(boolean listensForKeyspaceNotifications) { + super(); + this.listensForKeyspaceNotifications = listensForKeyspaceNotifications; + } + + public boolean listensForKeyspaceNotifications() { + return listensForKeyspaceNotifications; + } + } private void dispatchMessage(Collection listeners, final Message message, final byte[] pattern) { final byte[] source = (pattern != null ? pattern.clone() : message.getChannel());