Skip to content

Commit 5029094

Browse files
committed
Refine Topic creation for ChannelTopic and PatternTopic.
We now expose factory methods to construct ChannelTopic and PatternTopic from the Topic interface. See spring-projects#3131
1 parent 0de3af0 commit 5029094

File tree

5 files changed

+44
-23
lines changed

5 files changed

+44
-23
lines changed

src/main/antora/modules/ROOT/pages/redis/pubsub.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ XML::
162162
----
163163
======
164164

165-
NOTE: The listener topic can be either a channel (for example, `topic="chatroom"`) or a pattern (for example, `topic="*room"`). For channels, you should use the `ChannelTopic` class, and for patterns, use the `PatternTopic` class.
165+
NOTE: The listener topic can be either a channel (for example, `topic="chatroom"` respective `Topic.channel("chatroom")`) or a pattern (for example, `topic="*room"` respective `Topic.pattern("*room")`).
166166

167167
The preceding example uses the Redis namespace to declare the message listener container and automatically register the POJOs as listeners. The full-blown beans definition follows:
168168

src/main/java/org/springframework/data/redis/listener/Topic.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,37 @@
1919
* Topic for a Redis message. Acts a high-level abstraction on top of Redis low-level channels or patterns.
2020
*
2121
* @author Costin Leau
22+
* @author Mark Paluch
2223
*/
2324
public interface Topic {
2425

26+
/**
27+
* Create a new {@link ChannelTopic} for channel subscriptions.
28+
*
29+
* @param channelName {@link String name} of the Redis channel; must not be {@literal null}.
30+
* @return the {@link ChannelTopic} for the given {@code channelName}.
31+
* @since 3.5
32+
*/
33+
static ChannelTopic channel(String channelName) {
34+
return ChannelTopic.of(channelName);
35+
}
36+
37+
/**
38+
* Create a new {@link PatternTopic} for channel subscriptions based on a {@code pattern}.
39+
*
40+
* @param pattern {@link String pattern} used to match channels; must not be {@literal null} or empty.
41+
* @return the {@link PatternTopic} for the given {@code pattern}.
42+
* @since 3.5
43+
*/
44+
static PatternTopic pattern(String pattern) {
45+
return PatternTopic.of(pattern);
46+
}
47+
2548
/**
2649
* Returns the topic (as a String).
2750
*
2851
* @return the topic
2952
*/
3053
String getTopic();
54+
3155
}

src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@
2323

2424
import java.nio.ByteBuffer;
2525
import java.time.Duration;
26-
import java.util.ArrayList;
2726
import java.util.Arrays;
2827
import java.util.Collection;
2928
import java.util.Collections;
30-
import java.util.List;
31-
import java.util.Queue;
3229
import java.util.concurrent.BlockingQueue;
3330
import java.util.concurrent.CompletableFuture;
3431
import java.util.concurrent.LinkedBlockingDeque;
@@ -110,7 +107,7 @@ void shouldReceiveChannelMessages() {
110107

111108
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory);
112109

113-
container.receiveLater(ChannelTopic.of(CHANNEL1)) //
110+
container.receiveLater(Topic.channel(CHANNEL1)) //
114111
.doOnNext(it -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) //
115112
.flatMapMany(Function.identity()) //
116113
.as(StepVerifier::create) //
@@ -153,7 +150,7 @@ public void onChannelUnsubscribed(byte[] channel, long count) {
153150
}
154151
};
155152

156-
container.receive(Collections.singletonList(ChannelTopic.of(CHANNEL1)), listener) //
153+
container.receive(Collections.singletonList(Topic.channel(CHANNEL1)), listener) //
157154
.as(StepVerifier::create) //
158155
.then(awaitSubscription(container::getActiveSubscriptions))
159156
.then(() -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) //
@@ -220,7 +217,7 @@ public void onPatternUnsubscribed(byte[] pattern, long count) {
220217
}
221218
};
222219

