Skip to content

direct_consumer: move offset update logic to fetch_next#28309

Merged
joe-redpanda merged 9 commits into
redpanda-data:devfrom
joe-redpanda:start_offset
Dec 19, 2025
Merged

direct_consumer: move offset update logic to fetch_next#28309
joe-redpanda merged 9 commits into
redpanda-data:devfrom
joe-redpanda:start_offset

Conversation

@joe-redpanda

@joe-redpanda joe-redpanda commented Oct 31, 2025

Copy link
Copy Markdown
Contributor

A fetch is potentially stale until the point in time at which it is returned to the consumer in fetch_next.

This pr moves the update for offsets to only after the final subscription epoch filter has been applied to guarantee correctness of the provided offsets.

backport: this will get backported but we're going to bake the change before putting it through

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.3.x
  • v25.2.x
  • v25.1.x
  • v24.3.x

Release Notes

Bug Fixes

  • firm up start offset update logic

@joe-redpanda joe-redpanda requested review from bharathv, Copilot and michael-redpanda and removed request for Copilot October 31, 2025 21:54
@joe-redpanda joe-redpanda changed the title direct_consumer: move offset update logic to fetch direct_consumer: move offset update logic to fetch_next Oct 31, 2025
@joe-redpanda joe-redpanda force-pushed the start_offset branch 2 times, most recently from 08d481a to 75dad5b Compare October 31, 2025 22:26
Copilot AI review requested due to automatic review settings October 31, 2025 22:26

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR moves the offset update logic from the fetch response processing stage to the fetch_next method to ensure correctness. The change addresses a timing issue where fetches could become stale between processing and being returned to the consumer. By deferring offset updates until after the final subscription epoch filter is applied in fetch_next, the PR guarantees that only valid, current offsets are stored.

Key Changes:

  • Removed premature offset updates in process_fetch_response method in fetcher.cc
  • Added update_start_offsets method that updates offsets only after subscription filtering
  • Refactored subscription lookup logic to use helper methods with std::reference_wrapper

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
src/v/kafka/client/direct_consumer/fetcher.cc Removed premature offset updates and maybe_update_source_partition_offsets call from fetch response processing
src/v/kafka/client/direct_consumer/direct_consumer.h Added new helper methods for subscription lookup and renamed offset update method
src/v/kafka/client/direct_consumer/direct_consumer.cc Implemented update_start_offsets with validation logging and refactored subscription lookup methods

Comment thread src/v/kafka/client/direct_consumer/direct_consumer.cc Outdated
Comment thread src/v/kafka/client/direct_consumer/direct_consumer.cc Outdated
@vbotbuildovich

vbotbuildovich commented Nov 1, 2025

Copy link
Copy Markdown
Collaborator

Retry command for Build#75448

please wait until all jobs are finished before running the slash command

/ci-repeat 1
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_basic@{"shuffle_leadership":true,"source_cluster_spec":{"cluster_type":"redpanda"}}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingMetricsTests.test_link_metrics
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_auto_prefix_trimming@{"source_cluster_spec":{"cluster_type":"redpanda"},"with_failures":false}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_basic@{"shuffle_leadership":false,"source_cluster_spec":{"cluster_type":"redpanda"}}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_auto_prefix_trimming@{"source_cluster_spec":{"cluster_type":"redpanda"},"with_failures":true}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_basic@{"shuffle_leadership":false,"source_cluster_spec":{"cluster_type":"kafka","kafka_quorum":"COMBINED_KRAFT","kafka_version":"3.8.0"}}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_basic@{"shuffle_leadership":true,"source_cluster_spec":{"cluster_type":"kafka","kafka_quorum":"COMBINED_KRAFT","kafka_version":"3.8.0"}}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_auto_prefix_trimming@{"source_cluster_spec":{"cluster_type":"kafka","kafka_quorum":"COMBINED_KRAFT","kafka_version":"3.8.0"},"with_failures":false}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_with_failures

