Skip to content

Commit 4ae6934

Browse files
committed
KAFKA-18692: Consider to unify KStreamImpl "repartitionRequired" with GraphNode "keyChangingOperation"
1 parent 081deaa commit 4ae6934

File tree

10 files changed

+62
-48
lines changed

10 files changed

+62
-48
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,15 @@ public class BranchedKStreamImpl<K, V> implements BranchedKStream<K, V> {
3333
private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
3434

3535
private final KStreamImpl<K, V> source;
36-
private final boolean repartitionRequired;
3736
private final String splitterName;
3837
private final Map<String, KStream<K, V>> outputBranches = new HashMap<>();
3938

4039
private final List<Predicate<? super K, ? super V>> predicates = new ArrayList<>();
4140
private final List<String> childNames = new ArrayList<>();
4241
private final ProcessorGraphNode<K, V> splitterNode;
4342

44-
BranchedKStreamImpl(final KStreamImpl<K, V> source, final boolean repartitionRequired, final NamedInternal named) {
43+
BranchedKStreamImpl(final KStreamImpl<K, V> source, final NamedInternal named) {
4544
this.source = source;
46-
this.repartitionRequired = repartitionRequired;
4745
this.splitterName = named.orElseGenerateWithPrefix(source.builder, BRANCH_NAME);
4846

4947
// predicates and childNames are passed by reference so when the user adds a branch they get added to
@@ -86,7 +84,7 @@ private void createBranch(final Branched<K, V> branched, final int index) {
8684
source.builder.addGraphNode(splitterNode, branchChildNode);
8785
final KStreamImpl<K, V> branch = new KStreamImpl<>(branchChildName, source.keySerde,
8886
source.valueSerde, source.subTopologySourceNodes,
89-
repartitionRequired, branchChildNode, source.builder);
87+
branchChildNode, source.builder);
9088
process(branch, branchChildName, branchedInternal);
9189
}
9290

streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<
210210
final String queryableName) {
211211
for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {
212212

213-
if (repartitionReqs.repartitionRequired) {
213+
if (repartitionReqs.repartitionRequired()) {
214214

215215
final OptimizableRepartitionNodeBuilder<K, ?> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
216216

streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ class GroupedStreamAggregateBuilder<K, V> {
3636
private final InternalStreamsBuilder builder;
3737
private final Serde<K> keySerde;
3838
private final Serde<V> valueSerde;
39-
private final boolean repartitionRequired;
4039
private final String userProvidedRepartitionTopicName;
4140
private final Set<String> subTopologySourceNodes;
4241
private final String name;
@@ -51,15 +50,13 @@ class GroupedStreamAggregateBuilder<K, V> {
5150

5251
GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
5352
final GroupedInternal<K, V> groupedInternal,
54-
final boolean repartitionRequired,
5553
final Set<String> subTopologySourceNodes,
5654
final String name,
5755
final GraphNode graphNode) {
5856

5957
this.builder = builder;
6058
this.keySerde = groupedInternal.keySerde();
6159
this.valueSerde = groupedInternal.valueSerde();
62-
this.repartitionRequired = repartitionRequired;
6360
this.subTopologySourceNodes = subTopologySourceNodes;
6461
this.name = name;
6562
this.graphNode = graphNode;
@@ -124,7 +121,7 @@ private <KR, VR> KTable<KR, VR> build(final String aggFunctionName,
124121
String sourceName = this.name;
125122
GraphNode parentNode = graphNode;
126123

127-
if (repartitionRequired) {
124+
if (builder.isRepartitionRequired(graphNode)) {
128125
final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
129126

130127
final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeName;

streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,14 @@ public <K, V> KStream<K, V> stream(final Collection<String> topics,
100100

101101
final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
102102
final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, topics, consumed);
103+
streamSourceNode.repartitionByKey();
103104

104105
addGraphNode(root, streamSourceNode);
105106

106107
return new KStreamImpl<>(name,
107108
consumed.keySerde(),
108109
consumed.valueSerde(),
109110
Collections.singleton(name),
110-
false,
111111
streamSourceNode,
112112
this);
113113
}
@@ -116,14 +116,14 @@ public <K, V> KStream<K, V> stream(final Pattern topicPattern,
116116
final ConsumedInternal<K, V> consumed) {
117117
final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
118118
final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
119+
streamPatternSourceNode.repartitionByKey();
119120

120121
addGraphNode(root, streamPatternSourceNode);
121122

122123
return new KStreamImpl<>(name,
123124
consumed.keySerde(),
124125
consumed.valueSerde(),
125126
Collections.singleton(name),
126-
false,
127127
streamPatternSourceNode,
128128
this);
129129
}
@@ -614,6 +614,14 @@ private GraphNode getKeyChangingParentNode(final GraphNode repartitionNode) {
614614
return null;
615615
}
616616

617+
protected boolean isRepartitionRequired(final GraphNode node) {
618+
final GraphNode find = findParentNodeMatching(node, GraphNode::canResolveRepartition);
619+
if (find == null) {
620+
return false;
621+
}
622+
return find.isRepartitionRequired();
623+
}
624+
617625
private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
618626
return repartitionNodes.iterator().next().repartitionTopic();
619627
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,28 +45,28 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
4545
static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
4646

4747
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
48-
final boolean repartitionRequired;
4948
final String userProvidedRepartitionTopicName;
5049

5150
KGroupedStreamImpl(final String name,
5251
final Set<String> subTopologySourceNodes,
5352
final GroupedInternal<K, V> groupedInternal,
54-
final boolean repartitionRequired,
5553
final GraphNode graphNode,
5654
final InternalStreamsBuilder builder) {
5755
super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), subTopologySourceNodes, graphNode, builder);
58-
this.repartitionRequired = repartitionRequired;
5956
this.userProvidedRepartitionTopicName = groupedInternal.name();
6057
this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
6158
builder,
6259
groupedInternal,
63-
repartitionRequired,
6460
subTopologySourceNodes,
6561
name,
6662
graphNode
6763
);
6864
}
6965

66+
public boolean repartitionRequired() {
67+
return builder.isRepartitionRequired(graphNode);
68+
}
69+
7070
@Override
7171
public KTable<K, V> reduce(final Reducer<V> reducer) {
7272
return reduce(reducer, Materialized.with(keySerde, valueSerde));

0 commit comments

Comments
 (0)