Skip to content

Commit 38a95a2

Browse files
committed
refactor: move executor to AbstractWorkflowExecutor, add submit method (#1957)
1 parent 30c08cf commit 38a95a2

File tree

3 files changed

+18
-21
lines changed

3 files changed

+18
-21
lines changed

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.Optional;
66
import java.util.Set;
77
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.concurrent.ExecutorService;
89
import java.util.concurrent.Future;
910
import java.util.stream.Collectors;
1011

@@ -30,12 +31,14 @@ public abstract class AbstractWorkflowExecutor<P extends HasMetadata> {
3031
private final Map<DependentResourceNode, Future<?>> actualExecutions = new ConcurrentHashMap<>();
3132
private final Map<DependentResourceNode, Exception> exceptionsDuringExecution =
3233
new ConcurrentHashMap<>();
34+
private final ExecutorService executorService;
3335

3436
public AbstractWorkflowExecutor(Workflow<P> workflow, P primary, Context<P> context) {
3537
this.workflow = workflow;
3638
this.primary = primary;
3739
this.context = context;
3840
this.primaryID = ResourceID.fromResource(primary);
41+
executorService = context.getWorkflowExecutorService();
3942
}
4043

4144
protected abstract Logger logger();
@@ -107,10 +110,16 @@ protected synchronized void handleNodeExecutionFinish(
107110
}
108111
}
109112

110-
@SuppressWarnings("unchecked")
111113
protected <R> boolean isConditionMet(Optional<Condition<R, P>> condition,
112114
DependentResource<R, P> dependentResource) {
113-
return condition.map(c -> c.isMet(dependentResource, primary, context))
114-
.orElse(true);
115+
return condition.map(c -> c.isMet(dependentResource, primary, context)).orElse(true);
116+
}
117+
118+
protected <R> void submit(DependentResourceNode<R, P> dependentResourceNode,
119+
NodeExecutor<R, P> nodeExecutor, String operation) {
120+
final Future<?> future = executorService.submit(nodeExecutor);
121+
markAsExecuting(dependentResourceNode, future);
122+
logger().debug("Submitted to {}: {} primaryID: {}", operation, dependentResourceNode,
123+
primaryID);
115124
}
116125
}

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import java.util.List;
44
import java.util.Set;
55
import java.util.concurrent.ConcurrentHashMap;
6-
import java.util.concurrent.ExecutorService;
7-
import java.util.concurrent.Future;
86
import java.util.stream.Collectors;
97

108
import org.slf4j.Logger;
@@ -19,15 +17,14 @@
1917
public class WorkflowCleanupExecutor<P extends HasMetadata> extends AbstractWorkflowExecutor<P> {
2018

2119
private static final Logger log = LoggerFactory.getLogger(WorkflowCleanupExecutor.class);
20+
private static final String CLEANUP = "cleanup";
2221

2322
private final Set<DependentResourceNode> postDeleteConditionNotMet =
2423
ConcurrentHashMap.newKeySet();
2524
private final Set<DependentResourceNode> deleteCalled = ConcurrentHashMap.newKeySet();
26-
private final ExecutorService executorService;
2725

2826
public WorkflowCleanupExecutor(Workflow<P> workflow, P primary, Context<P> context) {
2927
super(workflow, primary, context);
30-
this.executorService = context.getWorkflowExecutorService();
3128
}
3229

3330
public synchronized WorkflowCleanupResult cleanup() {
@@ -55,9 +52,7 @@ private synchronized void handleCleanup(DependentResourceNode dependentResourceN
5552
return;
5653
}
5754

58-
Future<?> nodeFuture = executorService.submit(new CleanupExecutor<>(dependentResourceNode));
59-
markAsExecuting(dependentResourceNode, nodeFuture);
60-
log.debug("Submitted for cleanup: {}", dependentResourceNode);
55+
submit(dependentResourceNode, new CleanupExecutor<>(dependentResourceNode), CLEANUP);
6156
}
6257

6358

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import java.util.Map;
55
import java.util.Set;
66
import java.util.concurrent.ConcurrentHashMap;
7-
import java.util.concurrent.ExecutorService;
8-
import java.util.concurrent.Future;
97
import java.util.stream.Collectors;
108

119
import org.slf4j.Logger;
@@ -21,6 +19,8 @@
2119
public class WorkflowReconcileExecutor<P extends HasMetadata> extends AbstractWorkflowExecutor<P> {
2220

2321
private static final Logger log = LoggerFactory.getLogger(WorkflowReconcileExecutor.class);
22+
private static final String RECONCILE = "reconcile";
23+
private static final String DELETE = "delete";
2424

2525

2626
private final Set<DependentResourceNode> notReady = ConcurrentHashMap.newKeySet();
@@ -32,11 +32,9 @@ public class WorkflowReconcileExecutor<P extends HasMetadata> extends AbstractWo
3232
private final Set<DependentResourceNode> reconciled = ConcurrentHashMap.newKeySet();
3333
private final Map<DependentResource, ReconcileResult> reconcileResults =
3434
new ConcurrentHashMap<>();
35-
private final ExecutorService executorService;
3635

3736
public WorkflowReconcileExecutor(Workflow<P> workflow, P primary, Context<P> context) {
3837
super(workflow, primary, context);
39-
this.executorService = context.getWorkflowExecutorService();
4038
}
4139

4240
public synchronized WorkflowReconcileResult reconcile() {
@@ -69,9 +67,7 @@ private synchronized <R> void handleReconcile(DependentResourceNode<R, P> depend
6967
if (!reconcileConditionMet) {
7068
handleReconcileConditionNotMet(dependentResourceNode);
7169
} else {
72-
var nodeFuture = executorService.submit(new NodeReconcileExecutor(dependentResourceNode));
73-
markAsExecuting(dependentResourceNode, nodeFuture);
74-
log.debug("Submitted to reconcile: {} primaryID: {}", dependentResourceNode, primaryID);
70+
submit(dependentResourceNode, new NodeReconcileExecutor<>(dependentResourceNode), RECONCILE);
7571
}
7672
}
7773

@@ -87,10 +83,7 @@ private synchronized void handleDelete(DependentResourceNode dependentResourceNo
8783
return;
8884
}
8985

90-
Future<?> nodeFuture = executorService
91-
.submit(new NodeDeleteExecutor(dependentResourceNode));
92-
markAsExecuting(dependentResourceNode, nodeFuture);
93-
log.debug("Submitted to delete: {}", dependentResourceNode);
86+
submit(dependentResourceNode, new NodeDeleteExecutor<>(dependentResourceNode), DELETE);
9487
}
9588

9689
private boolean allDependentsDeletedAlready(DependentResourceNode<?, P> dependentResourceNode) {

0 commit comments

Comments
 (0)