Skip to content

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Sep 3, 2025

With "merge.repartition.topic" optimization enabled, Kafka Streams tries
to push repartition topics upstream, to be able to merge multiple
repartition topics from different downstream branches together.

However, it is not safe to push a repartition topic if the parent node
is value-changing: because of potentially changing data types, the
topology might become invalid, and fail with serde error at runtime.

The optimization itself work correctly, however, processValues() is not
correctly declared as value-changing, what can lead to invalid
topologies.

Reviewers: Bill Bejeck [email protected], Lucas Brutschy
[email protected]

@mjsax mjsax force-pushed the kafka-19668 branch 2 times, most recently from e0436ea to 1742ead Compare September 3, 2025 19:05
stateStoreNames);
stateStoreNames
);
if (builder.processProcessValueFixEnabled()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added this here -- it's atm not strictly necessary, because process() is key-changing, what we track via repartitionRequired flag further below, setting it to true via KStreamImpl constructor.

but it just feel correct to add anyway, as we have plans to actually unify repartitionRequired, and keyChanging flags. So we need to make sure we don't miss this one...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cf #18800

\cc @appchemist for visibility

Copy link
Contributor

@appchemist appchemist Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@@ -386,20 +392,20 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) {
" Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" +
" --> KSTREAM-PROCESSOR-0000000001\n" +
" Processor: KSTREAM-PROCESSOR-0000000001 (stores: [])\n" +
" --> KSTREAM-FILTER-0000000005\n" +
" --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure, what triggers this re-naming, but I believe it's only in-memory processor names, so I believe it's ok? -- The structure of the topology does not change, and all stateful things (stores, topics) keep their names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should be, especially since this is a filter we added some time ago a filter before repartitioning to filter records with a null key

@mjsax mjsax changed the title KAFKA-19668: processValue() must be declared as value-chaning operation KAFKA-19668: processValue() must be declared as value-changing operation Sep 3, 2025
With "merge.repartition.topic" optimization enabled, Kafka Streams tries
to push repartition topics upstream, to be able to merge multiple repartition
topics from different downstream branches together.

However, it is not safe to push a repartition topic if the parent node
is value-changing: because of potentially changing data types, the topology
might become invalid, and fail with serde error at runtime.

The optimization itself work correctly, however, processValues() is not
correctly declared as key-changing, what can lead to invalid topologies.
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @mjsax - LGTM with one minor comment.

public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof Boolean) {
return (boolean) value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cast required, wouldn't the JVM use autoboxing to covert between Boolean and boolean

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because the type of value is Object. We could also cast to Boolean. So some auto-conversion/unboxing happens anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually c&p this from StreamsConfig.InternalConfigs :)

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

stateStoreNames
);
if (builder.processProcessValueFixEnabled()) {
processNode.setValueChangingOperation(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Basically a one-liner fix.

@mjsax
Copy link
Member Author

mjsax commented Sep 3, 2025

Thanks both -- I am still wondering what changes we need to test this better? Eg, system tests? -- We also need more changes for the release noted / upgrade guide, and we might even want to go back to older docs, and also add information about this, as it's a problem since AK 3.3.0 release?

Thoughts?

@mjsax
Copy link
Member Author

mjsax commented Sep 4, 2025

@bbejeck @lucasbru -- As discussed offline, disabled the fix by default. Plan to merge this PR after build passed.

Will do sparate PRs for doc updates.

@mjsax
Copy link
Member Author

mjsax commented Sep 5, 2025

Docs update for 4.0.1 release: #20484

@mjsax mjsax merged commit 9ba7dd6 into apache:trunk Sep 6, 2025
22 checks passed
@mjsax mjsax deleted the kafka-19668 branch September 6, 2025 01:00
mjsax added a commit that referenced this pull request Sep 6, 2025
…ion (#20470)

With "merge.repartition.topic" optimization enabled, Kafka Streams tries
to push repartition topics upstream, to be able to merge multiple
repartition topics from different downstream branches together.

However, it is not safe to push a repartition topic if the parent node
is value-changing: because of potentially changing data types, the
topology might become invalid, and fail with serde error at runtime.

The optimization itself work correctly, however, processValues() is not
correctly declared as value-changing, what can lead to invalid
topologies.

Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy
 <[email protected]>
mjsax added a commit that referenced this pull request Sep 6, 2025
…ion (#20470)

With "merge.repartition.topic" optimization enabled, Kafka Streams tries
to push repartition topics upstream, to be able to merge multiple
repartition topics from different downstream branches together.

However, it is not safe to push a repartition topic if the parent node
is value-changing: because of potentially changing data types, the
topology might become invalid, and fail with serde error at runtime.

The optimization itself work correctly, however, processValues() is not
correctly declared as value-changing, what can lead to invalid
topologies.

Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy
 <[email protected]>
@mjsax
Copy link
Member Author

mjsax commented Sep 6, 2025

Merged to trunk and cherry-picked to 4.1 and 4.0 branches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants