Skip to content

Commit 59b896a

Browse files
committed
Add changes to block non-paginated calls in cat shards, indices and segments
Signed-off-by: Sumit Bansal <[email protected]>
1 parent b2a7136 commit 59b896a

File tree

8 files changed

+235
-35
lines changed

8 files changed

+235
-35
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@
425425
import org.opensearch.rest.action.admin.indices.RestValidateQueryAction;
426426
import org.opensearch.rest.action.admin.indices.RestViewAction;
427427
import org.opensearch.rest.action.cat.AbstractCatAction;
428+
import org.opensearch.rest.action.cat.RequestLimitSettings;
428429
import org.opensearch.rest.action.cat.RestAliasAction;
429430
import org.opensearch.rest.action.cat.RestAllocationAction;
430431
import org.opensearch.rest.action.cat.RestCatAction;
@@ -528,6 +529,7 @@ public class ActionModule extends AbstractModule {
528529
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
529530
private final ThreadPool threadPool;
530531
private final ExtensionsManager extensionsManager;
532+
private final RequestLimitSettings requestLimitSettings;
531533

532534
public ActionModule(
533535
Settings settings,
@@ -580,6 +582,7 @@ public ActionModule(
580582
);
581583

582584
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, identityService);
585+
requestLimitSettings = new RequestLimitSettings(clusterSettings, settings);
583586
}
584587

585588
public Map<String, ActionHandler<?, ?>> getActions() {
@@ -960,7 +963,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
960963
registerHandler.accept(new RestClusterManagerAction());
961964
registerHandler.accept(new RestNodesAction());
962965
registerHandler.accept(new RestTasksAction(nodesInCluster));
963-
registerHandler.accept(new RestIndicesAction());
966+
registerHandler.accept(new RestIndicesAction(requestLimitSettings));
964967
registerHandler.accept(new RestSegmentsAction());
965968
// Fully qualified to prevent interference with rest.action.count.RestCountAction
966969
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
@@ -1048,6 +1051,8 @@ protected void configure() {
10481051

10491052
// register dynamic ActionType -> transportAction Map used by NodeClient
10501053
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);
1054+
1055+
bind(RequestLimitSettings.class).toInstance(requestLimitSettings);
10511056
}
10521057

