Skip to content

Commit 38cd299

Browse files
committed
Add limit of three pipelines per node
1 parent 4f54d65 commit 38cd299

File tree

3 files changed

+203
-8
lines changed

3 files changed

+203
-8
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
4949
Duration::from_secs(30)
5050
};
5151

52+
/// That's 80% of a pipeline capacity
53+
const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200);
54+
5255
#[derive(Debug, Clone, Default, Serialize)]
5356
pub struct IndexingSchedulerState {
5457
pub num_applied_physical_indexing_plan: usize,
@@ -152,7 +155,7 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
152155
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
153156
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
154157
} else {
155-
NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap()
158+
NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / 4).unwrap()
156159
}
157160
}
158161

@@ -220,8 +223,12 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
220223
source_uid,
221224
source_type: SourceToScheduleType::NonSharded {
222225
num_pipelines: source_config.num_pipelines.get() as u32,
223-
// FIXME
224-
load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis())
226+
// FIXME:
227+
// - implementing adaptative load contains the risk of generating
228+
// rebalancing storms for sources like Kafka
229+
// - this is coupled with the scheduling logic that misses the notion of
230+
// pipeline
231+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis())
225232
.unwrap(),
226233
},
227234
params_fingerprint,

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,8 +757,8 @@ mod tests {
757757
convert_scheduling_solution_to_physical_plan_single_node_single_source,
758758
};
759759
use crate::indexing_plan::PhysicalIndexingPlan;
760-
use crate::indexing_scheduler::get_shard_locality_metrics;
761760
use crate::indexing_scheduler::scheduling::assign_shards;
761+
use crate::indexing_scheduler::{MAX_LOAD_PER_PIPELINE, get_shard_locality_metrics};
762762
use crate::model::ShardLocations;
763763

764764
fn source_id() -> SourceUid {
@@ -939,6 +939,146 @@ mod tests {
939939
}
940940
}
941941

