Skip to content

Limit to 3 pipelines per node per source #5792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: impr-shard-collocation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
Duration::from_secs(30)
};

/// That's 80% of a pipeline capacity
const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200);

#[derive(Debug, Clone, Default, Serialize)]
pub struct IndexingSchedulerState {
pub num_applied_physical_indexing_plan: usize,
Expand Down Expand Up @@ -152,7 +155,7 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
} else {
NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap()
NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / 4).unwrap()
}
}

Expand Down Expand Up @@ -220,8 +223,12 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
source_uid,
source_type: SourceToScheduleType::NonSharded {
num_pipelines: source_config.num_pipelines.get() as u32,
// FIXME
load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis())
// FIXME:
// - implementing adaptative load contains the risk of generating
// rebalancing storms for sources like Kafka
// - this is coupled with the scheduling logic that misses the notion of
// pipeline
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis())
.unwrap(),
},
params_fingerprint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,8 @@ mod tests {
convert_scheduling_solution_to_physical_plan_single_node_single_source,
};
use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::get_shard_locality_metrics;
use crate::indexing_scheduler::scheduling::assign_shards;
use crate::indexing_scheduler::{MAX_LOAD_PER_PIPELINE, get_shard_locality_metrics};
use crate::model::ShardLocations;

fn source_id() -> SourceUid {
Expand Down Expand Up @@ -939,6 +939,146 @@ mod tests {
}
}

#[test]
fn test_build_physical_plan_with_pipeline_limit() {
let indexer1 = "indexer1".to_string();
let indexer2 = "indexer2".to_string();
let source_uid0 = source_id();
let source_uid1 = source_id();
let source_0 = SourceToSchedule {
source_uid: source_uid0.clone(),
source_type: SourceToScheduleType::Sharded {
shard_ids: (0..16).map(ShardId::from).collect(),
load_per_shard: NonZeroU32::new(800).unwrap(),
},
params_fingerprint: 0,
};
let source_1 = SourceToSchedule {
source_uid: source_uid1.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 4,
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
},
params_fingerprint: 0,
};
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
let shard_locations = ShardLocations::default();
let indexing_plan = build_physical_indexing_plan(
&[source_0, source_1],
&indexer_id_to_cpu_capacities,
None,
&shard_locations,
);
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2);

let node1_plan = indexing_plan.indexer(&indexer1).unwrap();
let node2_plan = indexing_plan.indexer(&indexer2).unwrap();

let source_0_on_node1 = node1_plan
.iter()
.filter(|task| task.source_id == source_uid0.source_id)
.count();
let source_0_on_node2 = node2_plan
.iter()
.filter(|task| task.source_id == source_uid0.source_id)
.count();
assert!(source_0_on_node1 <= 3);
assert!(source_0_on_node2 <= 3);
assert_eq!(source_0_on_node1 + source_0_on_node2, 4);

let source_1_on_node1 = node1_plan
.iter()
.filter(|task| task.source_id == source_uid1.source_id)
.count();
let source_1_on_node2 = node2_plan
.iter()
.filter(|task| task.source_id == source_uid1.source_id)
.count();
assert!(source_1_on_node1 <= 3);
assert!(source_1_on_node2 <= 3);
assert_eq!(source_1_on_node1 + source_1_on_node2, 4);
}

#[test]
fn test_build_physical_plan_second_iteration() {
let indexer1 = "indexer1".to_string();
let indexer2 = "indexer2".to_string();
let indexer3 = "indexer3".to_string();
let mut sources = Vec::new();
for _ in 0..10 {
sources.push(SourceToSchedule {
source_uid: source_id(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 4,
load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(),
},
params_fingerprint: 0,
});
}
let mut indexer_id_to_cpu_capacities = FnvHashMap::default();
indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000));
indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000));
indexer_id_to_cpu_capacities.insert(indexer3.clone(), mcpu(16_000));
let shard_locations = ShardLocations::default();
let indexing_plan = build_physical_indexing_plan(
&sources,
&indexer_id_to_cpu_capacities,
None,
&shard_locations,
);
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 3);

for source in &sources {
let pipelines_per_indexer_for_source = indexing_plan
.indexing_tasks_per_indexer()
.values()
.map(|tasks| {
tasks
.iter()
.filter(|t| t.source_id == source.source_uid.source_id)
.count()
})
.collect_vec();
assert!(pipelines_per_indexer_for_source.contains(&3));
assert!(pipelines_per_indexer_for_source.contains(&1));
assert!(pipelines_per_indexer_for_source.contains(&0));
assert_eq!(pipelines_per_indexer_for_source.iter().sum::<usize>(), 4);
}

for source in &mut sources {
if let SourceToScheduleType::NonSharded { num_pipelines, .. } = &mut source.source_type
{
*num_pipelines = 5;
}
}

let new_indexing_plan = build_physical_indexing_plan(
&sources,
&indexer_id_to_cpu_capacities,
Some(&indexing_plan),
&shard_locations,
);

