@@ -999,6 +999,84 @@ mod tests {
999
999
assert_eq ! ( source_1_on_node1 + source_1_on_node2, 4 ) ;
1000
1000
}
1001
1001
1002
+ #[ test]
1003
+ fn test_build_physical_plan_second_iteration ( ) {
1004
+ let indexer1 = "indexer1" . to_string ( ) ;
1005
+ let indexer2 = "indexer2" . to_string ( ) ;
1006
+ let indexer3 = "indexer3" . to_string ( ) ;
1007
+ let mut sources = Vec :: new ( ) ;
1008
+ for _ in 0 ..10 {
1009
+ sources. push ( SourceToSchedule {
1010
+ source_uid : source_id ( ) ,
1011
+ source_type : SourceToScheduleType :: NonSharded {
1012
+ num_pipelines : 4 ,
1013
+ load_per_pipeline : NonZeroU32 :: new ( MAX_LOAD_PER_PIPELINE . cpu_millis ( ) ) . unwrap ( ) ,
1014
+ } ,
1015
+ params_fingerprint : 0 ,
1016
+ } ) ;
1017
+ }
1018
+ let mut indexer_id_to_cpu_capacities = FnvHashMap :: default ( ) ;
1019
+ indexer_id_to_cpu_capacities. insert ( indexer1. clone ( ) , mcpu ( 16_000 ) ) ;
1020
+ indexer_id_to_cpu_capacities. insert ( indexer2. clone ( ) , mcpu ( 16_000 ) ) ;
1021
+ indexer_id_to_cpu_capacities. insert ( indexer3. clone ( ) , mcpu ( 16_000 ) ) ;
1022
+ let shard_locations = ShardLocations :: default ( ) ;
1023
+ let indexing_plan = build_physical_indexing_plan (
1024
+ & sources,
1025
+ & indexer_id_to_cpu_capacities,
1026
+ None ,
1027
+ & shard_locations,
1028
+ ) ;
1029
+ assert_eq ! ( indexing_plan. indexing_tasks_per_indexer( ) . len( ) , 3 ) ;
1030
+
1031
+ for source in & sources {
1032
+ let pipelines_per_indexer_for_source = indexing_plan
1033
+ . indexing_tasks_per_indexer ( )
1034
+ . values ( )
1035
+ . map ( |tasks| {
1036
+ tasks
1037
+ . iter ( )
1038
+ . filter ( |t| t. source_id == source. source_uid . source_id )
1039
+ . count ( )
1040
+ } )
1041
+ . collect_vec ( ) ;
1042
+ assert ! ( pipelines_per_indexer_for_source. contains( & 3 ) ) ;
1043
+ assert ! ( pipelines_per_indexer_for_source. contains( & 1 ) ) ;
1044
+ assert ! ( pipelines_per_indexer_for_source. contains( & 0 ) ) ;
1045
+ assert_eq ! ( pipelines_per_indexer_for_source. iter( ) . sum:: <usize >( ) , 4 ) ;
1046
+ }
1047
+
1048
+ for source in & mut sources {
1049
+ if let SourceToScheduleType :: NonSharded { num_pipelines, .. } = & mut source. source_type
1050
+ {
1051
+ * num_pipelines = 5 ;
1052
+ }
1053
+ }
1054
+
1055
+ let new_indexing_plan = build_physical_indexing_plan (
1056
+ & sources,
1057
+ & indexer_id_to_cpu_capacities,
1058
+ Some ( & indexing_plan) ,
1059
+ & shard_locations,
1060
+ ) ;
1061
+
1062
+ for source in & sources {
1063
+ let pipelines_per_indexer_for_source = new_indexing_plan
1064
+ . indexing_tasks_per_indexer ( )
1065
+ . values ( )
1066
+ . map ( |tasks| {
1067
+ tasks
1068
+ . iter ( )
1069
+ . filter ( |t| t. source_id == source. source_uid . source_id )
1070
+ . count ( )
1071
+ } )
1072
+ . collect_vec ( ) ;
1073
+ assert ! ( pipelines_per_indexer_for_source. contains( & 3 ) ) ;
1074
+ assert ! ( pipelines_per_indexer_for_source. contains( & 2 ) ) ;
1075
+ assert ! ( pipelines_per_indexer_for_source. contains( & 0 ) ) ;
1076
+ assert_eq ! ( pipelines_per_indexer_for_source. iter( ) . sum:: <usize >( ) , 5 ) ;
1077
+ }
1078
+ }
1079
+
1002
1080
fn make_indexing_tasks (
1003
1081
source_uid : & SourceUid ,
1004
1082
shards : & [ ( PipelineUid , & [ ShardId ] ) ] ,
0 commit comments