Skip to content

Commit 5f1e12e

Browse files
authored
Introduce PartitionedChannel (#8617)
* Introduce `PartitionedChannel` * Implement a `PartitionedChannel` as an extension of the `AbstractExecutorChannel` * Supply this channel with a `PartitionedDispatcher` which is an extension of the `AbstractDispatcher` * The target partition is essentially a `UnicastingDispatcher` with a single thead executor * * Fix language in Javadocs * Add docs for `PartitionedChannel` into `channel.adoc` * Pre-populate partitions in the `PartitionedDispatcher` to ensure a thread number reflection of the partition it is used for (for default `ThreadFactory`) * Add Javadocs to `public` methods * Add Java DSL `PartitionedChannelSpec` and respective factory methods into `Channels` * Use `IntegrationMessageHeaderAccessor.CORRELATION_ID` header as a default partition key * * Fix language in Javadocs * Fix Checkstyle violations * Mark `PartitionedDispatcher.prePopulatedPartitions()` with `synchronized` * * Populate partitions from `PartitionedDispatcher` ctor and when a custom `ThreadFactory` is set * * Clear `executors` in `populatedPartitions()` * * Bring back `synchronized populatedPartitions()` and use it in the `dispatch()`
1 parent 32b6d82 commit 5f1e12e

File tree

8 files changed

+651
-2
lines changed

8 files changed

+651
-2
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.channel;
18+
19+
import java.util.concurrent.ThreadFactory;
20+
import java.util.function.Function;
21+
22+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
23+
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
24+
import org.springframework.integration.dispatcher.PartitionedDispatcher;
25+
import org.springframework.lang.Nullable;
26+
import org.springframework.messaging.Message;
27+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* An {@link AbstractExecutorChannel} implementation for partitioned message dispatching.
32+
* Requires a number of partitions where each of them is backed by a dedicated thread.
33+
* The {@code partitionKeyFunction} is used to determine to which partition the message
34+
* has to be dispatched.
35+
* By default, the {@link IntegrationMessageHeaderAccessor#CORRELATION_ID} message header is used
36+
* for partition key.
37+
* <p>
38+
* The actual dispatching and threading logic is implemented in the {@link PartitionedDispatcher}.
39+
* <p>
40+
* The default {@link ThreadFactory} is based on the bean name of this channel plus {@code -partition-thread-}.
41+
* Thus, every thread name will reflect a partition it belongs to.
42+
* <p>
43+
* The rest of the logic is similar to the {@link ExecutorChannel}, which includes:
44+
* - load balancing for subscribers;
45+
* - fail-over and error handling;
46+
* - channel operations intercepting.
47+
*
48+
* @author Artem Bilan
49+
*
50+
* @since 6.1
51+
*
52+
* @see PartitionedDispatcher
53+
*/
54+
public class PartitionedChannel extends AbstractExecutorChannel {
55+
56+
@Nullable
57+
private ThreadFactory threadFactory;
58+
59+
/**
60+
* Instantiate based on a provided number of partitions and function resolving a partition key from
61+
* the {@link IntegrationMessageHeaderAccessor#CORRELATION_ID} message header.
62+
* @param partitionCount the number of partitions in this channel.
63+
* sent to this channel.
64+
*/
65+
public PartitionedChannel(int partitionCount) {
66+
this(partitionCount, (message) -> message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID));
67+
}
68+
69+
/**
70+
* Instantiate based on a provided number of partitions and function for partition key against
71+
* the message.
72+
* @param partitionCount the number of partitions in this channel.
73+
* @param partitionKeyFunction the function to resolve a partition key against the message
74+
* sent to this channel.
75+
*/
76+
public PartitionedChannel(int partitionCount, Function<Message<?>, Object> partitionKeyFunction) {
77+
super(null);
78+
this.dispatcher = new PartitionedDispatcher(partitionCount, partitionKeyFunction);
79+
}
80+
81+
/**
82+
* Set a {@link ThreadFactory} for executors per partitions.
83+
* Propagated down to the {@link PartitionedDispatcher}.
84+
* Defaults to the {@link CustomizableThreadFactory} based on the bean name
85+
* of this channel plus {@code -partition-thread-}.
86+
* @param threadFactory the {@link ThreadFactory} to use.
87+
*/
88+
public void setThreadFactory(ThreadFactory threadFactory) {
89+
Assert.notNull(threadFactory, "'threadFactory' must not be null");
90+
this.threadFactory = threadFactory;
91+
}
92+
93+
/**
94+
* Specify whether the channel's dispatcher should have failover enabled.
95+
* By default, it will. Set this value to 'false' to disable it.
96+
* @param failover The failover boolean.
97+
*/
98+
public void setFailover(boolean failover) {
99+
getDispatcher().setFailover(failover);
100+
}
101+
102+
/**
103+
* Provide a {@link LoadBalancingStrategy} for the {@link PartitionedDispatcher}.
104+
* @param loadBalancingStrategy The load balancing strategy implementation.
105+
*/
106+
public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
107+
getDispatcher().setLoadBalancingStrategy(loadBalancingStrategy);
108+
}
109+
110+
@Override
111+
protected PartitionedDispatcher getDispatcher() {
112+
return (PartitionedDispatcher) this.dispatcher;
113+
}
114+
115+
@Override
116+
protected void onInit() {
117+
super.onInit();
118+
119+
if (this.threadFactory == null) {
120+
this.threadFactory = new CustomizableThreadFactory(getComponentName() + "-partition-thread-");
121+
}
122+
PartitionedDispatcher partitionedDispatcher = getDispatcher();
123+
partitionedDispatcher.setThreadFactory(this.threadFactory);
124+
125+
if (this.maxSubscribers == null) {
126+
partitionedDispatcher.setMaxSubscribers(getIntegrationProperties().getChannelsMaxUnicastSubscribers());
127+
}
128+
129+
partitionedDispatcher.setErrorHandler(ChannelUtils.getErrorHandler(getBeanFactory()));
130+
131+
partitionedDispatcher.setMessageHandlingTaskDecorator(task -> {
132+
if (this.executorInterceptorsSize > 0) {
133+
return new MessageHandlingTask(task);
134+
}
135+
else {
136+
return task;
137+
}
138+
});
139+
140+
}
141+
142+
@Override
143+
public void destroy() {
144+
super.destroy();
145+
getDispatcher().shutdown();
146+
}
147+
148+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dispatcher;
18+
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Set;
24+
import java.util.concurrent.Executor;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ThreadFactory;
28+
import java.util.function.Function;
29+
30+
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
31+
import org.springframework.lang.Nullable;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.messaging.MessageHandler;
34+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
35+
import org.springframework.util.Assert;
36+
import org.springframework.util.ErrorHandler;
37+
38+
/**
39+
* An {@link AbstractDispatcher} implementation for distributing messages to
40+
* dedicated threads according to the key determined by the provided function against
41+
* the message to dispatch.
42+
* <p>
43+
* Every partition, created by this class, is a {@link UnicastingDispatcher}
44+
* delegate based on a single thread {@link Executor}.
45+
* <p>
46+
* The number of partitions should be a reasonable value for the application environment
47+
* since every partition is based on a dedicated thread for message processing.
48+
* <p>
49+
* The rest of the logic is similar to {@link UnicastingDispatcher} behavior.
50+
*
51+
* @author Artem Bilan
52+
*
53+
* @since 6.1
54+
*/
55+
public class PartitionedDispatcher extends AbstractDispatcher {
56+
57+
private final Map<Integer, UnicastingDispatcher> partitions = new HashMap<>();
58+
59+
private final List<ExecutorService> executors = new ArrayList<>();
60+
61+
private final int partitionCount;
62+
63+
private final Function<Message<?>, Object> partitionKeyFunction;
64+
65+
private ThreadFactory threadFactory = new CustomizableThreadFactory("partition-thread-");
66+
67+
private boolean failover = true;
68+
69+
@Nullable
70+
private LoadBalancingStrategy loadBalancingStrategy;
71+
72+
private ErrorHandler errorHandler;
73+
74+
private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;
75+
76+
/**
77+
* Instantiate based on a provided number of partitions and function for partition key against
78+
* the message to dispatch.
79+
* @param partitionCount the number of partitions in this channel.
80+
* @param partitionKeyFunction the function to resolve a partition key against the message
81+
* to dispatch.
82+
*/
83+
public PartitionedDispatcher(int partitionCount, Function<Message<?>, Object> partitionKeyFunction) {
84+
Assert.isTrue(partitionCount > 0, "'partitionCount' must be greater than 0");
85+
Assert.notNull(partitionKeyFunction, "'partitionKeyFunction' must not be null");
86+
this.partitionKeyFunction = partitionKeyFunction;
87+
this.partitionCount = partitionCount;
88+
}
89+
90+
/**
91+
* Set a {@link ThreadFactory} for executors per partitions.
92+
* Defaults to the {@link CustomizableThreadFactory} based on a {@code partition-thread-} prefix.
93+
* @param threadFactory the {@link ThreadFactory} to use.
94+
*/
95+
public void setThreadFactory(ThreadFactory threadFactory) {
96+
Assert.notNull(threadFactory, "'threadFactory' must not be null");
97+
this.threadFactory = threadFactory;
98+
}
99+
100+
/**
101+
* Specify whether partition dispatchers should have failover enabled.
102+
* By default, it will. Set this value to 'false' to disable it.
103+
* @param failover The failover boolean.
104+
*/
105+
public void setFailover(boolean failover) {
106+
this.failover = failover;
107+
}
108+
109+
/**
110+
* Provide a {@link LoadBalancingStrategy} for partition dispatchers.
111+
* @param loadBalancingStrategy The load balancing strategy implementation.
112+
*/
113+
public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
114+
this.loadBalancingStrategy = loadBalancingStrategy;
115+
}
116+
117+
/**
118+
* Provide a {@link ErrorHandler} for wrapping partition {@link Executor}
119+
* to the {@link ErrorHandlingTaskExecutor}.
120+
* @param errorHandler the {@link ErrorHandler} to use.
121+
*/
122+
public void setErrorHandler(ErrorHandler errorHandler) {
123+
this.errorHandler = errorHandler;
124+
}
125+
126+
/**
127+
* Set a {@link MessageHandlingTaskDecorator} to wrap a message handling task into some
128+
* addition logic, e.g. message channel may provide an interception for its operations.
129+
* @param messageHandlingTaskDecorator the {@link MessageHandlingTaskDecorator} to use.
130+
*/
131+
public void setMessageHandlingTaskDecorator(MessageHandlingTaskDecorator messageHandlingTaskDecorator) {
132+
Assert.notNull(messageHandlingTaskDecorator, "'messageHandlingTaskDecorator' must not be null.");
133+
this.messageHandlingTaskDecorator = messageHandlingTaskDecorator;
134+
}
135+
136+
/**
137+
* Shutdown this dispatcher on application close.
138+
* The partition executors are shutdown and internal state of this instance is cleared.
139+
*/
140+
public void shutdown() {
141+
this.executors.forEach(ExecutorService::shutdown);
142+
this.executors.clear();
143+
this.partitions.clear();
144+
}
145+
146+
@Override
147+
public boolean dispatch(Message<?> message) {
148+
populatedPartitions();
149+
int partition = Math.abs(this.partitionKeyFunction.apply(message).hashCode()) % this.partitionCount;
150+
UnicastingDispatcher partitionDispatcher = this.partitions.get(partition);
151+
return partitionDispatcher.dispatch(message);
152+
}
153+
154+
private synchronized void populatedPartitions() {
155+
if (this.partitions.isEmpty()) {
156+
for (int i = 0; i < this.partitionCount; i++) {
157+
this.partitions.put(i, newPartition());
158+
}
159+
}
160+
}
161+
162+
private UnicastingDispatcher newPartition() {
163+
ExecutorService executor = Executors.newSingleThreadExecutor(this.threadFactory);
164+
this.executors.add(executor);
165+
DelegateDispatcher delegateDispatcher =
166+
new DelegateDispatcher(new ErrorHandlingTaskExecutor(executor, this.errorHandler));
167+
delegateDispatcher.setFailover(this.failover);
168+
delegateDispatcher.setLoadBalancingStrategy(this.loadBalancingStrategy);
169+
delegateDispatcher.setMessageHandlingTaskDecorator(this.messageHandlingTaskDecorator);
170+
return delegateDispatcher;
171+
}
172+
173+
private final class DelegateDispatcher extends UnicastingDispatcher {
174+
175+
DelegateDispatcher(Executor executor) {
176+
super(executor);
177+
}
178+
179+
@Override
180+
protected Set<MessageHandler> getHandlers() {
181+
return PartitionedDispatcher.this.getHandlers();
182+
}
183+
184+
@Override
185+
protected boolean tryOptimizedDispatch(Message<?> message) {
186+
return PartitionedDispatcher.this.tryOptimizedDispatch(message);
187+
}
188+
189+
}
190+
191+
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -147,6 +147,27 @@ public FluxMessageChannelSpec flux(String id) {
147147
return MessageChannels.flux(id);
148148
}
149149

150+
/**
151+
* Create a {@link PartitionedChannelSpec}.
152+
* @param partitionCount the number of partitions in the channel.
153+
* @return the {@link PartitionedChannelSpec}.
154+
* @since 6.1
155+
*/
156+
public PartitionedChannelSpec partitioned(int partitionCount) {
157+
return MessageChannels.partitioned(partitionCount);
158+
}
159+
160+
/**
161+
* Create a {@link PartitionedChannelSpec}.
162+
* @param id the bean name for the channel.
163+
* @param partitionCount the number of partitions in the channel.
164+
* @return the {@link PartitionedChannelSpec}.
165+
* @since 6.1
166+
*/
167+
public PartitionedChannelSpec partitioned(String id, int partitionCount) {
168+
return MessageChannels.partitioned(id, partitionCount);
169+
}
170+
150171
private Channels() {
151172
}
152173

0 commit comments

Comments
 (0)