Skip to content

GH-3192: pub-sub DSL for broker-backed channels #3193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,22 +25,25 @@
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.dispatcher.AbstractDispatcher;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;

/**
* The {@link AbstractSubscribableAmqpChannel} extension for pub-sub semantics based on the {@link FanoutExchange}.
*
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1
*/
public class PublishSubscribeAmqpChannel extends AbstractSubscribableAmqpChannel {

private volatile FanoutExchange exchange;
public class PublishSubscribeAmqpChannel extends AbstractSubscribableAmqpChannel implements BroadcastCapableChannel {

private final Queue queue = new AnonymousQueue();

private volatile FanoutExchange exchange;

private volatile Binding binding;

/**
Expand All @@ -53,6 +56,7 @@ public class PublishSubscribeAmqpChannel extends AbstractSubscribableAmqpChannel
*/
public PublishSubscribeAmqpChannel(String channelName, AbstractMessageListenerContainer container,
AmqpTemplate amqpTemplate) {

super(channelName, container, amqpTemplate, true);
}

Expand All @@ -69,6 +73,7 @@ public PublishSubscribeAmqpChannel(String channelName, AbstractMessageListenerCo
*/
public PublishSubscribeAmqpChannel(String channelName, AbstractMessageListenerContainer container,
AmqpTemplate amqpTemplate, AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {

super(channelName, container, amqpTemplate, true, outboundMapper, inboundMapper);
}

Expand Down Expand Up @@ -104,7 +109,7 @@ protected String obtainQueueName(String channelName) {
@Override
protected AbstractDispatcher createDispatcher() {
BroadcastingDispatcher broadcastingDispatcher = new BroadcastingDispatcher(true);
broadcastingDispatcher.setBeanFactory(this.getBeanFactory());
broadcastingDispatcher.setBeanFactory(getBeanFactory());
return broadcastingDispatcher;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.channel;

import org.springframework.messaging.SubscribableChannel;

/**
* A {@link SubscribableChannel} variant for implementations with broadcasting capabilities.
*
* @author Artem Bilan
*
* @since 5.3
*/
public interface BroadcastCapableChannel extends SubscribableChannel {

/**
* Return a state of this channel in regards of broadcasting capabilities.
* @return the state of this channel in regards of broadcasting capabilities.
*/
default boolean isBroadcast() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* @author Gary Russell
* @author Artem Bilan
*/
public class PublishSubscribeChannel extends AbstractExecutorChannel {
public class PublishSubscribeChannel extends AbstractExecutorChannel implements BroadcastCapableChannel {

private ErrorHandler errorHandler;

Expand All @@ -45,6 +45,14 @@ public class PublishSubscribeChannel extends AbstractExecutorChannel {

private int minSubscribers;

/**
* Create a PublishSubscribeChannel that will invoke the handlers in the
* message sender's thread.
*/
public PublishSubscribeChannel() {
this(null);
}

/**
* Create a PublishSubscribeChannel that will use an {@link Executor}
* to invoke the handlers. If this is null, each invocation will occur in
Expand All @@ -56,14 +64,6 @@ public PublishSubscribeChannel(@Nullable Executor executor) {
this.dispatcher = new BroadcastingDispatcher(executor);
}

/**
* Create a PublishSubscribeChannel that will invoke the handlers in the
* message sender's thread.
*/
public PublishSubscribeChannel() {
this(null);
}


@Override
public String getComponentType() {
Expand Down Expand Up @@ -135,7 +135,7 @@ public void setMinSubscribers(int minSubscribers) {
@Override
public final void onInit() {
super.onInit();
BeanFactory beanFactory = this.getBeanFactory();
BeanFactory beanFactory = getBeanFactory();
BroadcastingDispatcher dispatcherToUse = getDispatcher();
if (this.executor != null) {
Assert.state(dispatcherToUse.getHandlerCount() == 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.aggregator.AggregatingMessageHandler;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.channel.FluxMessageChannel;
Expand Down Expand Up @@ -294,6 +295,25 @@ public B publishSubscribeChannel(Executor executor,
return addComponents(spec.getComponentsToRegister()).channel(spec);
}

/**
* The {@link BroadcastCapableChannel} {@link #channel}
* method specific implementation to allow the use of the 'subflow' subscriber capability.
* @param broadcastCapableChannel the {@link BroadcastCapableChannel} to subscriber sub-flows to.
* @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
* {@link BroadcastPublishSubscribeSpec} 'subflow' definitions.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @since 5.3
*/
public B publishSubscribeChannel(BroadcastCapableChannel broadcastCapableChannel,
Consumer<BroadcastPublishSubscribeSpec> publishSubscribeChannelConfigurer) {

Assert.notNull(publishSubscribeChannelConfigurer, "'publishSubscribeChannelConfigurer' must not be null");
BroadcastPublishSubscribeSpec spec = new BroadcastPublishSubscribeSpec(broadcastCapableChannel);
publishSubscribeChannelConfigurer.accept(spec);
return addComponents(spec.getComponentsToRegister())
.channel(broadcastCapableChannel);
}

/**
* Populate the {@code Wire Tap} EI Pattern specific
* {@link org.springframework.messaging.support.ChannelInterceptor} implementation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.dsl;

import java.util.LinkedHashMap;
import java.util.Map;

import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;

/**
* An {@link IntegrationComponentSpec} for configuring sub-flow subscribers on the
* provided {@link BroadcastCapableChannel}.
*
* @author Artem Bilan
* @author Gary Russell
*
* @since 5.3
*/
public class BroadcastPublishSubscribeSpec
extends IntegrationComponentSpec<BroadcastPublishSubscribeSpec, BroadcastCapableChannel>
implements ComponentsRegistration {

private final Map<Object, String> subscriberFlows = new LinkedHashMap<>();

private int order;

protected BroadcastPublishSubscribeSpec(BroadcastCapableChannel channel) {
Assert.state(channel.isBroadcast(),
() -> "the " + channel +
" must be in the 'broadcast' state for using from this 'BroadcastPublishSubscribeSpec'");
this.target = channel;
}

/**
* Configure a {@link IntegrationFlow} to configure as a subscriber
* for the current {@link BroadcastCapableChannel}.
* @param subFlow the {@link IntegrationFlow} to configure as a subscriber
* for the current {@link BroadcastCapableChannel}.
* @return the current spec
*/
public BroadcastPublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
Assert.notNull(subFlow, "'subFlow' must not be null");

IntegrationFlowBuilder flowBuilder =
IntegrationFlows.from(this.target)
.bridge(consumer -> consumer.order(this.order++));

MessageChannel subFlowInput = subFlow.getInputChannel();

if (subFlowInput == null) {
subFlow.configure(flowBuilder);
}
else {
flowBuilder.channel(subFlowInput);
}
this.subscriberFlows.put(flowBuilder.get(), null);
return _this();
}

@Override
public Map<Object, String> getComponentsToRegister() {
return this.subscriberFlows;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,27 @@
import java.util.concurrent.Executor;

import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;

/**
* The {@link PublishSubscribeChannelSpec} extension to configure as a general flow callback for sub-flows
* as subscribers.
*
* @author Artem Bilan
* @author Gary Russell
*
* @since 5.0
*/
public class PublishSubscribeSpec extends PublishSubscribeChannelSpec<PublishSubscribeSpec> {

private final Map<Object, String> subscriberFlows = new LinkedHashMap<>();

private int order;
private final BroadcastPublishSubscribeSpec delegate;

protected PublishSubscribeSpec() {
this.delegate = new BroadcastPublishSubscribeSpec(this.channel);
}

protected PublishSubscribeSpec(@Nullable Executor executor) {
super(executor);
this.delegate = new BroadcastPublishSubscribeSpec(this.channel);
}

@Override
Expand All @@ -49,29 +50,15 @@ public PublishSubscribeSpec id(String id) { // NOSONAR - not useless, increases
}

public PublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
Assert.notNull(subFlow, "'subFlow' must not be null");

IntegrationFlowBuilder flowBuilder =
IntegrationFlows.from(this.channel)
.bridge(consumer -> consumer.order(this.order++));

MessageChannel subFlowInput = subFlow.getInputChannel();

if (subFlowInput == null) {
subFlow.configure(flowBuilder);
}
else {
flowBuilder.channel(subFlowInput);
}
this.subscriberFlows.put(flowBuilder.get(), null);
this.delegate.subscribe(subFlow);
return _this();
}

@Override
public Map<Object, String> getComponentsToRegister() {
Map<Object, String> objects = new LinkedHashMap<>();
objects.putAll(super.getComponentsToRegister());
objects.putAll(this.subscriberFlows);
objects.putAll(this.delegate.getComponentsToRegister());
return objects;
}

Expand Down
Loading