10531058
public ActionFilters getActionFilters() {

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919
import org.opensearch.common.inject.Inject;
2020
import org.opensearch.core.action.ActionListener;
2121
import org.opensearch.core.action.NotifyOnceListener;
22+
import org.opensearch.core.common.breaker.CircuitBreaker;
23+
import org.opensearch.core.common.breaker.CircuitBreakingException;
24+
import org.opensearch.rest.action.cat.RequestLimitSettings;
2225
import org.opensearch.tasks.CancellableTask;
2326
import org.opensearch.tasks.Task;
2427
import org.opensearch.transport.TransportService;
2528

29+
import static org.opensearch.rest.action.cat.RequestLimitSettings.BlockAction.CAT_SHARDS;
30+
2631
/**
2732
* Perform cat shards action
2833
*
@@ -31,11 +36,18 @@
3136
public class TransportCatShardsAction extends HandledTransportAction<CatShardsRequest, CatShardsResponse> {
3237

3338
private final NodeClient client;
39+
private final RequestLimitSettings requestLimitSettings;
3440

3541
@Inject
36-
public TransportCatShardsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) {
42+
public TransportCatShardsAction(
43+
NodeClient client,
44+
TransportService transportService,
45+
ActionFilters actionFilters,
46+
RequestLimitSettings requestLimitSettings
47+
) {
3748
super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new);
3849
this.client = client;
50+
this.requestLimitSettings = requestLimitSettings;
3951
}
4052

4153
@Override
@@ -73,6 +85,9 @@ protected void innerOnFailure(Exception e) {
7385
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
7486
@Override
7587
public void onResponse(ClusterStateResponse clusterStateResponse) {
88+
if (requestLimitSettings.isCircuitBreakerLimitBreached(clusterStateResponse.getState(), CAT_SHARDS)) {
89+
listener.onFailure(new CircuitBreakingException("Too many shards requested.", CircuitBreaker.Durability.TRANSIENT));
90+
}
7691
catShardsResponse.setClusterStateResponse(clusterStateResponse);
7792
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
7893
indicesStatsRequest.setShouldCancelOnTimeout(true);

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
import org.opensearch.repositories.blobstore.BlobStoreRepository;
156156
import org.opensearch.repositories.fs.FsRepository;
157157
import org.opensearch.rest.BaseRestHandler;
158+
import org.opensearch.rest.action.cat.RequestLimitSettings;
158159
import org.opensearch.script.ScriptService;
159160
import org.opensearch.search.SearchService;
160161
import org.opensearch.search.aggregations.MultiBucketConsumerService;
@@ -793,7 +794,11 @@ public void apply(Settings value, Settings current, Settings previous) {
793794
WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD,
794795
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
795796
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
796-
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD
797+
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,
798+
799+
RequestLimitSettings.CAT_INDICES_LIMIT_SETTING,
800+
RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING,
801+
RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING
797802
)
798803
)
799804
);
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.rest.action.cat;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.cluster.routing.IndexRoutingTable;
13+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
14+
import org.opensearch.cluster.routing.RoutingTable;
15+
import org.opensearch.common.settings.ClusterSettings;
16+
import org.opensearch.common.settings.Setting;
17+
import org.opensearch.common.settings.Settings;
18+
19+
import java.util.Map;
20+
import java.util.Objects;
21+
import java.util.function.Supplier;
22+
23+
/**
24+
* Class to define dynamic settings for putting circuit breakers on the actions and functions to evaluate if block is required.
25+
*/
26+
public class RequestLimitSettings {
27+
28+
public enum BlockAction {
29+
CAT_INDICES,
30+
CAT_SHARDS,
31+
CAT_SEGMENTS
32+
}
33+
34+
private volatile int catIndicesLimit;
35+
private volatile int catShardsLimit;
36+
private volatile int catSegmentsLimit;
37+
38+
public static final Setting<Integer> CAT_INDICES_LIMIT_SETTING = Setting.intSetting(
39+
"cat.indices.limit",
40+
-1,
41+
Setting.Property.NodeScope,
42+
Setting.Property.Dynamic
43+
);
44+
45+
public static final Setting<Integer> CAT_SHARDS_LIMIT_SETTING = Setting.intSetting(
46+
"cat.shards.limit",
47+
-1,
48+
Setting.Property.NodeScope,
49+
Setting.Property.Dynamic
50+
);
51+
52+
public static final Setting<Integer> CAT_SEGMENTS_LIMIT_SETTING = Setting.intSetting(
53+
"cat.segments.limit",
54+
-1,
55+
Setting.Property.NodeScope,
56+
Setting.Property.Dynamic
57+
);
58+
59+
public RequestLimitSettings(ClusterSettings clusterSettings, Settings settings) {
60+
setCatShardsLimitSetting(CAT_SHARDS_LIMIT_SETTING.get(settings));
61+
setCatIndicesLimitSetting(CAT_INDICES_LIMIT_SETTING.get(settings));
62+
setCatSegmentsLimitSetting(CAT_SEGMENTS_LIMIT_SETTING.get(settings));
63+
64+
clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimitSetting);
65+
clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimitSetting);
66+
clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimitSetting);
67+
}
68+
69+
/**
70+
* Method to check if the circuit breaker limit has reached for an action.
71+
* The limits are controlled via dynamic settings.
72+
*
73+
* @param clusterState {@link ClusterState}
74+
* @param actionToCheck {@link BlockAction}
75+
* @return True/False
76+
*/
77+
public boolean isCircuitBreakerLimitBreached(final ClusterState clusterState, BlockAction actionToCheck) {
78+
if (Objects.isNull(clusterState)) return false;
79+
switch (actionToCheck) {
80+
case CAT_INDICES:
81+
if (catIndicesLimit <= 0) return false;
82+
int indicesCount = getTotalIndices(clusterState);
83+
if (indicesCount > catIndicesLimit) return true;
84+
break;
85+
case CAT_SHARDS:
86+
if (catShardsLimit <= 0) return false;
87+
int totalShards = getTotalShards(clusterState);
88+
if (totalShards > catShardsLimit) return true;
89+
break;
90+
case CAT_SEGMENTS:
91+
if (catSegmentsLimit <= 0) return false;
92+
if (getTotalIndices(clusterState) > catSegmentsLimit) return true;
93+
break;
94+
}
95+
return false;
96+
}
97+
98+
private void setCatShardsLimitSetting(final int catShardsLimit) {
99+
this.catShardsLimit = catShardsLimit;
100+
}
101+
102+
private void setCatIndicesLimitSetting(final int catIndicesLimit) {
103+
this.catIndicesLimit = catIndicesLimit;
104+
}
105+
106+
private void setCatSegmentsLimitSetting(final int catSegmentsLimit) {
107+
this.catSegmentsLimit = catSegmentsLimit;
108+
}
109+
110+
private static int getTotalIndices(final ClusterState clusterState) {
111+
return chainWalk(() -> clusterState.getMetadata().getIndices().size(), 0);
112+
}
113+
114+
private static int getTotalShards(final ClusterState clusterState) {
115+
final RoutingTable routingTable = clusterState.getRoutingTable();
116+
final Map<String, IndexRoutingTable> indexRoutingTableMap = routingTable.getIndicesRouting();
117+
int totalShards = 0;
118+
for (final Map.Entry<String, IndexRoutingTable> entry : indexRoutingTableMap.entrySet()) {
119+
for (final Map.Entry<Integer, IndexShardRoutingTable> indexShardRoutingTableEntry : entry.getValue().getShards().entrySet()) {
120+
totalShards += indexShardRoutingTableEntry.getValue().getShards().size();
121+
}
122+
}
123+
return totalShards;
124+
}
125+
126+
// TODO: Evaluate if we can move this to common util.
127+
private static <T> T chainWalk(Supplier<T> supplier, T defaultValue) {
128+
try {
129+
return supplier.get();
130+
} catch (NullPointerException e) {
131+
return defaultValue;
132+
}
133+
}
134+
}

server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.opensearch.core.action.ActionListener;
5858
import org.opensearch.core.action.ActionResponse;
5959
import org.opensearch.core.common.Strings;
60+
import org.opensearch.core.common.breaker.CircuitBreaker;
61+
import org.opensearch.core.common.breaker.CircuitBreakingException;
6062
import org.opensearch.index.IndexSettings;
6163
import org.opensearch.rest.RestRequest;
6264
import org.opensearch.rest.RestResponse;
@@ -81,6 +83,7 @@
8183
import static java.util.Collections.unmodifiableList;
8284
import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
8385
import static org.opensearch.rest.RestRequest.Method.GET;
86+
import static org.opensearch.rest.action.cat.RequestLimitSettings.BlockAction.CAT_INDICES;
8487

8588
/**
8689
* _cat API action to list indices
@@ -96,6 +99,12 @@ public class RestIndicesAction extends AbstractCatAction {
9699
private static final String DUPLICATE_PARAMETER_ERROR_MESSAGE =
97100
"Please only use one of the request parameters [master_timeout, cluster_manager_timeout].";
98101

102+
private final RequestLimitSettings requestLimitSettings;
103+
104+
public RestIndicesAction(RequestLimitSettings requestLimitSettings) {
105+
this.requestLimitSettings = requestLimitSettings;
106+
}
107+
99108
@Override
100109
public List<Route> routes() {
101110
return unmodifiableList(asList(new Route(GET, "/_cat/indices"), new Route(GET, "/_cat/indices/{index}")));
@@ -151,48 +160,66 @@ public RestResponse buildResponse(final Table table) throws Exception {
151160
new ActionListener<GetSettingsResponse>() {
152161
@Override
153162
public void onResponse(final GetSettingsResponse getSettingsResponse) {
154-
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(request, 4, listener);
155-
groupedListener.onResponse(getSettingsResponse);
156-
157163
// The list of indices that will be returned is determined by the indices returned from the Get Settings call.
158164
// All the other requests just provide additional detail, and wildcards may be resolved differently depending on the
159165
// type of request in the presence of security plugins (looking at you, ClusterHealthRequest), so
160166
// force the IndicesOptions for all the sub-requests to be as inclusive as possible.
161167
final IndicesOptions subRequestIndicesOptions = IndicesOptions.lenientExpandHidden();
162168

163-
// Indices that were successfully resolved during the get settings request might be deleted when the subsequent
164-
// cluster
165-
// state, cluster health and indices stats requests execute. We have to distinguish two cases:
166-
// 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we want the
167-
// subsequent requests to fail.
168-
// 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent requests
169-
// not to
170-
// fail on the deleted index (as we want to ignore wildcards that cannot be resolved).
171-
// This behavior can be ensured by letting the cluster state, cluster health and indices stats requests re-resolve
172-
// the
173-
// index names with the same indices options that we used for the initial cluster state request (strictExpand).
174-
sendIndicesStatsRequest(
175-
indices,
176-
subRequestIndicesOptions,
177-
includeUnloadedSegments,
178-
client,
179-
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
180-
);
169+
// Indices that were successfully resolved during the get settings request might be deleted when the
170+
// subsequent cluster state, cluster health and indices stats requests execute. We have to distinguish two cases:
171+
// 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we
172+
// want the subsequent requests to fail.
173+
// 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent
174+
// requests not to fail on the deleted index (as we want to ignore wildcards that cannot be resolved).
175+
// This behavior can be ensured by letting the cluster state, cluster health and indices stats requests
176+
// re-resolve the index names with the same indices options that we used for the initial cluster state
177+
// request (strictExpand).
181178
sendClusterStateRequest(
182179
indices,
183180
subRequestIndicesOptions,
184181
local,
185182
clusterManagerNodeTimeout,
186183
client,
187-
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
188-
);
189-
sendClusterHealthRequest(
190-
indices,
191-
subRequestIndicesOptions,
192-
local,
193-
clusterManagerNodeTimeout,
194-
client,
195-
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
184+
new ActionListener<ClusterStateResponse>() {
185+
@Override
186+
public void onResponse(ClusterStateResponse clusterStateResponse) {
187+
if (requestLimitSettings.isCircuitBreakerLimitBreached(clusterStateResponse.getState(), CAT_INDICES)) {
188+
listener.onFailure(
189+
new CircuitBreakingException("Too many indices requested.", CircuitBreaker.Durability.TRANSIENT)
190+
);
191+
}
192+
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(
193+
request,
194+
4,
195+
listener
196+
);
197+
groupedListener.onResponse(getSettingsResponse);
198+
groupedListener.onResponse(clusterStateResponse);
199+
200+
sendIndicesStatsRequest(
201+
indices,
202+
subRequestIndicesOptions,
203+
includeUnloadedSegments,
204+
client,
205+
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
206+
);
207+
208+
sendClusterHealthRequest(
209+
indices,
210+
subRequestIndicesOptions,
211+
local,
212+
clusterManagerNodeTimeout,
213+
client,
214+
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
215+
);
216+
}
217+
218+
@Override
219+
public void onFailure(Exception e) {
220+
listener.onFailure(e);
221+
}
222+
}
196223
);
197224
}
198225

server/src/main/java/org/opensearch/rest/action/cat/RestSegmentsAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
9696
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
9797
@Override
9898
public void processResponse(final ClusterStateResponse clusterStateResponse) {
99+
/*
100+
if (requestLimitSettings.isRequestLimitBreached(clusterStateResponse, CAT_SEGMENTS)) {
101+
listener.onFailure(new CircuitBreakingException("Segments from too many indices requested.", CircuitBreaker.Durability.TRANSIENT));
102+
}
103+
*/
99104
final IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest();
100105
indicesSegmentsRequest.indices(indices);
101106
client.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) {

0 commit comments

Comments
 (0)