Skip to content

Commit f93f41b

Browse files
authored
Merge branch '2.x' into backport/backport-14847-to-2.x
Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>
2 parents 7bcd90b + 7d1f383 commit f93f41b

File tree

6 files changed

+99
-45
lines changed

6 files changed

+99
-45
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
2929
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
3030
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))
31+
- Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832))
3132
- Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847)))
3233

3334
### Dependencies
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.tasks.TaskResourceTrackingService;
12+
13+
/**
14+
* SearchTaskRequestOperationsListener subscriber for operations on search tasks resource usages.
15+
* Listener ensures to refreshResourceStats on request end capturing the search task resource usage
16+
* upon request completion.
17+
*
18+
*/
19+
public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener {
20+
private final TaskResourceTrackingService taskResourceTrackingService;
21+
22+
public SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) {
23+
this.taskResourceTrackingService = taskResourceTrackingService;
24+
}
25+
26+
@Override
27+
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
28+
taskResourceTrackingService.refreshResourceStats(context.getTask());
29+
}
30+
}

server/src/main/java/org/opensearch/cluster/ClusterInfo.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.cluster;
3434

3535
import org.opensearch.Version;
36+
import org.opensearch.cluster.routing.RoutingNode;
3637
import org.opensearch.cluster.routing.ShardRouting;
3738
import org.opensearch.common.annotation.PublicApi;
3839
import org.opensearch.core.common.io.stream.StreamInput;
@@ -69,6 +70,8 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
6970
final Map<ShardRouting, String> routingToDataPath;
7071
final Map<NodeAndPath, ReservedSpace> reservedSpace;
7172
final Map<String, FileCacheStats> nodeFileCacheStats;
73+
private long avgTotalBytes;
74+
private long avgFreeByte;
7275

