Skip to content
This repository was archived by the owner on Mar 30, 2023. It is now read-only.

Commit 3272861

Browse files
authored
GH-296: DSL: Support inline pub/sub subscriptions
Resolves #296 * * Add generic type to specsto avoid cast to `BroadcastCapableChannel`. * * Remove unneeded fields in the abstract spec * Restore log4j config
1 parent 879e7a7 commit 3272861

12 files changed

Lines changed: 183 additions & 67 deletions

src/main/java/org/springframework/integration/kafka/channel/AbstractKafkaChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected boolean doSend(Message<?> message, long timeout) {
7575
this.template.send(MessageBuilder.fromMessage(message)
7676
.setHeader(KafkaHeaders.TOPIC, this.topic)
7777
.build())
78-
.get(timeout, TimeUnit.MILLISECONDS);
78+
.get(timeout < 0 ? Long.MAX_VALUE : timeout, TimeUnit.MILLISECONDS);
7979
}
8080
catch (@SuppressWarnings("unused") InterruptedException e) {
8181
Thread.currentThread().interrupt();
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.kafka.channel;
18+
19+
import org.springframework.integration.channel.BroadcastCapableChannel;
20+
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
21+
import org.springframework.integration.dispatcher.MessageDispatcher;
22+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
23+
import org.springframework.kafka.core.KafkaOperations;
24+
25+
/**
26+
* Publish/subscribe channel backed by a Kafka topic.
27+
*
28+
* @author Gary Russell
29+
* @since 3.3
30+
*
31+
*/
32+
public class PublishSubscribeKafkaChannel extends SubscribableKafkaChannel implements BroadcastCapableChannel {
33+
34+
/**
35+
* Construct an instance with the provided parameters.
36+
* @param template template for sending.
37+
* @param factory factory for creating a container for receiving.
38+
* @param channelTopic the topic.
39+
*/
40+
public PublishSubscribeKafkaChannel(KafkaOperations<?, ?> template, KafkaListenerContainerFactory<?> factory,
41+
String channelTopic) {
42+
super(template, factory, channelTopic);
43+
}
44+
45+
@Override
46+
protected MessageDispatcher createDispatcher() {
47+
BroadcastingDispatcher broadcastingDispatcher = new BroadcastingDispatcher(true);
48+
broadcastingDispatcher.setBeanFactory(getBeanFactory());
49+
return broadcastingDispatcher;
50+
}
51+
52+
}

src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.springframework.context.Phased;
2323
import org.springframework.context.SmartLifecycle;
24-
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
2524
import org.springframework.integration.dispatcher.MessageDispatcher;
2625
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2726
import org.springframework.integration.dispatcher.UnicastingDispatcher;
@@ -47,8 +46,6 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su
4746

4847
private final KafkaListenerContainerFactory<?> factory;
4948

50-
private final boolean pubSub;
51-
5249
private MessageDispatcher dispatcher;
5350

5451
private MessageListenerContainer container;
@@ -68,23 +65,9 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su
6865
public SubscribableKafkaChannel(KafkaOperations<?, ?> template, KafkaListenerContainerFactory<?> factory,
6966
String channelTopic) {
7067

71-
this(template, factory, channelTopic, false);
72-
}
73-
74-
/**
75-
* Construct an instance with the provided parameters.
76-
* @param template template for sending.
77-
* @param factory factory for creating a container for receiving.
78-
* @param channelTopic the topic.
79-
* @param pubSub true for a publish/subscribe channel.
80-
*/
81-
public SubscribableKafkaChannel(KafkaOperations<?, ?> template, KafkaListenerContainerFactory<?> factory,
82-
String channelTopic, boolean pubSub) {
83-
8468
super(template, channelTopic);
8569
Assert.notNull(factory, "'factory' cannot be null");
8670
this.factory = factory;
87-
this.pubSub = pubSub;
8871
}
8972

9073
@Override
@@ -122,16 +105,7 @@ public boolean isAutoStartup() {
122105

123106
@Override
124107
protected void onInit() {
125-
if (this.pubSub) {
126-
BroadcastingDispatcher broadcastingDispatcher = new BroadcastingDispatcher(true);
127-
broadcastingDispatcher.setBeanFactory(this.getBeanFactory());
128-
this.dispatcher = broadcastingDispatcher;
129-
}
130-
else {
131-
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher();
132-
unicastingDispatcher.setLoadBalancingStrategy(new RoundRobinLoadBalancingStrategy());
133-
this.dispatcher = unicastingDispatcher;
134-
}
108+
this.dispatcher = createDispatcher();
135109
this.container = this.factory.createContainer(this.topic);
136110
String groupId = getGroupId();
137111
this.container.getContainerProperties().setGroupId(groupId != null ? groupId : getBeanName());
@@ -149,6 +123,12 @@ public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment ackn
149123
});
150124
}
151125