942+
#[test]
943+
fn test_build_physical_plan_with_pipeline_limit() {
944+
let indexer1 = "indexer1".to_string();
945+
let indexer2 = "indexer2".to_string();
946+
let source_uid0 = source_id();
947+
let source_uid1 = source_id();
948+
let source_0 = SourceToSchedule {
949+
source_uid: source_uid0.clone(),
950+
source_type: SourceToScheduleType::Sharded {
951+
shard_ids: (0..16).map(ShardId::from).collect(),
952+
load_per_shard: NonZeroU32::new(800).unwrap(),
953+
},
954+
params_fingerprint: 0,
955+
};
956+
let source_1 = SourceToSchedule {
957+
source_uid: source_uid1.clone(),
958+
source_type: SourceToScheduleType::NonSharded {
959+
num_pipelines: 4,
960+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
961+
},
962+
params_fingerprint: 0,
963+
};
964+
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
965+
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
966+
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
967+
let shard_locations = ShardLocations::default();
968+
let indexing_plan = build_physical_indexing_plan(
969+
&[source_0, source_1],
970+
&indexer_id_to_cpu_capacities,
971+
None,
972+
&shard_locations,
973+
);
974+
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2);
975+
976+
let node1_plan = indexing_plan.indexer(&indexer1).unwrap();
977+
let node2_plan = indexing_plan.indexer(&indexer2).unwrap();
978+
979+
let source_0_on_node1 = node1_plan
980+
.iter()
981+
.filter(|task| task.source_id == source_uid0.source_id)
982+
.count();
983+
let source_0_on_node2 = node2_plan
984+
.iter()
985+
.filter(|task| task.source_id == source_uid0.source_id)
986+
.count();
987+
assert!(source_0_on_node1 <= 3);
988+
assert!(source_0_on_node2 <= 3);
989+
assert_eq!(source_0_on_node1 + source_0_on_node2, 4);
990+
991+
let source_1_on_node1 = node1_plan
992+
.iter()
993+
.filter(|task| task.source_id == source_uid1.source_id)
994+
.count();
995+
let source_1_on_node2 = node2_plan
996+
.iter()
997+
.filter(|task| task.source_id == source_uid1.source_id)
998+
.count();
999+
assert!(source_1_on_node1 <= 3);
1000+
assert!(source_1_on_node2 <= 3);
1001+
assert_eq!(source_1_on_node1 + source_1_on_node2, 4);
1002+
}
1003+
1004+
#[test]
1005+
fn test_build_physical_plan_second_iteration() {
1006+
let indexer1 = "indexer1".to_string();
1007+
let indexer2 = "indexer2".to_string();
1008+
let indexer3 = "indexer3".to_string();
1009+
let mut sources = Vec::new();
1010+
for _ in 0..10 {
1011+
sources.push(SourceToSchedule {
1012+
source_uid: source_id(),
1013+
source_type: SourceToScheduleType::NonSharded {
1014+
num_pipelines: 4,
1015+
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
1016+
},
1017+
params_fingerprint: 0,
1018+
});
1019+
}
1020+
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
1021+
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
1022+
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
1023+
indexer_id_to_cpu_capacities.insert(indexer3.clone(), mcpu(16_000));
1024+
let shard_locations = ShardLocations::default();
1025+
let indexing_plan = build_physical_indexing_plan(
1026+
&sources,
1027+
&indexer_id_to_cpu_capacities,
1028+
None,
1029+
&shard_locations,
1030+
);
1031+
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 3);
1032+
1033+
for source in &sources {
1034+
let pipelines_per_indexer_for_source = indexing_plan
1035+
.indexing_tasks_per_indexer()
1036+
.values()
1037+
.map(|tasks| {
1038+
tasks
1039+
.iter()
1040+
.filter(|t| t.source_id == source.source_uid.source_id)
1041+
.count()
1042+
})
1043+
.collect_vec();
1044+
assert!(pipelines_per_indexer_for_source.contains(&3));
1045+
assert!(pipelines_per_indexer_for_source.contains(&1));
1046+
assert!(pipelines_per_indexer_for_source.contains(&0));
1047+
assert_eq!(pipelines_per_indexer_for_source.iter().sum::<usize>(), 4);
1048+
}
1049+
1050+
for source in &mut sources {
1051+
if let SourceToScheduleType::NonSharded { num_pipelines, .. } = &mut source.source_type
1052+
{
1053+
*num_pipelines = 5;
1054+
}
1055+
}
1056+
1057+
let new_indexing_plan = build_physical_indexing_plan(
1058+
&sources,
1059+
&indexer_id_to_cpu_capacities,
1060+
Some(&indexing_plan),
1061+
&shard_locations,
1062+
);
1063+
1064+
for source in &sources {
1065+
let pipelines_per_indexer_for_source = new_indexing_plan
1066+
.indexing_tasks_per_indexer()
1067+
.values()
1068+
.map(|tasks| {
1069+
tasks
1070+
.iter()
1071+
.filter(|t| t.source_id == source.source_uid.source_id)
1072+
.count()
1073+
})
1074+
.collect_vec();
1075+
assert!(pipelines_per_indexer_for_source.contains(&3));
1076+
assert!(pipelines_per_indexer_for_source.contains(&2));
1077+
assert!(pipelines_per_indexer_for_source.contains(&0));
1078+
assert_eq!(pipelines_per_indexer_for_source.iter().sum::<usize>(), 5);
1079+
}
1080+
}
1081+
9421082
fn make_indexing_tasks(
9431083
source_uid: &SourceUid,
9441084
shards: &[(PipelineUid, &[ShardId])],

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::collections::btree_map::Entry;
1919
use quickwit_proto::indexing::CpuCapacity;
2020

2121
use super::scheduling_logic_model::*;
22+
use crate::indexing_scheduler::MAX_LOAD_PER_PIPELINE;
2223
use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary;
2324

2425
// ------------------------------------------------------------------------------------
@@ -282,7 +283,12 @@ fn attempt_place_unassigned_shards(
282283
})
283284
.collect();
284285
placements.sort();
285-
place_unassigned_shards_single_source(source, &placements, &mut solution)?;
286+
place_unassigned_shards_single_source(
287+
source,
288+
&placements,
289+
problem.num_indexers(),
290+
&mut solution,
291+
)?;
286292
}
287293
assert_place_unassigned_shards_post_condition(problem, &solution);
288294
Ok(solution)
@@ -317,7 +323,12 @@ fn place_unassigned_shards_with_affinity(
317323
})
318324
.collect();
319325
placements.sort();
320-
let _ = place_unassigned_shards_single_source(source, &placements, solution);
326+
let _ = place_unassigned_shards_single_source(
327+
source,
328+
&placements,
329+
problem.num_indexers(),
330+
solution,
331+
);
321332
}
322333
}
323334

