diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java
new file mode 100644
index 00000000000..f8bc59ea98b
--- /dev/null
+++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.channel;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.function.Function;
+
+import org.springframework.integration.IntegrationMessageHeaderAccessor;
+import org.springframework.integration.dispatcher.LoadBalancingStrategy;
+import org.springframework.integration.dispatcher.PartitionedDispatcher;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
+import org.springframework.util.Assert;
+
+/**
+ * An {@link AbstractExecutorChannel} implementation for partitioned message dispatching.
+ * Requires a number of partitions where each of them is backed by a dedicated thread.
+ * The {@code partitionKeyFunction} is used to determine to which partition the message
+ * has to be dispatched.
+ * By default, the {@link IntegrationMessageHeaderAccessor#CORRELATION_ID} message header is used
+ * for partition key.
+ *
+ * The actual dispatching and threading logic is implemented in the {@link PartitionedDispatcher}.
+ *
+ * The default {@link ThreadFactory} is based on the bean name of this channel plus {@code -partition-thread-}.
+ * Thus, every thread name will reflect a partition it belongs to.
+ *
+ * The rest of the logic is similar to the {@link ExecutorChannel}, which includes:
+ * - load balancing for subscribers;
+ * - fail-over and error handling;
+ * - channel operations intercepting.
+ *
+ * @author Artem Bilan
+ *
+ * @since 6.1
+ *
+ * @see PartitionedDispatcher
+ */
+public class PartitionedChannel extends AbstractExecutorChannel {
+
+ @Nullable
+ private ThreadFactory threadFactory;
+
+ /**
+ * Instantiate based on a provided number of partitions and function resolving a partition key from
+ * the {@link IntegrationMessageHeaderAccessor#CORRELATION_ID} message header.
+ * @param partitionCount the number of partitions in this channel.
+ * sent to this channel.
+ */
+ public PartitionedChannel(int partitionCount) {
+ this(partitionCount, (message) -> message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID));
+ }
+
+ /**
+ * Instantiate based on a provided number of partitions and function for partition key against
+ * the message.
+ * @param partitionCount the number of partitions in this channel.
+ * @param partitionKeyFunction the function to resolve a partition key against the message
+ * sent to this channel.
+ */
+ public PartitionedChannel(int partitionCount, Function, Object> partitionKeyFunction) {
+ super(null);
+ this.dispatcher = new PartitionedDispatcher(partitionCount, partitionKeyFunction);
+ }
+
+ /**
+ * Set a {@link ThreadFactory} for executors per partitions.
+ * Propagated down to the {@link PartitionedDispatcher}.
+ * Defaults to the {@link CustomizableThreadFactory} based on the bean name
+ * of this channel plus {@code -partition-thread-}.
+ * @param threadFactory the {@link ThreadFactory} to use.
+ */
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ Assert.notNull(threadFactory, "'threadFactory' must not be null");
+ this.threadFactory = threadFactory;
+ }
+
+ /**
+ * Specify whether the channel's dispatcher should have failover enabled.
+ * By default, it will. Set this value to 'false' to disable it.
+ * @param failover The failover boolean.
+ */
+ public void setFailover(boolean failover) {
+ getDispatcher().setFailover(failover);
+ }
+
+ /**
+ * Provide a {@link LoadBalancingStrategy} for the {@link PartitionedDispatcher}.
+ * @param loadBalancingStrategy The load balancing strategy implementation.
+ */
+ public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
+ getDispatcher().setLoadBalancingStrategy(loadBalancingStrategy);
+ }
+
+ @Override
+ protected PartitionedDispatcher getDispatcher() {
+ return (PartitionedDispatcher) this.dispatcher;
+ }
+
+ @Override
+ protected void onInit() {
+ super.onInit();
+
+ if (this.threadFactory == null) {
+ this.threadFactory = new CustomizableThreadFactory(getComponentName() + "-partition-thread-");
+ }
+ PartitionedDispatcher partitionedDispatcher = getDispatcher();
+ partitionedDispatcher.setThreadFactory(this.threadFactory);
+
+ if (this.maxSubscribers == null) {
+ partitionedDispatcher.setMaxSubscribers(getIntegrationProperties().getChannelsMaxUnicastSubscribers());
+ }
+
+ partitionedDispatcher.setErrorHandler(ChannelUtils.getErrorHandler(getBeanFactory()));
+
+ partitionedDispatcher.setMessageHandlingTaskDecorator(task -> {
+ if (this.executorInterceptorsSize > 0) {
+ return new MessageHandlingTask(task);
+ }
+ else {
+ return task;
+ }
+ });
+
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ getDispatcher().shutdown();
+ }
+
+}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java
new file mode 100644
index 00000000000..2b727ac61ad
--- /dev/null
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.dispatcher;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.Function;
+
+import org.springframework.integration.util.ErrorHandlingTaskExecutor;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
+import org.springframework.util.Assert;
+import org.springframework.util.ErrorHandler;
+
+/**
+ * An {@link AbstractDispatcher} implementation for distributing messages to
+ * dedicated threads according to the key determined by the provided function against
+ * the message to dispatch.
+ *
+ * Every partition, created by this class, is a {@link UnicastingDispatcher}
+ * delegate based on a single thread {@link Executor}.
+ *
+ * The number of partitions should be a reasonable value for the application environment
+ * since every partition is based on a dedicated thread for message processing.
+ *
+ * The rest of the logic is similar to {@link UnicastingDispatcher} behavior.
+ *
+ * @author Artem Bilan
+ *
+ * @since 6.1
+ */
+public class PartitionedDispatcher extends AbstractDispatcher {
+
+ private final Map partitions = new HashMap<>();
+
+ private final List executors = new ArrayList<>();
+
+ private final int partitionCount;
+
+ private final Function, Object> partitionKeyFunction;
+
+ private ThreadFactory threadFactory = new CustomizableThreadFactory("partition-thread-");
+
+ private boolean failover = true;
+
+ @Nullable
+ private LoadBalancingStrategy loadBalancingStrategy;
+
+ private ErrorHandler errorHandler;
+
+ private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;
+
+ /**
+ * Instantiate based on a provided number of partitions and function for partition key against
+ * the message to dispatch.
+ * @param partitionCount the number of partitions in this channel.
+ * @param partitionKeyFunction the function to resolve a partition key against the message
+ * to dispatch.
+ */
+ public PartitionedDispatcher(int partitionCount, Function, Object> partitionKeyFunction) {
+ Assert.isTrue(partitionCount > 0, "'partitionCount' must be greater than 0");
+ Assert.notNull(partitionKeyFunction, "'partitionKeyFunction' must not be null");
+ this.partitionKeyFunction = partitionKeyFunction;
+ this.partitionCount = partitionCount;
+ }
+
+ /**
+ * Set a {@link ThreadFactory} for executors per partitions.
+ * Defaults to the {@link CustomizableThreadFactory} based on a {@code partition-thread-} prefix.
+ * @param threadFactory the {@link ThreadFactory} to use.
+ */
+ public void setThreadFactory(ThreadFactory threadFactory) {
+ Assert.notNull(threadFactory, "'threadFactory' must not be null");
+ this.threadFactory = threadFactory;
+ }
+
+ /**
+ * Specify whether partition dispatchers should have failover enabled.
+ * By default, it will. Set this value to 'false' to disable it.
+ * @param failover The failover boolean.
+ */
+ public void setFailover(boolean failover) {
+ this.failover = failover;
+ }
+
+ /**
+ * Provide a {@link LoadBalancingStrategy} for partition dispatchers.
+ * @param loadBalancingStrategy The load balancing strategy implementation.
+ */
+ public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
+ this.loadBalancingStrategy = loadBalancingStrategy;
+ }
+
+ /**
+ * Provide a {@link ErrorHandler} for wrapping partition {@link Executor}
+ * to the {@link ErrorHandlingTaskExecutor}.
+ * @param errorHandler the {@link ErrorHandler} to use.
+ */
+ public void setErrorHandler(ErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+
+ /**
+ * Set a {@link MessageHandlingTaskDecorator} to wrap a message handling task into some
+ * addition logic, e.g. message channel may provide an interception for its operations.
+ * @param messageHandlingTaskDecorator the {@link MessageHandlingTaskDecorator} to use.
+ */
+ public void setMessageHandlingTaskDecorator(MessageHandlingTaskDecorator messageHandlingTaskDecorator) {
+ Assert.notNull(messageHandlingTaskDecorator, "'messageHandlingTaskDecorator' must not be null.");
+ this.messageHandlingTaskDecorator = messageHandlingTaskDecorator;
+ }
+
+ /**
+ * Shutdown this dispatcher on application close.
+ * The partition executors are shutdown and internal state of this instance is cleared.
+ */
+ public void shutdown() {
+ this.executors.forEach(ExecutorService::shutdown);
+ this.executors.clear();
+ this.partitions.clear();
+ }
+
+ @Override
+ public boolean dispatch(Message> message) {
+ populatedPartitions();
+ int partition = Math.abs(this.partitionKeyFunction.apply(message).hashCode()) % this.partitionCount;
+ UnicastingDispatcher partitionDispatcher = this.partitions.get(partition);
+ return partitionDispatcher.dispatch(message);
+ }
+
+ private synchronized void populatedPartitions() {
+ if (this.partitions.isEmpty()) {
+ for (int i = 0; i < this.partitionCount; i++) {
+ this.partitions.put(i, newPartition());
+ }
+ }
+ }
+
+ private UnicastingDispatcher newPartition() {
+ ExecutorService executor = Executors.newSingleThreadExecutor(this.threadFactory);
+ this.executors.add(executor);
+ DelegateDispatcher delegateDispatcher =
+ new DelegateDispatcher(new ErrorHandlingTaskExecutor(executor, this.errorHandler));
+ delegateDispatcher.setFailover(this.failover);
+ delegateDispatcher.setLoadBalancingStrategy(this.loadBalancingStrategy);
+ delegateDispatcher.setMessageHandlingTaskDecorator(this.messageHandlingTaskDecorator);
+ return delegateDispatcher;
+ }
+
+ private final class DelegateDispatcher extends UnicastingDispatcher {
+
+ DelegateDispatcher(Executor executor) {
+ super(executor);
+ }
+
+ @Override
+ protected Set getHandlers() {
+ return PartitionedDispatcher.this.getHandlers();
+ }
+
+ @Override
+ protected boolean tryOptimizedDispatch(Message> message) {
+ return PartitionedDispatcher.this.tryOptimizedDispatch(message);
+ }
+
+ }
+
+}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java
index b3c9770ba0d..d26b62d624e 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2021 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -147,6 +147,27 @@ public FluxMessageChannelSpec flux(String id) {
return MessageChannels.flux(id);
}
+ /**
+ * Create a {@link PartitionedChannelSpec}.
+ * @param partitionCount the number of partitions in the channel.
+ * @return the {@link PartitionedChannelSpec}.
+ * @since 6.1
+ */
+ public PartitionedChannelSpec partitioned(int partitionCount) {
+ return MessageChannels.partitioned(partitionCount);
+ }
+
+ /**
+ * Create a {@link PartitionedChannelSpec}.
+ * @param id the bean name for the channel.
+ * @param partitionCount the number of partitions in the channel.
+ * @return the {@link PartitionedChannelSpec}.
+ * @since 6.1
+ */
+ public PartitionedChannelSpec partitioned(String id, int partitionCount) {
+ return MessageChannels.partitioned(id, partitionCount);
+ }
+
private Channels() {
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageChannels.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageChannels.java
index 790a502f9c4..59a7616d0a2 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageChannels.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageChannels.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2021 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -149,6 +149,27 @@ public static FluxMessageChannelSpec flux(String id) {
return flux().id(id);
}
+ /**
+ * Create a {@link PartitionedChannelSpec}.
+ * @param partitionCount the number of partitions in the channel.
+ * @return the {@link PartitionedChannelSpec}.
+ * @since 6.1
+ */
+ public static PartitionedChannelSpec partitioned(int partitionCount) {
+ return new PartitionedChannelSpec(partitionCount);
+ }
+
+ /**
+ * Create a {@link PartitionedChannelSpec}.
+ * @param id the bean name for the channel.
+ * @param partitionCount the number of partitions in the channel.
+ * @return the {@link PartitionedChannelSpec}.
+ * @since 6.1
+ */
+ public static PartitionedChannelSpec partitioned(String id, int partitionCount) {
+ return new PartitionedChannelSpec(partitionCount).id(id);
+ }
+
private MessageChannels() {
}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java
new file mode 100644
index 00000000000..0e8d31ee834
--- /dev/null
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.dsl;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.function.Function;
+
+import org.springframework.integration.channel.PartitionedChannel;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+
+/**
+ * A {@link LoadBalancingChannelSpec} implementation for the {@link PartitionedChannel}.
+ *
+ * @author Artem Bilan
+ *
+ * @since 6.1
+ */
+public class PartitionedChannelSpec extends LoadBalancingChannelSpec {
+
+ private final int partitionCount;
+
+ @Nullable
+ private Function, Object> partitionKeyFunction;
+
+ @Nullable
+ private ThreadFactory threadFactory;
+
+ protected PartitionedChannelSpec(int partitionCount) {
+ this.partitionCount = partitionCount;
+ }
+
+ public PartitionedChannelSpec partitionKey(Function, Object> partitionKeyFunction) {
+ this.partitionKeyFunction = partitionKeyFunction;
+ return this;
+ }
+
+ public PartitionedChannelSpec threadFactory(ThreadFactory threadFactory) {
+ this.threadFactory = threadFactory;
+ return this;
+ }
+
+ @Override
+ protected PartitionedChannel doGet() {
+ if (this.partitionKeyFunction != null) {
+ this.channel = new PartitionedChannel(this.partitionCount, this.partitionKeyFunction);
+ }
+ else {
+ this.channel = new PartitionedChannel(this.partitionCount);
+ }
+ this.channel.setLoadBalancingStrategy(this.loadBalancingStrategy);
+ if (this.failover != null) {
+ this.channel.setFailover(this.failover);
+ }
+ if (this.maxSubscribers != null) {
+ this.channel.setMaxSubscribers(this.maxSubscribers);
+ }
+ if (this.threadFactory != null) {
+ this.channel.setThreadFactory(this.threadFactory);
+ }
+ return super.doGet();
+ }
+
+}
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/PartitionedChannelTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/PartitionedChannelTests.java
new file mode 100644
index 00000000000..77c2471952e
--- /dev/null
+++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/PartitionedChannelTests.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.channel;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.api.Test;
+
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.PollableChannel;
+import org.springframework.messaging.support.ExecutorChannelInterceptor;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * @author Artem Bilan
+ *
+ * @since 6.1
+ */
+@SpringJUnitConfig
+public class PartitionedChannelTests {
+
+ @Test
+ void messagesAreProperlyPartitioned() throws InterruptedException {
+ PartitionedChannel partitionedChannel =
+ new PartitionedChannel(2, (message) -> message.getHeaders().get("partitionKey"));
+ partitionedChannel.setBeanFactory(mock(BeanFactory.class));
+ partitionedChannel.setBeanName("testPartitionedChannel");
+
+ CountDownLatch handleLatch = new CountDownLatch(4);
+
+ partitionedChannel.addInterceptor(new ExecutorChannelInterceptor() {
+
+ @Override
+ public void afterMessageHandled(Message> message, MessageChannel ch, MessageHandler h, Exception ex) {
+ handleLatch.countDown();
+ }
+
+ });
+ partitionedChannel.afterPropertiesSet();
+
+ MultiValueMap> partitionedMessages = new LinkedMultiValueMap<>();
+
+ partitionedChannel.subscribe((message) -> partitionedMessages.add(Thread.currentThread().getName(), message));
+
+ partitionedChannel.send(MessageBuilder.withPayload("test1").setHeader("partitionKey", "1").build());
+ partitionedChannel.send(MessageBuilder.withPayload("test2").setHeader("partitionKey", "2").build());
+ partitionedChannel.send(MessageBuilder.withPayload("test3").setHeader("partitionKey", "2").build());
+ partitionedChannel.send(MessageBuilder.withPayload("test4").setHeader("partitionKey", "1").build());
+
+ assertThat(handleLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ assertThat(partitionedMessages).hasSize(2);
+ partitionedMessages.values()
+ .forEach(messagesInPartition -> {
+ assertThat(messagesInPartition).hasSize(2);
+ assertThat(messagesInPartition.get(0).getHeaders().get("partitionKey"))
+ .isEqualTo(messagesInPartition.get(1).getHeaders().get("partitionKey"));
+ });
+
+
+ HashSet allocatedPartitions = new HashSet<>(partitionedMessages.keySet());
+ partitionedMessages.clear();
+
+ CountDownLatch anotherHandleLatch = new CountDownLatch(1);
+
+ partitionedChannel.addInterceptor(new ExecutorChannelInterceptor() {
+
+ @Override
+ public void afterMessageHandled(Message> message, MessageChannel ch, MessageHandler h, Exception ex) {
+ anotherHandleLatch.countDown();
+ }
+
+ });
+
+ partitionedChannel.send(MessageBuilder.withPayload("test4").setHeader("partitionKey", "3").build());
+
+ assertThat(anotherHandleLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ assertThat(partitionedMessages).hasSize(1);
+ String partitionForLastMessage = partitionedMessages.keySet().iterator().next();
+ assertThat(partitionForLastMessage).isIn(allocatedPartitions);
+
+ partitionedChannel.destroy();
+ }
+
+ @Autowired
+ @Qualifier("someFlow.input")
+ MessageChannel inputChannel;
+
+ @Autowired
+ PollableChannel resultChannel;
+
+ @Test
+ void messagesArePartitionedByCorrelationId() {
+ this.inputChannel.send(new GenericMessage<>(IntStream.range(0, 5).toArray()));
+
+ Message> receive = this.resultChannel.receive(10_000);
+
+ assertThat(receive).isNotNull()
+ .extracting(Message::getPayload)
+ .asList()
+ .hasSize(5);
+
+ @SuppressWarnings("unchecked")
+ Set strings = new HashSet<>((Collection extends String>) receive.getPayload());
+ assertThat(strings).hasSize(1)
+ .allMatch(value -> value.startsWith("testChannel-partition-thread-"));
+ }
+
+ @Configuration
+ @EnableIntegration
+ public static class TestConfiguration {
+
+ @Bean
+ IntegrationFlow someFlow() {
+ return f -> f
+ .split()
+ .channel(c -> c.partitioned("testChannel", 10))
+ .transform(p -> Thread.currentThread().getName())
+ .aggregate()
+ .channel(c -> c.queue("resultChannel"));
+ }
+
+ }
+
+}
diff --git a/src/reference/asciidoc/channel.adoc b/src/reference/asciidoc/channel.adoc
index 9c3cc92b908..61e6da4d484 100644
--- a/src/reference/asciidoc/channel.adoc
+++ b/src/reference/asciidoc/channel.adoc
@@ -203,6 +203,30 @@ CAUTION: The sender can sometimes block.
For example, when using a `TaskExecutor` with a rejection policy that throttles the client (such as the `ThreadPoolExecutor.CallerRunsPolicy`), the sender's thread can execute the method any time the thread pool is at its maximum capacity and the executor's work queue is full.
Since that situation would only occur in a non-predictable way, you should not rely upon it for transactions.
+[[partitioned-channel]]
+===== `PartitionedChannel`
+
+Starting with version 6.1, a `PartitionedChannel` implementation is provided.
+This is an extension of `AbstractExecutorChannel` and represents point-to-point dispatching logic where the actual consumption is processed on a specific thread, determined by the partition key evaluated from a message sent to this channel.
+This channel is similar to the `ExecutorChannel` mentioned above, but with the difference that messages with the same partition key are always handled in the same thread, preserving ordering.
+It does not require an external `TaskExecutor`, but can be configured with a custom `ThreadFactory` (e.g. `Thread.ofVirtual().name("partition-", 0).factory()`).
+This factory is used to populate single-thread executors into a `MessageDispatcher` delegate, per partition.
+By default, the `IntegrationMessageHeaderAccessor.CORRELATION_ID` message header is used as the partition key.
+This channel can be configured as a simple bean:
+
+====
+[source,java]
+----
+@Bean
+PartitionedChannel somePartitionedChannel() {
+ return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
+}
+----
+====
+
+The channel will have `3` partitions - dedicated threads; will use the `partitionKey` header to determine in which partition the message will be handled.
+See `PartitionedChannel` class Javadocs for more information.
+
[[flux-message-channel]]
===== `FluxMessageChannel`
diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc
index 0ea1a793362..bb7cfb7db8e 100644
--- a/src/reference/asciidoc/whats-new.adoc
+++ b/src/reference/asciidoc/whats-new.adoc
@@ -34,6 +34,11 @@ See <<./handler-advice.adoc#context-holder-advice, Context Holder Advice>> for m
The `IntegrationFlow` can now end with a convenient `handleReactive(ReactiveMessageHandler)` operator.
See <<./reactive-streams.adoc#reactive-message-handler, `ReactiveMessageHandler`>> for more information.
+[[x6.1-partitioned-channel]]
+==== `PartitionedChannel`
+A new `PartitionedChannel` has been introduced to process messages with the same partition key in the same thread.
+See <<./channel.adoc#partitioned-channel, `PartitionedChannel`>> for more information.
+
[[x6.1-general]]
=== General Changes