Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class StreamsBuilder {
public StreamsBuilder() {
topology = new Topology();
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, true);
}

/**
Expand All @@ -87,7 +87,14 @@ public StreamsBuilder() {
public StreamsBuilder(final TopologyConfig topologyConfigs) {
topology = newTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
internalStreamsBuilder = new InternalStreamsBuilder(
internalTopologyBuilder,
!TopologyConfig.InternalConfig.getBoolean(
topologyConfigs.originals(),
TopologyConfig.InternalConfig.DISABLE_PROCESS_PROCESSVALUE_FIX,
false
)
);
}

protected Topology newTopology(final TopologyConfig topologyConfigs) {
Expand Down
23 changes: 23 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
Expand Down Expand Up @@ -86,6 +87,28 @@
*/
@SuppressWarnings("deprecation")
public final class TopologyConfig extends AbstractConfig {

public static class InternalConfig {
// Cf https://issues.apache.org/jira/browse/KAFKA-19668
public static final String DISABLE_PROCESS_PROCESSVALUE_FIX = "__disable.process.processValue.fix__";

public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof Boolean) {
return (boolean) value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cast required, wouldn't the JVM use autoboxing to covert between Boolean and boolean

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because the type of value is Object. We could also cast to Boolean. So some auto-conversion/unboxing happens anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually c&p this from StreamsConfig.InternalConfigs :)

} else if (value instanceof String) {
return Boolean.parseBoolean((String) value);
} else {
log.warn(
"Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.",
value,
key
);
return defaultValue;
}
}
}

private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
private static final String TABLE_SOURCE_SUFFIX = "-source";

final InternalTopologyBuilder internalTopologyBuilder;
private final boolean processProcessValueFixEnabled;
private final AtomicInteger index = new AtomicInteger(0);

private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
Expand All @@ -91,8 +92,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
}
};

public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder,
final boolean processProcessValueFixEnabled) {
this.internalTopologyBuilder = internalTopologyBuilder;
this.processProcessValueFixEnabled = processProcessValueFixEnabled;
}

public <K, V> KStream<K, V> stream(final Collection<String> topics,
Expand Down Expand Up @@ -709,4 +712,7 @@ public InternalTopologyBuilder internalTopologyBuilder() {
return internalTopologyBuilder;
}

public boolean processProcessValueFixEnabled() {
return processProcessValueFixEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,12 @@ public <KOut, VOut> KStream<KOut, VOut> process(
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
stateStoreNames
);
if (builder.processProcessValueFixEnabled()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added this here -- it's atm not strictly necessary, because process() is key-changing, what we track via repartitionRequired flag further below, setting it to true via KStreamImpl constructor.

but it just feel correct to add anyway, as we have plans to actually unify repartitionRequired, and keyChanging flags. So we need to make sure we don't miss this one...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cf #18800

\cc @appchemist for visibility

Copy link
Contributor

@appchemist appchemist Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

processNode.setKeyChangingOperation(true);
processNode.setValueChangingOperation(true);
}

builder.addGraphNode(graphNode, processNode);

Expand Down Expand Up @@ -1350,7 +1355,11 @@ public <VOut> KStream<K, VOut> processValues(
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
stateStoreNames
);
if (builder.processProcessValueFixEnabled()) {
processNode.setValueChangingOperation(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Basically a one-liner fix.

}

builder.addGraphNode(graphNode, processNode);
// cannot inherit value serde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class InternalStreamsBuilderTest {

private static final String APP_ID = "app-id";

private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder(), true);
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.with(null, null));
private final String storePrefix = "prefix-";
private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
Expand All @@ -93,7 +93,7 @@ public void testNewName() {
assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
assertEquals("Z-0000000002", builder.newProcessorName("Z-"));

final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), true);

assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
Expand All @@ -106,7 +106,7 @@ public void testNewStoreName() {
assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));

final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), true);

assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() {
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));

final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, true);

final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
Expand All @@ -113,7 +113,7 @@ public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaTopologyConfig(
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));

final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, true);

final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
Expand All @@ -129,7 +129,7 @@ public void shouldReturnEmptyWhenOriginalsAndOverridesDontHaveSuppliersSpecified
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));

final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, true);

