Skip to content

Commit 973f3be

Browse files
committed
Add changes to block non-paginated calls in cat shards, indices and segments
Signed-off-by: Sumit Bansal <[email protected]>
1 parent b2a7136 commit 973f3be

File tree

12 files changed

+472
-37
lines changed

12 files changed

+472
-37
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@
325325
import org.opensearch.plugins.ActionPlugin;
326326
import org.opensearch.plugins.ActionPlugin.ActionHandler;
327327
import org.opensearch.rest.NamedRoute;
328+
import org.opensearch.rest.RequestLimitSettings;
328329
import org.opensearch.rest.RestController;
329330
import org.opensearch.rest.RestHandler;
330331
import org.opensearch.rest.RestHeaderDefinition;
@@ -528,6 +529,7 @@ public class ActionModule extends AbstractModule {
528529
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
529530
private final ThreadPool threadPool;
530531
private final ExtensionsManager extensionsManager;
532+
private final RequestLimitSettings requestLimitSettings;
531533

532534
public ActionModule(
533535
Settings settings,
@@ -580,6 +582,7 @@ public ActionModule(
580582
);
581583

582584
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, identityService);
585+
requestLimitSettings = new RequestLimitSettings(clusterSettings, settings);
583586
}
584587

585588
public Map<String, ActionHandler<?, ?>> getActions() {
@@ -960,8 +963,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
960963
registerHandler.accept(new RestClusterManagerAction());
961964
registerHandler.accept(new RestNodesAction());
962965
registerHandler.accept(new RestTasksAction(nodesInCluster));
963-
registerHandler.accept(new RestIndicesAction());
964-
registerHandler.accept(new RestSegmentsAction());
966+
registerHandler.accept(new RestIndicesAction(requestLimitSettings));
967+
registerHandler.accept(new RestSegmentsAction(requestLimitSettings));
965968
// Fully qualified to prevent interference with rest.action.count.RestCountAction
966969
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
967970
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
@@ -1048,6 +1051,8 @@ protected void configure() {
10481051

10491052
// register dynamic ActionType -> transportAction Map used by NodeClient
10501053
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);
1054+
1055+
bind(RequestLimitSettings.class).toInstance(requestLimitSettings);
10511056
}
10521057

10531058
public ActionFilters getActionFilters() {

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq
2727

2828
private String[] indices;
2929
private TimeValue cancelAfterTimeInterval;
30+
private boolean requestLimitCheckSupported;
3031

3132
public CatShardsRequest() {}
3233

3334
public CatShardsRequest(StreamInput in) throws IOException {
3435
super(in);
36+
this.requestLimitCheckSupported = false;
3537
}
3638

3739
@Override
@@ -55,6 +57,14 @@ public TimeValue getCancelAfterTimeInterval() {
5557
return this.cancelAfterTimeInterval;
5658
}
5759

60+
public void setRequestLimitCheckSupported(final boolean requestLimitCheckSupported) {
61+
this.requestLimitCheckSupported = requestLimitCheckSupported;
62+
}
63+
64+
public boolean isRequestLimitCheckSupported() {
65+
return this.requestLimitCheckSupported;
66+
}
67+
5868
@Override
5969
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
6070
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919
import org.opensearch.common.inject.Inject;
2020
import org.opensearch.core.action.ActionListener;
2121
import org.opensearch.core.action.NotifyOnceListener;
22+
import org.opensearch.core.common.breaker.CircuitBreaker;
23+
import org.opensearch.core.common.breaker.CircuitBreakingException;
24+
import org.opensearch.rest.RequestLimitSettings;
2225
import org.opensearch.tasks.CancellableTask;
2326
import org.opensearch.tasks.Task;
2427
import org.opensearch.transport.TransportService;
2528

29+
import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_SHARDS;
30+
2631
/**
2732
* Perform cat shards action
2833
*
@@ -31,11 +36,18 @@
3136
public class TransportCatShardsAction extends HandledTransportAction<CatShardsRequest, CatShardsResponse> {
3237

3338
private final NodeClient client;
39+
private final RequestLimitSettings requestLimitSettings;
3440

3541
@Inject
36-
public TransportCatShardsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) {
42+
public TransportCatShardsAction(
43+
NodeClient client,
44+
TransportService transportService,
45+
ActionFilters actionFilters,
46+
RequestLimitSettings requestLimitSettings
47+
) {
3748
super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new);
3849
this.client = client;
50+
this.requestLimitSettings = requestLimitSettings;
3951
}
4052

4153
@Override
@@ -73,6 +85,10 @@ protected void innerOnFailure(Exception e) {
7385
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
7486
@Override
7587
public void onResponse(ClusterStateResponse clusterStateResponse) {
88+
if (shardsRequest.isRequestLimitCheckSupported()
89+
&& requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_SHARDS)) {
90+
listener.onFailure(new CircuitBreakingException("Too many shards requested.", CircuitBreaker.Durability.TRANSIENT));
91+
}
7692
catShardsResponse.setClusterStateResponse(clusterStateResponse);
7793
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
7894
indicesStatsRequest.setShouldCancelOnTimeout(true);

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
import org.opensearch.repositories.blobstore.BlobStoreRepository;
156156
import org.opensearch.repositories.fs.FsRepository;
157157
import org.opensearch.rest.BaseRestHandler;
158+
import org.opensearch.rest.RequestLimitSettings;
158159
import org.opensearch.script.ScriptService;
159160
import org.opensearch.search.SearchService;
160161
import org.opensearch.search.aggregations.MultiBucketConsumerService;
@@ -793,7 +794,11 @@ public void apply(Settings value, Settings current, Settings previous) {
793794
WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD,
794795
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
795796
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
796-
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD
797+
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,
798+
799+
RequestLimitSettings.CAT_INDICES_LIMIT_SETTING,
800+
RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING,
801+
RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING
797802
)
798803
)
799804
);
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.rest;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.cluster.routing.IndexRoutingTable;
13+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
14+
import org.opensearch.cluster.routing.RoutingTable;
15+
import org.opensearch.common.settings.ClusterSettings;
16+
import org.opensearch.common.settings.Setting;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.rest.action.cat.RestIndicesAction;
19+
import org.opensearch.rest.action.cat.RestSegmentsAction;
20+
import org.opensearch.rest.action.cat.RestShardsAction;
21+
22+
import java.util.Map;
23+
import java.util.Objects;
24+
import java.util.function.Supplier;
25+
26+
/**
27+
* Class to define dynamic settings for putting circuit breakers on the actions and functions to evaluate if block is required.
28+
*/
29+
public class RequestLimitSettings {
30+
31+
/**
32+
* Enum to represent action names against whom we need to perform limit checks.
33+
*/
34+
public enum BlockAction {
35+
CAT_INDICES,
36+
CAT_SHARDS,
37+
CAT_SEGMENTS
38+
}
39+
40+
private volatile int catIndicesLimit;
41+
private volatile int catShardsLimit;
42+
private volatile int catSegmentsLimit;
43+
44+
/**
45+
* Setting to enable circuit breaker on {@link RestIndicesAction}. The limit will be applied on number of indices.
46+
*/
47+
public static final Setting<Integer> CAT_INDICES_LIMIT_SETTING = Setting.intSetting(
48+
"cat.indices.limit",
49+
-1,
50+
Setting.Property.NodeScope,
51+
Setting.Property.Dynamic
52+
);
53+
54+
/**
55+
* Setting to enable circuit breaker on {@link RestShardsAction}. The limit will be applied on number of shards.
56+
*/
57+
public static final Setting<Integer> CAT_SHARDS_LIMIT_SETTING = Setting.intSetting(
58+
"cat.shards.limit",
59+
-1,
60+
Setting.Property.NodeScope,
61+
Setting.Property.Dynamic
62+
);
63+
64+
/**
65+
* Setting to enable circuit breaker on {@link RestSegmentsAction}. The limit will be applied on number of indices.
66+
*/
67+
public static final Setting<Integer> CAT_SEGMENTS_LIMIT_SETTING = Setting.intSetting(
68+
"cat.segments.limit",
69+
-1,
70+
Setting.Property.NodeScope,
71+
Setting.Property.Dynamic
72+
);
73+
74+
public RequestLimitSettings(ClusterSettings clusterSettings, Settings settings) {
75+
setCatShardsLimitSetting(CAT_SHARDS_LIMIT_SETTING.get(settings));
76+
setCatIndicesLimitSetting(CAT_INDICES_LIMIT_SETTING.get(settings));
77+
setCatSegmentsLimitSetting(CAT_SEGMENTS_LIMIT_SETTING.get(settings));
78+
79+
clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimitSetting);
80+
clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimitSetting);
81+
clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimitSetting);
82+
}
83+
84+
/**
85+
* Method to check if the circuit breaker limit has reached for an action.
86+
* The limits are controlled via dynamic settings.
87+
*
88+
* @param clusterState {@link ClusterState}
89+
* @param actionToCheck {@link BlockAction}
90+
* @return True/False
91+
*/
92+
public boolean isCircuitLimitBreached(final ClusterState clusterState, final BlockAction actionToCheck) {
93+
if (Objects.isNull(clusterState)) return false;
94+
switch (actionToCheck) {
95+
case CAT_INDICES:
96+
if (catIndicesLimit <= 0) return false;
97+
int indicesCount = getTotalIndices(clusterState);
98+
if (indicesCount > catIndicesLimit) return true;
99+
break;
100+
case CAT_SHARDS:
101+
if (catShardsLimit <= 0) return false;
102+
int totalShards = getTotalShards(clusterState);
103+
if (totalShards > catShardsLimit) return true;
104+
break;
105+
case CAT_SEGMENTS:
106+
if (catSegmentsLimit <= 0) return false;
107+
if (getTotalIndices(clusterState) > catSegmentsLimit) return true;
108+
break;
109+
}
110+
return false;
111+
}
112+
113+
private void setCatShardsLimitSetting(final int catShardsLimit) {
114+
this.catShardsLimit = catShardsLimit;
115+
}
116+
117+
private void setCatIndicesLimitSetting(final int catIndicesLimit) {
118+
this.catIndicesLimit = catIndicesLimit;
119+
}
120+
121+
private void setCatSegmentsLimitSetting(final int catSegmentsLimit) {
122+
this.catSegmentsLimit = catSegmentsLimit;
123+
}
124+
125+
private static int getTotalIndices(final ClusterState clusterState) {
126+
return chainWalk(() -> clusterState.getMetadata().getIndices().size(), 0);
127+
}
128+
129+
private static int getTotalShards(final ClusterState clusterState) {
130+
final RoutingTable routingTable = clusterState.getRoutingTable();
131+
final Map<String, IndexRoutingTable> indexRoutingTableMap = routingTable.getIndicesRouting();
132+
int totalShards = 0;
133+
for (final Map.Entry<String, IndexRoutingTable> entry : indexRoutingTableMap.entrySet()) {
134+
for (final Map.Entry<Integer, IndexShardRoutingTable> indexShardRoutingTableEntry : entry.getValue().getShards().entrySet()) {
135+
totalShards += indexShardRoutingTableEntry.getValue().getShards().size();
136+
}
137+
}
138+
return totalShards;
139+
}
140+
141+
// TODO: Evaluate if we can move this to common util.
142+
private static <T> T chainWalk(Supplier<T> supplier, T defaultValue) {
143+
try {
144+
return supplier.get();
145+
} catch (NullPointerException e) {
146+
return defaultValue;
147+
}
148+
}
149+
}

server/src/main/java/org/opensearch/rest/action/cat/AbstractCatAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.core.rest.RestStatus;
4040
import org.opensearch.rest.BaseRestHandler;
4141
import org.opensearch.rest.BytesRestResponse;
42+
import org.opensearch.rest.RequestLimitSettings;
4243
import org.opensearch.rest.RestRequest;
4344

4445
import java.io.IOException;
@@ -98,4 +99,12 @@ protected Set<String> responseParams() {
9899
return RESPONSE_PARAMS;
99100
}
100101

102+
/**
103+
* Method to check if limits defined in {@link RequestLimitSettings} are applicable to an action.
104+
*
105+
* @return True / False status
106+
*/
107+
protected boolean isRequestLimitCheckSupported() {
108+
return false;
109+
}
101110
}

0 commit comments

Comments
 (0)