Skip to content

Introduce PartitionedChannel #8617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,15 @@ public void shutdown() {

@Override
public boolean dispatch(Message<?> message) {
prePopulatedPartitionsIfAny();
if (this.partitions.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work, thread 2 might proceed before thread 1 has populated all partition execs, and NPE on the dispatcher returned by the get().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about now?
See the latest commit.

prePopulatedPartitions();
}
int partition = Math.abs(this.partitionKeyFunction.apply(message).hashCode()) % this.partitionCount;
UnicastingDispatcher partitionDispatcher = this.partitions.get(partition);
return partitionDispatcher.dispatch(message);
}

private void prePopulatedPartitionsIfAny() {
private synchronized void prePopulatedPartitions() {
if (this.partitions.isEmpty()) {
for (int i = 0; i < this.partitionCount; i++) {
this.partitions.put(i, newPartition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;

import org.springframework.integration.channel.AbstractExecutorChannel;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.channel.PartitionedChannel;
import org.springframework.integration.dispatcher.PartitionedDispatcher;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

Expand Down Expand Up @@ -61,10 +57,10 @@ public PartitionedChannelSpec threadFactory(ThreadFactory threadFactory) {
@Override
protected PartitionedChannel doGet() {
if (this.partitionKeyFunction != null) {
this.channel = new PartitionedChannel(partitionCount, this.partitionKeyFunction);
this.channel = new PartitionedChannel(this.partitionCount, this.partitionKeyFunction);
}
else {
this.channel = new PartitionedChannel(partitionCount);
this.channel = new PartitionedChannel(this.partitionCount);
}
this.channel.setLoadBalancingStrategy(this.loadBalancingStrategy);
if (this.failover != null) {
Expand Down
10 changes: 5 additions & 5 deletions src/reference/asciidoc/channel.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ Since that situation would only occur in a non-predictable way, you should not r
===== `PartitionedChannel`

Starting with version 6.1, a `PartitionedChannel` implementation is provided.
This is an extension of `AbstractExecutorChannel` and represents a point-to-point dispatching logic where an actual consumption happens on a specific thread determined by the partition key evaluated from a message sent to this channel.
This channel is similar to the `ExecutorChannel` mentioned above, but with a difference that messages with the same partition key are always handled in the same thread preserving ordering.
This is an extension of `AbstractExecutorChannel` and represents point-to-point dispatching logic where the actual consumption is processed on a specific thread, determined by the partition key evaluated from a message sent to this channel.
This channel is similar to the `ExecutorChannel` mentioned above, but with the difference that messages with the same partition key are always handled in the same thread, preserving ordering.
It does not require an external `TaskExecutor`, but can be configured with a custom `ThreadFactory` (e.g. `Thread.ofVirtual().name("partition-", 0).factory()`).
This factory then used to populate single-thread executors into `MessageDispatcher` delegates per partition.
By default, the `IntegrationMessageHeaderAccessor.CORRELATION_ID` message header is used for partition key.
This factory is used to populate single-thread executors into a `MessageDispatcher` delegate, per partition.
By default, the `IntegrationMessageHeaderAccessor.CORRELATION_ID` message header is used as the partition key.
This channel can be configured as a simple bean:

====
Expand All @@ -224,7 +224,7 @@ PartitionedChannel somePartitionedChannel() {
----
====

The channel will have `3` partitions - dedicated threads; will use a `partitionKey` header to determine in which partition the message must be handled.
The channel will have `3` partitions - dedicated threads; will use the `partitionKey` header to determine in which partition the message will be handled.
See `PartitionedChannel` class Javadocs for more information.

[[flux-message-channel]]
Expand Down