@@ -70,6 +70,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
70
70
private final float avgPrimaryShardsPerNode ;
71
71
private final BalancedShardsAllocator .NodeSorter sorter ;
72
72
private final Set <RoutingNode > inEligibleTargetNode ;
73
+ private int totalShardCount = 0 ;
73
74
74
75
public LocalShardsBalancer (
75
76
Logger logger ,
@@ -127,8 +128,7 @@ public float avgPrimaryShardsPerNode() {
127
128
*/
128
129
@ Override
129
130
public float avgShardsPerNode () {
130
- float totalShards = nodes .values ().stream ().map (BalancedShardsAllocator .ModelNode ::numShards ).reduce (0 , Integer ::sum );
131
- return totalShards / nodes .size ();
131
+ return totalShardCount / nodes .size ();
132
132
}
133
133
134
134
/**
@@ -600,13 +600,15 @@ void moveShards() {
600
600
final BalancedShardsAllocator .ModelNode sourceNode = nodes .get (shardRouting .currentNodeId ());
601
601
final BalancedShardsAllocator .ModelNode targetNode = nodes .get (moveDecision .getTargetNode ().getId ());
602
602
sourceNode .removeShard (shardRouting );
603
+ --totalShardCount ;
603
604
Tuple <ShardRouting , ShardRouting > relocatingShards = routingNodes .relocateShard (
604
605
shardRouting ,
605
606
targetNode .getNodeId (),
606
607
allocation .clusterInfo ().getShardSize (shardRouting , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE ),
607
608
allocation .changes ()
608
609
);
609
610
targetNode .addShard (relocatingShards .v2 ());
611
+ ++totalShardCount ;
610
612
if (logger .isTraceEnabled ()) {
611
613
logger .trace ("Moved shard [{}] to node [{}]" , shardRouting , targetNode .getRoutingNode ());
612
614
}
@@ -726,6 +728,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
726
728
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
727
729
if (RoutingPool .LOCAL_ONLY .equals (RoutingPool .getShardPool (shard , allocation )) && shard .state () != RELOCATING ) {
728
730
node .addShard (shard );
731
+ ++totalShardCount ;
729
732
if (logger .isTraceEnabled ()) {
730
733
logger .trace ("Assigned shard [{}] to node [{}]" , shard , node .getNodeId ());
731
734
}
@@ -816,6 +819,7 @@ void allocateUnassigned() {
816
819
);
817
820
shard = routingNodes .initializeShard (shard , minNode .getNodeId (), null , shardSize , allocation .changes ());
818
821
minNode .addShard (shard );
822
+ ++totalShardCount ;
819
823
if (!shard .primary ()) {
820
824
// copy over the same replica shards to the secondary array so they will get allocated
821
825
// in a subsequent iteration, allowing replicas of other shards to be allocated first
@@ -845,6 +849,7 @@ void allocateUnassigned() {
845
849
allocation .routingTable ()
846
850
);
847
851
minNode .addShard (shard .initialize (minNode .getNodeId (), null , shardSize ));
852
+ ++totalShardCount ;
848
853
} else {
849
854
if (logger .isTraceEnabled ()) {
850
855
logger .trace ("No Node found to assign shard [{}]" , shard );
@@ -1012,18 +1017,21 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala
1012
1017
}
1013
1018
final Decision decision = new Decision .Multi ().add (allocationDecision ).add (rebalanceDecision );
1014
1019
maxNode .removeShard (shard );
1020
+ --totalShardCount ;
1015
1021
long shardSize = allocation .clusterInfo ().getShardSize (shard , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE );
1016
1022
1017
1023
if (decision .type () == Decision .Type .YES ) {
1018
1024
/* only allocate on the cluster if we are not throttled */
1019
1025
logger .debug ("Relocate [{}] from [{}] to [{}]" , shard , maxNode .getNodeId (), minNode .getNodeId ());
1020
1026
minNode .addShard (routingNodes .relocateShard (shard , minNode .getNodeId (), shardSize , allocation .changes ()).v1 ());
1027
+ ++totalShardCount ;
1021
1028
return true ;
1022
1029
} else {
1023
1030
/* allocate on the model even if throttled */
1024
1031
logger .debug ("Simulate relocation of [{}] from [{}] to [{}]" , shard , maxNode .getNodeId (), minNode .getNodeId ());
1025
1032
assert decision .type () == Decision .Type .THROTTLE ;
1026
1033
minNode .addShard (shard .relocate (minNode .getNodeId (), shardSize ));
1034
+ ++totalShardCount ;
1027
1035
return false ;
1028
1036
}
1029
1037
}
0 commit comments