Skip to content

Commit 9ba7dd6

Browse files
authored
KAFKA-19668: processValue() must be declared as value-changing operation (#20470)
With "merge.repartition.topic" optimization enabled, Kafka Streams tries to push repartition topics upstream, to be able to merge multiple repartition topics from different downstream branches together. However, it is not safe to push a repartition topic if the parent node is value-changing: because of potentially changing data types, the topology might become invalid, and fail with serde error at runtime. The optimization itself work correctly, however, processValues() is not correctly declared as value-changing, what can lead to invalid topologies. Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy <[email protected]>
1 parent 0a12eaa commit 9ba7dd6

File tree

10 files changed

+265
-23
lines changed

10 files changed

+265
-23
lines changed

streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class StreamsBuilder {
7575
public StreamsBuilder() {
7676
topology = new Topology();
7777
internalTopologyBuilder = topology.internalTopologyBuilder;
78-
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
78+
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false);
7979
}
8080

8181
/**
@@ -87,7 +87,14 @@ public StreamsBuilder() {
8787
public StreamsBuilder(final TopologyConfig topologyConfigs) {
8888
topology = newTopology(topologyConfigs);
8989
internalTopologyBuilder = topology.internalTopologyBuilder;
90-
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
90+
internalStreamsBuilder = new InternalStreamsBuilder(
91+
internalTopologyBuilder,
92+
TopologyConfig.InternalConfig.getBoolean(
93+
topologyConfigs.originals(),
94+
TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX,
95+
false
96+
)
97+
);
9198
}
9299

93100
protected Topology newTopology(final TopologyConfig topologyConfigs) {

streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

37+
import java.util.Map;
3738
import java.util.Optional;
3839
import java.util.Properties;
3940
import java.util.function.Supplier;
@@ -86,6 +87,28 @@
8687
*/
8788
@SuppressWarnings("deprecation")
8889
public final class TopologyConfig extends AbstractConfig {
90+
91+
public static class InternalConfig {
92+
// Cf https://issues.apache.org/jira/browse/KAFKA-19668
93+
public static final String ENABLE_PROCESS_PROCESSVALUE_FIX = "__enable.process.processValue.fix__";
94+
95+
public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
96+
final Object value = configs.getOrDefault(key, defaultValue);
97+
if (value instanceof Boolean) {
98+
return (boolean) value;
99+
} else if (value instanceof String) {
100+
return Boolean.parseBoolean((String) value);
101+
} else {
102+
log.warn(
103+
"Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.",
104+
value,
105+
key
106+
);
107+
return defaultValue;
108+
}
109+
}
110+
}
111+
89112
private static final ConfigDef CONFIG;
90113
static {
91114
CONFIG = new ConfigDef()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
7272
private static final String TABLE_SOURCE_SUFFIX = "-source";
7373

7474
final InternalTopologyBuilder internalTopologyBuilder;
75+
private final boolean processProcessValueFixEnabled;
7576
private final AtomicInteger index = new AtomicInteger(0);
7677

7778
private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
@@ -91,8 +92,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
9192
}
9293
};
9394

94-
public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
95+
public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder,
96+
final boolean processProcessValueFixEnabled) {
9597
this.internalTopologyBuilder = internalTopologyBuilder;
98+
this.processProcessValueFixEnabled = processProcessValueFixEnabled;
9699
}
97100

98101
public <K, V> KStream<K, V> stream(final Collection<String> topics,
@@ -709,4 +712,7 @@ public InternalTopologyBuilder internalTopologyBuilder() {
709712
return internalTopologyBuilder;
710713
}
711714

715+
public boolean processProcessValueFixEnabled() {
716+
return processProcessValueFixEnabled;
717+
}
712718
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,7 +1306,12 @@ public <KOut, VOut> KStream<KOut, VOut> process(
13061306
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
13071307
name,
13081308
new ProcessorParameters<>(processorSupplier, name),
1309-
stateStoreNames);
1309+
stateStoreNames
1310+
);
1311+
if (builder.processProcessValueFixEnabled()) {
1312+
processNode.setKeyChangingOperation(true);
1313+
processNode.setValueChangingOperation(true);
1314+
}
13101315

13111316
builder.addGraphNode(graphNode, processNode);
13121317

@@ -1350,7 +1355,11 @@ public <VOut> KStream<K, VOut> processValues(
13501355
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
13511356
name,
13521357
new ProcessorParameters<>(processorSupplier, name),
1353-
stateStoreNames);
1358+
stateStoreNames
1359+
);
1360+
if (builder.processProcessValueFixEnabled()) {
1361+
processNode.setValueChangingOperation(true);
1362+
}
13541363

13551364
builder.addGraphNode(graphNode, processNode);
13561365
// cannot inherit value serde

streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public class InternalStreamsBuilderTest {
8181

8282
private static final String APP_ID = "app-id";
8383

84-
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
84+
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
8585
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.with(null, null));
8686
private final String storePrefix = "prefix-";
8787
private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
@@ -93,7 +93,7 @@ public void testNewName() {
9393
assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
9494
assertEquals("Z-0000000002", builder.newProcessorName("Z-"));
9595

96-
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
96+
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
9797

9898
assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
9999
assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
@@ -106,7 +106,7 @@ public void testNewStoreName() {
106106
assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
107107
assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
108108

109-
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
109+
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
110110

111111
assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
112112
assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));

streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() {
9595
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
9696
new TopologyConfig("my-topology", config, topologyOverrides));
9797

98-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
98+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
9999

100100
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
101101
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
@@ -113,7 +113,7 @@ public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaTopologyConfig(
113113
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
114114
new TopologyConfig("my-topology", config, topologyOverrides));
115115

116-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
116+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
117117

118118
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
119119
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
@@ -129,7 +129,7 @@ public void shouldReturnEmptyWhenOriginalsAndOverridesDontHaveSuppliersSpecified
129129
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
130130
new TopologyConfig("my-topology", config, topologyOverrides));
131131

132-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
132+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);
133133

134134
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
135135
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);

0 commit comments

Comments
 (0)