final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
Expand Down Expand Up @@ -127,7 +129,7 @@ public void shouldNotThrowNPEWithMergeNodes() {
initializer = () -> "";
aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
final ProcessorSupplier<String, String, String, String> processorSupplier =
() -> new Processor<String, String, String, String>() {
() -> new Processor<>() {
private ProcessorContext<String, String> context;

@Override
Expand Down Expand Up @@ -186,13 +188,12 @@ public void process(final Record<String, String> record) {

@Test
public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {

final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION);

assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString());
assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
}

@Test
Expand Down Expand Up @@ -228,7 +229,6 @@ public void shouldNotOptimizeWhenRepartitionOperationIsDone() {
}

private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) {

final StreamsBuilder builder = new StreamsBuilder();
final Properties properties = new Properties();
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
Expand All @@ -238,9 +238,15 @@ private Topology getTopologyWithChangingValuesAfterChangingKey(final String opti

mappedKeyStream.mapValues(v -> v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
mappedKeyStream.flatMapValues(v -> Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");
mappedKeyStream.processValues(
() -> new ContextualFixedKeyProcessor<>() {
@Override
public void process(final FixedKeyRecord<String, String> record) {
context().forward(record.withValue(record.value().toUpperCase(Locale.getDefault())));
}
}).groupByKey().count().toStream().to("output");

return builder.build(properties);

}

private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) {
Expand Down Expand Up @@ -386,20 +392,20 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) {
" Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" +
" --> KSTREAM-PROCESSOR-0000000001\n" +
" Processor: KSTREAM-PROCESSOR-0000000001 (stores: [])\n" +
" --> KSTREAM-FILTER-0000000005\n" +
" --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure, what triggers this re-naming, but I believe it's only in-memory processor names, so I believe it's ok? -- The structure of the topology does not change, and all stateful things (stores, topics) keep their names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should be, especially since this is a filter we added some time ago a filter before repartitioning to filter records with a null key

" <-- KSTREAM-SOURCE-0000000000\n" +
" Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" +
" --> KSTREAM-SINK-0000000004\n" +
" Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n" +
" --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n" +
" <-- KSTREAM-PROCESSOR-0000000001\n" +
" Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" +
" <-- KSTREAM-FILTER-0000000005\n" +
" Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" +
" <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
"\n" +
" Sub-topology: 1\n" +
" Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" +
" Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" +
" --> KSTREAM-AGGREGATE-0000000003\n" +
" Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
" --> KTABLE-SUPPRESS-0000000007\n" +
" <-- KSTREAM-SOURCE-0000000006\n" +
" <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n" +
" Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" +
" --> KSTREAM-PEEK-0000000020\n" +
" Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.Stores;
Expand Down Expand Up @@ -220,6 +222,36 @@ private void runTest(final String optimizationConfig, final int expectedNumberRe
assertThat(joinedOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedJoinKeyValues)));
}

@Test
public void shouldNotPushRepartitionAcrossValueChangingOperation() {
final StreamsBuilder builder = new StreamsBuilder();

builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceStream"))
.map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v))
.processValues(() -> new ContextualFixedKeyProcessor<String, String, Integer>() {
@Override
public void process(final FixedKeyRecord<String, String> record) {
context().forward(record.withValue(record.value().length()));
}
})
.groupByKey(Grouped.valueSerde(new Serdes.IntegerSerde()))
.reduce(Integer::sum)
.toStream()
.to(AGGREGATION_TOPIC);

streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final Topology topology = builder.build(streamsConfiguration);

topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);

final TestInputTopic<String, String> inputTopic = topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, stringSerializer);
final TestOutputTopic<String, Integer> outputTopic = topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, stringDeserializer, new IntegerDeserializer());

inputTopic.pipeKeyValueList(getKeyValues());

assertThat(outputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedAggKeyValues)));
}

private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> keyValuePairs) {
final Map<K, V> map = new HashMap<>();
for (final KeyValue<K, V> pair : keyValuePairs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public class StreamThreadTest {
private final ChangelogReader changelogReader = new MockChangelogReader();
private StateDirectory stateDirectory = null;
private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, true);

private StreamThread thread = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2622,7 +2622,7 @@ public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount(
builder = new CorruptedInternalTopologyBuilder();
topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps(parameterizedConfig)));

final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder);
final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder, true);

final KStream<String, String> inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>(Consumed.with(null, null)));
final KTable<String, String> inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(Consumed.with(null, null)), new MaterializedInternal<>(Materialized.as("store")));
Expand Down