Skip to content

Commit d3b3aa5

Browse files
committed
KAFKA-19668: processValue() must be declared as value-changing operation (#20470)
With "merge.repartition.topic" optimization enabled, Kafka Streams tries to push repartition topics upstream, to be able to merge multiple repartition topics from different downstream branches together. However, it is not safe to push a repartition topic if the parent node is value-changing: because of potentially changing data types, the topology might become invalid, and fail with serde error at runtime. The optimization itself work correctly, however, processValues() is not correctly declared as value-changing, what can lead to invalid topologies. Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy <[email protected]>
1 parent c5169ca commit d3b3aa5

File tree

10 files changed

+265
-23
lines changed

10 files changed

+265
-23
lines changed

streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class StreamsBuilder {
7575
public StreamsBuilder() {
7676
topology = new Topology();
7777
internalTopologyBuilder = topology.internalTopologyBuilder;
78-
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
78+
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false);
7979
}
8080

8181
/**
@@ -87,7 +87,14 @@ public StreamsBuilder() {
8787
public StreamsBuilder(final TopologyConfig topologyConfigs) {
8888
topology = newTopology(topologyConfigs);
8989
internalTopologyBuilder = topology.internalTopologyBuilder;
90-
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
90+
internalStreamsBuilder = new InternalStreamsBuilder(
91+
internalTopologyBuilder,
92+
TopologyConfig.InternalConfig.getBoolean(
93+
topologyConfigs.originals(),
94+
TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX,
95+
false
96+
)
97+
);
9198
}
9299

93100
protected Topology newTopology(final TopologyConfig topologyConfigs) {

streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

37+
import java.util.Map;
3738
import java.util.Optional;
3839
import java.util.Properties;
3940
import java.util.function.Supplier;
@@ -84,6 +85,28 @@
8485
*/
8586
@SuppressWarnings("deprecation")
8687
public final class TopologyConfig extends AbstractConfig {
88+
89+
public static class InternalConfig {
90+
// Cf https://issues.apache.org/jira/browse/KAFKA-19668
91+
public static final String ENABLE_PROCESS_PROCESSVALUE_FIX = "__enable.process.processValue.fix__";
92+
93+
public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
94+
final Object value = configs.getOrDefault(key, defaultValue);
95+
if (value instanceof Boolean) {
96+
return (boolean) value;
97+
} else if (value instanceof String) {
98+
return Boolean.parseBoolean((String) value);
99+
} else {
100+
log.warn(
101+
"Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.",
102+
value,
103+
key
104+
);
105+
return defaultValue;
106+
}
107+
}
108+
}
109+
87110
private static final ConfigDef CONFIG;
88111
static {
89112
CONFIG = new ConfigDef()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
7272
private static final String TABLE_SOURCE_SUFFIX = "-source";
7373

7474
final InternalTopologyBuilder internalTopologyBuilder;
75+
private final boolean processProcessValueFixEnabled;
7576
private final AtomicInteger index = new AtomicInteger(0);
7677

7778
private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
@@ -91,8 +92,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
9192
}
9293
};
9394

94-
public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
95+
public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder,
96+
final boolean processProcessValueFixEnabled) {
9597
this.internalTopologyBuilder = internalTopologyBuilder;
98+
this.processProcessValueFixEnabled = processProcessValueFixEnabled;
9699
}
97100

98101
public <K, V> KStream<K, V> stream(final Collection<String> topics,
@@ -706,4 +709,7 @@ public InternalTopologyBuilder internalTopologyBuilder() {
706709
return internalTopologyBuilder;
707710
}
708711

712+
public boolean processProcessValueFixEnabled() {
713+
return processProcessValueFixEnabled;
714+
}
709715
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,7 +1235,12 @@ public <KOut, VOut> KStream<KOut, VOut> process(
12351235
ProcessorToStateConnectorNode<>(
12361236
name,
12371237
new ProcessorParameters<>(processorSupplier, name),
1238-
stateStoreNames);
1238+
stateStoreNames
1239+
);
1240+
if (builder.processProcessValueFixEnabled()) {
1241+
processNode.keyChangingOperation(true);
1242+
processNode.setValueChangingOperation(true);
1243+
}
12391244

12401245
builder.addGraphNode(graphNode, processNode);
12411246

@@ -1280,7 +1285,11 @@ public <VOut> KStream<K, VOut> processValues(
12801285
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
12811286
name,
12821287
new ProcessorParameters<>(processorSupplier, name),
1283-
stateStoreNames);
1288+
stateStoreNames
1289+
);
1290+
if (builder.processProcessValueFixEnabled()) {
1291+
processNode.setValueChangingOperation(true);
1292+
}
12841293

12851294
builder.addGraphNode(graphNode, processNode);
12861295
// cannot inherit value serde

streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public class InternalStreamsBuilderTest {
8181

8282
private static final String APP_ID = "app-id";
8383

84-
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
84+
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
8585
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.with(null, null));
8686
private final String storePrefix = "prefix-";
8787
private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
@@ -93,7 +93,7 @@ public void testNewName() {
9393
assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
9494
assertEquals("Z-0000000002", builder.newProcessorName("Z-"));
9595

96-
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
96+
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
9797

9898
assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
9999
assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
@@ -106,7 +106,7 @@ public void testNewStoreName() {
106106
assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
107107
assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
108108

109-
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
109+
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
110110

111111
assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
112112
assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));

streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() {
9595
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
9696
new TopologyConfig("my-topology", config, topologyOverrides));
9797

98-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
98+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
9999

100100
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
101101
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
@@ -113,7 +113,7 @@ public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaTopologyConfig(
113113
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
114114
new TopologyConfig("my-topology", config, topologyOverrides));
115115

116-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
116+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
117117

118118
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
119119
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
@@ -129,7 +129,7 @@ public void shouldReturnEmptyWhenOriginalsAndOverridesDontHaveSuppliersSpecified
129129
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
130130
new TopologyConfig("my-topology", config, topologyOverrides));
131131

132-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
132+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
133133

134134
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
135135
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);

0 commit comments

Comments
 (0)