Skip to content

Commit 761073e

Browse files
committed
Limit using load instead of shard number
1 parent 8baa632 commit 761073e

File tree

3 files changed

+91
-9
lines changed

3 files changed

+91
-9
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
4949
Duration::from_secs(30)
5050
};
5151

52+
/// That's 80% of a pipeline capacity
53+
const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200);
54+
5255
#[derive(Debug, Clone, Default, Serialize)]
5356
pub struct IndexingSchedulerState {
5457
pub num_applied_physical_indexing_plan: usize,
@@ -152,7 +155,7 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
152155
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
153156
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
154157
} else {
155-
NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap()
158+
NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / 4).unwrap()
156159
}
157160
}
158161

@@ -221,7 +224,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
221224
source_type: SourceToScheduleType::NonSharded {
222225
num_pipelines: source_config.num_pipelines.get() as u32,
223226
// FIXME
224-
load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis())
227+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis())
225228
.unwrap(),
226229
},
227230
params_fingerprint,

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use scheduling_logic_model::{IndexerOrd, SourceOrd};
2626
use tracing::{error, warn};
2727

2828
use crate::indexing_plan::PhysicalIndexingPlan;
29+
use crate::indexing_scheduler::MAX_LOAD_PER_PIPELINE;
2930
use crate::indexing_scheduler::scheduling::scheduling_logic_model::{
3031
IndexerAssignment, SchedulingProblem, SchedulingSolution,
3132
};
@@ -44,9 +45,6 @@ use crate::model::ShardLocations;
4445
/// of 30%. Which translates into an overall load of 60%.
4546
const CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD: CpuCapacity = CpuCapacity::from_cpu_millis(1_200);
4647

47-
/// That's 80% of a period
48-
const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200);
49-
5048
fn populate_problem(
5149
source: &SourceToSchedule,
5250
problem: &mut SchedulingProblem,
@@ -757,8 +755,8 @@ mod tests {
757755
convert_scheduling_solution_to_physical_plan_single_node_single_source,
758756
};
759757
use crate::indexing_plan::PhysicalIndexingPlan;
760-
use crate::indexing_scheduler::get_shard_locality_metrics;
761758
use crate::indexing_scheduler::scheduling::assign_shards;
759+
use crate::indexing_scheduler::{MAX_LOAD_PER_PIPELINE, get_shard_locality_metrics};
762760
use crate::model::ShardLocations;
763761

764762
fn source_id() -> SourceUid {
@@ -939,6 +937,68 @@ mod tests {
939937
}
940938
}
941939

940+
#[test]
941+
fn test_build_physical_plan_with_pipeline_limit() {
942+
let indexer1 = "indexer1".to_string();
943+
let indexer2 = "indexer2".to_string();
944+
let source_uid0 = source_id();
945+
let source_uid1 = source_id();
946+
let source_0 = SourceToSchedule {
947+
source_uid: source_uid0.clone(),
948+
source_type: SourceToScheduleType::Sharded {
949+
shard_ids: (0..16).map(ShardId::from).collect(),
950+
load_per_shard: NonZeroU32::new(800).unwrap(),
951+
},
952+
params_fingerprint: 0,
953+
};
954+
let source_1 = SourceToSchedule {
955+
source_uid: source_uid1.clone(),
956+
source_type: SourceToScheduleType::NonSharded {
957+
num_pipelines: 4,
958+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
959+
},
960+
params_fingerprint: 0,
961+
};
962+
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
963+
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
964+
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
965+
let shard_locations = ShardLocations::default();
966+
let indexing_plan = build_physical_indexing_plan(
967+
&[source_0, source_1],
968+
&indexer_id_to_cpu_capacities,
969+
None,
970+
&shard_locations,
971+
);
972+
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2);
973+
974+
let node1_plan = indexing_plan.indexer(&indexer1).unwrap();
975+
let node2_plan = indexing_plan.indexer(&indexer2).unwrap();
976+
977+
let source_0_on_node1 = node1_plan
978+
.iter()
979+
.filter(|task| task.source_id == source_uid0.source_id)
980+
.count();
981+
let source_0_on_node2 = node2_plan
982+
.iter()
983+
.filter(|task| task.source_id == source_uid0.source_id)
984+
.count();
985+
assert!(source_0_on_node1 <= 3);
986+
assert!(source_0_on_node2 <= 3);
987+
assert_eq!(source_0_on_node1 + source_0_on_node2, 4);
988+
989+
let source_1_on_node1 = node1_plan
990+
.iter()
991+
.filter(|task| task.source_id == source_uid1.source_id)
992+
.count();
993+
let source_1_on_node2 = node2_plan
994+
.iter()
995+
.filter(|task| task.source_id == source_uid1.source_id)
996+
.count();
997+
assert!(source_1_on_node1 <= 3);
998+
assert!(source_1_on_node2 <= 3);
999+
assert_eq!(source_1_on_node1 + source_1_on_node2, 4);
1000+
}
1001+
9421002
fn make_indexing_tasks(
9431003
source_uid: &SourceUid,
9441004
shards: &[(PipelineUid, &[ShardId])],

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use itertools::Itertools;
2020
use quickwit_proto::indexing::CpuCapacity;
2121

2222
use super::scheduling_logic_model::*;
23+
use crate::indexing_scheduler::MAX_LOAD_PER_PIPELINE;
2324
use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary;
2425

2526
// ------------------------------------------------------------------------------------
@@ -355,9 +356,12 @@ fn place_unassigned_shards_single_source(
355356
solution: &mut SchedulingSolution,
356357
) -> Result<(), NotEnoughCapacity> {
357358
let mut num_shards = source.num_shards;
358-
// To ensure that merges can keep up, try not to assign more than 3
359-
// shards per indexer for a source (except if there aren't enough nodes)
360-
let limit_num_shards_per_indexer_per_source = 3.max(num_shards.div_ceil(num_indexers as u32));
359+
// To ensure that merges can keep up, we try not to assign more than 3
360+
// pipelines per indexer for a source (except if there aren't enough nodes).
361+
let target_limit_num_shards_per_indexer_per_source =
362+
3 * MAX_LOAD_PER_PIPELINE.cpu_millis() / source.load_per_shard.get();
363+
let limit_num_shards_per_indexer_per_source = target_limit_num_shards_per_indexer_per_source
364+
.max(num_shards.div_ceil(num_indexers as u32));
361365
while num_shards > 0 {
362366
let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else {
363367
return Err(NotEnoughCapacity);
@@ -608,6 +612,21 @@ mod tests {
608612
problem.add_source(4, NonZeroU32::new(1_000).unwrap());
609613
problem.add_source(4, NonZeroU32::new(1_000).unwrap());
610614
problem.inc_affinity(0, 1);
615+
problem.inc_affinity(1, 0);
616+
let mut solution = problem.new_solution();
617+
place_unassigned_shards_with_affinity(&problem, &mut solution);
618+
assert_eq!(solution.indexer_assignments[0].num_shards(1), 4);
619+
assert_eq!(solution.indexer_assignments[1].num_shards(0), 4);
620+
}
621+
622+
#[test]
623+
fn test_limit_placement_to_three_pipelines() {
624+
let mut problem =
625+
SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(16_000), mcpu(16_000)]);
626+
let max_load_per_pipeline = NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap();
627+
problem.add_source(4, max_load_per_pipeline);
628+
problem.add_source(4, max_load_per_pipeline);
629+
problem.inc_affinity(0, 1);
611630
problem.inc_affinity(0, 1);
612631
problem.inc_affinity(0, 0);
613632
problem.inc_affinity(1, 0);

0 commit comments

Comments
 (0)