Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class StreamsBuilder {
public StreamsBuilder() {
topology = new Topology();
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false);
}

/**
Expand All @@ -87,7 +87,14 @@ public StreamsBuilder() {
public StreamsBuilder(final TopologyConfig topologyConfigs) {
topology = newTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
internalStreamsBuilder = new InternalStreamsBuilder(
internalTopologyBuilder,
TopologyConfig.InternalConfig.getBoolean(
topologyConfigs.originals(),
TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX,
false
)
);
}

protected Topology newTopology(final TopologyConfig topologyConfigs) {
Expand Down
23 changes: 23 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
Expand Down Expand Up @@ -86,6 +87,28 @@
*/
@SuppressWarnings("deprecation")
public final class TopologyConfig extends AbstractConfig {

public static class InternalConfig {
// Cf https://issues.apache.org/jira/browse/KAFKA-19668
public static final String ENABLE_PROCESS_PROCESSVALUE_FIX = "__enable.process.processValue.fix__";

public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof Boolean) {
return (boolean) value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cast required, wouldn't the JVM use autoboxing to covert between Boolean and boolean

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because the type of value is Object. We could also cast to Boolean. So some auto-conversion/unboxing happens anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually c&p this from StreamsConfig.InternalConfigs :)

} else if (value instanceof String) {
return Boolean.parseBoolean((String) value);
} else {
log.warn(
"Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.",
value,
key
);
return defaultValue;
}
}
}

private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
private static final String TABLE_SOURCE_SUFFIX = "-source";

final InternalTopologyBuilder internalTopologyBuilder;
private final boolean processProcessValueFixEnabled;
private final AtomicInteger index = new AtomicInteger(0);

private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
Expand All @@ -91,8 +92,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
}
};

public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder,
final boolean processProcessValueFixEnabled) {
this.internalTopologyBuilder = internalTopologyBuilder;
this.processProcessValueFixEnabled = processProcessValueFixEnabled;
}

public <K, V> KStream<K, V> stream(final Collection<String> topics,
Expand Down Expand Up @@ -709,4 +712,7 @@ public InternalTopologyBuilder internalTopologyBuilder() {
return internalTopologyBuilder;
}

public boolean processProcessValueFixEnabled() {
return processProcessValueFixEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,12 @@ public <KOut, VOut> KStream<KOut, VOut> process(
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
stateStoreNames
);
if (builder.processProcessValueFixEnabled()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added this here -- it's atm not strictly necessary, because process() is key-changing, what we track via repartitionRequired flag further below, setting it to true via KStreamImpl constructor.

but it just feel correct to add anyway, as we have plans to actually unify repartitionRequired, and keyChanging flags. So we need to make sure we don't miss this one...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cf #18800

\cc @appchemist for visibility

Copy link
Contributor

@appchemist appchemist Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

processNode.setKeyChangingOperation(true);
processNode.setValueChangingOperation(true);
}

builder.addGraphNode(graphNode, processNode);

Expand Down Expand Up @@ -1350,7 +1355,11 @@ public <VOut> KStream<K, VOut> processValues(
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
stateStoreNames
);
if (builder.processProcessValueFixEnabled()) {
processNode.setValueChangingOperation(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Basically a one-liner fix.

}

builder.addGraphNode(graphNode, processNode);
// cannot inherit value serde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class InternalStreamsBuilderTest {

private static final String APP_ID = "app-id";

private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.with(null, null));
private final String storePrefix = "prefix-";
private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
Expand All @@ -93,7 +93,7 @@ public void testNewName() {
assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
assertEquals("Z-0000000002", builder.newProcessorName("Z-"));

final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);

assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
Expand All @@ -106,7 +106,7 @@ public void testNewStoreName() {
assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));

final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false);

assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() {
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));

final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);

final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
Expand All @@ -113,7 +113,7 @@ public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaTopologyConfig(
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));

final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);

final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
Expand All @@ -129,7 +129,7 @@ public void shouldReturnEmptyWhenOriginalsAndOverridesDontHaveSuppliersSpecified
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));

final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false);

final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
Expand Down
Loading