@vbotbuildovich

vbotbuildovich commented Nov 1, 2025

Copy link
Copy Markdown
Collaborator

CI test results

test results on build#75448
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
ShadowLinkConsumeGroupsMirroringTest test_continuous_group_sync {"source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}, "with_failures": false} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a31-46ff-8ee9-d7b800bcbcbd FLAKY 13/21 upstream reliability is '93.79432624113475'. current run reliability is '61.904761904761905'. drift is 31.88956 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkConsumeGroupsMirroringTest&test_method=test_continuous_group_sync
ShadowLinkingMetricsTests test_link_metrics null integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a2e-4740-9258-7a46eb7302d6 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingMetricsTests&test_method=test_link_metrics
ShadowLinkingMetricsTests test_link_metrics null integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b6a-4681-9089-024e1e6a3d76 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingMetricsTests&test_method=test_link_metrics
ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}, "with_failures": false} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a2f-4b0e-9953-7435dad72707 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}, "with_failures": false} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b6c-4874-bb34-2ef562459b73 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "with_failures": false} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a30-4271-aaf4-fb3bf374bb54 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "with_failures": false} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b6d-4647-8590-761d2bdd10ca FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "with_failures": true} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a33-4dcd-872c-6097fa4cc086 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "with_failures": true} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b70-4169-bd9a-d7850ce5f958 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": false, "source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a34-40cf-ae3b-e35bf2f2eb94 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": false, "source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b71-4e92-b1f2-f68e570beceb FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": false, "source_cluster_spec": {"cluster_type": "redpanda"}} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a35-4dd6-a24b-ff00d2b30e89 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": false, "source_cluster_spec": {"cluster_type": "redpanda"}} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b72-4e64-a66f-9ae3497c14bd FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a37-42cc-b4c7-fc2ca44a65d0 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b74-4602-9fc9-5a948bfc6e7f FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "redpanda"}} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a38-44b4-b92e-e524db91e0f7 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "redpanda"}} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b67-45a8-9d63-998b5152b9e9 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_timestamps_match {"source_cluster_spec": {"cluster_type": "redpanda"}, "timestamp_type": "CreateTime"} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a2e-4740-9258-7a46eb7302d6 FLAKY 20/21 upstream reliability is '95.14563106796116'. current run reliability is '95.23809523809523'. drift is -0.09246 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_timestamps_match
ShadowLinkingReplicationTests test_replication_with_failures null integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a31-46ff-8ee9-d7b800bcbcbd FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_with_failures
ShadowLinkingReplicationTests test_replication_with_failures null integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b6f-4d4f-b7f1-6d4eaf25bb36 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_with_failures
SegmentMsTest test_segment_rolling_with_retention_consumer null integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c85-5a35-4dd6-a24b-ff00d2b30e89 FLAKY 18/21 upstream reliability is '94.56118665018542'. current run reliability is '85.71428571428571'. drift is 8.8469 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=SegmentMsTest&test_method=test_segment_rolling_with_retention_consumer
PartitionMovementTest test_static {"num_to_upgrade": 0} integration https://buildkite.com/redpanda/redpanda/builds/75448#019a3c86-0b68-4129-ab3c-0a4318b48637 FLAKY 16/21 upstream reliability is '87.01923076923077'. current run reliability is '76.19047619047619'. drift is 10.82875 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=PartitionMovementTest&test_method=test_static
test results on build#75826
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
ReplicatedMetastoreTest TestBasicRemoveTopics unit https://buildkite.com/redpanda/redpanda/builds/75826#019a5eca-8850-4ab4-91ab-c03c7d8ca487 FAIL 0/1
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "redpanda"}} integration https://buildkite.com/redpanda/redpanda/builds/75826#019a5f36-98e8-47c5-9b8b-0749ca2788df FLAKY 19/21 upstream reliability is '98.19148936170212'. current run reliability is '90.47619047619048'. drift is 7.7153 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
DataMigrationsApiTest test_creating_and_listing_migrations null integration https://buildkite.com/redpanda/redpanda/builds/75826#019a5f36-98ef-489b-913d-e1d49d14869d FLAKY 19/21 upstream reliability is '98.13620071684588'. current run reliability is '90.47619047619048'. drift is 7.66001 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=DataMigrationsApiTest&test_method=test_creating_and_listing_migrations
MountUnmountIcebergTest test_simple_remount {"cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/75826#019a5f56-8c48-4a17-8e35-0d209699350c FLAKY 17/21 upstream reliability is '91.97860962566845'. current run reliability is '80.95238095238095'. drift is 11.02623 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=MountUnmountIcebergTest&test_method=test_simple_remount
test results on build#77609
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
ReplicatedMetastoreTest TestBasicRemoveTopics unit https://buildkite.com/redpanda/redpanda/builds/77609#019b0507-1d4f-4e19-b134-ab607e210ee1 FAIL 0/1
ControllerLogLimitMirrorMakerTests test_mirror_maker_with_limits null integration https://buildkite.com/redpanda/redpanda/builds/77609#019b0533-0a85-47bc-bb39-9e9da6e76c03 FLAKY 20/21 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0177, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.0521, p1=0.3432, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ControllerLogLimitMirrorMakerTests&test_method=test_mirror_maker_with_limits
test results on build#77625
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
NodesDecommissioningTest test_decommissioning_rebalancing_node {"shutdown_decommissioned": false} integration https://buildkite.com/redpanda/redpanda/builds/77625#019b05da-30ef-40ac-9406-20a7ededdbc2 FLAKY 9/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.1046, p0=0.6688, reject_threshold=0.0100. adj_baseline=0.2821, p1=0.1792, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_decommissioning_rebalancing_node
NodesDecommissioningTest test_decommissioning_rebalancing_node {"shutdown_decommissioned": true} integration https://buildkite.com/redpanda/redpanda/builds/77625#019b05da-30f0-4374-85d5-c492f17ec5f3 FLAKY 9/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.1046, p0=0.6688, reject_threshold=0.0100. adj_baseline=0.2821, p1=0.1792, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_decommissioning_rebalancing_node
TxAtomicProduceConsumeTest test_basic_tx_consumer_transform_produce {"with_failures": true} integration https://buildkite.com/redpanda/redpanda/builds/77625#019b05da-5cc1-4f97-a8eb-1ff7a1e54cd4 FLAKY 30/31 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0088, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.0262, p1=0.4516, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=TxAtomicProduceConsumeTest&test_method=test_basic_tx_consumer_transform_produce
test results on build#77668
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
NodesDecommissioningTest test_decommissioning_rebalancing_node {"shutdown_decommissioned": true} integration https://buildkite.com/redpanda/redpanda/builds/77668#019b09b0-a7fd-4a94-98f9-6e28a3e61dc6 FLAKY 8/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.1030, p0=0.2757, reject_threshold=0.0100. adj_baseline=0.2784, p1=0.4425, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodesDecommissioningTest&test_method=test_decommissioning_rebalancing_node
test results on build#78229
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
MountUnmountIcebergTest test_simple_remount {"cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/78229#019b3832-437a-427a-a961-8d913f2340df FLAKY 8/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.1799, p0=0.5606, reject_threshold=0.0100. adj_baseline=0.4485, p1=0.1012, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=MountUnmountIcebergTest&test_method=test_simple_remount
WriteCachingFailureInjectionE2ETest test_crash_all {"use_transactions": false} integration https://buildkite.com/redpanda/redpanda/builds/78229#019b3836-152d-441c-9809-4d71fc4d9c89 FLAKY 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0709, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1979, p1=0.1103, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=WriteCachingFailureInjectionE2ETest&test_method=test_crash_all

@bharathv

bharathv commented Nov 3, 2025

Copy link
Copy Markdown
Contributor

The failures here seem related to the change?

@joe-redpanda joe-redpanda marked this pull request as draft November 3, 2025 20:10
@joe-redpanda

Copy link
Copy Markdown
Contributor Author

/ci-repeat 1
tests/rptest/tests/cluster_linking_e2e_test.py

Comment thread src/v/kafka/client/direct_consumer/api_types.h
@joe-redpanda joe-redpanda force-pushed the start_offset branch 2 times, most recently from 8df5d95 to d24398c Compare November 5, 2025 17:52
@joe-redpanda

Copy link
Copy Markdown
Contributor Author

/ci-repeat 1
tests/rptest/tests/cluster_linking_e2e_test.py
tests/rptest/direct_consumer_tests/direct_consumer_test.py

@joe-redpanda

Copy link
Copy Markdown
Contributor Author

/ci-repeat 1
tests/rptest/tests/cluster_linking_e2e_test.py
tests/rptest/direct_consumer_tests/direct_consumer_test.py

@joe-redpanda

Copy link
Copy Markdown
Contributor Author

/ci-repeat 1
tests/rptest/tests/cluster_linking_e2e_test.py
tests/rptest/direct_consumer_tests/direct_consumer_test.py

@joe-redpanda

Copy link
Copy Markdown
Contributor Author

/ci-repeat 1
tests/rptest/tests/cluster_linking_e2e_test.py
tests/rptest/direct_consumer_tests/direct_consumer_test.py

@joe-redpanda

Copy link
Copy Markdown
Contributor Author

my only concern is that this seems like a non-trivial change that is going to be backported?

Agreed. My thought was to leave this to bake for some time before performing the back-port.

@joe-redpanda

Copy link
Copy Markdown
Contributor Author

lgtm, mostly minor comments, thanks for the cleanup

This pr moves the update for offsets to only after the final subscription epoch filter has been applied to guarantee correctness of the provided offsets.

can you detail the conditions under which correctness is compromised (for posterity), AFAICT it is very very rare

Short:
a lot of cached fetches + a prefix truncation.

Your start offset change will be visible right away, where all other updates will be visible whenever the queue gets burned down.

Long:
Start offsets gets updated at the time a fetch is performed instead of when it passes by direct consumer, while all other data is made visible at the time of direct_consumer::fetch_next

so for tp topic/1 you can have
fetch -> start: 0, hwm: 100
fetch -> start: 0, hwm: 200
fetch -> start: 200, hwm: 300

direct_consumer::get_start_offset("topic/1") -> 200
direct_consumer::fetch_next -> start: 0, hwm: 100

25.3 start offset was only getting used for metrics so this wasn't really an important issue. We decided the above was fine for the release but correctness should be fixed s.t. we can depend on the correct ordering of offsets.

@joe-redpanda

Copy link
Copy Markdown
Contributor Author

dev rebase

Comment thread src/v/kafka/client/direct_consumer/direct_consumer.cc Outdated
@joe-redpanda joe-redpanda marked this pull request as draft December 10, 2025 00:56
Comment thread src/v/kafka/client/direct_consumer/fetcher.cc Outdated
@joe-redpanda joe-redpanda marked this pull request as ready for review December 10, 2025 18:30
@joe-redpanda joe-redpanda requested a review from Copilot December 10, 2025 18:32

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.

Comments suppressed due to low confidence (2)

src/v/kafka/client/direct_consumer/fetcher.h:1

  • Using -1 as an invalid/uninitialized marker for an epoch value may be problematic if -1 could ever be a valid epoch. Consider using std::optional<kafka::leader_epoch> for fields that may not always be set.
/*

src/v/kafka/client/direct_consumer/fetcher.cc:1

  • Corrected spelling of 'monatomic' to 'monotonic'.
/*

Comment thread src/v/kafka/client/direct_consumer/tests/fetcher_test.cc Outdated
Comment thread src/v/kafka/client/direct_consumer/tests/fetcher_test.cc
Comment thread src/v/kafka/client/direct_consumer/tests/fetcher_test.cc
Comment thread src/v/kafka/client/direct_consumer/tests/direct_consumer_test.cc
Comment thread src/v/kafka/client/direct_consumer/direct_consumer.cc Outdated
Comment thread src/v/kafka/client/direct_consumer/fetcher.cc

@bharathv bharathv left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm modulo one question around a loop with timer.

as discussed offline, we should probably skip the backport or have some extra baketime before backporting.

Comment on lines +54 to +58
// we'll keep attempting to pluck from the queue until the timeout is
// exhausted
while (ss::lowres_clock::now() < deadline) {
// either the remaining timeout or a small but reasonable minimum
// timeout

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may I ask why this change is needed? It seems like this timeout should be honored within the data_queue API if nothing is available to be fetched.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With filtering its possible that the next batch of fetch data from the queue is empty. The loop will continuously grab new fetches off the queue until it finds one that isn't entirely stale.

The deadline fiddling done here is so we don't join the cv's waiter queue with an already expired / imminently expiring timeout. If we're going through the work of calling into fetch_next, imo we should give the operation at least a task_quota to work with

Comment on lines +125 to +126
"offset filtering requires that unassigned subscriptions have "
"already been filtered out");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need to dump any debug state in the assert output

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. This is a paranoid assert. I can feasibly only see this assert firing if a developer changes direct consumer, and the message is fairly clear in so far as I think it indicates
"you goofed up, this used to filter expired subscriptions and now it doesn't"

direct_consumer_fixture_test now only asserts on initial conditions in
one test to reduce redundancy. The initial assert is changed to permit
offset only updates, but restrict those updates to at max one per
partition.

direct_consumer_test is changed to no longer assert on empty fetches.
Adds utility functions to find subscriptions, returning the reference as
an optional on reference wrapper. Updates usages of these to use the
helpers for cleanliness
Add reasonable initializers to fetched_partition_data. This is not a
required correctness change, instead this is meant to preempt a writer
from forgetting to set a value when filling in the fields of
fetched_partition_data.

Adds are_offfsets_equal to source_partition_offsets which will check if
tracked offsets are the same or different over time.
This commit does four things
1. fetch data will now be added to the queue even if it has no batches
2. offsets will be updated at the point in time at which fetch_next is
   called
3. a new filter is applied to remove fetches which contain no new
   information (all offsets are the same)
4. fetch next will retry fetching from the data_queue if the filters
   have removed everything from the resultant fetch
The results of partitioning were incorrectly named in
filter_stale_subscriptions.

Fix the names and additionally pull the iterators from the subspan
rather than doing wasteful (and dangerous) iterator math.
Clarifies why a vassert is firing in direct_consumer:
Direct consumers filtering is order dependent to reduce the amount of
code spent checking nullopt
Fetcher is remarkably error prone to work on.
To mitigate this, this commit splits the logic for processing fetch
responses into new delegated functions.

1. do_process_partition_response: a sync static method which is
   responsible for taking a given fetch and determining what should be
   done with it
   - retriable errors -> update metadata
   - out of bounds -> reset offsets
   - unknown error -> bubble to caller
   - data fetch -> return data
   - offset only fetch -> return offsets
2. process_partition_response: an async wrapper for
   do_process_partition_response which updates the fetcher local state
   and incorporates the results into the resultant fetch response
Adds fetcher unit tests to ensure the decision logic in
do_process_partition_response is per expectations.
Adds clarity to the meaning and implication of consistent partitions in
vasserts.

Namely, the code is written to check whether a partition is consistent
before operating on it, allowing us to skip most checks on iterators
to the end of a collection and nullopts from helper getting methods.

This was done to significantly cut down on invalid entry checks.

@bharathv bharathv left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@joe-redpanda joe-redpanda merged commit a1ffc7b into redpanda-data:dev Dec 19, 2025
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants