Skip to content

Commit 1742ead

Browse files
committed
KAFKA-19668: processValue() must be declared as value-chaning operation
With "merge.repartition.topic" optimization enabled, Kafka Streams tries to push repartition topics upstream, to be able to merge multiple repartition topics from different downstream branches together. However, it is not safe to push a repartition topic if the parent node is value-changing: because of potentially changing data types, the topology might become invalid, and fail with serde error at runtime. The optimization itself work correctly, however, processValues() is not correctly declared as key-changing, what can lead to invalid topologies.
1 parent 2ba30cc commit 1742ead

File tree

11 files changed

+109
-27
lines changed

11 files changed

+109
-27
lines changed

streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class StreamsBuilder {
7575
public StreamsBuilder() {
7676
topology = new Topology();
7777
internalTopologyBuilder = topology.internalTopologyBuilder;
78-
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
78+
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, true);
7979
}
8080

8181
/**
@@ -87,7 +87,14 @@ public StreamsBuilder() {
8787
public StreamsBuilder(final TopologyConfig topologyConfigs) {
8888
topology = newTopology(topologyConfigs);
8989
internalTopologyBuilder = topology.internalTopologyBuilder;
90-
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
90+
internalStreamsBuilder = new InternalStreamsBuilder(
91+
internalTopologyBuilder,
92+
!TopologyConfig.InternalConfig.getBoolean(
93+
topologyConfigs.originals(),
94+
TopologyConfig.InternalConfig.DISABLE_PROCESS_PROCESSVALUE_FIX,
95+
false
96+
)
97+
);
9198
}
9299

93100
protected Topology newTopology(final TopologyConfig topologyConfigs) {

streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

37+
import java.util.Map;
3738
import java.util.Optional;
3839
import java.util.Properties;
3940
import java.util.function.Supplier;
@@ -86,6 +87,28 @@
8687
*/
8788
@SuppressWarnings("deprecation")
8889
public final class TopologyConfig extends AbstractConfig {
90+
91+
public static class InternalConfig {
92+
// Cf https://issues.apache.org/jira/browse/KAFKA-19668
93+
public static final String DISABLE_PROCESS_PROCESSVALUE_FIX = "__disable.process.processValue.fix__";
94+
95+
public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
96+
final Object value = configs.getOrDefault(key, defaultValue);
97+
if (value instanceof Boolean) {
98+
return (boolean) value;
99+
} else if (value instanceof String) {
100+
return Boolean.parseBoolean((String) value);
101+
} else {
102+
log.warn(
103+
"Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.",
104+
value,
105+
key
106+
);
107+
return defaultValue;
108+
}
109+
}
110+
}
111+
89112
private static final ConfigDef CONFIG;
90113
static {
91114
CONFIG = new ConfigDef()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
7272
private static final String TABLE_SOURCE_SUFFIX = "-source";
7373

7474
final InternalTopologyBuilder internalTopologyBuilder;
75+
private final boolean processProcessValueFixEnabled;
7576
private final AtomicInteger index = new AtomicInteger(0);
7677

7778
private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
@@ -91,8 +92,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
9192
}
9293
};
9394

94-
public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
95+
public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder,
96+
final boolean processProcessValueFixEnabled) {
9597
this.internalTopologyBuilder = internalTopologyBuilder;
98+
this.processProcessValueFixEnabled = processProcessValueFixEnabled;
9699
}
97100

98101
public <K, V> KStream<K, V> stream(final Collection<String> topics,
@@ -709,4 +712,7 @@ public InternalTopologyBuilder internalTopologyBuilder() {
709712
return internalTopologyBuilder;
710713
}
711714

715+
public boolean processProcessValueFixEnabled() {
716+
return processProcessValueFixEnabled;
717+
}
712718
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,7 +1306,12 @@ public <KOut, VOut> KStream<KOut, VOut> process(
13061306
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
13071307
name,
13081308
new ProcessorParameters<>(processorSupplier, name),
1309-
stateStoreNames);
1309+
stateStoreNames
1310+
);
1311+
if (builder.processProcessValueFixEnabled()) {
1312+
processNode.setKeyChangingOperation(true);
1313+
processNode.setValueChangingOperation(true);
1314+
}
13101315

