Skip to content

[WLM] Add WLM mode validation for workload group CRUD requests #18652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add NodeResourceUsageStats to ClusterInfo ([#18480](https://github.com/opensearch-project/OpenSearch/issues/18472))
- Introduce SecureHttpTransportParameters experimental API (to complement SecureTransportParameters counterpart) ([#18572](https://github.com/opensearch-project/OpenSearch/issues/18572))
- Create equivalents of JSM's AccessController in the java agent ([#18346](https://github.com/opensearch-project/OpenSearch/issues/18346))
- Introduced a new cluster-level API to fetch remote store metadata (segments and translogs) for each shard of an index. ([#18257](https://github.com/opensearch-project/OpenSearch/pull/18257))
- [WLM] Add WLM mode validation for workload group CRUD requests ([#18346](https://github.com/opensearch-project/OpenSearch/issues/18346))
- Introduced a new cluster-level API to fetch remote store metadata (segments and translogs) for each shard of an index. ([#18652](https://github.com/opensearch-project/OpenSearch/pull/18652))
- Add last index request timestamp columns to the `_cat/indices` API. ([10766](https://github.com/opensearch-project/OpenSearch/issues/10766))
- Introduce a new pull-based ingestion plugin for file-based indexing (for local testing) ([#18591](https://github.com/opensearch-project/OpenSearch/pull/18591))
- Add support for search pipeline in search and msearch template ([#18564](https://github.com/opensearch-project/OpenSearch/pull/18564))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.wlm.WlmMode;
import org.opensearch.wlm.WorkloadManagementSettings;

/**
* Central provider for maintaining and supplying the current values of non-plugin cluster settings.
* This class listens for updates to relevant settings and provides the latest setting values.
*/
public class NonPluginSettingValuesProvider {

private volatile WlmMode wlmMode;

/**
* Constructor for NonPluginSettingValuesProvider
* @param settings OpenSearch settings
* @param clusterSettings Cluster settings to register update listener
*/
public NonPluginSettingValuesProvider(Settings settings, ClusterSettings clusterSettings) {
this.wlmMode = WorkloadManagementSettings.WLM_MODE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(WorkloadManagementSettings.WLM_MODE_SETTING, this::setWlmMode);
}

/**
* Check if WLM mode is ENABLED
* Throws an IllegalStateException if WLM mode is DISABLED or MONITOR ONLY.
* @param operationDescription A short text describing the operation, e.g. "create workload group".
*/
public void ensureWlmEnabled(String operationDescription) {
if (wlmMode != WlmMode.ENABLED) {
throw new IllegalStateException(
"Cannot "
+ operationDescription
+ " because workload management mode is disabled or monitor_only."
+ "To enable this feature, set [wlm.workload_group.mode] to 'enabled' in cluster settings."
);
}
}

/**
* Set the latest WLM mode.
* @param mode The wlm mode to set
*/
void setWlmMode(WlmMode mode) {
this.wlmMode = mode;
}

/**
* Get the latest WLM mode.
*/
public WlmMode getWlmMode() {
return wlmMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class WorkloadManagementPlugin extends Plugin implements ActionPlugin, Sy
private static FeatureType featureType;
private static RulePersistenceService rulePersistenceService;
private static RuleRoutingService ruleRoutingService;
private NonPluginSettingValuesProvider nonPluginSettingValuesProvider;
private AutoTaggingActionFilter autoTaggingActionFilter;

/**
Expand All @@ -112,6 +113,10 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
nonPluginSettingValuesProvider = new NonPluginSettingValuesProvider(
clusterService.getSettings(),
clusterService.getClusterSettings()
);
featureType = new WorkloadGroupFeatureType(new WorkloadGroupFeatureValueValidator(clusterService));
RuleEntityParser parser = new XContentRuleParser(featureType);
AttributeValueStoreFactory attributeValueStoreFactory = new AttributeValueStoreFactory(
Expand All @@ -132,12 +137,10 @@ public Collection<Object> createComponents(
RefreshBasedSyncMechanism refreshMechanism = new RefreshBasedSyncMechanism(
threadPool,
clusterService.getSettings(),
clusterService.getClusterSettings(),
parser,
ruleProcessingService,
featureType,
rulePersistenceService,
new RuleEventClassifier(Collections.emptySet(), ruleProcessingService)
new RuleEventClassifier(Collections.emptySet(), ruleProcessingService),
nonPluginSettingValuesProvider
);

autoTaggingActionFilter = new AutoTaggingActionFilter(ruleProcessingService, threadPool);
Expand Down Expand Up @@ -181,10 +184,10 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(
new RestCreateWorkloadGroupAction(),
new RestCreateWorkloadGroupAction(nonPluginSettingValuesProvider),
new RestGetWorkloadGroupAction(),
new RestDeleteWorkloadGroupAction(),
new RestUpdateWorkloadGroupAction()
new RestDeleteWorkloadGroupAction(nonPluginSettingValuesProvider),
new RestUpdateWorkloadGroupAction(nonPluginSettingValuesProvider)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.plugin.wlm.NonPluginSettingValuesProvider;
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupAction;
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupRequest;
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupResponse;
Expand All @@ -35,10 +36,15 @@
*/
public class RestCreateWorkloadGroupAction extends BaseRestHandler {

private final NonPluginSettingValuesProvider nonPluginSettingValuesProvider;

/**
* Constructor for RestCreateWorkloadGroupAction
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
*/
public RestCreateWorkloadGroupAction() {}
public RestCreateWorkloadGroupAction(NonPluginSettingValuesProvider nonPluginSettingValuesProvider) {
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
}

@Override
public String getName() {
Expand All @@ -55,6 +61,7 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
nonPluginSettingValuesProvider.ensureWlmEnabled("create workload group");
try (XContentParser parser = request.contentParser()) {
CreateWorkloadGroupRequest createWorkloadGroupRequest = CreateWorkloadGroupRequest.fromXContent(parser);
return channel -> client.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.wlm.rest;

import org.opensearch.plugin.wlm.NonPluginSettingValuesProvider;
import org.opensearch.plugin.wlm.action.DeleteWorkloadGroupAction;
import org.opensearch.plugin.wlm.action.DeleteWorkloadGroupRequest;
import org.opensearch.rest.BaseRestHandler;
Expand All @@ -27,10 +28,15 @@
*/
public class RestDeleteWorkloadGroupAction extends BaseRestHandler {

private final NonPluginSettingValuesProvider nonPluginSettingValuesProvider;

/**
* Constructor for RestDeleteWorkloadGroupAction
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
*/
public RestDeleteWorkloadGroupAction() {}
public RestDeleteWorkloadGroupAction(NonPluginSettingValuesProvider nonPluginSettingValuesProvider) {
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
}

@Override
public String getName() {
Expand All @@ -47,6 +53,7 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
nonPluginSettingValuesProvider.ensureWlmEnabled("delete workload group");
DeleteWorkloadGroupRequest deleteWorkloadGroupRequest = new DeleteWorkloadGroupRequest(request.param("name"));
deleteWorkloadGroupRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", deleteWorkloadGroupRequest.clusterManagerNodeTimeout())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.plugin.wlm.NonPluginSettingValuesProvider;
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupAction;
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupRequest;
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupResponse;
Expand All @@ -35,10 +36,15 @@
*/
public class RestUpdateWorkloadGroupAction extends BaseRestHandler {

private final NonPluginSettingValuesProvider nonPluginSettingValuesProvider;

/**
* Constructor for RestUpdateWorkloadGroupAction
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
*/
public RestUpdateWorkloadGroupAction() {}
public RestUpdateWorkloadGroupAction(NonPluginSettingValuesProvider nonPluginSettingValuesProvider) {
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
}

@Override
public String getName() {
Expand All @@ -55,6 +61,7 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
nonPluginSettingValuesProvider.ensureWlmEnabled("update workload group");
try (XContentParser parser = request.contentParser()) {
UpdateWorkloadGroupRequest updateWorkloadGroupRequest = UpdateWorkloadGroupRequest.fromXContent(parser, request.param("name"));
return channel -> client.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.NonPluginSettingValuesProvider;
import org.opensearch.plugin.wlm.rule.sync.detect.RuleEvent;
import org.opensearch.plugin.wlm.rule.sync.detect.RuleEventClassifier;
import org.opensearch.rule.InMemoryRuleProcessingService;
import org.opensearch.rule.RuleEntityParser;
import org.opensearch.rule.RulePersistenceService;
import org.opensearch.rule.action.GetRuleRequest;
import org.opensearch.rule.action.GetRuleResponse;
Expand All @@ -28,7 +26,6 @@
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.WlmMode;
import org.opensearch.wlm.WorkloadManagementSettings;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -65,12 +62,10 @@ public class RefreshBasedSyncMechanism extends AbstractLifecycleComponent {
private final ThreadPool threadPool;
private long refreshInterval;
private volatile Scheduler.Cancellable scheduledFuture;
private final RuleEntityParser parser;
private final InMemoryRuleProcessingService ruleProcessingService;
private final RulePersistenceService rulePersistenceService;
private final RuleEventClassifier ruleEventClassifier;
private final FeatureType featureType;
private WlmMode wlmMode;
private final NonPluginSettingValuesProvider nonPluginSettingValuesProvider;
// This var keeps the Rules which were present during last run of this service
private Set<Rule> lastRunIndexedRules;
private static final Logger logger = LogManager.getLogger(RefreshBasedSyncMechanism.class);
Expand All @@ -80,41 +75,34 @@ public class RefreshBasedSyncMechanism extends AbstractLifecycleComponent {
*
* @param threadPool
* @param settings
* @param clusterSettings
* @param parser
* @param ruleProcessingService
* @param featureType
* @param rulePersistenceService
* @param ruleEventClassifier
* @param nonPluginSettingValuesProvider
*/
public RefreshBasedSyncMechanism(
ThreadPool threadPool,
Settings settings,
ClusterSettings clusterSettings,
RuleEntityParser parser,
InMemoryRuleProcessingService ruleProcessingService,
FeatureType featureType,
RulePersistenceService rulePersistenceService,
RuleEventClassifier ruleEventClassifier
RuleEventClassifier ruleEventClassifier,
NonPluginSettingValuesProvider nonPluginSettingValuesProvider
) {
this.threadPool = threadPool;
refreshInterval = RULE_SYNC_REFRESH_INTERVAL_SETTING.get(settings);
this.parser = parser;
this.ruleProcessingService = ruleProcessingService;
this.featureType = featureType;
this.rulePersistenceService = rulePersistenceService;
this.lastRunIndexedRules = new HashSet<>();
this.ruleEventClassifier = ruleEventClassifier;
wlmMode = WorkloadManagementSettings.WLM_MODE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(WorkloadManagementSettings.WLM_MODE_SETTING, this::setWlmMode);
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
}

/**
* synchronized check is needed in case two scheduled runs happen concurrently though highly improbable
* but theoretically possible
*/
synchronized void doRun() {
if (wlmMode != WlmMode.ENABLED) {
if (nonPluginSettingValuesProvider.getWlmMode() != WlmMode.ENABLED) {
return;
}

Expand Down Expand Up @@ -161,8 +149,4 @@ protected void doClose() throws IOException {
scheduledFuture.cancel();
}
}

void setWlmMode(WlmMode mode) {
this.wlmMode = mode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugin.wlm.rule.sync.RefreshBasedSyncMechanism;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.wlm.WlmMode;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.junit.Before;

import java.util.HashSet;

public class NonPluginSettingValuesProviderTests extends OpenSearchTestCase {

private NonPluginSettingValuesProvider provider;
private ClusterSettings clusterSettings;

@Before
public void setUp() throws Exception {
super.setUp();
try (WorkloadManagementPlugin plugin = new WorkloadManagementPlugin()) {
Settings settings = Settings.builder()
.put(RefreshBasedSyncMechanism.RULE_SYNC_REFRESH_INTERVAL_SETTING_NAME, 1000)
.put(WorkloadManagementSettings.WLM_MODE_SETTING_NAME, "monitor_only")
.build();
clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(plugin.getSettings()));
clusterSettings.registerSetting(WorkloadManagementSettings.WLM_MODE_SETTING);
provider = new NonPluginSettingValuesProvider(settings, clusterSettings);
}
}

public void testInitialWlmModeFromSettings() {
assertEquals(provider.getWlmMode(), WlmMode.MONITOR_ONLY);
}

public void testSetWlmModeUpdatesValue() {
provider.setWlmMode(WlmMode.ENABLED);
assertEquals(provider.getWlmMode(), WlmMode.ENABLED);
}

public void testEnsureWlmEnabledThrowsWhenDisabled() {
provider.setWlmMode(WlmMode.DISABLED);
try {
provider.ensureWlmEnabled("delete workload group");
fail("Expected exception when WLM mode is DISABLED");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("delete workload group"));
}
}

public void testEnsureWlmEnabledThrowsWhenMonitorOnly() {
try {
provider.ensureWlmEnabled("update workload group");
fail("Expected exception when WLM mode is MONITOR_ONLY");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("update workload group"));
}
}

public void testEnsureWlmEnabledSucceedWhenEnabled() {
provider.setWlmMode(WlmMode.ENABLED);
try {
provider.ensureWlmEnabled("delete workload group");
} catch (Exception e) {
fail();
}
}
}
Loading
Loading