Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 48 additions & 14 deletions src/v/cluster/scheduling/leader_balancer_constraints.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "cluster/scheduling/leader_balancer_constraints.h"

#include "base/vassert.h"
#include "cluster/scheduling/leader_balancer_types.h"
#include "config/leaders_preference.h"
#include "model/metadata.h"

namespace cluster::leader_balancer_types {
Expand Down Expand Up @@ -228,35 +230,67 @@ std::vector<shard_load> even_shard_load_constraint::stats() const {
double pinning_constraint::evaluate_internal(const reassignment& r) {
int diff = 0;

const leaders_preference* preference = &_preference_idx.default_preference;

topic_id_t topic_id = _group2topic.get().at(r.group);
auto pref_it = _preference_idx.topic2preference.find(topic_id);
if (pref_it != _preference_idx.topic2preference.end()) {
preference = &pref_it->second;
}

if (preference->racks.empty()) {
const auto& preference = pref_it == _preference_idx.topic2preference.end()
? _preference_idx.default_preference
: pref_it->second;

switch (preference.type) {
case config::leaders_preference::type_t::none:
vassert(
preference.racks.empty(),
"no racks should be present if the preference type is none");
return diff;
case config::leaders_preference::type_t::racks:
return do_evaluate_unordered(r, _preference_idx.node2rack, preference);
case config::leaders_preference::type_t::ordered_racks:
return do_evaluate_ordered(r, _preference_idx.node2rack, preference);
}
}

auto from_it = _preference_idx.node2rack.find(r.from.node_id);
double pinning_constraint::do_evaluate_unordered(
const reassignment& r,
const absl::flat_hash_map<model::node_id, model::rack_id>& node_to_rack,
const leaders_preference& preference) {
double diff{0};
auto from_it = node_to_rack.find(r.from.node_id);
if (
from_it != _preference_idx.node2rack.end()
&& preference->racks.contains(from_it->second)) {
from_it != node_to_rack.end()
&& std::ranges::contains(preference.racks, from_it->second)) {
diff -= 1;
}

auto to_it = _preference_idx.node2rack.find(r.to.node_id);
auto to_it = node_to_rack.find(r.to.node_id);
if (
to_it != _preference_idx.node2rack.end()
&& preference->racks.contains(to_it->second)) {
to_it != node_to_rack.end()
&& std::ranges::contains(preference.racks, to_it->second)) {
diff += 1;
}

return diff;
}

double pinning_constraint::do_evaluate_ordered(
const reassignment& reassignment,
const absl::flat_hash_map<model::node_id, model::rack_id>& node_to_rack,
const leaders_preference& preference) {
const auto do_find_priority = [&preference,
&node_to_rack](model::node_id to_evaluate) {
auto it = node_to_rack.find(to_evaluate);
if (it == node_to_rack.end()) {
return preference.racks.end();
}
const auto& rack = it->second;
return std::ranges::find(preference.racks, rack);
};

auto from_priority = do_find_priority(reassignment.from.node_id);
auto to_priority = do_find_priority(reassignment.to.node_id);

// snap to -1, 0, 1, were 'higher is preferable'
return (to_priority < from_priority) - (from_priority < to_priority);
Comment thread
joe-redpanda marked this conversation as resolved.
}

even_node_load_constraint::even_node_load_constraint(const shard_index& si) {
for (const auto& [bs, leaders] : si.shards()) {
auto& info = _node2info[bs.node_id];
Expand Down
17 changes: 10 additions & 7 deletions src/v/cluster/scheduling/leader_balancer_constraints.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,9 @@
#include "cluster/scheduling/leader_balancer_types.h"
#include "container/chunked_hash_map.h"
#include "model/metadata.h"
#include "raft/fundamental.h"

#include <seastar/core/metrics.hh>
#include <seastar/core/sstring.hh>

#include <boost/range/adaptor/reversed.hpp>

#include <functional>
#include <numeric>
#include <optional>

namespace cluster::leader_balancer_types {

Expand Down Expand Up @@ -284,6 +277,16 @@ class pinning_constraint final : public soft_constraint {
double evaluate_internal(const reassignment& r) override;

private:
static double do_evaluate_unordered(
const reassignment& r,
const absl::flat_hash_map<model::node_id, model::rack_id>& node_to_rack,
const leaders_preference& preference);

static double do_evaluate_ordered(
const reassignment& r,
const absl::flat_hash_map<model::node_id, model::rack_id>& node_to_rack,
const leaders_preference& preference);

std::reference_wrapper<const group_id_to_topic_id> _group2topic;
preference_index _preference_idx;
};
Expand Down
16 changes: 11 additions & 5 deletions src/v/cluster/scheduling/leader_balancer_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
#pragma once

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "config/leaders_preference.h"
#include "container/chunked_hash_map.h"
#include "model/metadata.h"
#include "raft/fundamental.h"

#include <roaring/roaring64map.hh>

#include <ranges>

namespace cluster::leader_balancer_types {

struct reassignment {
Expand Down Expand Up @@ -51,16 +52,21 @@ template<typename ValueType>
using topic_map = chunked_hash_map<topic_id_t, ValueType>;

struct leaders_preference {
absl::flat_hash_set<model::rack_id> racks;
config::leaders_preference::type_t type{
config::leaders_preference::type_t::none};
Comment thread
joe-redpanda marked this conversation as resolved.
// O(N) find operations, fine so long as the rack preference number is small
std::vector<model::rack_id> racks;

leaders_preference() = default;
explicit leaders_preference(const config::leaders_preference& cfg) {
explicit leaders_preference(const config::leaders_preference& cfg)
: type{cfg.type} {
switch (cfg.type) {
case config::leaders_preference::type_t::none:
break;
case config::leaders_preference::type_t::racks:
racks.reserve(cfg.racks.size());
racks.insert(cfg.racks.begin(), cfg.racks.end());
[[fallthrough]];
case config::leaders_preference::type_t::ordered_racks:
racks = std::vector<model::rack_id>(std::from_range, cfg.racks);
break;
}
}
Expand Down
20 changes: 20 additions & 0 deletions src/v/cluster/scheduling/tests/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
load("//bazel:test.bzl", "redpanda_cc_gtest")

redpanda_cc_gtest(
name = "leader_pinning_test",
timeout = "short",
srcs = [
"leader_pinning_test.cc",
],
deps = [
"//src/v/cluster",
"//src/v/config",
"//src/v/model",
"//src/v/raft",
"//src/v/test_utils:gtest",
"@abseil-cpp//absl/container:flat_hash_map",
"@abseil-cpp//absl/container:flat_hash_set",
"@googletest//:gtest",
"@seastar",
],
)
Loading