Dekaf: namespace group IDs and invalidate topic state when binding is backfilled or collection is reset #2093
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Add
advanced.namespaced_ids
to Dekaf endpoint configThis flag will cause topics and groups to be namespaced when reported to/from upstream Kafka. This fixes a few problems described in #2060
Specifically:
token
toto_upstream_topic_name
, as the net effect was breaking your consumer group if you changed your token.Post-deploy steps:
Setting
advanced.namespaced_ids
to true will cause all of your committed offset state to be effectively reset, so it's set to false by default if not specified. After this change is deployed, we'll want to manually go and publish all Dekaf materializations with this field explicitly set to false. Then we can change its default value to true, and the net effect will be that all new Dekaf materializations will have this turned on by default, leaving existing ones alone.Other changes
We currently have a couple of consumers of materializations where the token changed after the group/topics had offsets committed. This is causing sessions to panic, and while we're actually handling that correctly because they're each an isolated Tokio task, I realized that we are "leaking"
dekaf_total_connections
-- we increment the counter when the connection is established, but panic before we can decrement it. So I refactored the way we report that metric to handle panic'd sessions.I also fixed the panic by removing a couple of
unwrap()
s which I thought wouldn't fail, but do in this circumstance.Fixes #2083, fixes #2060
This change is