126+
protected MessageDispatcher createDispatcher() {
127+
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher();
128+
unicastingDispatcher.setLoadBalancingStrategy(new RoundRobinLoadBalancingStrategy());
129+
return unicastingDispatcher;
130+
}
131+
152132
@Override
153133
public void start() {
154134
this.container.start();

src/main/java/org/springframework/integration/kafka/config/xml/KafkaChannelParser.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.springframework.integration.config.xml.AbstractChannelParser;
2424
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
2525
import org.springframework.integration.kafka.channel.PollableKafkaChannel;
26+
import org.springframework.integration.kafka.channel.PublishSubscribeKafkaChannel;
2627
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
2728
import org.springframework.util.StringUtils;
2829

@@ -46,11 +47,12 @@ protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserConte
4647
String topic = element.getAttribute("topic");
4748
boolean pubSub = "publish-subscribe-channel".equals(element.getLocalName());
4849
if (hasFactory) {
49-
builder = BeanDefinitionBuilder.genericBeanDefinition(SubscribableKafkaChannel.class);
50+
builder = BeanDefinitionBuilder.genericBeanDefinition(pubSub
51+
? PublishSubscribeKafkaChannel.class
52+
: SubscribableKafkaChannel.class);
5053
builder.addConstructorArgReference(template);
5154
builder.addConstructorArgReference(factory);
5255
builder.addConstructorArgValue(topic);
53-
builder.addConstructorArgValue(pubSub);
5456
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup");
5557
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "phase");
5658
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "role");

src/main/java/org/springframework/integration/kafka/dsl/AbstractKafkaChannelSpec.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,23 @@
1818

1919
import org.springframework.integration.dsl.MessageChannelSpec;
2020
import org.springframework.integration.kafka.channel.AbstractKafkaChannel;
21-
import org.springframework.kafka.core.KafkaTemplate;
2221

