Skip to content

Commit b1453da

Browse files
sujl95mp911de
authored andcommitted
Correctly handle null listener in RedisMessageListenerContainer.remove(…).
Closes #3009
1 parent 20ceb4c commit b1453da

File tree

2 files changed

+57
-26
lines changed

2 files changed

+57
-26
lines changed

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

+25-22
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
* @author Thomas Darimont
102102
* @author Mark Paluch
103103
* @author John Blum
104+
* @author SEONGJUN LEE
104105
* @see MessageListener
105106
* @see SubscriptionListener
106107
*/
@@ -770,33 +771,35 @@ else if (isListening()) {
770771
}
771772

772773
private void remove(@Nullable MessageListener listener, Topic topic, ByteArrayWrapper holder,
773-
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {
774+
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {
774775

775776
Collection<MessageListener> listeners = mapping.get(holder);
776-
Collection<MessageListener> listenersToRemove = null;
777-
778-
if (listeners != null) {
779-
// remove only one listener
780-
listeners.remove(listener);
781-
listenersToRemove = Collections.singletonList(listener);
782-
783-
// start removing listeners
784-
for (MessageListener messageListener : listenersToRemove) {
785-
Set<Topic> topics = listenerTopics.get(messageListener);
786-
if (topics != null) {
787-
topics.remove(topic);
788-
}
789-
if (CollectionUtils.isEmpty(topics)) {
790-
listenerTopics.remove(messageListener);
791-
}
792-
}
777+
if (listeners == null || listeners.isEmpty()) {
778+
return;
779+
}
793780

794-
// if we removed everything, remove the empty holder collection
795-
if (listeners.isEmpty()) {
796-
mapping.remove(holder);
797-
topicToRemove.add(holder.getArray());
781+
Collection<MessageListener> listenersToRemove = (listener == null) ? new ArrayList<>(listeners)
782+
: Collections.singletonList(listener);
783+
784+
// Remove the specified listener(s) from the original collection
785+
listeners.removeAll(listenersToRemove);
786+
787+
// Start removing listeners
788+
for (MessageListener messageListener : listenersToRemove) {
789+
Set<Topic> topics = listenerTopics.get(messageListener);
790+
if (topics != null) {
791+
topics.remove(topic);
792+
}
793+
if (CollectionUtils.isEmpty(topics)) {
794+
listenerTopics.remove(messageListener);
798795
}
799796
}
797+
798+
// If all listeners were removed, clean up the mapping and the holder
799+
if (listeners.isEmpty()) {
800+
mapping.remove(holder);
801+
topicToRemove.add(holder.getArray());
802+
}
800803
}
801804

802805
private Subscriber createSubscriber(RedisConnectionFactory connectionFactory, Executor executor) {

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java

+32-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.nio.charset.StandardCharsets;
22+
import java.util.Collections;
2223
import java.util.concurrent.CountDownLatch;
2324
import java.util.concurrent.Executor;
2425
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,10 +31,7 @@
3031
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3132
import org.springframework.core.task.SyncTaskExecutor;
3233
import org.springframework.data.redis.RedisConnectionFailureException;
33-
import org.springframework.data.redis.connection.RedisConnection;
34-
import org.springframework.data.redis.connection.RedisConnectionFactory;
35-
import org.springframework.data.redis.connection.Subscription;
36-
import org.springframework.data.redis.connection.SubscriptionListener;
34+
import org.springframework.data.redis.connection.*;
3735
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
3836
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
3937
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
@@ -221,4 +219,34 @@ void shouldRecoverFromConnectionFailure() throws Exception {
221219
void failsOnDuplicateInit() {
222220
assertThatIllegalStateException().isThrownBy(() -> container.afterPropertiesSet());
223221
}
222+
223+
@Test
224+
void shouldRemoveSpecificListenerFromMappingAndListenerTopics() {
225+
MessageListener listener1 = mock(MessageListener.class);
226+
MessageListener listener2 = mock(MessageListener.class);
227+
Topic topic = new ChannelTopic("topic1");
228+
229+
container.addMessageListener(listener1, Collections.singletonList(topic));
230+
container.addMessageListener(listener2, Collections.singletonList(topic));
231+
232+
container.removeMessageListener(listener1, Collections.singletonList(topic));
233+
234+
container.addMessageListener(listener2, Collections.singletonList(topic));
235+
verify(listener1, never()).onMessage(any(), any());
236+
}
237+
238+
@Test
239+
void shouldRemoveAllListenersWhenListenerIsNull() {
240+
MessageListener listener1 = mock(MessageListener.class);
241+
MessageListener listener2 = mock(MessageListener.class);
242+
Topic topic = new ChannelTopic("topic1");
243+
244+
container.addMessageListener(listener1, Collections.singletonList(topic));
245+
container.addMessageListener(listener2, Collections.singletonList(topic));
246+
247+
container.removeMessageListener(null, Collections.singletonList(topic));
248+
249+
verify(listener1, never()).onMessage(any(), any());
250+
verify(listener2, never()).onMessage(any(), any());
251+
}
224252
}

0 commit comments

Comments
 (0)