diff --git a/CHANGELOG.md b/CHANGELOG.md index 5383965916b68..429a6af7beb79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix the visit of sub queries for HasParentQuery and HasChildQuery ([#18621](https://github.com/opensearch-project/OpenSearch/pull/18621)) - Fix the backward compatibility regression with COMPLEMENT for Regexp queries introduced in OpenSearch 3.0 ([#18640](https://github.com/opensearch-project/OpenSearch/pull/18640)) - Fix Replication lag computation ([#18602](https://github.com/opensearch-project/OpenSearch/pull/18602)) +- Fixed Staggered merge - load average replace with AverageTrackers, some Default thresholds modified ([#18666](https://github.com/opensearch-project/OpenSearch/pull/18666)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java index bd17d82c4d46e..142e2da95653e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java @@ -45,11 +45,6 @@ public class AutoForceMergeManagerIT extends RemoteStoreBaseIntegTestCase { private static final String MERGE_DELAY = "1s"; private static final Integer SEGMENT_COUNT = 1; - @Override - protected boolean addMockIndexStorePlugin() { - return false; - } - @Override protected Settings nodeSettings(int nodeOrdinal) { ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); @@ -158,8 +153,8 @@ public void testAutoForceMergeTriggeringBasicWithOneShard() throws Exception { SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false); waitUntil(() -> shard.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false); - // assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount()); - // assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount()); + assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount()); + assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount()); assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get()); } @@ -221,11 +216,11 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false); SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false); SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false); - AtomicLong totalSegments = new AtomicLong( + AtomicLong totalSegmentsBefore = new AtomicLong( segmentsStatsForShard1Before.getCount() + segmentsStatsForShard2Before.getCount() + segmentsStatsForShard3Before.getCount() + segmentsStatsForShard4Before.getCount() + segmentsStatsForShard5Before.getCount() ); - assertTrue(totalSegments.get() > 5); + assertTrue(totalSegmentsBefore.get() > 5); waitUntil(() -> shard1.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); waitUntil(() -> shard2.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); waitUntil(() -> shard3.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); @@ -236,11 +231,11 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false); SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false); SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false); - totalSegments.set( + AtomicLong totalSegmentsAfter = new AtomicLong( segmentsStatsForShard1After.getCount() + segmentsStatsForShard2After.getCount() + segmentsStatsForShard3After.getCount() + segmentsStatsForShard4After.getCount() + segmentsStatsForShard5After.getCount() ); - // assertEquals(5, totalSegments.get()); + assertTrue(totalSegmentsBefore.get() > totalSegmentsAfter.get()); assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get()); assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_2).get()); } diff --git a/server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java b/server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java index f79b0e72c683a..ae7f98a138407 100644 --- a/server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java +++ b/server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java @@ -66,10 +66,11 @@ public class AutoForceMergeManager extends AbstractLifecycleComponent { private ConfigurationValidator configurationValidator; private NodeValidator nodeValidator; private ShardValidator shardValidator; + private Integer allocatedProcessors; + private ResourceTrackerProvider.ResourceTrackers resourceTrackers; private final ForceMergeManagerSettings forceMergeManagerSettings; private final CommonStatsFlags flags = new CommonStatsFlags(CommonStatsFlags.Flag.Segments, CommonStatsFlags.Flag.Translog); private final Set mergingShards; - private Integer allocatedProcessors; private static final Logger logger = LogManager.getLogger(AutoForceMergeManager.class); @@ -96,6 +97,7 @@ protected void doStart() { this.nodeValidator = new NodeValidator(); this.shardValidator = new ShardValidator(); this.allocatedProcessors = OpenSearchExecutors.allocatedProcessors(clusterService.getSettings()); + this.resourceTrackers = ResourceTrackerProvider.create(threadPool); } @Override @@ -117,43 +119,65 @@ private void modifySchedulerInterval(TimeValue schedulerInterval) { } private void triggerForceMerge() { + if (isValidForForceMerge() == false) { + return; + } + executeForceMergeOnShards(); + } + + private boolean isValidForForceMerge() { if (configurationValidator.hasWarmNodes() == false) { + resourceTrackers.stop(); logger.debug("No warm nodes found. Skipping Auto Force merge."); - return; + return false; } if (nodeValidator.validate().isAllowed() == false) { logger.debug("Node capacity constraints are not allowing to trigger auto ForceMerge"); - return; + return false; } - int iteration = nodeValidator.getMaxConcurrentForceMerges(); + return true; + } + + private void executeForceMergeOnShards() { + int remainingIterations = nodeValidator.getMaxConcurrentForceMerges(); for (IndexShard shard : getShardsBasedOnSorting(indicesService)) { - if (iteration == 0) { + if (remainingIterations == 0 || !nodeValidator.validate().isAllowed()) { + if (remainingIterations > 0) { + logger.debug("Node conditions no longer suitable for force merge."); + } break; } - if (nodeValidator.validate().isAllowed() == false) { - logger.debug("Node conditions no longer suitable for force merge."); + remainingIterations--; + executeForceMergeForShard(shard); + if (!waitBetweenShards()) { break; } - iteration--; - CompletableFuture.runAsync(() -> { - try { - mergingShards.add(shard.shardId().getId()); - shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount())); - logger.debug("Merging is completed successfully for the shard {}", shard.shardId()); - } catch (Exception e) { - logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e); - } finally { - mergingShards.remove(shard.shardId().getId()); - } - }, threadPool.executor(ThreadPool.Names.FORCE_MERGE)); - logger.info("Successfully triggered force merge for shard {}", shard.shardId()); + } + } + + private void executeForceMergeForShard(IndexShard shard) { + CompletableFuture.runAsync(() -> { try { - Thread.sleep(forceMergeManagerSettings.getForcemergeDelay().getMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Timer was interrupted while waiting between shards", e); - break; + mergingShards.add(shard.shardId().getId()); + shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount())); + logger.debug("Merging is completed successfully for the shard {}", shard.shardId()); + } catch (Exception e) { + logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e); + } finally { + mergingShards.remove(shard.shardId().getId()); } + }, threadPool.executor(ThreadPool.Names.FORCE_MERGE)); + logger.info("Successfully triggered force merge for shard {}", shard.shardId()); + } + + private boolean waitBetweenShards() { + try { + Thread.sleep(forceMergeManagerSettings.getForcemergeDelay().getMillis()); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Timer was interrupted while waiting between shards", e); + return false; } } @@ -264,15 +288,14 @@ protected class NodeValidator implements ValidationStrategy { @Override public ValidationResult validate() { + resourceTrackers.start(); if (isCpuUsageOverThreshold()) { return new ValidationResult(false); } if (isDiskUsageOverThreshold()) { return new ValidationResult(false); } - double jvmUsedPercent = jvmService.stats().getMem().getHeapUsedPercent(); - if (jvmUsedPercent >= forceMergeManagerSettings.getJvmThreshold()) { - logger.debug("JVM memory: {}% breached the threshold: {}", jvmUsedPercent, forceMergeManagerSettings.getJvmThreshold()); + if (isJvmUsageOverThreshold()) { return new ValidationResult(false); } if (areForceMergeThreadsAvailable() == false) { @@ -291,24 +314,34 @@ private boolean areForceMergeThreadsAvailable() { return false; } + private boolean isJvmUsageOverThreshold() { + double jvmAverage = resourceTrackers.jvmFiveMinute.getAverage(); + if (jvmAverage >= forceMergeManagerSettings.getJvmThreshold()) { + logger.debug("JVM Average: 5m({}%) breached the threshold: {}", jvmAverage, forceMergeManagerSettings.getJvmThreshold()); + return true; + } + jvmAverage = resourceTrackers.jvmOneMinute.getAverage(); + if (jvmAverage >= forceMergeManagerSettings.getJvmThreshold()) { + logger.debug("JVM Average: 1m({}%) breached the threshold: {}", jvmAverage, forceMergeManagerSettings.getJvmThreshold()); + return true; + } + double jvmUsedPercent = jvmService.stats().getMem().getHeapUsedPercent(); + if (jvmUsedPercent >= forceMergeManagerSettings.getJvmThreshold()) { + logger.debug("JVM memory: {}% breached the threshold: {}", jvmUsedPercent, forceMergeManagerSettings.getJvmThreshold()); + return true; + } + return false; + } + private boolean isCpuUsageOverThreshold() { - double[] loadAverage = osService.stats().getCpu().getLoadAverage(); - double loadAverage5m = (loadAverage[1] / (double) allocatedProcessors) * 100; - if (loadAverage5m >= forceMergeManagerSettings.getCpuThreshold()) { - logger.debug( - "Load Average: 5m({}%) breached the threshold: {}", - loadAverage5m, - forceMergeManagerSettings.getCpuThreshold() - ); + double cpuAverage = resourceTrackers.cpuFiveMinute.getAverage(); + if (cpuAverage >= forceMergeManagerSettings.getCpuThreshold()) { + logger.debug("CPU Average: 5m({}%) breached the threshold: {}", cpuAverage, forceMergeManagerSettings.getCpuThreshold()); return true; } - double loadAverage1m = (loadAverage[0] / (double) allocatedProcessors) * 100; - if (loadAverage1m >= forceMergeManagerSettings.getCpuThreshold()) { - logger.debug( - "Load Average: 1m({}%) breached the threshold: {}", - loadAverage1m, - forceMergeManagerSettings.getCpuThreshold() - ); + cpuAverage = resourceTrackers.cpuOneMinute.getAverage(); + if (cpuAverage >= forceMergeManagerSettings.getCpuThreshold()) { + logger.debug("CPU Average: 1m({}%) breached the threshold: {}", cpuAverage, forceMergeManagerSettings.getCpuThreshold()); return true; } double cpuPercent = osService.stats().getCpu().getPercent(); @@ -445,6 +478,7 @@ protected boolean mustReschedule() { @Override protected void runInternal() { if (configurationValidator.validate().isAllowed() == false) { + resourceTrackers.stop(); return; } triggerForceMerge(); diff --git a/server/src/main/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettings.java b/server/src/main/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettings.java index b1d9ccc77988c..4077cd5768574 100644 --- a/server/src/main/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettings.java +++ b/server/src/main/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettings.java @@ -56,11 +56,11 @@ public class ForceMergeManagerSettings { ); /** - * Setting for wait time between force merge operations (default: 10s). + * Setting for wait time between force merge operations (default: 15s). */ public static final Setting MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE = Setting.timeSetting( "node.auto_force_merge.merge_delay", - TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(15), TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(60), Setting.Property.Dynamic, @@ -92,11 +92,11 @@ public class ForceMergeManagerSettings { ); /** - * Setting for cpu threshold. (default: 80) + * Setting for cpu threshold. (default: 75) */ public static final Setting CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting( "node.auto_force_merge.cpu.threshold", - 80.0, + 75.0, 10, 100, Setting.Property.Dynamic, @@ -104,11 +104,11 @@ public class ForceMergeManagerSettings { ); /** - * Setting for memory threshold. (default: 90) + * Setting for disk threshold. (default: 85) */ public static final Setting DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting( "node.auto_force_merge.disk.threshold", - 90.0, + 85.0, 10, 100, Setting.Property.Dynamic, diff --git a/server/src/main/java/org/opensearch/index/autoforcemerge/ResourceTrackerProvider.java b/server/src/main/java/org/opensearch/index/autoforcemerge/ResourceTrackerProvider.java new file mode 100644 index 0000000000000..6db76a6359ea9 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/autoforcemerge/ResourceTrackerProvider.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.autoforcemerge; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.resource.tracker.AverageCpuUsageTracker; +import org.opensearch.node.resource.tracker.AverageMemoryUsageTracker; +import org.opensearch.threadpool.ThreadPool; + +/** + * Provider for creating resource usage trackers used in auto force merge operations. + * + * @opensearch.internal + */ +public class ResourceTrackerProvider { + + public static final TimeValue SHORT_POLL_INTERVAL = TimeValue.timeValueSeconds(6); + public static final TimeValue LONG_POLL_INTERVAL = TimeValue.timeValueSeconds(30); + public static final TimeValue SHORT_AVERAGE_WINDOW = TimeValue.timeValueMinutes(1); + public static final TimeValue LONG_AVERAGE_WINDOW = TimeValue.timeValueMinutes(5); + + public static ResourceTrackers resourceTrackers; + + public static ResourceTrackers create(ThreadPool threadPool) { + return resourceTrackers = new ResourceTrackers( + new AverageCpuUsageTracker(threadPool, SHORT_POLL_INTERVAL, SHORT_AVERAGE_WINDOW), + new AverageCpuUsageTracker(threadPool, LONG_POLL_INTERVAL, LONG_AVERAGE_WINDOW), + new AverageMemoryUsageTracker(threadPool, SHORT_POLL_INTERVAL, SHORT_AVERAGE_WINDOW), + new AverageMemoryUsageTracker(threadPool, LONG_POLL_INTERVAL, LONG_AVERAGE_WINDOW) + ); + } + + /** + * Container for resource usage trackers used in auto force merge operations. + * Provides access to CPU and JVM memory usage trackers with different time windows. + * + * @opensearch.internal + */ + public static class ResourceTrackers { + public final AverageCpuUsageTracker cpuOneMinute; + public final AverageCpuUsageTracker cpuFiveMinute; + public final AverageMemoryUsageTracker jvmOneMinute; + public final AverageMemoryUsageTracker jvmFiveMinute; + + /** + * Creates a new ResourceTrackers instance. + * + * @param cpuOneMinute CPU tracker with 1-minute window + * @param cpuFiveMinute CPU tracker with 5-minute window + * @param jvmOneMinute JVM memory tracker with 1-minute window + * @param jvmFiveMinute JVM memory tracker with 5-minute window + */ + ResourceTrackers( + AverageCpuUsageTracker cpuOneMinute, + AverageCpuUsageTracker cpuFiveMinute, + AverageMemoryUsageTracker jvmOneMinute, + AverageMemoryUsageTracker jvmFiveMinute + ) { + this.cpuOneMinute = cpuOneMinute; + this.cpuFiveMinute = cpuFiveMinute; + this.jvmOneMinute = jvmOneMinute; + this.jvmFiveMinute = jvmFiveMinute; + } + + public void start() { + cpuOneMinute.start(); + cpuFiveMinute.start(); + jvmOneMinute.start(); + jvmFiveMinute.start(); + } + + public void stop() { + cpuOneMinute.stop(); + cpuFiveMinute.stop(); + jvmOneMinute.stop(); + jvmFiveMinute.stop(); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java b/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java index 5a8d7fc16ad3e..8f33c3534423a 100644 --- a/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java +++ b/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java @@ -18,6 +18,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lifecycle.Lifecycle; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -176,7 +177,6 @@ public void testConfigurationValidatorWithNonDataNode() { // NodeValidator Tests public void testNodeValidatorWithHealthyResources() { when(cpu.getPercent()).thenReturn((short) 50); - when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); when(jvm.getHeapUsedPercent()).thenReturn((short) 60); ThreadPoolStats stats = new ThreadPoolStats( Arrays.asList(new ThreadPoolStats.Stats( @@ -191,9 +191,32 @@ public void testNodeValidatorWithHealthyResources() { autoForceMergeManager.close(); } + public void testNodeValidatorWithFeatureSwitch() { + when(cpu.getPercent()).thenReturn((short) 50); + when(jvm.getHeapUsedPercent()).thenReturn((short) 60); + ThreadPoolStats stats = new ThreadPoolStats( + Arrays.asList(new ThreadPoolStats.Stats( + ThreadPool.Names.FORCE_MERGE, 1, 0, 0, 0, 1, 0, 0 + )) + ); + when(threadPool.stats()).thenReturn(stats); + Settings settings = getConfiguredClusterSettings(false, false, Collections.emptyMap()); + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(settings, getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE))); + autoForceMergeManager.start(); + assertFalse(autoForceMergeManager.getConfigurationValidator().validate().isAllowed()); + assertNotEquals(Lifecycle.State.STARTED, ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.lifecycleState()); + assertNotEquals(Lifecycle.State.STARTED, ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.lifecycleState()); + assertNotEquals(Lifecycle.State.STARTED, ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.lifecycleState()); + assertNotEquals(Lifecycle.State.STARTED, ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.lifecycleState()); + assertTrue(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + assertEquals(Lifecycle.State.STARTED, ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.lifecycleState()); + assertEquals(Lifecycle.State.STARTED, ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.lifecycleState()); + assertEquals(Lifecycle.State.STARTED, ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.lifecycleState()); + assertEquals(Lifecycle.State.STARTED, ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.lifecycleState()); + autoForceMergeManager.close(); + } + public void testNodeValidatorWithHighCPU() { - when(cpu.getPercent()).thenReturn((short) 95); - when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); DiscoveryNode dataNode1 = getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)); DiscoveryNode warmNode1 = getNodeWithRoles(WARM_NODE_1, Set.of(DiscoveryNodeRole.WARM_ROLE)); ClusterState clusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) @@ -207,20 +230,24 @@ public void testNodeValidatorWithHighCPU() { .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); when(clusterService.state()).thenReturn(clusterState); - AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), dataNode1); + AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode( + getConfiguredClusterSettings(true, true, Collections.emptyMap()), + dataNode1 + ); autoForceMergeManager.start(); + when(cpu.getPercent()).thenReturn((short) 95); assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); - when(cpu.getPercent()).thenReturn((short) 50); - when(cpu.getLoadAverage()).thenReturn(new double[]{0.9 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); + for (int i = 0; i < 10; i++) + ResourceTrackerProvider.resourceTrackers.cpuOneMinute.recordUsage(90); assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); - when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.9 * allocatedProcessors, 0.5 * allocatedProcessors}); + for (int i = 0; i < 10; i++) + ResourceTrackerProvider.resourceTrackers.cpuFiveMinute.recordUsage(90); assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); autoForceMergeManager.close(); } public void testNodeValidatorWithHighDiskUsage() { when(cpu.getPercent()).thenReturn((short) 50); - when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); when(disk.getAvailable()).thenReturn(new ByteSizeValue(5)); AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE))); autoForceMergeManager.start(); @@ -230,17 +257,21 @@ public void testNodeValidatorWithHighDiskUsage() { public void testNodeValidatorWithHighJVMUsage() { when(cpu.getPercent()).thenReturn((short) 50); - when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); - when(jvm.getHeapUsedPercent()).thenReturn((short) 90); AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE))); autoForceMergeManager.start(); + when(jvm.getHeapUsedPercent()).thenReturn((short) 90); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + for(int i = 0; i < 10; i++) + ResourceTrackerProvider.resourceTrackers.jvmOneMinute.recordUsage(90); + assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); + for(int i = 0; i < 10; i++) + ResourceTrackerProvider.resourceTrackers.jvmFiveMinute.recordUsage(90); assertFalse(autoForceMergeManager.getNodeValidator().validate().isAllowed()); autoForceMergeManager.close(); } public void testNodeValidatorWithInsufficientForceMergeThreads() { when(cpu.getPercent()).thenReturn((short) 50); - when(cpu.getLoadAverage()).thenReturn(new double[]{0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors}); when(jvm.getHeapUsedPercent()).thenReturn((short) 50); ThreadPoolStats stats = new ThreadPoolStats( Arrays.asList(new ThreadPoolStats.Stats( @@ -376,9 +407,6 @@ public void testForceMergeOperationOnDataNodeWithFailingMerges() throws IOExcept .build(); when(clusterService.state()).thenReturn(clusterState); when(cpu.getPercent()).thenReturn((short) 50); - when(cpu.getLoadAverage()).thenReturn( - new double[] { 0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors } - ); when(jvm.getHeapUsedPercent()).thenReturn((short) 50); int forceMergeThreads = 4; @@ -430,9 +458,6 @@ public void testForceMergeOperationOnDataNodeOfWarmEnabledCluster() throws IOExc .build(); when(clusterService.state()).thenReturn(clusterState); when(cpu.getPercent()).thenReturn((short) 50); - when(cpu.getLoadAverage()).thenReturn( - new double[] { 0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors } - ); when(jvm.getHeapUsedPercent()).thenReturn((short) 50); int forceMergeThreads = 4; ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads); @@ -488,9 +513,6 @@ public void testForceMergeOperationOnDataNodeWithThreadInterruption() throws Int .build(); when(clusterService.state()).thenReturn(clusterState); when(cpu.getPercent()).thenReturn((short) 50); - when(cpu.getLoadAverage()).thenReturn( - new double[] { 0.7 * allocatedProcessors, 0.6 * allocatedProcessors, 0.5 * allocatedProcessors } - ); when(jvm.getHeapUsedPercent()).thenReturn((short) 50); int forceMergeThreads = 4; diff --git a/server/src/test/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettingsTests.java b/server/src/test/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettingsTests.java index cb3c008f0de08..7aded4023670a 100644 --- a/server/src/test/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/autoforcemerge/ForceMergeManagerSettingsTests.java @@ -61,12 +61,13 @@ public void accept(TimeValue timeValue) { public void testDefaultSettings() { assertEquals(false, forceMergeManagerSettings.isAutoForceMergeFeatureEnabled()); - assertEquals(forceMergeManagerSettings.getForcemergeDelay(), TimeValue.timeValueSeconds(10)); + assertEquals(forceMergeManagerSettings.getForcemergeDelay(), TimeValue.timeValueSeconds(15)); assertEquals(forceMergeManagerSettings.getSchedulerInterval(), TimeValue.timeValueMinutes(30)); assertEquals(2, (int) forceMergeManagerSettings.getConcurrencyMultiplier()); assertEquals(1, (int) forceMergeManagerSettings.getSegmentCount()); - assertEquals(80.0, forceMergeManagerSettings.getCpuThreshold(), 0.0); + assertEquals(75.0, forceMergeManagerSettings.getCpuThreshold(), 0.0); assertEquals(75.0, forceMergeManagerSettings.getJvmThreshold(), 0.0); + assertEquals(85.0, forceMergeManagerSettings.getDiskThreshold(), 0.0); } public void testDynamicSettingsUpdate() { @@ -100,14 +101,14 @@ public void testTimeValueSettings() { Settings newSettings = Settings.builder() .put(ForceMergeManagerSettings.AUTO_FORCE_MERGE_SCHEDULER_INTERVAL.getKey(), "10m") .put(ForceMergeManagerSettings.TRANSLOG_AGE_AUTO_FORCE_MERGE.getKey(), "10m") - .put(ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE.getKey(), "15s") + .put(ForceMergeManagerSettings.MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE.getKey(), "20s") .build(); clusterSettings.applySettings(newSettings); assertEquals(forceMergeManagerSettings.getSchedulerInterval(), TimeValue.timeValueMinutes(10)); assertEquals(forceMergeManagerSettings.getTranslogAge(), TimeValue.timeValueMinutes(10)); - assertEquals(forceMergeManagerSettings.getForcemergeDelay(), TimeValue.timeValueSeconds(15)); + assertEquals(forceMergeManagerSettings.getForcemergeDelay(), TimeValue.timeValueSeconds(20)); } public void testThreadSettings() {