2322
/**
2423
*
2524
* Spec for a message channel backed by a Kafka topic.
2625
*
2726
* @param <S> the spec type.
27+
* @param <C> the channel type.
2828
*
2929
* @author Gary Russell
3030
* @since 3.3
3131
*
3232
*/
33-
public abstract class AbstractKafkaChannelSpec<S extends AbstractKafkaChannelSpec<S>>
34-
extends MessageChannelSpec<S, AbstractKafkaChannel> {
35-
36-
protected final KafkaTemplate<?, ?> template; // NOSONAR final
37-
38-
protected final String topic; // NOSONAR final
33+
public abstract class AbstractKafkaChannelSpec<S extends AbstractKafkaChannelSpec<S, C>, C extends AbstractKafkaChannel>
34+
extends MessageChannelSpec<S, C> {
3935

4036
protected String groupId; // NOSONAR
4137

42-
protected AbstractKafkaChannelSpec(KafkaTemplate<?, ?> template, String topic) {
43-
this.template = template;
44-
this.topic = topic;
45-
}
46-
4738
@Override
4839
public S id(String idToSet) { // NOSONAR - increase visibility
4940
return super.id(idToSet);

src/main/java/org/springframework/integration/kafka/dsl/Kafka.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -545,10 +545,10 @@ public static <K, V, R> KafkaInboundGatewaySpec.KafkaInboundGatewayListenerConta
545545
* @return the spec.
546546
* @since 3.3
547547
*/
548-
public static KafkaSubscribableChannelSpec channel(KafkaTemplate<?, ?> template,
548+
public static KafkaPointToPointChannelSpec channel(KafkaTemplate<?, ?> template,
549549
KafkaListenerContainerFactory<?> containerFactory, String topic) {
550550

551-
return new KafkaSubscribableChannelSpec(template, containerFactory, topic, false);
551+
return new KafkaPointToPointChannelSpec(template, containerFactory, topic);
552552
}
553553

554554
/**
@@ -559,10 +559,10 @@ public static KafkaSubscribableChannelSpec channel(KafkaTemplate<?, ?> template,
559559
* @return the spec.
560560
* @since 3.3
561561
*/
562-
public static KafkaSubscribableChannelSpec publishSubscribeChannel(KafkaTemplate<?, ?> template,
562+
public static KafkaPublishSubscribeChannelSpec publishSubscribeChannel(KafkaTemplate<?, ?> template,
563563
KafkaListenerContainerFactory<?> containerFactory, String topic) {
564564

565-
return new KafkaSubscribableChannelSpec(template, containerFactory, topic, true);
565+
return new KafkaPublishSubscribeChannelSpec(template, containerFactory, topic);
566566
}
567567

568568
/**
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.kafka.dsl;
18+
19+
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
20+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
21+
import org.springframework.kafka.core.KafkaTemplate;
22+
23+
/**
24+
* Spec for a point to point channel backed by a Kafka topic.
25+
*
26+
* @author Gary Russell
27+
* @since 3.3
28+
*
29+
*/
30+
public class KafkaPointToPointChannelSpec extends KafkaSubscribableChannelSpec<SubscribableKafkaChannel> {
31+
32+
protected KafkaPointToPointChannelSpec(KafkaTemplate<?, ?> template, KafkaListenerContainerFactory<?> factory,
33+
String topic) {
34+
35+
this.channel = new SubscribableKafkaChannel(template, factory, topic);
36+
}
37+
38+
}

src/main/java/org/springframework/integration/kafka/dsl/KafkaPollableChannelSpec.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@
2727
* @since 3.3
2828
*
2929
*/
30-
public class KafkaPollableChannelSpec extends AbstractKafkaChannelSpec<KafkaPollableChannelSpec> {
30+
public class KafkaPollableChannelSpec extends AbstractKafkaChannelSpec<KafkaPollableChannelSpec, PollableKafkaChannel> {
3131

3232
protected KafkaPollableChannelSpec(KafkaTemplate<?, ?> template, KafkaMessageSource<?, ?> source) {
33-
super(null, null);
3433
this.channel = new PollableKafkaChannel(template, source);
3534
}
3635

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.kafka.dsl;
18+
19+
import org.springframework.integration.kafka.channel.PublishSubscribeKafkaChannel;
20+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
21+
import org.springframework.kafka.core.KafkaTemplate;
22+
23+
/**
24+
* Spec for a publish/subscribe channel backed by a Kafka topic.
25+
*
26+
* @author Gary Russell
27+
* @since 3.3
28+
*
29+
*/
30+
public class KafkaPublishSubscribeChannelSpec
31+
extends KafkaSubscribableChannelSpec<PublishSubscribeKafkaChannel> {
32+
33+
protected KafkaPublishSubscribeChannelSpec(KafkaTemplate<?, ?> template, KafkaListenerContainerFactory<?> factory,
34+
String topic) {
35+
36+
this.channel = new PublishSubscribeKafkaChannel(template, factory, topic);
37+
}
38+
39+
}

src/main/java/org/springframework/integration/kafka/dsl/KafkaSubscribableChannelSpec.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,17 @@
1717
package org.springframework.integration.kafka.dsl;
1818

1919
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
20-
import org.springframework.kafka.config.KafkaListenerContainerFactory;
21-
import org.springframework.kafka.core.KafkaTemplate;
2220

2321
/**
2422
* Spec for a subscribable channel.
2523
*
24+
* @param <C> the channel type.
25+
*
2626
* @author Gary Russell
2727
* @since 3.3
2828
*
2929
*/
30-
public class KafkaSubscribableChannelSpec extends AbstractKafkaChannelSpec<KafkaSubscribableChannelSpec> {
31-
32-
protected KafkaSubscribableChannelSpec(KafkaTemplate<?, ?> template, KafkaListenerContainerFactory<?> factory,
33-
String topic, boolean pubSub) {
34-
35-
super(template, topic);
36-
this.channel = new SubscribableKafkaChannel(template, factory, topic, pubSub);
37-
}
30+
public abstract class KafkaSubscribableChannelSpec<C extends SubscribableKafkaChannel>
31+
extends AbstractKafkaChannelSpec<KafkaSubscribableChannelSpec<C>, C> {
3832

3933
}

0 commit comments

Comments
 (0)