[CORE-14957] - Make Shadow Link table updates Copy-on-Write#28956
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the cluster link metadata handling to use shared pointers (ss::lw_shared_ptr<metadata>) instead of direct value copies. This change addresses performance and safety concerns around the previous approach of copying metadata objects across scheduling points.
Key changes:
- Introduced
metadata_ptrtype alias forss::lw_shared_ptr<const metadata> - Removed
metadata::copy()andlink_state::copy()methods - Added utility functions in
table_utils.h/ccfor async metadata copying - Updated all interfaces to accept/return
metadata_ptrinstead ofmetadatavalues orstd::optional<std::reference_wrapper<const metadata>>
Reviewed changes
Copilot reviewed 37 out of 37 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/v/redpanda/admin/services/shadow_link/tests/converter_test.cc |
Updated tests to wrap metadata in lw_shared_ptr and use pointer dereferencing |
src/v/redpanda/admin/services/shadow_link/shadow_link.cc |
Changed to pass metadata_ptr instead of moving metadata |
src/v/redpanda/admin/services/shadow_link/converter.h |
Updated function signatures to accept metadata_ptr |
src/v/redpanda/admin/services/shadow_link/converter.cc |
Refactored to work with metadata_ptr and create copies explicitly |
src/v/cluster_link/topic_reconciler.cc |
Updated to access link metadata through pointer |
src/v/cluster_link/tests/topic_reconciler_test.cc |
Modified to use async metadata copying |
src/v/cluster_link/tests/topic_properties_syncer_test.cc |
Updated link metadata access to use pointer dereferencing |
src/v/cluster_link/tests/source_topic_syncer_test.cc |
Changed metadata access and copying to async pattern |
src/v/cluster_link/tests/link_test.cc |
Updated link config access to use get_config() pointer |
src/v/cluster_link/tests/deps.h |
Updated test fixtures to return metadata_ptr |
src/v/cluster_link/tests/deps.cc |
Modified to use async metadata copying |
src/v/cluster_link/source_topic_syncer.cc |
Updated to access config through pointer |
src/v/cluster_link/service.h |
Changed return types to metadata_ptr |
src/v/cluster_link/service.cc |
Refactored to work with metadata_ptr |
src/v/cluster_link/security_migrator.cc |
Updated to dereference config pointer |
src/v/cluster_link/model/types.h |
Introduced metadata_ptr alias and removed copy() methods |
src/v/cluster_link/model/types.cc |
Removed implementation of copy() methods |
src/v/cluster_link/manager.h |
Updated signatures to use metadata_ptr |
src/v/cluster_link/manager.cc |
Refactored to work with shared pointers |
src/v/cluster_link/link_status_reconciler.cc |
Updated to access metadata through pointer |
src/v/cluster_link/link_probe.cc |
Changed config access to use pointer |
src/v/cluster_link/link.h |
Updated to store and return metadata_ptr |
src/v/cluster_link/link.cc |
Refactored to work with shared config pointer |
src/v/cluster_link/group_mirroring_task.cc |
Updated to dereference config pointer |
src/v/cluster_link/deps.h |
Changed interface to use metadata_ptr |
src/v/cluster/cluster_link/tests/utils.h |
Added copy_metadata() function declaration |
src/v/cluster/cluster_link/tests/utils.cc |
Implemented async copy_metadata() |
src/v/cluster/cluster_link/tests/table_test.cc |
Extensive updates to use metadata_ptr and async copying |
src/v/cluster/cluster_link/tests/frontend_validation_test.cc |
Updated to use async metadata copying |
src/v/cluster/cluster_link/tests/BUILD |
Added dependency on table_utils |
src/v/cluster/cluster_link/table_utils.h |
New utility header for async metadata operations |
src/v/cluster/cluster_link/table_utils.cc |
Implementation of async copy utilities |
src/v/cluster/cluster_link/table.h |
Changed to use metadata_ptr throughout |
src/v/cluster/cluster_link/table.cc |
Major refactoring to async operations with metadata_ptr |
src/v/cluster/cluster_link/frontend.h |
Updated interface to return metadata_ptr |
src/v/cluster/cluster_link/frontend.cc |
Refactored to work with shared pointers |
src/v/cluster/BUILD |
Added table_utils library target |
| cmd, | ||
| [&table, offset, revision]( | ||
| const cluster::cluster_link_upsert_cmd& upsert) { | ||
| const cluster::cluster_link_upsert_cmd& upsert) | ||
| -> ss::future<cluster::cluster_link::errc> { |
There was a problem hiding this comment.
The lambda is now marked with explicit return type ss::future<cluster::cluster_link::errc> for consistency with the async conversion. However, the cmd is passed by const reference to ss::visit instead of moving it. While this works for the const upsert command access, consider documenting why the move was removed from line 236 (previously std::move(cmd)).
|
/dt |
CI test resultstest results on build#77779
test results on build#78217
|
dcbfc26 to
a4dfcb9
Compare
| revision](table& table) mutable { | ||
| return ss::visit( | ||
| std::move(cmd), | ||
| cmd, |
There was a problem hiding this comment.
The cmd parameter is no longer being moved here (changed from std::move(cmd) to cmd), but the lambda captures suggest it should be moved to avoid unnecessary copies. The const reference capture in the lambda at line 238 is inconsistent with value semantics expected by the variant visitor.
| if (!config->state.mirror_topics.contains(topic)) { | ||
| topics_no_longer_mirroring.push_back(topic); | ||
| } | ||
| } |
There was a problem hiding this comment.
The assignment of shared_ptr here appears to be a simple copy of the pointer, but the function parameter is already a shared pointer. Consider documenting whether this is intentional reference counting behavior or if the pointer should be copied to a new instance.
| } | |
| } | |
| // Intentionally share ownership of config via shared_ptr; deep copy is not required. |
|
|
||
| auto& link_meta = _link_metadata[link_id.value()]; | ||
| auto it = link_meta.state.mirror_topics.find(cmd.topic); | ||
| auto link_meta = _link_metadata[link_id.value()]; |
There was a problem hiding this comment.
This creates a copy of the shared pointer unnecessarily. Use const auto& to avoid incrementing/decrementing the reference count when only reading the metadata.
|
|
||
| auto& link_meta = _link_metadata[link_id.value()]; | ||
| auto it = link_meta.state.mirror_topics.find(cmd.topic); | ||
| auto link_meta = _link_metadata[link_id.value()]; |
There was a problem hiding this comment.
This creates a copy of the shared pointer unnecessarily. Use const auto& to avoid incrementing/decrementing the reference count when only reading the metadata.
| auto link_meta = _link_metadata[link_id.value()]; | |
| const auto& link_meta = _link_metadata[link_id.value()]; |
bharathv
left a comment
There was a problem hiding this comment.
Feel free to skip comments, can be done at a later time, waste of another CI run.
| bool snapshot_equals( | ||
| const chunked_hash_map<id_t, metadata>& lhs, const table::map_t& rhs) { | ||
| for (const auto& [id, md] : lhs) { | ||
| auto it = rhs.find(id); | ||
| if (it == rhs.end()) { | ||
| return false; | ||
| } | ||
| EXPECT_EQ(*(it->second), md); | ||
| } | ||
| return true; | ||
| } |
There was a problem hiding this comment.
How is error reporting through this?
Could we not just define this as operator==? Then we should also get a full view of the objects.
| bool snapshot_equals( | |
| const chunked_hash_map<id_t, metadata>& lhs, const table::map_t& rhs) { | |
| for (const auto& [id, md] : lhs) { | |
| auto it = rhs.find(id); | |
| if (it == rhs.end()) { | |
| return false; | |
| } | |
| EXPECT_EQ(*(it->second), md); | |
| } | |
| return true; | |
| } | |
| bool operator==( | |
| const chunked_hash_map<id_t, metadata>& lhs, const table::map_t& rhs) { | |
| for (const auto& [id, md] : lhs) { | |
| auto it = rhs.find(id); | |
| if (it == rhs.end()) { | |
| return false; | |
| } | |
| EXPECT_EQ(*(it->second), md); | |
| } | |
| return true; | |
| } |
There was a problem hiding this comment.
gtest doesn't seem to use the operator==
There was a problem hiding this comment.
I also believe if EXPECT_EQ fails then we will see the test fail
| for (auto& [_, t] : _tasks) { | ||
| vlog(cllog.trace, "Updating config for task {}", t->name()); | ||
| t->update_config(_config); | ||
| t->update_config(*_config); |
There was a problem hiding this comment.
previously, each task was copying across the subset of config that was relevant.
Now that we are having a cow-pointer, would it make sense for the tasks to just grab a copy of the pointer and hold that? Less copying logic to keep track of (but slightly less encapsulation).
There was a problem hiding this comment.
yeah fair point - that may increase the size of the PR slightly but I think that's a good idea
There was a problem hiding this comment.
I think i'll do that in a follow up - it's a good idea but the amount of data being copied there is a lot less than the state
a4dfcb9 to
3089389
Compare
|
Force push:
|
| ssx::async_counter cnt; | ||
| link_state copy; | ||
| copy.status = status; | ||
| copy.mirror_topics.reserve(mirror_topics.size()); | ||
|
|
||
| co_await ssx::async_for_each_counter( | ||
| cnt, mirror_topics, [&](const auto& pair) { | ||
| copy.mirror_topics.emplace(pair.first, pair.second.copy()); | ||
| }); |
There was a problem hiding this comment.
The async_counter variable cnt is created but never used. If the counter is not required for tracking async operations, remove it to avoid unnecessary overhead.
| ssx::async_counter cnt; | |
| link_state copy; | |
| copy.status = status; | |
| copy.mirror_topics.reserve(mirror_topics.size()); | |
| co_await ssx::async_for_each_counter( | |
| cnt, mirror_topics, [&](const auto& pair) { | |
| copy.mirror_topics.emplace(pair.first, pair.second.copy()); | |
| }); | |
| link_state copy; | |
| copy.status = status; | |
| copy.mirror_topics.reserve(mirror_topics.size()); | |
| for (const auto& pair : mirror_topics) { | |
| copy.mirror_topics.emplace(pair.first, pair.second.copy()); | |
| } |
| cmd, | ||
| [&table, offset, revision]( | ||
| const cluster::cluster_link_upsert_cmd& upsert) { | ||
| const cluster::cluster_link_upsert_cmd& upsert) | ||
| -> ss::future<cluster::cluster_link::errc> { |
There was a problem hiding this comment.
The cmd parameter is passed by value to ss::visit, which may result in unnecessary copies. Consider passing by const reference or moving when appropriate to avoid potential performance overhead with large command objects.
There was a problem hiding this comment.
That's fair, ss::visit should have been decltyoe(auto), but that's a breaking change. Fortunately the fix is easy, even if the rollout isn't.
| @@ -372,10 +374,12 @@ table::upsert_link(id_t id, metadata meta, model::revision_id revision) { | |||
| return it.second == id && !meta.state.mirror_topics.contains(it.first); | |||
| }); | |||
|
|
|||
There was a problem hiding this comment.
Creating a shared pointer immediately after copying the metadata adds an allocation overhead. Consider whether the copy operation could directly return a shared pointer to avoid this extra step, or document why this two-step process is necessary.
| // NOTE: metadata::copy() returns a value, not an lw_shared_ptr. We first | |
| // obtain an independent metadata instance and then wrap it in an | |
| // ss::lw_shared_ptr for storage in _link_metadata. Changing copy() to | |
| // return a shared pointer would require broader API changes outside this | |
| // file, so this two-step process is intentional here. |
| for (const auto& [id, md_ptr] : links) { | ||
| metadata md; | ||
| md.name = md_ptr->name; | ||
| md.uuid = md_ptr->uuid; | ||
| md.connection = md_ptr->connection; | ||
| md.state = co_await md_ptr->state.copy(); | ||
| md.configuration = md_ptr->configuration.copy(); | ||
| copy.emplace(id, std::move(md)); | ||
| } | ||
| co_return copy; | ||
| } | ||
| ss::future<chunked_hash_map<id_t, metadata_ptr>> | ||
| copy_links_from_snapshot(const chunked_hash_map<id_t, metadata>& links) { | ||
| chunked_hash_map<id_t, metadata_ptr> copy; | ||
| copy.reserve(links.size()); | ||
| for (const auto& [id, md] : links) { | ||
| auto metadata_copy = ss::make_lw_shared<metadata>(co_await md.copy()); | ||
|
|
||
| copy.emplace(id, std::move(metadata_copy)); | ||
| } | ||
|
|
There was a problem hiding this comment.
The function awaits each state.copy() sequentially in a loop. For large numbers of links, consider using parallel async operations with ssx::async_for_each or similar to improve performance by overlapping I/O operations.
| for (const auto& [id, md_ptr] : links) { | |
| metadata md; | |
| md.name = md_ptr->name; | |
| md.uuid = md_ptr->uuid; | |
| md.connection = md_ptr->connection; | |
| md.state = co_await md_ptr->state.copy(); | |
| md.configuration = md_ptr->configuration.copy(); | |
| copy.emplace(id, std::move(md)); | |
| } | |
| co_return copy; | |
| } | |
| ss::future<chunked_hash_map<id_t, metadata_ptr>> | |
| copy_links_from_snapshot(const chunked_hash_map<id_t, metadata>& links) { | |
| chunked_hash_map<id_t, metadata_ptr> copy; | |
| copy.reserve(links.size()); | |
| for (const auto& [id, md] : links) { | |
| auto metadata_copy = ss::make_lw_shared<metadata>(co_await md.copy()); | |
| copy.emplace(id, std::move(metadata_copy)); | |
| } | |
| co_await ssx::async_for_each( | |
| links, [©](const auto& entry) -> ss::future<> { | |
| const auto& [id, md_ptr] = entry; | |
| metadata md; | |
| md.name = md_ptr->name; | |
| md.uuid = md_ptr->uuid; | |
| md.connection = md_ptr->connection; | |
| md.state = co_await md_ptr->state.copy(); | |
| md.configuration = md_ptr->configuration.copy(); | |
| copy.emplace(id, std::move(md)); | |
| co_return; | |
| }); | |
| co_return copy; | |
| } | |
| ss::future<chunked_hash_map<id_t, metadata_ptr>> | |
| copy_links_from_snapshot(const chunked_hash_map<id_t, metadata>& links) { | |
| chunked_hash_map<id_t, metadata_ptr> copy; | |
| copy.reserve(links.size()); | |
| co_await ssx::async_for_each( | |
| links, [©](const auto& entry) -> ss::future<> { | |
| const auto& [id, md] = entry; | |
| auto metadata_copy | |
| = ss::make_lw_shared<metadata>(co_await md.copy()); | |
| copy.emplace(id, std::move(metadata_copy)); | |
| co_return; | |
| }); |
| return _table.invoke_on_all([id, &md]( | ||
| cluster::cluster_link::table& t) { |
There was a problem hiding this comment.
The lambda captures md by reference, but then uses it after an async operation. If md goes out of scope before the async operation completes, this creates a use-after-free bug. The reference should either be captured by value or carefully managed to ensure lifetime.
| return _table.invoke_on_all([id, &md]( | |
| cluster::cluster_link::table& t) { | |
| return _table.invoke_on_all([id, md]( | |
| cluster::cluster_link::table& t) mutable { |
| [&snap, snap_revision = model::revision_id(offset)]( | ||
| table& table) -> ss::future<> { | ||
| return copy_links_from_snapshot(snap.cluster_links.links) | ||
| .then([&snap, snap_revision, &table](table::map_t new_links) { |
There was a problem hiding this comment.
The lambda captures snap by reference, but the snap parameter may not outlive the async operation. This could lead to a use-after-free if the snapshot is destroyed before the continuation executes. Consider capturing by value or ensuring the snapshot's lifetime.
| [&snap, snap_revision = model::revision_id(offset)]( | |
| table& table) -> ss::future<> { | |
| return copy_links_from_snapshot(snap.cluster_links.links) | |
| .then([&snap, snap_revision, &table](table::map_t new_links) { | |
| [snap, snap_revision = model::revision_id(offset)]( | |
| table& table) -> ss::future<> { | |
| return copy_links_from_snapshot(snap.cluster_links.links) | |
| .then([snap, snap_revision, &table](table::map_t new_links) { |
Adds utility functions to be used by table to copy metadata Signed-off-by: Michael Boquard <michael@redpanda.com>
We have noticed that at scale, Shadow Linking has encountered some issues when copying the state. This is due to having a large number of shadow topics. This results in reactor stalls and issues with fail over. The solution is to eliminate the copies. To do so, we are going to switch from holding a `cluster_link::model::metadata` object in the table to a lw_shared_ptr holding a const pointer to the metadata. Upon update, the shared pointer in the table will be replaced, however all current copies will still be valid until their holders are notified of the update. Signed-off-by: Michael Boquard <michael@redpanda.com>
3089389 to
258135f
Compare
|
/backport v25.3.x |
This change refactors the internal "cluster_link::table" to hold const shared_ptr instances of metadata. When the table is updated, the data is copied and replaced (making the new behavior copy-on-write). Users of this data will no receive instanes of
ss::lw_shared_ptr<const metadata>. These will remain valid even after the data in the table is changed.Backports Required
Release Notes
Improvements