Skip to content

Replaced CPU load average logic with AverageTracker classes. Default thresholds modified #18666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix the visit of sub queries for HasParentQuery and HasChildQuery ([#18621](https://github.com/opensearch-project/OpenSearch/pull/18621))
- Fix the backward compatibility regression with COMPLEMENT for Regexp queries introduced in OpenSearch 3.0 ([#18640](https://github.com/opensearch-project/OpenSearch/pull/18640))
- Fix Replication lag computation ([#18602](https://github.com/opensearch-project/OpenSearch/pull/18602))
- Fixed Staggered merge - load average replace with AverageTrackers, some Default thresholds modified ([#18666](https://github.com/opensearch-project/OpenSearch/pull/18666))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ public class AutoForceMergeManagerIT extends RemoteStoreBaseIntegTestCase {
private static final String MERGE_DELAY = "1s";
private static final Integer SEGMENT_COUNT = 1;

@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
Expand Down Expand Up @@ -158,8 +153,8 @@ public void testAutoForceMergeTriggeringBasicWithOneShard() throws Exception {
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
waitUntil(() -> shard.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
// assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount());
// assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount());
assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount());
assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount());
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
}

Expand Down Expand Up @@ -221,11 +216,11 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false);
SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false);
SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false);
AtomicLong totalSegments = new AtomicLong(
AtomicLong totalSegmentsBefore = new AtomicLong(
segmentsStatsForShard1Before.getCount() + segmentsStatsForShard2Before.getCount() + segmentsStatsForShard3Before.getCount()
+ segmentsStatsForShard4Before.getCount() + segmentsStatsForShard5Before.getCount()
);
assertTrue(totalSegments.get() > 5);
assertTrue(totalSegmentsBefore.get() > 5);
waitUntil(() -> shard1.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
waitUntil(() -> shard2.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
waitUntil(() -> shard3.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
Expand All @@ -236,11 +231,11 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false);
SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false);
SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false);
totalSegments.set(
AtomicLong totalSegmentsAfter = new AtomicLong(
segmentsStatsForShard1After.getCount() + segmentsStatsForShard2After.getCount() + segmentsStatsForShard3After.getCount()
+ segmentsStatsForShard4After.getCount() + segmentsStatsForShard5After.getCount()
);
// assertEquals(5, totalSegments.get());
assertTrue(totalSegmentsBefore.get() > totalSegmentsAfter.get());
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_2).get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@
private ConfigurationValidator configurationValidator;
private NodeValidator nodeValidator;
private ShardValidator shardValidator;
private Integer allocatedProcessors;
private ResourceTrackerProvider.ResourceTrackers resourceTrackers;
private final ForceMergeManagerSettings forceMergeManagerSettings;
private final CommonStatsFlags flags = new CommonStatsFlags(CommonStatsFlags.Flag.Segments, CommonStatsFlags.Flag.Translog);
private final Set<Integer> mergingShards;
private Integer allocatedProcessors;

private static final Logger logger = LogManager.getLogger(AutoForceMergeManager.class);

Expand All @@ -96,6 +97,7 @@
this.nodeValidator = new NodeValidator();
this.shardValidator = new ShardValidator();
this.allocatedProcessors = OpenSearchExecutors.allocatedProcessors(clusterService.getSettings());
this.resourceTrackers = ResourceTrackerProvider.create(threadPool);
}

@Override
Expand All @@ -117,43 +119,65 @@
}

private void triggerForceMerge() {
if (isValidForForceMerge() == false) {
return;
}
executeForceMergeOnShards();
}

private boolean isValidForForceMerge() {
if (configurationValidator.hasWarmNodes() == false) {
resourceTrackers.stop();
logger.debug("No warm nodes found. Skipping Auto Force merge.");
return;
return false;
}
if (nodeValidator.validate().isAllowed() == false) {
logger.debug("Node capacity constraints are not allowing to trigger auto ForceMerge");
return;
return false;

Check warning on line 136 in server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java#L136

Added line #L136 was not covered by tests
}
int iteration = nodeValidator.getMaxConcurrentForceMerges();
return true;
}

private void executeForceMergeOnShards() {
int remainingIterations = nodeValidator.getMaxConcurrentForceMerges();
for (IndexShard shard : getShardsBasedOnSorting(indicesService)) {
if (iteration == 0) {
if (remainingIterations == 0 || !nodeValidator.validate().isAllowed()) {
if (remainingIterations > 0) {
logger.debug("Node conditions no longer suitable for force merge.");

Check warning on line 146 in server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java#L146

Added line #L146 was not covered by tests
}
break;
}
if (nodeValidator.validate().isAllowed() == false) {
logger.debug("Node conditions no longer suitable for force merge.");
remainingIterations--;
executeForceMergeForShard(shard);
if (!waitBetweenShards()) {
break;
}
iteration--;
CompletableFuture.runAsync(() -> {
try {
mergingShards.add(shard.shardId().getId());
shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount()));
logger.debug("Merging is completed successfully for the shard {}", shard.shardId());
} catch (Exception e) {
logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e);
} finally {
mergingShards.remove(shard.shardId().getId());
}
}, threadPool.executor(ThreadPool.Names.FORCE_MERGE));
logger.info("Successfully triggered force merge for shard {}", shard.shardId());
}
}

