Skip to content

shadowing/write_at_offset_stm: ensure_truncatable API#29358

Merged
bharathv merged 8 commits into
redpanda-data:devfrom
bharathv:ensure_truncatable
Jan 28, 2026
Merged

shadowing/write_at_offset_stm: ensure_truncatable API#29358
bharathv merged 8 commits into
redpanda-data:devfrom
bharathv:ensure_truncatable

Conversation

@bharathv

@bharathv bharathv commented Jan 21, 2026

Copy link
Copy Markdown
Contributor

If the source start offset has moved past sink HWM, propagating truncation (today) would result in an error as we currently have no way to truncate above HWM.

This PR exposes ensure_truncatable() API that can be used to ensure that start_offset is past the offset that we are trying to truncate.

Fixes: CORE-15308

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.3.x
  • v25.2.x
  • v25.1.x

Release Notes

Bug Fixes

  • Fixes a bug where truncation does not propagate to sink if source start offset has moved ahead of sink high watermark.

@bharathv

Copy link
Copy Markdown
Contributor Author

/dt

@vbotbuildovich

vbotbuildovich commented Jan 21, 2026

Copy link
Copy Markdown
Collaborator

Retry command for Build#79443

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/nodes_decommissioning_test.py::NodesDecommissioningTest.test_decommissioning_and_upgrade
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_deletion_stops_move@{"num_to_upgrade":2}
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_legacy_default_explicit_after_upgrade@{"wipe_cache":false}
tests/rptest/tests/control_character_flag_test.py::ControlCharacterNag.test_validate_nag_message@{"initial_version_as_list":[23,1,1]}
tests/rptest/tests/upgrade_test.py::UpgradeBackToBackTest.test_upgrade_with_all_workloads@{"single_upgrade":true}
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_moving_not_fully_initialized_partition@{"num_to_upgrade":2}
tests/rptest/tests/cluster_bootstrap_test.py::ClusterBootstrapUpgrade.test_change_bootstrap_configs_after_upgrade@{"empty_seed_starts_cluster":false}
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_legacy_default_explicit_before_upgrade@{"wipe_cache":false}
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_legacy_default_explicit_after_upgrade@{"wipe_cache":true}
tests/rptest/tests/upgrade_test.py::UpgradeBackToBackTest.test_upgrade_with_all_workloads@{"single_upgrade":false}
tests/rptest/tests/license_enforcement_test.py::LicenseEnforcementTest.test_license_enforcement@{"clean_node_after_recovery":true,"clean_node_before_recovery":false}
tests/rptest/tests/upgrade_test.py::UpgradeWithWorkloadTest.test_rolling_upgrade
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_removal_of_legacy_default_overriden@{"wipe_cache":false}
tests/rptest/transactions/consumer_offsets_test.py::VerifyConsumerOffsetsThruUpgrades.test_consumer_group_offsets@{"versions_to_upgrade":3}
tests/rptest/tests/upgrade_test.py::UpgradeFromPriorFeatureVersionTest.test_basic_upgrade
tests/rptest/tests/audit_log_test.py::AuditLogUpgradeTest.test_audit_log_upgrade_all_nodes
tests/rptest/tests/license_enforcement_test.py::LicenseEnforcementTest.test_escape_hatch_license_variable@{"clean_node_before_upgrade":true}
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_overlapping_changes@{"num_to_upgrade":2}
tests/rptest/tests/cluster_bootstrap_test.py::ClusterBootstrapUpgrade.test_change_bootstrap_configs_during_upgrade@{"empty_seed_starts_cluster":false}
tests/rptest/tests/cluster_features_test.py::FeaturesMultiNodeUpgradeTest.test_upgrade
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_removal_of_legacy_default_defaulted@{"wipe_cache":false}
tests/rptest/tests/control_character_flag_test.py::ControlCharacterPermittedAfterUpgrade.test_upgrade_from_pre_v23_2@{"initial_version_as_list":[22,3,11]}
tests/rptest/transactions/consumer_offsets_test.py::VerifyConsumerOffsetsThruUpgrades.test_consumer_group_offsets@{"versions_to_upgrade":1}
tests/rptest/tests/data_transforms_test.py::DataTransformsRpcUpgradeTest.test_upgrade_one_node
tests/rptest/tests/partition_movement_upgrade_test.py::PartitionMovementUpgradeTest.test_basic_upgrade
tests/rptest/tests/license_enforcement_test.py::LicenseEnforcementTest.test_license_enforcement@{"clean_node_after_recovery":false,"clean_node_before_recovery":false}
tests/rptest/tests/cluster_bootstrap_test.py::ClusterBootstrapUpgrade.test_change_bootstrap_configs_during_upgrade@{"empty_seed_starts_cluster":true}
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_empty@{"num_to_upgrade":2}
tests/rptest/tests/upgrade_test.py::UpgradeFromSpecificVersion.test_basic_upgrade
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_removal_of_legacy_default_defaulted@{"wipe_cache":true}
tests/rptest/tests/control_character_flag_test.py::ControlCharacterPermittedAfterUpgrade.test_upgrade_from_pre_v23_2@{"initial_version_as_list":[23,1,1]}
tests/rptest/transactions/consumer_offsets_test.py::VerifyConsumerOffsetsThruUpgrades.test_consumer_group_offsets@{"versions_to_upgrade":2}
tests/rptest/tests/read_replica_e2e_test.py::ReadReplicasUpgradeTest.test_upgrades@{"cloud_storage_type":1}
tests/rptest/tests/controller_snapshot_test.py::ControllerSnapshotTest.test_upgrade_compat
tests/rptest/tests/license_enforcement_test.py::LicenseEnforcementTest.test_escape_hatch_license_variable@{"clean_node_before_upgrade":false}
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_dynamic@{"num_to_upgrade":2}
tests/rptest/tests/upgrade_test.py::UpgradeFromPriorFeatureVersionCloudStorageTest.test_rolling_upgrade@{"cloud_storage_type":1}
tests/rptest/tests/cluster_bootstrap_test.py::ClusterBootstrapUpgrade.test_change_bootstrap_configs_after_upgrade@{"empty_seed_starts_cluster":true}
tests/rptest/tests/cluster_features_test.py::FeaturesMultiNodeUpgradeTest.test_rollback
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_legacy_default_explicit_before_upgrade@{"wipe_cache":true}
tests/rptest/tests/control_character_flag_test.py::ControlCharacterPermittedAfterUpgrade.test_upgrade_from_pre_v23_2@{"initial_version_as_list":[22,2,9]}
tests/rptest/tests/license_enforcement_test.py::LicenseEnforcementTest.test_license_enforcement@{"clean_node_after_recovery":false,"clean_node_before_recovery":true}
tests/rptest/tests/upgrade_test.py::UpgradeWithWorkloadTest.test_rolling_upgrade_with_rollback@{"upgrade_after_rollback":false}
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_invalid_destination@{"num_to_upgrade":2}
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_removal_of_legacy_default_overriden@{"wipe_cache":true}
tests/rptest/tests/compaction_recovery_test.py::CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade
tests/rptest/tests/random_node_operations_smoke_test.py::RedpandaNodeOperationsSmokeTest.test_node_ops_smoke_test@{"cloud_storage_type":1,"mixed_versions":true}
tests/rptest/tests/acl_upgrade_test.py::ACLUpgradeTest.test_acl_operations_during_upgrade
tests/rptest/tests/license_enforcement_test.py::LicenseEnforcementTest.test_license_enforcement@{"clean_node_after_recovery":true,"clean_node_before_recovery":true}
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_bootstrapping_after_move@{"num_to_upgrade":2}
tests/rptest/tests/upgrade_test.py::UpgradeWithWorkloadTest.test_rolling_upgrade_with_rollback@{"upgrade_after_rollback":true}
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_static@{"num_to_upgrade":2}
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_legacy_default@{"wipe_cache":false}
tests/rptest/tests/control_character_flag_test.py::ControlCharacterNag.test_validate_nag_message@{"initial_version_as_list":[22,2,9]}
tests/rptest/tests/cluster_features_test.py::FeaturesSingleNodeUpgradeTest.test_upgrade
tests/rptest/tests/partition_movement_test.py::PartitionMovementTest.test_move_consumer_offsets_intranode@{"num_to_upgrade":2}
tests/rptest/tests/cluster_config_test.py::ClusterConfigLegacyDefaultTest.test_legacy_default@{"wipe_cache":true}
tests/rptest/tests/control_character_flag_test.py::ControlCharacterNag.test_validate_nag_message@{"initial_version_as_list":[22,3,11]}
tests/rptest/tests/partition_movement_test.py::SIPartitionMovementTest.test_shadow_indexing@{"cloud_storage_type":2,"num_to_upgrade":2,"with_cloud_topics":false}
tests/rptest/transactions/tx_upgrade_test.py::TxUpgradeTest.upgrade_does_not_change_tx_coordinator_assignment_test
tests/rptest/transactions/tx_coordinator_migration_test.py::TxCoordinatorMigrationTest.test_migrating_tx_manager_coordinator@{"upgrade":true,"with_failures":false}
tests/rptest/tests/controller_upgrade_test.py::ControllerUpgradeTest.test_updating_cluster_when_executing_operations
tests/rptest/tests/partition_movement_test.py::SIPartitionMovementTest.test_cross_shard@{"cloud_storage_type":1,"num_to_upgrade":2,"with_cloud_topics":false}
tests/rptest/tests/raft_recovery_test.py::RaftRecoveryUpgradeTest.test_upgrade
tests/rptest/tests/partition_movement_test.py::SIPartitionMovementTest.test_shadow_indexing@{"cloud_storage_type":1,"num_to_upgrade":2,"with_cloud_topics":false}
tests/rptest/tests/partition_movement_test.py::SIPartitionMovementTest.test_cross_shard@{"cloud_storage_type":2,"num_to_upgrade":2,"with_cloud_topics":false}
tests/rptest/transactions/tx_coordinator_migration_test.py::TxCoordinatorMigrationTest.test_migrating_tx_manager_coordinator@{"upgrade":true,"with_failures":true}
tests/rptest/tests/datalake/datalake_upgrade_test.py::DatalakeUpgradeTest.test_upload_through_upgrade@{"cloud_storage_type":1,"migration_type":"recreate_table","query_engine":"spark"}
tests/rptest/tests/datalake/datalake_upgrade_test.py::DatalakeUpgradeTest.test_upload_through_upgrade@{"cloud_storage_type":1,"migration_type":"rename_columns","query_engine":"spark"}
tests/rptest/tests/write_caching_fi_e2e_test.py::WriteCachingFailureInjectionE2ETest.test_crash_all@{"use_transactions":false}
tests/rptest/tests/upgrade_test.py::UpgradeAndCheckRecoveryReads.test_basic_upgrade
tests/rptest/transactions/tx_upgrade_test.py::TxUpgradeRevertTest.test_snapshot_compatibility

@bharathv bharathv marked this pull request as ready for review January 22, 2026 03:35

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a bug where truncation does not propagate to a sink if the source start offset has moved ahead of the sink's high watermark. The solution introduces an ensure_truncatable() API that guarantees the log can be safely truncated by filling gaps with placeholder batches as needed.

Changes:

  • Introduces ensure_truncatable() API to write_at_offset_stm that ensures truncation safety by filling gaps with compaction placeholder batches
  • Refactors ghost batch creation functions into generic template functions that support both ghost batches and compaction placeholder batches
  • Integrates the new API into the cluster link service to handle truncation propagation

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/v/model/batch_utils.h Refactored batch creation functions into templates supporting both ghost and compaction placeholder batches
src/v/model/batch_utils.cc Moved function implementations to header as templates and added explicit instantiations
src/v/storage/log_reader.cc Updated function call to use namespace-qualified model::make_ghost_batch
src/v/kafka/server/write_at_offset_stm.h Added ensure_truncatable() method declaration and new error codes
src/v/kafka/server/write_at_offset_stm.cc Implemented ensure_truncatable() method and custom formatter for error codes
src/v/kafka/server/tests/write_at_offset_stm_test.cc Added comprehensive tests for the new ensure_truncatable() functionality
src/v/cluster_link/service.cc Integrated ensure_truncatable() call before prefix truncation in cluster link service

