Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ ss::future<> controller::start(
std::ref(_members_table),
std::ref(_partition_balancer),
std::ref(_partition_manager),
std::ref(_partition_leaders),
std::ref(_as));

co_await _members_backend.invoke_on(
Expand Down
112 changes: 70 additions & 42 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "cluster/partition_balancer_backend.h"
#include "cluster/partition_balancer_rpc_service.h"
#include "cluster/partition_balancer_types.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/partition_manager.h"
#include "cluster/shard_table.h"
#include "cluster/topic_table.h"
Expand Down Expand Up @@ -49,6 +50,7 @@ controller_api::controller_api(
ss::sharded<members_table>& members,
ss::sharded<partition_balancer_backend>& partition_balancer,
ss::sharded<partition_manager>& partition_manager,
ss::sharded<partition_leaders_table>& partition_leaders,
ss::sharded<ss::abort_source>& as)
: _self(self)
, _backend(backend)
Expand All @@ -58,6 +60,7 @@ controller_api::controller_api(
, _members(members)
, _partition_balancer(partition_balancer)
, _partition_manager(partition_manager)
, _partition_leaders(partition_leaders)
, _as(as) {}

ss::future<chunked_vector<ntp_reconciliation_state>>
Expand Down Expand Up @@ -283,6 +286,22 @@ controller_api::get_reconciliation_state(
});
}

ss::future<result<ntp_reconciliation_state>>
controller_api::get_partition_leader_reconciliation_state(
model::ntp ntp, model::timeout_clock::time_point timeout) {
auto leader = _partition_leaders.local().get_leader(ntp);
if (!leader) {
vlog(
clusterlog.debug,
"can't get partition leader for ntp {} to get its reconciliation "
"state",
ntp);
co_return result<ntp_reconciliation_state>(errc::no_leader_controller);
}
co_return co_await get_reconciliation_state(
*leader, std::move(ntp), timeout);
}

ss::future<result<ntp_reconciliation_state>>
controller_api::get_reconciliation_state(
model::node_id id, model::ntp ntp, model::timeout_clock::time_point timeout) {
Expand Down Expand Up @@ -333,51 +352,60 @@ ss::future<std::error_code> controller_api::wait_for_topic(
}

ss::future<result<chunked_vector<partition_reconfiguration_state>>>
controller_api::get_partitions_reconfiguration_state(
controller_api::get_partitions_leader_reconfiguration_state(
const chunked_vector<model::ntp>& partitions,
model::timeout_clock::time_point) {
model::timeout_clock::time_point timeout) {
auto& updates_in_progress = _topics.local().updates_in_progress();

absl::node_hash_map<model::ntp, partition_reconfiguration_state> states;
for (auto& ntp : partitions) {
auto progress_it = updates_in_progress.find(ntp);
if (progress_it == updates_in_progress.end()) {
continue;
}
auto p_as = _topics.local().get_partition_assignment(ntp);
if (!p_as) {
continue;
}
partition_reconfiguration_state state;
state.ntp = ntp;

state.current_assignment = std::move(p_as->replicas);
state.previous_assignment = progress_it->second.get_previous_replicas();
state.state = progress_it->second.get_state();
state.policy = progress_it->second.get_reconfiguration_policy();

auto reconciliation_state = co_await get_reconciliation_state(ntp);
for (auto& operation : reconciliation_state.pending_operations()) {
if (operation.recovery_state) {
state.current_partition_size
= operation.recovery_state->local_size;
for (auto& [id, recovery_state] :
operation.recovery_state->replicas) {
state.replicas.push_back(
replica_bytes{
.node = id,
.bytes_left = recovery_state.bytes_left,
.bytes_transferred = state.current_partition_size
- recovery_state.bytes_left,
.offset = recovery_state.last_offset,
});
co_await ss::max_concurrent_for_each(
partitions,
16,
Comment on lines +360 to +362

Copilot AI Dec 18, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 16 for max concurrency should be extracted as a named constant or configuration parameter to make its purpose clear and allow easier tuning.

Copilot uses AI. Check for mistakes.
[this, &updates_in_progress, &states, timeout](
const model::ntp& ntp) -> ss::future<> {
auto progress_it = updates_in_progress.find(ntp);
if (progress_it == updates_in_progress.end()) {
return ss::now();
}
auto p_as = _topics.local().get_partition_assignment(ntp);
if (!p_as) {
return ss::now();
}
partition_reconfiguration_state state;
state.ntp = ntp;

state.current_assignment = std::move(p_as->replicas);
state.previous_assignment
= progress_it->second.get_previous_replicas();
state.state = progress_it->second.get_state();
state.policy = progress_it->second.get_reconfiguration_policy();

return get_partition_leader_reconciliation_state(ntp, timeout)
.then([&states, &ntp, state = std::move(state)](
result<ntp_reconciliation_state> r) mutable {
if (r.has_value()) {
for (auto& operation : r.value().pending_operations()) {
if (operation.recovery_state) {
state.current_partition_size
= operation.recovery_state->local_size;
for (auto& [id, recovery_state] :
operation.recovery_state->replicas) {
state.replicas.push_back(
replica_bytes{
.node = id,
.bytes_left = recovery_state.bytes_left,
.bytes_transferred
= state.current_partition_size
- recovery_state.bytes_left,
.offset = recovery_state.last_offset,
});
}
}
}
}
}
}

states.emplace(ntp, std::move(state));
}

states.emplace(ntp, std::move(state));
return ss::now();
});
});
chunked_vector<partition_reconfiguration_state> ret;
ret.reserve(states.size());
for (auto& [_, state] : states) {
Expand Down Expand Up @@ -499,7 +527,7 @@ controller_api::get_node_decommission_progress(
// replicas that are moving from decommissioned node are still present on a
// node but their metadata is update, add them explicitly
ret.replicas_left += moving_from_node.size();
auto states = co_await get_partitions_reconfiguration_state(
auto states = co_await get_partitions_leader_reconfiguration_state(
std::move(moving_from_node), timeout);

if (states) {
Expand Down
8 changes: 7 additions & 1 deletion src/v/cluster/controller_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class controller_api {
ss::sharded<members_table>&,
ss::sharded<partition_balancer_backend>&,
ss::sharded<partition_manager>&,
ss::sharded<partition_leaders_table>&,
ss::sharded<ss::abort_source>&);

ss::future<result<chunked_vector<ntp_reconciliation_state>>>
Expand All @@ -65,12 +66,16 @@ class controller_api {
ss::future<result<ntp_reconciliation_state>> get_reconciliation_state(
model::node_id, model::ntp, model::timeout_clock::time_point);

ss::future<result<ntp_reconciliation_state>>
get_partition_leader_reconciliation_state(
model::ntp, model::timeout_clock::time_point);

// high level APIs
ss::future<std::error_code> wait_for_topic(
model::topic_namespace_view, model::timeout_clock::time_point);

ss::future<result<chunked_vector<partition_reconfiguration_state>>>
get_partitions_reconfiguration_state(
get_partitions_leader_reconfiguration_state(
const chunked_vector<model::ntp>&, model::timeout_clock::time_point);
/**
* Returns state of controller backend from each node in the cluster for
Expand Down Expand Up @@ -106,6 +111,7 @@ class controller_api {
ss::sharded<members_table>& _members;
ss::sharded<partition_balancer_backend>& _partition_balancer;
ss::sharded<partition_manager>& _partition_manager;
ss::sharded<partition_leaders_table>& _partition_leaders;
ss::sharded<ss::abort_source>& _as;
};
} // namespace cluster
3 changes: 2 additions & 1 deletion src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,9 @@ std::ostream& operator<<(std::ostream& o, const backend_operation& op) {
std::ostream& operator<<(std::ostream& o, const recovery_state& r) {
fmt::print(
o,
"{{local_last_offset: {}, replicas: {}}}",
"{{local_last_offset: {}, local_size: {}, replicas: {}}}",
r.local_last_offset,
r.local_size,
r.replicas);
return o;
}
Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ struct replica_recovery_state
};
struct recovery_state
: serde::
envelope<recovery_state, serde::version<0>, serde::compat_version<0>> {
envelope<recovery_state, serde::version<1>, serde::compat_version<0>> {
model::offset local_last_offset;
size_t local_size;

Expand All @@ -1570,7 +1570,9 @@ struct recovery_state
friend bool operator==(const recovery_state&, const recovery_state&)
= default;

auto serde_fields() { return std::tie(local_last_offset, replicas); }
auto serde_fields() {
return std::tie(local_last_offset, replicas, local_size);
}
};

struct backend_operation
Expand Down
5 changes: 3 additions & 2 deletions src/v/redpanda/admin/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ admin_server::get_reconfigurations_handler(std::unique_ptr<ss::http::request>) {

auto [reconfiguration_states, reconciliations]
= co_await ss::when_all_succeed(
_controller->get_api().local().get_partitions_reconfiguration_state(
ntps, deadline),
_controller->get_api()
.local()
.get_partitions_leader_reconfiguration_state(ntps, deadline),
_controller->get_api().local().get_global_reconciliation_state(
ntps, deadline));

Expand Down
55 changes: 55 additions & 0 deletions tests/rptest/tests/nodes_decommissioning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,61 @@ def test_decommissioning_working_node(
if not delete_topic:
self.verify()

@cluster(num_nodes=6)
def test_decommission_status(self):
self.start_redpanda()
self._create_topics(replication_factors=[3])
self.start_producer()
to_decommission = random.choice(self.redpanda.nodes)
to_decommission_id = self.redpanda.node_id(to_decommission)
self.logger.info(
f"decommissioning node: {to_decommission_id}",
)
# Block recovery to ensure decommissioning takes some time
# and we see some status in the middle of decommissioning
self._set_recovery_rate(0)
self._decommission(to_decommission_id)

def validate_decommission_status():
def check_decommission_status(rpc_node: ClusterNode):
status = self.admin.get_decommission_status(
id=to_decommission_id, node=rpc_node
)
finished = status["finished"]
replicas_left = status["replicas_left"]
partition_meta = []
for p in status.get("partitions", []):
if p["topic"] != self._topic:
# We are only producing data to self._topic
continue
has_valid_meta = (
p["partition_size"] > 0 and p["bytes_left_to_move"] > 0
)
if not has_valid_meta:
self.logger.debug(
f"partition {p} is not reporting valid metadata"
)
partition_meta.append(has_valid_meta)
return (
not finished
and replicas_left > 0
and partition_meta
and all(partition_meta)
)

return all(check_decommission_status(n) for n in self.redpanda.nodes)

self.redpanda.wait_until(
validate_decommission_status,
timeout_sec=30,
backoff_sec=1,
err_msg="Decommission status not reported as in_progress on all nodes",
retry_on_exc=True,
)
self._set_recovery_rate(2 << 30)

Copilot AI Dec 18, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 2 << 30 (2GB) should be extracted as a named constant to clarify that this is setting a high recovery rate to allow decommissioning to complete.

Copilot uses AI. Check for mistakes.

self._wait_for_node_removed(to_decommission_id)

@cluster(num_nodes=6, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_decommissioning_node_rf_1_replica(self):
self.start_redpanda()
Expand Down
Loading