7376
protected ClusterInfo() {
7477
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
@@ -98,6 +101,7 @@ public ClusterInfo(
98101
this.routingToDataPath = routingToDataPath;
99102
this.reservedSpace = reservedSpace;
100103
this.nodeFileCacheStats = nodeFileCacheStats;
104+
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
101105
}
102106

103107
public ClusterInfo(StreamInput in) throws IOException {
@@ -122,6 +126,39 @@ public ClusterInfo(StreamInput in) throws IOException {
122126
} else {
123127
this.nodeFileCacheStats = Map.of();
124128
}
129+
130+
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
131+
}
132+
133+
/**
134+
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
135+
* average usage of other nodes in the disk usage map.
136+
* @param usages Map of nodeId to DiskUsage for all known nodes.
137+
*/
138+
private void calculateAvgFreeAndTotalBytes(final Map<String, DiskUsage> usages) {
139+
if (usages == null || usages.isEmpty()) {
140+
this.avgTotalBytes = 0;
141+
this.avgFreeByte = 0;
142+
return;
143+
}
144+
145+
long totalBytes = 0;
146+
long freeBytes = 0;
147+
for (DiskUsage du : usages.values()) {
148+
totalBytes += du.getTotalBytes();
149+
freeBytes += du.getFreeBytes();
150+
}
151+
152+
this.avgTotalBytes = totalBytes / usages.size();
153+
this.avgFreeByte = freeBytes / usages.size();
154+
}
155+
156+
public long getAvgFreeByte() {
157+
return avgFreeByte;
158+
}
159+
160+
public long getAvgTotalBytes() {
161+
return avgTotalBytes;
125162
}
126163

127164
@Override

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,8 @@ public static long sizeOfRelocatingShards(
140140

141141
// Where reserved space is unavailable (e.g. stats are out-of-sync) compute a conservative estimate for initialising shards
142142
final List<ShardRouting> initializingShards = node.shardsWithState(ShardRoutingState.INITIALIZING);
143-
initializingShards.removeIf(shardRouting -> reservedSpace.containsShardId(shardRouting.shardId()));
144143
for (ShardRouting routing : initializingShards) {
145-
if (routing.relocatingNodeId() == null) {
144+
if (routing.relocatingNodeId() == null || reservedSpace.containsShardId(routing.shardId())) {
146145
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
147146
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
148147
// any additional space and can be ignored here
@@ -230,7 +229,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
230229

231230
// subtractLeavingShards is passed as false here, because they still use disk space, and therefore we should be extra careful
232231
// and take the size into account
233-
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false);
232+
final DiskUsageWithRelocations usage = getDiskUsage(
233+
node,
234+
allocation,
235+
usages,
236+
clusterInfo.getAvgFreeByte(),
237+
clusterInfo.getAvgTotalBytes(),
238+
false
239+
);
234240
// First, check that the node currently over the low watermark
235241
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
236242
// Cache the used disk percentage for displaying disk percentages consistent with documentation
@@ -492,7 +498,14 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
492498

493499
// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
494500
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
495-
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true);
501+
final DiskUsageWithRelocations usage = getDiskUsage(
502+
node,
503+
allocation,
504+
usages,
505+
clusterInfo.getAvgFreeByte(),
506+
clusterInfo.getAvgTotalBytes(),
507+
true
508+
);
496509
final String dataPath = clusterInfo.getDataPath(shardRouting);
497510
// If this node is already above the high threshold, the shard cannot remain (get it off!)
498511
final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
@@ -581,13 +594,15 @@ private DiskUsageWithRelocations getDiskUsage(
581594
RoutingNode node,
582595
RoutingAllocation allocation,
583596
final Map<String, DiskUsage> usages,
597+
final long avgFreeBytes,
598+
final long avgTotalBytes,
584599
boolean subtractLeavingShards
585600
) {
586601
DiskUsage usage = usages.get(node.nodeId());
587602
if (usage == null) {
588603
// If there is no usage, and we have other nodes in the cluster,
589604
// use the average usage for all nodes as the usage for this node
590-
usage = averageUsage(node, usages);
605+
usage = new DiskUsage(node.nodeId(), node.node().getName(), "_na_", avgTotalBytes, avgFreeBytes);
591606
if (logger.isDebugEnabled()) {
592607
logger.debug(
593608
"unable to determine disk usage for {}, defaulting to average across nodes [{} total] [{} free] [{}% free]",
@@ -619,26 +634,6 @@ private DiskUsageWithRelocations getDiskUsage(
619634
return diskUsageWithRelocations;
620635
}
621636

622-
/**
623-
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
624-
* average usage of other nodes in the disk usage map.
625-
* @param node Node to return an averaged DiskUsage object for
626-
* @param usages Map of nodeId to DiskUsage for all known nodes
627-
* @return DiskUsage representing given node using the average disk usage
628-
*/
629-
DiskUsage averageUsage(RoutingNode node, final Map<String, DiskUsage> usages) {
630-
if (usages.size() == 0) {
631-
return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", 0, 0);
632-
}
633-
long totalBytes = 0;
634-
long freeBytes = 0;
635-
for (DiskUsage du : usages.values()) {
636-
totalBytes += du.getTotalBytes();
637-
freeBytes += du.getFreeBytes();
638-
}
639-
return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", totalBytes / usages.size(), freeBytes / usages.size());
640-
}
641-
642637
/**
643638
* Given the DiskUsage for a node and the size of the shard, return the
644639
* percentage of free disk if the shard were to be allocated to the node.

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.action.search.SearchRequestOperationsListener;
5252
import org.opensearch.action.search.SearchRequestSlowLog;
5353
import org.opensearch.action.search.SearchRequestStats;
54+
import org.opensearch.action.search.SearchTaskRequestOperationsListener;
5455
import org.opensearch.action.search.SearchTransportService;
5556
import org.opensearch.action.support.TransportAction;
5657
import org.opensearch.action.update.UpdateHelper;
@@ -852,8 +853,17 @@ protected Node(
852853
threadPool
853854
);
854855

856+
final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
857+
settings,
858+
clusterService.getClusterSettings(),
859+
threadPool
860+
);
861+
855862
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings());
856863
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);
864+
final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener(
865+
taskResourceTrackingService
866+
);
857867

858868
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
859869
CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings);
@@ -982,7 +992,7 @@ protected Node(
982992
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
983993
new SearchRequestOperationsCompositeListenerFactory(
984994
Stream.concat(
985-
Stream.of(searchRequestStats, searchRequestSlowLog),
995+
Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener),
986996
pluginComponents.stream()
987997
.filter(p -> p instanceof SearchRequestOperationsListener)
988998
.map(p -> (SearchRequestOperationsListener) p)
@@ -1110,12 +1120,6 @@ protected Node(
11101120
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
11111121
clusterService.setIndexingPressureService(indexingPressureService);
11121122

1113-
final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
1114-
settings,
1115-
clusterService.getClusterSettings(),
1116-
threadPool
1117-
);
1118-
11191123
final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
11201124
settings,
11211125
clusterService.getClusterSettings()

server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -863,19 +863,6 @@ public void testUnknownDiskUsage() {
863863
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
864864
}
865865

866-
public void testAverageUsage() {
867-
RoutingNode rn = new RoutingNode("node1", newNode("node1"));
868-
DiskThresholdDecider decider = makeDecider(Settings.EMPTY);
869-
870-
final Map<String, DiskUsage> usages = new HashMap<>();
871-
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
872-
usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used
873-
874-
DiskUsage node1Usage = decider.averageUsage(rn, usages);
875-
assertThat(node1Usage.getTotalBytes(), equalTo(100L));
876-
assertThat(node1Usage.getFreeBytes(), equalTo(25L));
877-
}
878-
879866
public void testFreeDiskPercentageAfterShardAssigned() {
880867
DiskThresholdDecider decider = makeDecider(Settings.EMPTY);
881868

0 commit comments

Comments
 (0)