Comment thread src/v/kafka/server/write_at_offset_stm.cc Outdated
No logical changes, templatized the ghost batch utility on batch type
@vbotbuildovich

vbotbuildovich commented Jan 22, 2026

Copy link
Copy Markdown
Collaborator

Retry command for Build#79523

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_start_offset_catch_up@{"source_cluster_spec":{"cluster_type":"kafka","kafka_quorum":"COMBINED_KRAFT","kafka_version":"3.8.0"},"with_failures":false}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_start_offset_catch_up@{"source_cluster_spec":{"cluster_type":"redpanda"},"with_failures":true}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_start_offset_catch_up@{"source_cluster_spec":{"cluster_type":"redpanda"},"with_failures":false}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkCustomStartOffsetSelectionTests.test_starting_offset@{"failures":false,"source_cluster_spec":{"cluster_type":"redpanda"},"starting_offset":"latest"}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkCustomStartOffsetSelectionTests.test_starting_offset@{"failures":false,"source_cluster_spec":{"cluster_type":"kafka","kafka_quorum":"COMBINED_KRAFT","kafka_version":"3.8.0"},"starting_offset":"latest"}

@bharathv bharathv force-pushed the ensure_truncatable branch 2 times, most recently from c236c57 to 677f039 Compare January 23, 2026 18:59
@vbotbuildovich

