Skip to content

Commit 47707fe

Browse files
committed
modify based on comments
1 parent d4ac9c7 commit 47707fe

10 files changed

+143
-103
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.wlm;
10+
11+
import org.opensearch.common.settings.ClusterSettings;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.wlm.WlmMode;
14+
import org.opensearch.wlm.WorkloadManagementSettings;
15+
16+
/**
17+
* Central manager for maintaining the current WLM mode with updates from cluster settings.
18+
*/
19+
public class WlmModeManager {
20+
21+
private volatile WlmMode wlmMode;
22+
23+
/**
24+
* Constructor to initialize and register listener for WLM mode changes.
25+
* @param settings OpenSearch settings
26+
* @param clusterSettings Cluster settings to register update listener
27+
*/
28+
public WlmModeManager(Settings settings, ClusterSettings clusterSettings) {
29+
this.wlmMode = WorkloadManagementSettings.WLM_MODE_SETTING.get(settings);
30+
clusterSettings.addSettingsUpdateConsumer(WorkloadManagementSettings.WLM_MODE_SETTING, this::setWlmMode);
31+
}
32+
33+
/**
34+
* Check if WLM mode is ENABLED or MONITOR_ONLY.
35+
* Throws an IllegalStateException if WLM mode is DISABLED.
36+
* @param operationDescription A short text describing the operation, e.g. "create workload group".
37+
*/
38+
public void verifyWlmNotDisabled(String operationDescription) {
39+
if (wlmMode == WlmMode.DISABLED) {
40+
throw new IllegalStateException(
41+
"Cannot "
42+
+ operationDescription
43+
+ " because workload management mode is DISABLED. "
44+
+ "To enable this feature, set [wlm.workload_group.mode] to ENABLED or MONITOR_ONLY in cluster settings."
45+
);
46+
}
47+
}
48+
49+
/**
50+
* Set the latest WLM mode.
51+
* @param mode The wlm mode to set
52+
*/
53+
void setWlmMode(WlmMode mode) {
54+
this.wlmMode = mode;
55+
}
56+
57+
/**
58+
* Get the latest WLM mode.
59+
*/
60+
public WlmMode getWlmMode() {
61+
return wlmMode;
62+
}
63+
}

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import org.opensearch.transport.TransportService;
6969
import org.opensearch.transport.client.Client;
7070
import org.opensearch.watcher.ResourceWatcherService;
71-
import org.opensearch.wlm.WorkloadManagementSettings;
7271

7372
import java.util.Collection;
7473
import java.util.Collections;
@@ -92,7 +91,7 @@ public class WorkloadManagementPlugin extends Plugin implements ActionPlugin, Sy
9291
private static FeatureType featureType;
9392
private static RulePersistenceService rulePersistenceService;
9493
private static RuleRoutingService ruleRoutingService;
95-
private WorkloadManagementSettings workloadManagementSettings;
94+
private WlmModeManager wlmModeManager;
9695
private AutoTaggingActionFilter autoTaggingActionFilter;
9796

9897
/**
@@ -114,7 +113,7 @@ public Collection<Object> createComponents(
114113
IndexNameExpressionResolver indexNameExpressionResolver,
115114
Supplier<RepositoriesService> repositoriesServiceSupplier
116115
) {
117-
workloadManagementSettings = new WorkloadManagementSettings(environment.settings(), clusterService.getClusterSettings());
116+
wlmModeManager = new WlmModeManager(clusterService.getSettings(), clusterService.getClusterSettings());
118117
featureType = new WorkloadGroupFeatureType(new WorkloadGroupFeatureValueValidator(clusterService));
119118
RuleEntityParser parser = new XContentRuleParser(featureType);
120119
AttributeValueStoreFactory attributeValueStoreFactory = new AttributeValueStoreFactory(
@@ -135,16 +134,14 @@ public Collection<Object> createComponents(
135134
RefreshBasedSyncMechanism refreshMechanism = new RefreshBasedSyncMechanism(
136135
threadPool,
137136
clusterService.getSettings(),
138-
clusterService.getClusterSettings(),
139-
parser,
140-
ruleProcessingService,
141137
featureType,
142138
rulePersistenceService,
143-
new RuleEventClassifier(Collections.emptySet(), ruleProcessingService)
139+
new RuleEventClassifier(Collections.emptySet(), ruleProcessingService),
140+
wlmModeManager
144141
);
145142

146143
autoTaggingActionFilter = new AutoTaggingActionFilter(ruleProcessingService, threadPool);
147-
return List.of(refreshMechanism, workloadManagementSettings);
144+
return List.of(refreshMechanism);
148145
}
149146

150147
@Override
@@ -184,10 +181,10 @@ public List<RestHandler> getRestHandlers(
184181
Supplier<DiscoveryNodes> nodesInCluster
185182
) {
186183
return List.of(
187-
new RestCreateWorkloadGroupAction(workloadManagementSettings),
184+
new RestCreateWorkloadGroupAction(wlmModeManager),
188185
new RestGetWorkloadGroupAction(),
189-
new RestDeleteWorkloadGroupAction(workloadManagementSettings),
190-
new RestUpdateWorkloadGroupAction(workloadManagementSettings)
186+
new RestDeleteWorkloadGroupAction(wlmModeManager),
187+
new RestUpdateWorkloadGroupAction(wlmModeManager)
191188
);
192189
}
193190

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateWorkloadGroupAction.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.core.rest.RestStatus;
1212
import org.opensearch.core.xcontent.ToXContent;
1313
import org.opensearch.core.xcontent.XContentParser;
14+
import org.opensearch.plugin.wlm.WlmModeManager;
1415
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupAction;
1516
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupRequest;
1617
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupResponse;
@@ -21,8 +22,6 @@
2122
import org.opensearch.rest.RestResponse;
2223
import org.opensearch.rest.action.RestResponseListener;
2324
import org.opensearch.transport.client.node.NodeClient;
24-
import org.opensearch.wlm.WlmMode;
25-
import org.opensearch.wlm.WorkloadManagementSettings;
2625

2726
import java.io.IOException;
2827
import java.util.List;
@@ -37,14 +36,14 @@
3736
*/
3837
public class RestCreateWorkloadGroupAction extends BaseRestHandler {
3938

40-
private final WorkloadManagementSettings workloadManagementSettings;
39+
private final WlmModeManager wlmModeManager;
4140

4241
/**
4342
* Constructor for RestCreateWorkloadGroupAction
44-
* @param workloadManagementSettings the WorkloadManagementSettings instance to access the current WLM mode
43+
* @param wlmModeManager the WlmModeManager to access the current WLM mode
4544
*/
46-
public RestCreateWorkloadGroupAction(WorkloadManagementSettings workloadManagementSettings) {
47-
this.workloadManagementSettings = workloadManagementSettings;
45+
public RestCreateWorkloadGroupAction(WlmModeManager wlmModeManager) {
46+
this.wlmModeManager = wlmModeManager;
4847
}
4948

5049
@Override
@@ -62,9 +61,7 @@ public List<Route> routes() {
6261

6362
@Override
6463
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
65-
if (workloadManagementSettings.getWlmMode() == WlmMode.DISABLED) {
66-
throw new IllegalStateException("Workload management mode is DISABLED. Cannot create workload group.");
67-
}
64+
wlmModeManager.verifyWlmNotDisabled("create workload group");
6865
try (XContentParser parser = request.contentParser()) {
6966
CreateWorkloadGroupRequest createWorkloadGroupRequest = CreateWorkloadGroupRequest.fromXContent(parser);
7067
return channel -> client.execute(

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestDeleteWorkloadGroupAction.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@
88

99
package org.opensearch.plugin.wlm.rest;
1010

11+
import org.opensearch.plugin.wlm.WlmModeManager;
1112
import org.opensearch.plugin.wlm.action.DeleteWorkloadGroupAction;
1213
import org.opensearch.plugin.wlm.action.DeleteWorkloadGroupRequest;
1314
import org.opensearch.rest.BaseRestHandler;
1415
import org.opensearch.rest.RestRequest;
1516
import org.opensearch.rest.action.RestToXContentListener;
1617
import org.opensearch.transport.client.node.NodeClient;
17-
import org.opensearch.wlm.WlmMode;
18-
import org.opensearch.wlm.WorkloadManagementSettings;
1918

2019
import java.io.IOException;
2120
import java.util.List;
@@ -29,14 +28,14 @@
2928
*/
3029
public class RestDeleteWorkloadGroupAction extends BaseRestHandler {
3130

32-
private final WorkloadManagementSettings workloadManagementSettings;
31+
private final WlmModeManager wlmModeManager;
3332

3433
/**
3534
* Constructor for RestDeleteWorkloadGroupAction
36-
* @param workloadManagementSettings the WorkloadManagementSettings instance to access the current WLM mode
35+
* @param wlmModeManager the WlmModeManager to access the current WLM mode
3736
*/
38-
public RestDeleteWorkloadGroupAction(WorkloadManagementSettings workloadManagementSettings) {
39-
this.workloadManagementSettings = workloadManagementSettings;
37+
public RestDeleteWorkloadGroupAction(WlmModeManager wlmModeManager) {
38+
this.wlmModeManager = wlmModeManager;
4039
}
4140

4241
@Override
@@ -54,10 +53,7 @@ public List<Route> routes() {
5453

5554
@Override
5655
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
57-
if (workloadManagementSettings.getWlmMode() == WlmMode.DISABLED) {
58-
throw new IllegalStateException("Workload management mode is DISABLED. Cannot delete workload group.");
59-
}
60-
56+
wlmModeManager.verifyWlmNotDisabled("delete workload group");
6157
DeleteWorkloadGroupRequest deleteWorkloadGroupRequest = new DeleteWorkloadGroupRequest(request.param("name"));
6258
deleteWorkloadGroupRequest.clusterManagerNodeTimeout(
6359
request.paramAsTime("cluster_manager_timeout", deleteWorkloadGroupRequest.clusterManagerNodeTimeout())

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateWorkloadGroupAction.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88

99
package org.opensearch.plugin.wlm.rest;
1010

11-
import org.opensearch.common.inject.Inject;
1211
import org.opensearch.core.rest.RestStatus;
1312
import org.opensearch.core.xcontent.ToXContent;
1413
import org.opensearch.core.xcontent.XContentParser;
14+
import org.opensearch.plugin.wlm.WlmModeManager;
1515
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupAction;
1616
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupRequest;
1717
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupResponse;
@@ -22,8 +22,6 @@
2222
import org.opensearch.rest.RestResponse;
2323
import org.opensearch.rest.action.RestResponseListener;
2424
import org.opensearch.transport.client.node.NodeClient;
25-
import org.opensearch.wlm.WlmMode;
26-
import org.opensearch.wlm.WorkloadManagementSettings;
2725

2826
import java.io.IOException;
2927
import java.util.List;
@@ -38,15 +36,14 @@
3836
*/
3937
public class RestUpdateWorkloadGroupAction extends BaseRestHandler {
4038

41-
private final WorkloadManagementSettings workloadManagementSettings;
39+
private final WlmModeManager wlmModeManager;
4240

4341
/**
4442
* Constructor for RestUpdateWorkloadGroupAction
45-
* @param workloadManagementSettings the WorkloadManagementSettings instance to access the current WLM mode
43+
* @param wlmModeManager the WlmModeManager to access the current WLM mode
4644
*/
47-
@Inject
48-
public RestUpdateWorkloadGroupAction(WorkloadManagementSettings workloadManagementSettings) {
49-
this.workloadManagementSettings = workloadManagementSettings;
45+
public RestUpdateWorkloadGroupAction(WlmModeManager wlmModeManager) {
46+
this.wlmModeManager = wlmModeManager;
5047
}
5148

5249
@Override
@@ -64,9 +61,7 @@ public List<Route> routes() {
6461

6562
@Override
6663
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
67-
if (workloadManagementSettings.getWlmMode() == WlmMode.DISABLED) {
68-
throw new IllegalStateException("Workload management mode is DISABLED. Cannot update workload group.");
69-
}
64+
wlmModeManager.verifyWlmNotDisabled("update workload group");
7065
try (XContentParser parser = request.contentParser()) {
7166
UpdateWorkloadGroupRequest updateWorkloadGroupRequest = UpdateWorkloadGroupRequest.fromXContent(parser, request.param("name"));
7267
return channel -> client.execute(

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rule/sync/RefreshBasedSyncMechanism.java

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
14-
import org.opensearch.common.settings.ClusterSettings;
1514
import org.opensearch.common.settings.Setting;
1615
import org.opensearch.common.settings.Settings;
1716
import org.opensearch.common.unit.TimeValue;
1817
import org.opensearch.core.action.ActionListener;
18+
import org.opensearch.plugin.wlm.WlmModeManager;
1919
import org.opensearch.plugin.wlm.rule.sync.detect.RuleEvent;
2020
import org.opensearch.plugin.wlm.rule.sync.detect.RuleEventClassifier;
21-
import org.opensearch.rule.InMemoryRuleProcessingService;
22-
import org.opensearch.rule.RuleEntityParser;
2321
import org.opensearch.rule.RulePersistenceService;
2422
import org.opensearch.rule.action.GetRuleRequest;
2523
import org.opensearch.rule.action.GetRuleResponse;
@@ -28,7 +26,6 @@
2826
import org.opensearch.threadpool.Scheduler;
2927
import org.opensearch.threadpool.ThreadPool;
3028
import org.opensearch.wlm.WlmMode;
31-
import org.opensearch.wlm.WorkloadManagementSettings;
3229

3330
import java.io.IOException;
3431
import java.util.Collections;
@@ -65,12 +62,10 @@ public class RefreshBasedSyncMechanism extends AbstractLifecycleComponent {
6562
private final ThreadPool threadPool;
6663
private long refreshInterval;
6764
private volatile Scheduler.Cancellable scheduledFuture;
68-
private final RuleEntityParser parser;
69-
private final InMemoryRuleProcessingService ruleProcessingService;
7065
private final RulePersistenceService rulePersistenceService;
7166
private final RuleEventClassifier ruleEventClassifier;
7267
private final FeatureType featureType;
73-
private WlmMode wlmMode;
68+
private final WlmModeManager wlmModeManager;
7469
// This var keeps the Rules which were present during last run of this service
7570
private Set<Rule> lastRunIndexedRules;
7671
private static final Logger logger = LogManager.getLogger(RefreshBasedSyncMechanism.class);
@@ -80,41 +75,34 @@ public class RefreshBasedSyncMechanism extends AbstractLifecycleComponent {
8075
*
8176
* @param threadPool
8277
* @param settings
83-
* @param clusterSettings
84-
* @param parser
85-
* @param ruleProcessingService
8678
* @param featureType
8779
* @param rulePersistenceService
8880
* @param ruleEventClassifier
81+
* @param wlmModeManager
8982
*/
9083
public RefreshBasedSyncMechanism(
9184
ThreadPool threadPool,
9285
Settings settings,
93-
ClusterSettings clusterSettings,
94-
RuleEntityParser parser,
95-
InMemoryRuleProcessingService ruleProcessingService,
9686
FeatureType featureType,
9787
RulePersistenceService rulePersistenceService,
98-
RuleEventClassifier ruleEventClassifier
88+
RuleEventClassifier ruleEventClassifier,
89+
WlmModeManager wlmModeManager
9990
) {
10091
this.threadPool = threadPool;
10192
refreshInterval = RULE_SYNC_REFRESH_INTERVAL_SETTING.get(settings);
102-
this.parser = parser;
103-
this.ruleProcessingService = ruleProcessingService;
10493
this.featureType = featureType;
10594
this.rulePersistenceService = rulePersistenceService;
10695
this.lastRunIndexedRules = new HashSet<>();
10796
this.ruleEventClassifier = ruleEventClassifier;
108-
wlmMode = WorkloadManagementSettings.WLM_MODE_SETTING.get(settings);
109-
clusterSettings.addSettingsUpdateConsumer(WorkloadManagementSettings.WLM_MODE_SETTING, this::setWlmMode);
97+
this.wlmModeManager = wlmModeManager;
11098
}
11199

112100
/**
113101
* synchronized check is needed in case two scheduled runs happen concurrently though highly improbable
114102
* but theoretically possible
115103
*/
116104
synchronized void doRun() {
117-
if (wlmMode != WlmMode.ENABLED) {
105+
if (wlmModeManager.getWlmMode() != WlmMode.ENABLED) {
118106
return;
119107
}
120108

@@ -161,8 +149,4 @@ protected void doClose() throws IOException {
161149
scheduledFuture.cancel();
162150
}
163151
}
164-
165-
void setWlmMode(WlmMode mode) {
166-
this.wlmMode = mode;
167-
}
168152
}

0 commit comments

Comments
 (0)