@@ -387,17 +398,33 @@ struct NotEnoughCapacity;
387398
fn place_unassigned_shards_single_source(
388399
source: &Source,
389400
sorted_candidates: &[PlacementCandidate],
401+
num_indexers: usize,
390402
solution: &mut SchedulingSolution,
391403
) -> Result<(), NotEnoughCapacity> {
392404
let mut num_shards = source.num_shards;
405+
// To ensure that merges can keep up, we try not to assign more than 3
406+
// pipelines per indexer for a source (except if there aren't enough nodes).
407+
let target_limit_num_shards_per_indexer_per_source =
408+
3 * MAX_LOAD_PER_PIPELINE.cpu_millis() / source.load_per_shard.get();
409+
let limit_num_shards_per_indexer_per_source = target_limit_num_shards_per_indexer_per_source
410+
.max(num_shards.div_ceil(num_indexers as u32));
393411
for PlacementCandidate {
394412
indexer_ord,
395413
available_capacity,
396414
..
397415
} in sorted_candidates
398416
{
399-
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
400-
let num_shards_to_place = num_placable_shards.min(num_shards);
417+
let current_num_shards_for_indexer_and_source = *solution.indexer_assignments[*indexer_ord]
418+
.num_shards_per_source
419+
.get(&source.source_ord)
420+
.unwrap_or(&0);
421+
let num_placable_shards_for_available_capacity =
422+
available_capacity.cpu_millis() / source.load_per_shard;
423+
let num_placable_shards_for_limit = limit_num_shards_per_indexer_per_source
424+
.saturating_sub(current_num_shards_for_indexer_and_source);
425+
let num_shards_to_place = num_shards
426+
.min(num_placable_shards_for_available_capacity)
427+
.min(num_placable_shards_for_limit);
401428
// Update the solution, the shard load, and the number of shards to place.
402429
solution.indexer_assignments[*indexer_ord]
403430
.add_shards(source.source_ord, num_shards_to_place);
@@ -624,6 +651,27 @@ mod tests {
624651
assert_eq!(solution.indexer_assignments[1].num_shards(0), 4);
625652
}
626653

654+
#[test]
655+
fn test_placement_limit_with_affinity() {
656+
let mut problem =
657+
SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(16_000), mcpu(16_000)]);
658+
let max_load_per_pipeline = NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap();
659+
problem.add_source(4, max_load_per_pipeline);
660+
problem.add_source(4, max_load_per_pipeline);
661+
problem.inc_affinity(0, 1);
662+
problem.inc_affinity(0, 1);
663+
problem.inc_affinity(0, 0);
664+
problem.inc_affinity(1, 0);
665+
let mut solution = problem.new_solution();
666+
place_unassigned_shards_with_affinity(&problem, &mut solution);
667+
assert_eq!(solution.indexer_assignments[0].num_shards(1), 3);
668+
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
669+
assert_eq!(solution.indexer_assignments[1].num_shards(0), 3);
670+
// one shard was not placed because indexer 0 was full and it had no
671+
// affinity with indexer 1
672+
assert_eq!(solution.indexer_assignments[1].num_shards(1), 0);
673+
}
674+
627675
#[test]
628676
fn test_place_unassigned_shards_reach_capacity() {
629677
let mut problem =

0 commit comments

Comments
 (0)