private void executeForceMergeForShard(IndexShard shard) {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(forceMergeManagerSettings.getForcemergeDelay().getMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Timer was interrupted while waiting between shards", e);
break;
mergingShards.add(shard.shardId().getId());
shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount()));
logger.debug("Merging is completed successfully for the shard {}", shard.shardId());
} catch (Exception e) {
logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e);
} finally {
mergingShards.remove(shard.shardId().getId());
}
}, threadPool.executor(ThreadPool.Names.FORCE_MERGE));
logger.info("Successfully triggered force merge for shard {}", shard.shardId());
}

private boolean waitBetweenShards() {
try {
Thread.sleep(forceMergeManagerSettings.getForcemergeDelay().getMillis());
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Timer was interrupted while waiting between shards", e);
return false;
}
}

Expand Down Expand Up @@ -264,15 +288,14 @@

@Override
public ValidationResult validate() {
resourceTrackers.start();
if (isCpuUsageOverThreshold()) {
return new ValidationResult(false);
}
if (isDiskUsageOverThreshold()) {
return new ValidationResult(false);
}
double jvmUsedPercent = jvmService.stats().getMem().getHeapUsedPercent();
if (jvmUsedPercent >= forceMergeManagerSettings.getJvmThreshold()) {
logger.debug("JVM memory: {}% breached the threshold: {}", jvmUsedPercent, forceMergeManagerSettings.getJvmThreshold());
if (isJvmUsageOverThreshold()) {
return new ValidationResult(false);
}
if (areForceMergeThreadsAvailable() == false) {
Expand All @@ -291,24 +314,34 @@
return false;
}

private boolean isJvmUsageOverThreshold() {
double jvmAverage = resourceTrackers.jvmFiveMinute.getAverage();
if (jvmAverage >= forceMergeManagerSettings.getJvmThreshold()) {
logger.debug("JVM Average: 5m({}%) breached the threshold: {}", jvmAverage, forceMergeManagerSettings.getJvmThreshold());
return true;
}
jvmAverage = resourceTrackers.jvmOneMinute.getAverage();
if (jvmAverage >= forceMergeManagerSettings.getJvmThreshold()) {
logger.debug("JVM Average: 1m({}%) breached the threshold: {}", jvmAverage, forceMergeManagerSettings.getJvmThreshold());
return true;
}
double jvmUsedPercent = jvmService.stats().getMem().getHeapUsedPercent();
if (jvmUsedPercent >= forceMergeManagerSettings.getJvmThreshold()) {
logger.debug("JVM memory: {}% breached the threshold: {}", jvmUsedPercent, forceMergeManagerSettings.getJvmThreshold());
return true;
}
return false;
}

private boolean isCpuUsageOverThreshold() {
double[] loadAverage = osService.stats().getCpu().getLoadAverage();
double loadAverage5m = (loadAverage[1] / (double) allocatedProcessors) * 100;
if (loadAverage5m >= forceMergeManagerSettings.getCpuThreshold()) {
logger.debug(
"Load Average: 5m({}%) breached the threshold: {}",
loadAverage5m,
forceMergeManagerSettings.getCpuThreshold()
);
double cpuAverage = resourceTrackers.cpuFiveMinute.getAverage();
if (cpuAverage >= forceMergeManagerSettings.getCpuThreshold()) {
logger.debug("CPU Average: 5m({}%) breached the threshold: {}", cpuAverage, forceMergeManagerSettings.getCpuThreshold());
return true;
}
double loadAverage1m = (loadAverage[0] / (double) allocatedProcessors) * 100;
if (loadAverage1m >= forceMergeManagerSettings.getCpuThreshold()) {
logger.debug(
"Load Average: 1m({}%) breached the threshold: {}",
loadAverage1m,
forceMergeManagerSettings.getCpuThreshold()
);
cpuAverage = resourceTrackers.cpuOneMinute.getAverage();
if (cpuAverage >= forceMergeManagerSettings.getCpuThreshold()) {
logger.debug("CPU Average: 1m({}%) breached the threshold: {}", cpuAverage, forceMergeManagerSettings.getCpuThreshold());
return true;
}
double cpuPercent = osService.stats().getCpu().getPercent();
Expand Down Expand Up @@ -445,6 +478,7 @@
@Override
protected void runInternal() {
if (configurationValidator.validate().isAllowed() == false) {
resourceTrackers.stop();
return;
}
triggerForceMerge();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public class ForceMergeManagerSettings {
);

/**
* Setting for wait time between force merge operations (default: 10s).
* Setting for wait time between force merge operations (default: 15s).
*/
public static final Setting<TimeValue> MERGE_DELAY_BETWEEN_SHARDS_FOR_AUTO_FORCE_MERGE = Setting.timeSetting(
"node.auto_force_merge.merge_delay",
TimeValue.timeValueSeconds(10),
TimeValue.timeValueSeconds(15),
TimeValue.timeValueSeconds(1),
TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic,
Expand Down Expand Up @@ -92,23 +92,23 @@ public class ForceMergeManagerSettings {
);

/**
* Setting for cpu threshold. (default: 80)
* Setting for cpu threshold. (default: 75)
*/
public static final Setting<Double> CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting(
"node.auto_force_merge.cpu.threshold",
80.0,
75.0,
10,
100,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Setting for memory threshold. (default: 90)
* Setting for disk threshold. (default: 85)
*/
public static final Setting<Double> DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE = Setting.doubleSetting(
"node.auto_force_merge.disk.threshold",
90.0,
85.0,
10,
100,
Setting.Property.Dynamic,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.autoforcemerge;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.node.resource.tracker.AverageCpuUsageTracker;
import org.opensearch.node.resource.tracker.AverageMemoryUsageTracker;
import org.opensearch.threadpool.ThreadPool;

/**
* Provider for creating resource usage trackers used in auto force merge operations.
*
* @opensearch.internal
*/
public class ResourceTrackerProvider {

Check warning on line 21 in server/src/main/java/org/opensearch/index/autoforcemerge/ResourceTrackerProvider.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/autoforcemerge/ResourceTrackerProvider.java#L21

Added line #L21 was not covered by tests

public static final TimeValue SHORT_POLL_INTERVAL = TimeValue.timeValueSeconds(6);
public static final TimeValue LONG_POLL_INTERVAL = TimeValue.timeValueSeconds(30);
public static final TimeValue SHORT_AVERAGE_WINDOW = TimeValue.timeValueMinutes(1);
public static final TimeValue LONG_AVERAGE_WINDOW = TimeValue.timeValueMinutes(5);

public static ResourceTrackers resourceTrackers;

public static ResourceTrackers create(ThreadPool threadPool) {
return resourceTrackers = new ResourceTrackers(
new AverageCpuUsageTracker(threadPool, SHORT_POLL_INTERVAL, SHORT_AVERAGE_WINDOW),
new AverageCpuUsageTracker(threadPool, LONG_POLL_INTERVAL, LONG_AVERAGE_WINDOW),
new AverageMemoryUsageTracker(threadPool, SHORT_POLL_INTERVAL, SHORT_AVERAGE_WINDOW),
new AverageMemoryUsageTracker(threadPool, LONG_POLL_INTERVAL, LONG_AVERAGE_WINDOW)
);
}

/**
* Container for resource usage trackers used in auto force merge operations.
* Provides access to CPU and JVM memory usage trackers with different time windows.
*
* @opensearch.internal
*/
public static class ResourceTrackers {
public final AverageCpuUsageTracker cpuOneMinute;
public final AverageCpuUsageTracker cpuFiveMinute;
public final AverageMemoryUsageTracker jvmOneMinute;
public final AverageMemoryUsageTracker jvmFiveMinute;

/**
* Creates a new ResourceTrackers instance.
*
* @param cpuOneMinute CPU tracker with 1-minute window
* @param cpuFiveMinute CPU tracker with 5-minute window
* @param jvmOneMinute JVM memory tracker with 1-minute window
* @param jvmFiveMinute JVM memory tracker with 5-minute window
*/
ResourceTrackers(
AverageCpuUsageTracker cpuOneMinute,
AverageCpuUsageTracker cpuFiveMinute,
AverageMemoryUsageTracker jvmOneMinute,
AverageMemoryUsageTracker jvmFiveMinute
) {
this.cpuOneMinute = cpuOneMinute;
this.cpuFiveMinute = cpuFiveMinute;
this.jvmOneMinute = jvmOneMinute;
this.jvmFiveMinute = jvmFiveMinute;
}

public void start() {
cpuOneMinute.start();
cpuFiveMinute.start();
jvmOneMinute.start();
jvmFiveMinute.start();
}

public void stop() {
cpuOneMinute.stop();
cpuFiveMinute.stop();
jvmOneMinute.stop();
jvmFiveMinute.stop();
}
}
}
Loading
Loading