-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Introduce PartitionedChannel
#8617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce PartitionedChannel
#8617
Conversation
* Implement a `PartitionedChannel` as an extension of the `AbstractExecutorChannel` * Supply this channel with a `PartitionedDispatcher` which is an extension of the `AbstractDispatcher` * The target partition is essentially a `UnicastingDispatcher` with a single thead executor
If this is OK, I'll add some docs and DSL configuration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions; otherwise LGTM.
* An {@link AbstractExecutorChannel} implementation for partitioned messages dispatching. | ||
* Requires a number of partitions where each of them is backed by a dedicated thread. | ||
* The {@code partitionKeyFunction} is used to determine to which partition the message | ||
* has to be dispatched. | ||
* <p> | ||
* The actual dispatching and threading logic in implemented in the {@link PartitionedDispatcher}. | ||
* <p> | ||
* The default {@link ThreadFactory} is based on a bean name of this channel plus {@code -partition-thread-}. | ||
* <p> | ||
* The rest of the logic is similar to the {@link ExecutorChannel}, which includes: | ||
* - load balancing for subscribers; | ||
* - fail-over and error handling; | ||
* - channel operations intercepting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* An {@link AbstractExecutorChannel} implementation for partitioned messages dispatching. | |
* Requires a number of partitions where each of them is backed by a dedicated thread. | |
* The {@code partitionKeyFunction} is used to determine to which partition the message | |
* has to be dispatched. | |
* <p> | |
* The actual dispatching and threading logic in implemented in the {@link PartitionedDispatcher}. | |
* <p> | |
* The default {@link ThreadFactory} is based on a bean name of this channel plus {@code -partition-thread-}. | |
* <p> | |
* The rest of the logic is similar to the {@link ExecutorChannel}, which includes: | |
* - load balancing for subscribers; | |
* - fail-over and error handling; | |
* - channel operations intercepting. | |
* An {@link AbstractExecutorChannel} implementation for partitioned message dispatching. | |
* Requires a number of partitions where each of them is backed by a dedicated thread. | |
* The {@code partitionKeyFunction} is used to determine to which partition the message | |
* has to be dispatched. | |
* <p> | |
* The actual dispatching and threading logic in implemented in the {@link PartitionedDispatcher}. | |
* <p> | |
* The default {@link ThreadFactory} is based on the bean name of this channel plus {@code -partition-thread-}. | |
* <p> | |
* The rest of the logic is similar to the {@link ExecutorChannel}, which includes: | |
* - load balancing for subscribers; | |
* - fail-over and error handling; | |
* - channel operations intercepting. |
...tegration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java
Show resolved
Hide resolved
...tegration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java
Show resolved
Hide resolved
...tegration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java
Show resolved
Hide resolved
@Override | ||
public boolean dispatch(Message<?> message) { | ||
int partition = this.partitionKeyFunction.apply(message).hashCode() % this.partitionCount; | ||
UnicastingDispatcher partitionDispatcher = this.partitions.computeIfAbsent(partition, (__) -> newPartition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should pre-populate - otherwise the thread names won't necessarily match the partition #.
assertThat(messagesInPartition.get(0).getHeaders().get("partitionKey")) | ||
.isEqualTo(messagesInPartition.get(1).getHeaders().get("partitionKey")); | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we match the thread name to the partition, we could assert that the records went to the right partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it is not always true, since we don't use a partition key as is, but rather its hashCode()
.
So, while all the same keys goes to the same partition, it does not mean that 1
for key will always determine the partition 0
.
* Add docs for `PartitionedChannel` into `channel.adoc` * Pre-populate partitions in the `PartitionedDispatcher` to ensure a thread number reflection of the partition it is used for (for default `ThreadFactory`) * Add Javadocs to `public` methods * Add Java DSL `PartitionedChannelSpec` and respective factory methods into `Channels` * Use `IntegrationMessageHeaderAccessor.CORRELATION_ID` header as a default partition key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just doc polishing.
return partitionDispatcher.dispatch(message); | ||
} | ||
|
||
private void prePopulatedPartitionsIfAny() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void prePopulatedPartitionsIfAny() { | |
private synchronized void prePopulatedPartitionsIfAny() { |
src/reference/asciidoc/channel.adoc
Outdated
Starting with version 6.1, a `PartitionedChannel` implementation is provided. | ||
This is an extension of `AbstractExecutorChannel` and represents a point-to-point dispatching logic where an actual consumption happens on a specific thread determined by the partition key evaluated from a message sent to this channel. | ||
This channel is similar to the `ExecutorChannel` mentioned above, but with a difference that messages with the same partition key are always handled in the same thread preserving ordering. | ||
It does not require an external `TaskExecutor`, but can be configured with a custom `ThreadFactory` (e.g. `Thread.ofVirtual().name("partition-", 0).factory()`). | ||
This factory then used to populate single-thread executors into `MessageDispatcher` delegates per partition. | ||
By default, the `IntegrationMessageHeaderAccessor.CORRELATION_ID` message header is used for partition key. | ||
This channel can be configured as a simple bean: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting with version 6.1, a `PartitionedChannel` implementation is provided. | |
This is an extension of `AbstractExecutorChannel` and represents a point-to-point dispatching logic where an actual consumption happens on a specific thread determined by the partition key evaluated from a message sent to this channel. | |
This channel is similar to the `ExecutorChannel` mentioned above, but with a difference that messages with the same partition key are always handled in the same thread preserving ordering. | |
It does not require an external `TaskExecutor`, but can be configured with a custom `ThreadFactory` (e.g. `Thread.ofVirtual().name("partition-", 0).factory()`). | |
This factory then used to populate single-thread executors into `MessageDispatcher` delegates per partition. | |
By default, the `IntegrationMessageHeaderAccessor.CORRELATION_ID` message header is used for partition key. | |
This channel can be configured as a simple bean: | |
Starting with version 6.1, a `PartitionedChannel` implementation is provided. | |
This is an extension of `AbstractExecutorChannel` and represents point-to-point dispatching logic where the actual consumption is processed on a specific thread, determined by the partition key evaluated from a message sent to this channel. | |
This channel is similar to the `ExecutorChannel` mentioned above, but with the difference that messages with the same partition key are always handled in the same thread, preserving ordering. | |
It does not require an external `TaskExecutor`, but can be configured with a custom `ThreadFactory` (e.g. `Thread.ofVirtual().name("partition-", 0).factory()`). | |
This factory is used to populate single-thread executors into a `MessageDispatcher` delegate, per partition. | |
By default, the `IntegrationMessageHeaderAccessor.CORRELATION_ID` message header is used as the partition key. | |
This channel can be configured as a simple bean: |
src/reference/asciidoc/channel.adoc
Outdated
---- | ||
==== | ||
|
||
The channel will have `3` partitions - dedicated threads; will use a `partitionKey` header to determine in which partition the message must be handled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel will have `3` partitions - dedicated threads; will use a `partitionKey` header to determine in which partition the message must be handled. | |
The channel will have `3` partitions - dedicated threads; will use the `partitionKey` header to determine in which partition the message must be handled. |
src/reference/asciidoc/channel.adoc
Outdated
---- | ||
==== | ||
|
||
The channel will have `3` partitions - dedicated threads; will use a `partitionKey` header to determine in which partition the message must be handled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel will have `3` partitions - dedicated threads; will use a `partitionKey` header to determine in which partition the message must be handled. | |
The channel will have `3` partitions - dedicated threads; will use the `partitionKey` header to determine in which partition the message will be handled. |
* Fix Checkstyle violations * Mark `PartitionedDispatcher.prePopulatedPartitions()` with `synchronized`
@@ -145,13 +145,15 @@ public void shutdown() { | |||
|
|||
@Override | |||
public boolean dispatch(Message<?> message) { | |||
prePopulatedPartitionsIfAny(); | |||
if (this.partitions.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work, thread 2 might proceed before thread 1 has populated all partition execs, and NPE on the dispatcher returned by the get()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about now?
See the latest commit.
and when a custom `ThreadFactory` is set
public void setThreadFactory(ThreadFactory threadFactory) { | ||
Assert.notNull(threadFactory, "'threadFactory' must not be null"); | ||
this.threadFactory = threadFactory; | ||
populatedPartitions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to shut down and clear existing executors.
Perhaps call shutdown()
first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... Nothing to shutdown I guess: those executors have nit been used yet.
But we indeed have to this.executors.clear()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't know that; they could call this after we started processing messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if not, it's a no-op; so why worry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doh... We cannon populate them from the ctor: some other options are required for delegate dispatchers.
So, coming back to the synchronized
solution 😄
PartitionedChannel
as an extension of theAbstractExecutorChannel
PartitionedDispatcher
which is an extension of theAbstractDispatcher
UnicastingDispatcher
with a single thead executor