13111316
builder.addGraphNode(graphNode, processNode);
13121317

@@ -1350,7 +1355,11 @@ public <VOut> KStream<K, VOut> processValues(
13501355
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
13511356
name,
13521357
new ProcessorParameters<>(processorSupplier, name),
1353-
stateStoreNames);
1358+
stateStoreNames
1359+
);
1360+
if (builder.processProcessValueFixEnabled()) {
1361+
processNode.setValueChangingOperation(true);
1362+
}
13541363

13551364
builder.addGraphNode(graphNode, processNode);
13561365
// cannot inherit value serde

streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public class RepartitionTopicNamingTest {
5959

6060
@Test
6161
public void shouldReuseFirstRepartitionTopicNameWhenOptimizing() {
62-
6362
final String optimizedTopology = buildTopology(StreamsConfig.OPTIMIZE).describe().toString();
6463
final String unOptimizedTopology = buildTopology(StreamsConfig.NO_OPTIMIZATION).describe().toString();
6564

streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public class InternalStreamsBuilderTest {
8181

8282
private static final String APP_ID = "app-id";
8383

84-
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
84+
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder(), true);
8585
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.with(null, null));
8686
private final String storePrefix = "prefix-";
8787
private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
@@ -93,7 +93,7 @@ public void testNewName() {
9393
assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
9494
assertEquals("Z-0000000002", builder.newProcessorName("Z-"));
9595

96-
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
96+
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), true);
9797

9898
assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
9999
assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
@@ -106,7 +106,7 @@ public void testNewStoreName() {
106106
assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
107107
assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
108108

109-
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
109+
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), true);
110110

111111
assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
112112
assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));

streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() {
9595
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
9696
new TopologyConfig("my-topology", config, topologyOverrides));
9797

98-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
98+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, true);
9999

100100
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
101101
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
@@ -113,7 +113,7 @@ public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaTopologyConfig(
113113
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
114114
new TopologyConfig("my-topology", config, topologyOverrides));
115115

116-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
116+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, true);
117117

118118
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
119119
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
@@ -129,7 +129,7 @@ public void shouldReturnEmptyWhenOriginalsAndOverridesDontHaveSuppliersSpecified
129129
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
130130
new TopologyConfig("my-topology", config, topologyOverrides));
131131

132-
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
132+
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, true);
133133

134134
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
135135
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);

streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.kafka.streams.kstream.Suppressed;
3737
import org.apache.kafka.streams.kstream.TimeWindows;
3838
import org.apache.kafka.streams.kstream.ValueJoiner;
39+
import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
40+
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
3941
import org.apache.kafka.streams.processor.api.Processor;
4042
import org.apache.kafka.streams.processor.api.ProcessorContext;
4143
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -127,7 +129,7 @@ public void shouldNotThrowNPEWithMergeNodes() {
127129
initializer = () -> "";
128130
aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
129131
final ProcessorSupplier<String, String, String, String> processorSupplier =
130-
() -> new Processor<String, String, String, String>() {
132+
() -> new Processor<>() {
131133
private ProcessorContext<String, String> context;
132134

133135
@Override
@@ -186,13 +188,12 @@ public void process(final Record<String, String> record) {
186188

187189
@Test
188190
public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
189-
190191
final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
191192
final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION);
192193

193194
assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString());
194-
assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
195-
assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
195+
assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
196+
assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
196197
}
197198

198199
@Test
@@ -228,7 +229,6 @@ public void shouldNotOptimizeWhenRepartitionOperationIsDone() {
228229
}
229230

230231
private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) {
231-
232232
final StreamsBuilder builder = new StreamsBuilder();
233233
final Properties properties = new Properties();
234234
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
@@ -238,9 +238,15 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti
238238

239239
mappedKeyStream.mapValues(v -> v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
240240
mappedKeyStream.flatMapValues(v -> Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");
241+
mappedKeyStream.processValues(
242+
() -> new ContextualFixedKeyProcessor<>() {
243+
@Override
244+
public void process(FixedKeyRecord<String, String> record) {
245+
context().forward(record.withValue(record.value().toUpperCase(Locale.getDefault())));
246+
}
247+
}).groupByKey().count().toStream().to("output");
241248

242249
return builder.build(properties);
243-
244250
}
245251

246252
private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) {
@@ -386,20 +392,20 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) {
386392
" Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" +
387393
" --> KSTREAM-PROCESSOR-0000000001\n" +
388394
" Processor: KSTREAM-PROCESSOR-0000000001 (stores: [])\n" +
389-
" --> KSTREAM-FILTER-0000000005\n" +
395+
" --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
390396
" <-- KSTREAM-SOURCE-0000000000\n" +
391-
" Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" +
392-
" --> KSTREAM-SINK-0000000004\n" +
397+
" Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n" +
398+
" --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n" +
393399
" <-- KSTREAM-PROCESSOR-0000000001\n" +
394-
" Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" +
395-
" <-- KSTREAM-FILTER-0000000005\n" +
400+
" Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" +
401+
" <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
396402
"\n" +
397403
" Sub-topology: 1\n" +
398-
" Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" +
404+
" Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" +
399405
" --> KSTREAM-AGGREGATE-0000000003\n" +
400406
" Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
401407
" --> KTABLE-SUPPRESS-0000000007\n" +
402-
" <-- KSTREAM-SOURCE-0000000006\n" +
408+
" <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n" +
403409
" Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" +
404410
" --> KSTREAM-PEEK-0000000020\n" +
405411
" Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" +

streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.apache.kafka.streams.kstream.Produced;
4343
import org.apache.kafka.streams.kstream.Reducer;
4444
import org.apache.kafka.streams.kstream.StreamJoined;
45+
import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
46+
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
4547
import org.apache.kafka.streams.processor.api.Processor;
4648
import org.apache.kafka.streams.processor.api.Record;
4749
import org.apache.kafka.streams.state.Stores;
@@ -220,6 +222,36 @@ private void runTest(final String optimizationConfig, final int expectedNumberRe
220222
assertThat(joinedOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedJoinKeyValues)));
221223
}
222224

225+
@Test
226+
public void shouldNotPushRepartitionAcrossValueChangingOperation() {
227+
final StreamsBuilder builder = new StreamsBuilder();
228+
229+
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceStream"))
230+
.map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v))
231+
.processValues(() -> new ContextualFixedKeyProcessor<String, String, Integer>() {
232+
@Override
233+
public void process(FixedKeyRecord<String, String> record) {
234+
context().forward(record.withValue(record.value().length()));
235+
}
236+
})
237+
.groupByKey(Grouped.valueSerde(new Serdes.IntegerSerde()))
238+
.reduce(Integer::sum)
239+
.toStream()
240+
.to(AGGREGATION_TOPIC);
241+
242+
streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
243+
final Topology topology = builder.build(streamsConfiguration);
244+
245+
topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);
246+
247+
final TestInputTopic<String, String> inputTopic = topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, stringSerializer);
248+
final TestOutputTopic<String, Integer> outputTopic = topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, stringDeserializer, new IntegerDeserializer());
249+
250+
inputTopic.pipeKeyValueList(getKeyValues());
251+
252+
assertThat(outputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedAggKeyValues)));
253+
}
254+
223255
private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> keyValuePairs) {
224256
final Map<K, V> map = new HashMap<>();
225257
for (final KeyValue<K, V> pair : keyValuePairs) {

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public class StreamThreadTest {
206206
private final ChangelogReader changelogReader = new MockChangelogReader();
207207
private StateDirectory stateDirectory = null;
208208
private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
209-
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
209+
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, true);
210210

211211
private StreamThread thread = null;
212212

0 commit comments

Comments
 (0)