for source in &sources {
let pipelines_per_indexer_for_source = new_indexing_plan
.indexing_tasks_per_indexer()
.values()
.map(|tasks| {
tasks
.iter()
.filter(|t| t.source_id == source.source_uid.source_id)
.count()
})
.collect_vec();
assert!(pipelines_per_indexer_for_source.contains(&3));
assert!(pipelines_per_indexer_for_source.contains(&2));
assert!(pipelines_per_indexer_for_source.contains(&0));
assert_eq!(pipelines_per_indexer_for_source.iter().sum::<usize>(), 5);
}
}

fn make_indexing_tasks(
source_uid: &SourceUid,
shards: &[(PipelineUid, &[ShardId])],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::btree_map::Entry;
use quickwit_proto::indexing::CpuCapacity;

use super::scheduling_logic_model::*;
use crate::indexing_scheduler::MAX_LOAD_PER_PIPELINE;
use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary;

// ------------------------------------------------------------------------------------
Expand Down Expand Up @@ -288,7 +289,12 @@ fn attempt_place_unassigned_shards(
})
.collect();
placements.sort();
place_unassigned_shards_single_source(source, &placements, &mut solution)?;
place_unassigned_shards_single_source(
source,
&placements,
problem.num_indexers(),
&mut solution,
)?;
}
assert_place_unassigned_shards_post_condition(problem, &solution);
Ok(solution)
Expand Down Expand Up @@ -323,7 +329,12 @@ fn place_unassigned_shards_with_affinity(
})
.collect();
placements.sort();
let _ = place_unassigned_shards_single_source(source, &placements, solution);
let _ = place_unassigned_shards_single_source(
source,
&placements,
problem.num_indexers(),
solution,
);
}
}

Expand Down Expand Up @@ -393,17 +404,30 @@ struct NotEnoughCapacity;
fn place_unassigned_shards_single_source(
source: &Source,
sorted_candidates: &[PlacementCandidate],
num_indexers: usize,
solution: &mut SchedulingSolution,
) -> Result<(), NotEnoughCapacity> {
let mut num_shards = source.num_shards;
// To ensure that merges can keep up, we try not to assign more than 3
// pipelines per indexer for a source (except if there aren't enough nodes).
let target_limit_num_shards_per_indexer_per_source =
3 * MAX_LOAD_PER_PIPELINE.cpu_millis() / source.load_per_shard.get();
Comment on lines +411 to +414
Copy link
Collaborator Author

@rdettai rdettai Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is creating some undesired coupling with the rest of the code:

  • we rely on convert_scheduling_solution_to_physical_plan to use the same MAX_LOAD_PER_PIPELINE to create the right amount of pipelines
  • we rely on the default load_per_pipeline for non-ingest sources (e.g Kafka) to also use MAX_LOAD_PER_PIPELINE.

let limit_num_shards_per_indexer_per_source = target_limit_num_shards_per_indexer_per_source
.max(num_shards.div_ceil(num_indexers as u32));
for PlacementCandidate {
indexer_ord,
available_capacity,
current_num_shards,
..
} in sorted_candidates
{
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
let num_shards_to_place = num_placable_shards.min(num_shards);
let num_placable_shards_for_available_capacity =
available_capacity.cpu_millis() / source.load_per_shard;
let num_placable_shards_for_limit =
limit_num_shards_per_indexer_per_source.saturating_sub(*current_num_shards);
let num_shards_to_place = num_shards
.min(num_placable_shards_for_available_capacity)
.min(num_placable_shards_for_limit);
// Update the solution, the shard load, and the number of shards to place.
solution.indexer_assignments[*indexer_ord]
.add_shards(source.source_ord, num_shards_to_place);
Expand Down Expand Up @@ -630,6 +654,27 @@ mod tests {
assert_eq!(solution.indexer_assignments[1].num_shards(0), 4);
}

#[test]
fn test_placement_limit_with_affinity() {
let mut problem =
SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(16_000), mcpu(16_000)]);
let max_load_per_pipeline = NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap();
problem.add_source(4, max_load_per_pipeline);
problem.add_source(4, max_load_per_pipeline);
problem.inc_affinity(0, 1);
problem.inc_affinity(0, 1);
problem.inc_affinity(0, 0);
problem.inc_affinity(1, 0);
let mut solution = problem.new_solution();
place_unassigned_shards_with_affinity(&problem, &mut solution);
assert_eq!(solution.indexer_assignments[0].num_shards(1), 3);
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
assert_eq!(solution.indexer_assignments[1].num_shards(0), 3);
// one shard was not placed because indexer 0 was full and it had no
// affinity with indexer 1
assert_eq!(solution.indexer_assignments[1].num_shards(1), 0);
}

#[test]
fn test_place_unassigned_shards_reach_capacity() {
let mut problem =
Expand Down