-
Notifications
You must be signed in to change notification settings - Fork 14.6k
DRAFT KAFKA-18692: Consider to unify KStreamImpl "repartitionRequired" with GraphNode "keyChangingOperation" #18800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Hi, @mjsax |
Thanks for this draft -- might take a few days until I find time to take a look. |
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
Outdated
Show resolved
Hide resolved
Hi, @mjsax After Analyzing the cause, I'll request the review again However, I would appreciate it if you could take a look at the following comment when you have time. |
@ mjsax If the POC is worth applying, I'll resolve the conflict. |
@mjsax kindly ping |
@appchemist -- Sorry for dropping the ball on this work. -- I would have time now, to support you with with PR, if you are still interested. |
@mjsax Of course, I'm still interested. |
… GraphNode "keyChangingOperation"
37f5762
to
4ae6934
Compare
I've resolve the conflict. |
… GraphNode "keyChangingOperation" - refactoring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Finally made a pass. Hope the comments/question make sense and help.
|
||
private enum Repartition { | ||
NOT_REQUIRED, | ||
BY_KEY_ONLY, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what this means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The meaning of BY_KEY
is that repartitioning is determined based on the keyChangingOperation of the GraphNode.
@@ -100,14 +100,14 @@ public <K, V> KStream<K, V> stream(final Collection<String> topics, | |||
|
|||
final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); | |||
final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, topics, consumed); | |||
streamSourceNode.requireRepartitionByKey(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what this means? Ie, the name and semantics are not clear to me. When we add a new KStream
from a topic, the assumption is, that the KStream
is partitioned by key. So this operator does not "require" any repartitioning (it just reads from a topic), and it does also not change the key (so downstream repartitioning is also not required as it's not a key changing operation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it.
The original intent was streamSourceNode.requireNotRepartition()
, but it was incorrectly modified during the refactoring process.
@@ -116,14 +116,14 @@ public <K, V> KStream<K, V> stream(final Pattern topicPattern, | |||
final ConsumedInternal<K, V> consumed) { | |||
final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); | |||
final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed); | |||
streamPatternSourceNode.requireRepartitionByKey(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same answer as above.
@@ -614,6 +614,14 @@ private GraphNode getKeyChangingParentNode(final GraphNode repartitionNode) { | |||
return null; | |||
} | |||
|
|||
protected boolean isRepartitionRequired(final GraphNode node) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to know, if node
requires repartitioning, why do we add this method here? It seems to belong to GraphNode
class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
I overlooked it while focusing on reusing the InternalStreamsBuilder.findParentNodeMatching
.
I also moved InternalStreamsBuilder.findParentNodeMatching
to GraphNode, as it seemed more appropriate there.
subTopologySourceNodes, | ||
name, | ||
graphNode | ||
); | ||
} | ||
|
||
public boolean repartitionRequired() { | ||
return builder.isRepartitionRequired(graphNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cf my commend above. This could be graphNode.repartitionRequired()
if we move the method. Or do I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I Agree.
Fixed it
@@ -93,6 +98,14 @@ public String nodeName() { | |||
return nodeName; | |||
} | |||
|
|||
public boolean canResolveRepartition() { | |||
return keyChangingOperation || repartition != Repartition.NOT_REQUIRED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this method and it's logic is not clear to me? What does "resolve" actually mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It indicates whether the node is able to determine if repartitioning is needed.
I renamed it to canDetermineRepartition
for better clarity.
Also, the original intent was keyChangingOperation || repartition != Repartition.BY_KEY
, but it was incorrectly modified during the refactoring process.
return keyChangingOperation || repartition != Repartition.NOT_REQUIRED; | ||
} | ||
|
||
public boolean isRepartitionRequired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to repartitioningRequired()
or requiresRepartitioning()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed the method to repartitioningRequired()
} | ||
|
||
public boolean isRepartitionRequired() { | ||
return keyChangingOperation || repartition == Repartition.ALWAYS_REQUIRED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A key-changing operation by itself does not require repartitioning so not clear about the "or" condition. Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can find the explanation for why Repartition.REQUIRED
is necessary in the linked reference.
@@ -105,6 +118,14 @@ public boolean isMergeNode() { | |||
return mergeNode; | |||
} | |||
|
|||
public void requireRepartitionByKey() { | |||
this.repartition = Repartition.BY_KEY_ONLY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: avoid unnecessary this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this
} | ||
|
||
public void requireRepartitionAlways() { | ||
this.repartition = Repartition.ALWAYS_REQUIRED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: avoid unnecessary this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this
… GraphNode "keyChangingOperation" - Corrected values in Repartition enum - Refactoring
That PR is a POC.
For now, I only focused on modifying
KStreamImpl.repartitionRequired
to replace it withGraphNode.keyChangingOperation
.So I'll be working on it further to make sure it's the right fix and need to write some test code.
Committer Checklist (excluded from commit message)