223-
container.receive(Collections.singletonList(PatternTopic.of(PATTERN1)), listener) //
220+
container.receive(Collections.singletonList(Topic.pattern(PATTERN1)), listener) //
224221
.cast(PatternMessage.class) //
225222
.as(StepVerifier::create) //
226223
.then(awaitSubscription(container::getActiveSubscriptions))
@@ -314,10 +311,10 @@ void multipleListenShouldTrackSubscriptions() throws Exception {
314311

315312
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory);
316313

317-
Flux<? extends ReactiveSubscription.Message<String, String>> c1 = container.receiveLater(ChannelTopic.of(CHANNEL1))
314+
Flux<? extends ReactiveSubscription.Message<String, String>> c1 = container.receiveLater(Topic.channel(CHANNEL1))
318315
.block();
319316
Flux<? extends ReactiveSubscription.Message<String, String>> c1p1 = container
320-
.receiveLater(Arrays.asList(ChannelTopic.of(CHANNEL1), PatternTopic.of(PATTERN1)),
317+
.receiveLater(Arrays.asList(Topic.channel(CHANNEL1), PatternTopic.of(PATTERN1)),
321318
SerializationPair.fromSerializer(RedisSerializer.string()),
322319
SerializationPair.fromSerializer(RedisSerializer.string()))
323320
.block();

src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerUnitTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ void shouldSubscribeToPattern() {
7979

8080
container = createContainer();
8181

82-
container.receive(PatternTopic.of("foo*")).as(StepVerifier::create).thenAwait().thenCancel().verify();
82+
container.receive(Topic.pattern("foo*")).as(StepVerifier::create).thenAwait().thenCancel().verify();
8383

8484
verify(subscriptionMock).pSubscribe(getByteBuffer("foo*"));
8585
}
@@ -90,7 +90,7 @@ void shouldSubscribeToMultiplePatterns() {
9090
when(subscriptionMock.receive()).thenReturn(Flux.never());
9191
container = createContainer();
9292

93-
container.receive(PatternTopic.of("foo*"), PatternTopic.of("bar*")).as(StepVerifier::create).thenRequest(1)
93+
container.receive(Topic.pattern("foo*"), Topic.pattern("bar*")).as(StepVerifier::create).thenRequest(1)
9494
.thenAwait().thenCancel().verify();
9595

9696
verify(subscriptionMock).pSubscribe(getByteBuffer("foo*"), getByteBuffer("bar*"));
@@ -102,7 +102,7 @@ void shouldSubscribeToChannel() {
102102
when(subscriptionMock.receive()).thenReturn(Flux.never());
103103
container = createContainer();
104104

105-
container.receive(ChannelTopic.of("foo")).as(StepVerifier::create).thenAwait().thenCancel().verify();
105+
container.receive(Topic.channel("foo")).as(StepVerifier::create).thenAwait().thenCancel().verify();
106106

107107
verify(subscriptionMock).subscribe(getByteBuffer("foo"));
108108
}
@@ -113,7 +113,7 @@ void shouldSubscribeToMultipleChannels() {
113113
when(subscriptionMock.receive()).thenReturn(Flux.never());
114114
container = createContainer();
115115

116-
container.receive(ChannelTopic.of("foo"), ChannelTopic.of("bar")).as(StepVerifier::create).thenAwait().thenCancel()
116+
container.receive(Topic.channel("foo"), Topic.channel("bar")).as(StepVerifier::create).thenAwait().thenCancel()
117117
.verify();
118118

119119
verify(subscriptionMock).subscribe(getByteBuffer("foo"), getByteBuffer("bar"));
@@ -127,7 +127,7 @@ void shouldEmitChannelMessage() {
127127
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
128128
container = createContainer();
129129

130-
Flux<Message<String, String>> messageStream = container.receive(ChannelTopic.of("foo"));
130+
Flux<Message<String, String>> messageStream = container.receive(Topic.channel("foo"));
131131

132132
messageStream.as(StepVerifier::create).then(() -> {
133133
sink.tryEmitNext(createChannelMessage("foo", "message"));
@@ -146,7 +146,7 @@ void shouldEmitPatternMessage() {
146146
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
147147
container = createContainer();
148148

149-
Flux<PatternMessage<String, String, String>> messageStream = container.receive(PatternTopic.of("foo*"));
149+
Flux<PatternMessage<String, String, String>> messageStream = container.receive(Topic.pattern("foo*"));
150150

151151
messageStream.as(StepVerifier::create).then(() -> {
152152
sink.tryEmitNext(createPatternMessage("foo*", "foo", "message"));
@@ -171,7 +171,7 @@ void shouldRegisterSubscription() {
171171
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
172172
container = createContainer();
173173

174-
Flux<Message<String, String>> messageStream = container.receive(ChannelTopic.of("foo*"));
174+
Flux<Message<String, String>> messageStream = container.receive(Topic.channel("foo*"));
175175

176176
Disposable subscription = messageStream.subscribe();
177177

@@ -193,7 +193,7 @@ void shouldRegisterSubscriptionMultipleSubscribers() {
193193
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
194194
container = createContainer();
195195

196-
Flux<Message<String, String>> messageStream = container.receive(new ChannelTopic("foo*"));
196+
Flux<Message<String, String>> messageStream = container.receive(Topic.channel("foo*"));
197197

198198
Disposable first = messageStream.subscribe();
199199
Disposable second = messageStream.subscribe();
@@ -216,7 +216,7 @@ void shouldUnsubscribeOnCancel() {
216216
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
217217
container = createContainer();
218218

219-
Flux<PatternMessage<String, String, String>> messageStream = container.receive(PatternTopic.of("foo*"));
219+
Flux<PatternMessage<String, String, String>> messageStream = container.receive(Topic.pattern("foo*"));
220220

221221
messageStream.as(StepVerifier::create).then(() -> {
222222

@@ -240,7 +240,7 @@ void shouldTerminateSubscriptionsOnShutdown() {
240240
}));
241241
container = createContainer();
242242

243-
Flux<PatternMessage<String, String, String>> messageStream = container.receive(PatternTopic.of("foo*"));
243+
Flux<PatternMessage<String, String, String>> messageStream = container.receive(Topic.pattern("foo*"));
244244

245245
messageStream.as(StepVerifier::create).then(() -> {
246246
container.destroy();
@@ -255,7 +255,7 @@ void shouldCleanupDownstream() {
255255
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
256256
container = createContainer();
257257

258-
Flux<PatternMessage<String, String, String>> messageStream = container.receive(PatternTopic.of("foo*"));
258+
Flux<PatternMessage<String, String, String>> messageStream = container.receive(Topic.pattern("foo*"));
259259

260260
messageStream.as(StepVerifier::create).then(() -> {
261261
assertThat(sink.currentSubscriberCount()).isGreaterThan(0);

src/test/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensionsUnitTests.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test
2626
import org.springframework.data.redis.connection.DataType
2727
import org.springframework.data.redis.connection.ReactiveSubscription
2828
import org.springframework.data.redis.core.script.RedisScript
29-
import org.springframework.data.redis.listener.ChannelTopic
29+
import org.springframework.data.redis.listener.Topic
3030
import org.springframework.data.redis.serializer.RedisElementReader
3131
import org.springframework.data.redis.serializer.RedisElementWriter
3232
import reactor.core.publisher.Flux
@@ -167,8 +167,8 @@ class ReactiveRedisOperationsExtensionsUnitTests {
167167
@Test // DATAREDIS-1033
168168
fun listenTo() {
169169

170-
val topic1 = ChannelTopic.of("foo")
171-
val topic2 = ChannelTopic.of("bar")
170+
val topic1 = Topic.channel("foo")
171+
val topic2 = Topic.channel("bar")
172172
val message = ReactiveSubscription.ChannelMessage("a", "b")
173173
val operations = mockk<ReactiveRedisOperations<String, String>>()
174174
every { operations.listenTo(any(), any()) } returns Flux.just(message)

0 commit comments

Comments
 (0)