vbotbuildovich commented Jan 23, 2026

Copy link
Copy Markdown
Collaborator

Retry command for Build#79567

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkCustomStartOffsetSelectionTests.test_starting_offset@{"failures":false,"source_cluster_spec":{"cluster_type":"redpanda"},"starting_offset":"latest"}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkCustomStartOffsetSelectionTests.test_starting_offset@{"failures":false,"source_cluster_spec":{"cluster_type":"kafka","kafka_quorum":"COMBINED_KRAFT","kafka_version":"3.8.0"},"starting_offset":"latest"}
tests/rptest/tests/random_node_operations_smoke_test.py::RedpandaNodeOperationsSmokeTest.test_node_ops_smoke_test@{"cloud_storage_type":1,"mixed_versions":false}

@michael-redpanda michael-redpanda left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Comment thread src/v/cluster_link/service.cc
Comment thread src/v/kafka/server/write_at_offset_stm.cc
Comment thread src/v/kafka/server/write_at_offset_stm.cc
Comment thread src/v/kafka/server/write_at_offset_stm.cc Outdated
co_return;
}

auto truncate_offset = std::max(_start_offset, source_start_offset);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vbotbuildovich

Copy link
Copy Markdown
Collaborator

Retry command for Build#79672

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/shadow_linking_rnot_test.py::ShadowLinkingRandomOpsTest.test_node_operations@{"failures":true}

Without this the sink may be having a wrong last_replicated_offset from
the last inflight replication which has eventually failed. We will end
up reset-ing source to wrong offset.
Prefix truncation is a rare op, it is ok to do it inline with
replication. Now that we have the ability to do offset only fetches in
the client, we don't need a periodic timer loop to do it.
The client was returning an empty batch when the start offset changed.
These were getting filtered out in the data_queue, so the notification
never reached the replicator and truncation didn’t happen.
With this change, an empty batch is treated as valid data. It still gets
filtered out by the replicator, but now it triggers prefix truncation

This issue was masked until now because there was a periodic truncation
async loop that invoked prefix_truncation on a timer. Now that the
previous commit removed it, this issue surfaced.
@bharathv bharathv requested a review from Copilot January 27, 2026 18:21

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 1 comment.

Comment on lines +65 to +68
if (!batches.empty()) [[likely]] {
_next = kafka::next_offset(
model::offset_cast(batches.back().last_offset()));
auto total_bytes = std::accumulate(

Copilot AI Jan 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _next offset is updated even when batches is empty (after the check). However, when batches is empty, the previous behavior didn't update _next. Consider whether this change in behavior is intentional. If batches is empty, _next should remain unchanged to maintain consistency with the previous implementation.

Copilot uses AI. Check for mistakes.
co_return errc::success;
}
vlog(
_log.debug,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe info level ?

@mmaslankaprv mmaslankaprv left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, just nits

@redpanda-data redpanda-data deleted a comment from vbotbuildovich Jan 28, 2026
@redpanda-data redpanda-data deleted a comment from vbotbuildovich Jan 28, 2026
@vbotbuildovich

Copy link
Copy Markdown
Collaborator

CI test results

test results on build#79705
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
ControllerLogLimitMirrorMakerTests test_mirror_maker_with_limits null integration https://buildkite.com/redpanda/redpanda/builds/79705#019c004b-28c4-4792-9fa6-6b79203ffe23 FLAKY 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0124, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ControllerLogLimitMirrorMakerTests&test_method=test_mirror_maker_with_limits
LogCompactionTxRemovalUpgradeFrom25_3_1_Test test_tx_control_batch_removal_with_upgrade_and_recovery null integration https://buildkite.com/redpanda/redpanda/builds/79705#019c004b-28ba-49a8-a01a-938ee3e8df06 FLAKY 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0026, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=LogCompactionTxRemovalUpgradeFrom25_3_1_Test&test_method=test_tx_control_batch_removal_with_upgrade_and_recovery

@bharathv bharathv merged commit 0ed2e21 into redpanda-data:dev Jan 28, 2026
21 checks passed
@bharathv bharathv deleted the ensure_truncatable branch January 28, 2026 19:00
@vbotbuildovich

Copy link
Copy Markdown
Collaborator

/backport v25.3.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants