Skip to content
Merged
5 changes: 4 additions & 1 deletion src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,8 @@ ss::future<> controller::start(
std::ref(_drain_manager),
std::ref(_feature_table),
std::ref(_partition_leaders),
std::ref(_tp_state));
std::ref(_tp_state),
std::ref(_node_status_table));

_leader_balancer = std::make_unique<leader_balancer>(
_tp_state.local(),
Expand Down Expand Up @@ -804,6 +805,8 @@ ss::future<> controller::start(
std::ref(_members_frontend),
config::shard_local_cfg()
.partition_autobalancing_node_availability_timeout_sec.bind(),
config::shard_local_cfg()
.partition_autobalancing_node_autodecommission_timeout_sec.bind(),
config::shard_local_cfg()
.partition_autobalancing_max_disk_usage_percent.bind(),
config::shard_local_cfg().partition_autobalancing_tick_interval_ms.bind(),
Expand Down
46 changes: 43 additions & 3 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/node/local_monitor.h"
#include "cluster/node_status_table.h"
#include "cluster/partition_manager.h"
#include "cluster/partition_probe.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/property.h"
#include "container/chunked_hash_map.h"
Expand All @@ -30,6 +32,7 @@
#include "model/metadata.h"
#include "raft/fwd.h"
#include "rpc/connection_cache.h"
#include "rpc/types.h"
#include "ssx/async_algorithm.h"

#include <seastar/core/chunked_fifo.hh>
Expand All @@ -47,6 +50,7 @@
#include <fmt/ranges.h>

#include <algorithm>
#include <chrono>
#include <iterator>
#include <optional>
#include <ranges>
Expand All @@ -67,7 +71,8 @@ health_monitor_backend::health_monitor_backend(
ss::sharded<drain_manager>& drain_manager,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<partition_leaders_table>& partition_leaders_table,
ss::sharded<topic_table>& topic_table)
ss::sharded<topic_table>& topic_table,
ss::sharded<node_status_table>& node_status_table)
: _raft0(std::move(raft0))
, _members(mt)
, _connections(connections)
Expand All @@ -78,6 +83,7 @@ health_monitor_backend::health_monitor_backend(
, _feature_table(feature_table)
, _partition_leaders_table(partition_leaders_table)
, _topic_table(topic_table)
, _node_status_table(node_status_table)
, _reports{ss::make_lw_shared<report_cache_t>()}
, _local_monitor(local_monitor)
, _self(_raft0->self().id()) {}
Expand Down Expand Up @@ -199,7 +205,11 @@ std::optional<node_health_report_ptr> health_monitor_backend::build_node_report(
}

node_health_report ret{
it->second->id, it->second->local_state, {}, it->second->drain_status};
it->second->id,
it->second->local_state,
{},
it->second->drain_status,
it->second->node_liveness_report};
ret.local_state.logical_version
= features::feature_table::get_latest_logical_version();
ret.topics = filter_topic_status(it->second->topics, f.ntp_filters);
Expand Down Expand Up @@ -888,13 +898,18 @@ health_monitor_backend::collect_current_node_health() {

auto drain_status = co_await _drain_manager.local().status();
auto topics = co_await collect_topic_status();
auto node_liveness_report = collect_node_liveness_report();

auto [it, _] = _status.try_emplace(id);
it->second.is_alive = alive::yes;
it->second.last_reply_timestamp = ss::lowres_clock::now();

co_return node_health_report{
id, std::move(local_state), std::move(topics), std::move(drain_status)};
id,
std::move(local_state),
std::move(topics),
std::move(drain_status),
std::move(node_liveness_report)};
}
ss::future<result<node_health_report_ptr>>
health_monitor_backend::get_current_node_health() {
Expand Down Expand Up @@ -1024,6 +1039,7 @@ reports_acc_t reduce_reports_map(reports_acc_t acc, shard_report shard_report) {
return acc;
}
} // namespace

ss::future<chunked_vector<topic_status>>
health_monitor_backend::collect_topic_status() {
auto reports_map = co_await _partition_manager.map_reduce0(
Expand All @@ -1040,6 +1056,30 @@ health_monitor_backend::collect_topic_status() {
co_return topics;
}

node_liveness_report health_monitor_backend::collect_node_liveness_report() {
const auto now = rpc::clock_type::now();
absl::flat_hash_map<model::node_id, rpc::clock_type::duration>
node_to_last_seen{};
auto node_status_range
= _members.local().node_ids()
| std::ranges::views::transform([this](model::node_id node_id) {
return _node_status_table.local().get_node_status(node_id);
})
| std::ranges::views::filter(
[](auto maybe_node_status) { return maybe_node_status.has_value(); })
| std::ranges::views::transform(
[](auto maybe_node_status) { return *maybe_node_status; });

std::ranges::for_each(
std::move(node_status_range),
[&node_to_last_seen, &now](node_status node_status) {
node_to_last_seen.emplace(
node_status.node_id, now - node_status.last_seen);
});
return node_liveness_report{
{.node_id_to_last_seen = std::move(node_to_last_seen)}};
}

std::chrono::milliseconds health_monitor_backend::max_metadata_age() {
return config::shard_local_cfg().health_monitor_max_metadata_age();
}
Expand Down
9 changes: 8 additions & 1 deletion src/v/cluster/health_monitor_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
#include "cluster/fwd.h"
#include "cluster/health_monitor_types.h"
#include "cluster/node/local_monitor.h"
#include "cluster/node_status_table.h"
#include "cluster/notification.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "rpc/fwd.h"
#include "ssx/mutex.h"
Expand Down Expand Up @@ -71,7 +73,8 @@ class health_monitor_backend {
ss::sharded<drain_manager>&,
ss::sharded<features::feature_table>&,
ss::sharded<partition_leaders_table>&,
ss::sharded<topic_table>&);
ss::sharded<topic_table>&,
ss::sharded<node_status_table>&);

ss::future<> stop();

Expand Down Expand Up @@ -165,6 +168,9 @@ class health_monitor_backend {

ss::future<chunked_vector<topic_status>> collect_topic_status();

// get the status info of all nodes which are past auto decommission timeout
node_liveness_report collect_node_liveness_report();

result<node_health_report>
process_node_reply(model::node_id, result<get_node_health_reply>);

Expand Down Expand Up @@ -232,6 +238,7 @@ class health_monitor_backend {
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<partition_leaders_table>& _partition_leaders_table;
ss::sharded<topic_table>& _topic_table;
ss::sharded<node_status_table>& _node_status_table;

ss::lowres_clock::time_point _last_refresh;
ss::lw_shared_ptr<abortable_refresh_request> _refresh_request;
Expand Down
34 changes: 27 additions & 7 deletions src/v/cluster/health_monitor_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,25 @@ std::ostream& operator<<(std::ostream& o, const node_state& s) {
return o;
}

std::ostream& operator<<(std::ostream& o, const node_liveness_report& nls) {
fmt::print(o, "{{node_id_to_last_seen: {}}}", nls.node_id_to_last_seen);
return o;
}

bool operator==(const node_liveness_report& a, const node_liveness_report& b) {
return std::ranges::equal(a.node_id_to_last_seen, b.node_id_to_last_seen);
}

node_health_report::node_health_report(
model::node_id id,
node::local_state local_state,
chunked_vector<topic_status> topics_vec,
std::optional<drain_manager::drain_status> drain_status)
std::optional<drain_manager::drain_status> drain_status,
struct node_liveness_report node_liveness_report)
: id(id)
, local_state(std::move(local_state))
, drain_status(drain_status) {
, drain_status(drain_status)
, node_liveness_report(std::move(node_liveness_report)) {
topics.reserve(topics_vec.size());
for (auto& topic : topics_vec) {
topics.emplace(
Expand All @@ -91,7 +102,8 @@ node_health_report::node_health_report(
}

node_health_report node_health_report::copy() const {
node_health_report ret{id, local_state, {}, drain_status};
node_health_report ret{
id, local_state, {}, drain_status, node_liveness_report};
ret.topics.reserve(topics.bucket_count());
for (const auto& [tp_ns, partitions] : topics) {
ret.topics.emplace(tp_ns, copy_partition_statuses(partitions));
Expand All @@ -104,7 +116,12 @@ std::ostream& operator<<(std::ostream& o, const node_health_report& r) {
}

node_health_report_serde::node_health_report_serde(const node_health_report& hr)
: node_health_report_serde(hr.id, hr.local_state, {}, hr.drain_status) {
: node_health_report_serde(
hr.id,
hr.local_state,
/* topics */ {},
hr.drain_status,
hr.node_liveness_report) {
topics.reserve(hr.topics.size());
for (const auto& [tp_ns, partitions] : hr.topics) {
topics.emplace_back(tp_ns, copy_to_vector(partitions));
Expand Down Expand Up @@ -154,11 +171,13 @@ partition_statuses_map_t copy_to_map(const partition_statuses_t& ps_vec) {
std::ostream& operator<<(std::ostream& o, const node_health_report_serde& r) {
fmt::print(
o,
"{{id: {}, topics: {}, local_state: {}, drain_status: {}}}",
"{{id: {}, topics: {}, local_state: {}, drain_status: {}, "
"node_liveness_report {}}}",
r.id,
r.topics,
r.local_state,
r.drain_status);
r.drain_status,
r.node_liveness_report);
return o;
}

Expand All @@ -171,7 +190,8 @@ bool operator==(
a.topics.cbegin(),
a.topics.cend(),
b.topics.cbegin(),
b.topics.cend());
b.topics.cend())
&& a.node_liveness_report == b.node_liveness_report;
}

std::ostream& operator<<(std::ostream& o, const cluster_health_report& r) {
Expand Down
60 changes: 53 additions & 7 deletions src/v/cluster/health_monitor_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#include "cluster/drain_manager.h"
#include "cluster/errc.h"
#include "cluster/node/types.h"
#include "cluster/types.h"
#include "container/chunked_hash_map.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "rpc/types.h"
#include "serde/async.h"
#include "serde/rw/bool_class.h"
#include "serde/rw/envelope.h"
Expand Down Expand Up @@ -196,6 +198,42 @@ struct topic_status
auto serde_fields() { return std::tie(tp_ns, partitions); }
};

/**
* Status for the automatic decommissioning of dead nodes
*/

struct node_liveness_report_data {
Comment thread
joe-redpanda marked this conversation as resolved.
// map between a given node id and how long ago that node was last seen
// no entry will be present for a given node if it has not been seen
absl::flat_hash_map<model::node_id, rpc::clock_type::duration>
Comment thread
joe-redpanda marked this conversation as resolved.
node_id_to_last_seen;
};

struct node_liveness_report
: serde::envelope<
node_liveness_report,
serde::version<0>,
serde::compat_version<0>>
, node_liveness_report_data {
static constexpr int8_t current_version = 0;

using node_liveness_report_data::node_liveness_report_data;

// NOLINTNEXTLINE hicpp-explicit-conversions
node_liveness_report(node_liveness_report_data data) noexcept
: node_liveness_report_data{std::move(data)} {
static_assert(
sizeof(node_liveness_report_data) == sizeof(node_liveness_report));
}

friend std::ostream& operator<<(std::ostream&, const node_liveness_report&);

auto serde_fields() { return std::tie(node_id_to_last_seen); }

friend bool
operator==(const node_liveness_report& a, const node_liveness_report& b);
};

/**
* Node health report is collected built based on node local state at given
* instance of time
Expand All @@ -211,12 +249,14 @@ struct node_health_report {
node::local_state local_state;
topics_t topics;
std::optional<drain_manager::drain_status> drain_status;
node_liveness_report node_liveness_report;

node_health_report(
model::node_id,
node::local_state,
chunked_vector<topic_status>,
std::optional<drain_manager::drain_status>);
std::optional<drain_manager::drain_status>,
struct node_liveness_report);

node_health_report copy() const;

Expand All @@ -234,15 +274,17 @@ using node_health_report_ptr
struct node_health_report_serde
: serde::envelope<
node_health_report_serde,
serde::version<0>,
serde::version<1>,
Comment thread
joe-redpanda marked this conversation as resolved.
serde::compat_version<0>> {
model::node_id id;
node::local_state local_state;
chunked_vector<topic_status> topics;
std::optional<drain_manager::drain_status> drain_status;
node_liveness_report node_liveness_report;

auto serde_fields() {
return std::tie(id, local_state, topics, drain_status);
return std::tie(
id, local_state, topics, drain_status, node_liveness_report);
}

node_health_report_serde() = default;
Expand All @@ -251,14 +293,17 @@ struct node_health_report_serde
model::node_id id,
node::local_state local_state,
chunked_vector<topic_status> topics,
std::optional<drain_manager::drain_status> drain_status)
std::optional<drain_manager::drain_status> drain_status,
struct node_liveness_report node_liveness_report)
: id(id)
, local_state(std::move(local_state))
, topics(std::move(topics))
, drain_status(drain_status) {}
, drain_status(drain_status)
, node_liveness_report(std::move(node_liveness_report)) {}

node_health_report_serde copy() const {
return {id, local_state, topics.copy(), drain_status};
return {
id, local_state, topics.copy(), drain_status, node_liveness_report};
}

explicit node_health_report_serde(const node_health_report& hr);
Expand All @@ -268,7 +313,8 @@ struct node_health_report_serde
id,
std::move(local_state),
std::move(topics),
std::move(drain_status)};
drain_status,
std::move(node_liveness_report)};
}

friend std::ostream&
Expand Down
Loading