Skip to content

Commit 97e7a38

Browse files
committed
code-refactor
1 parent cd5b7c9 commit 97e7a38

File tree

5 files changed

+126
-67
lines changed

5 files changed

+126
-67
lines changed

core/src/main/java/io/temporal/samples/taskinteraction/Task.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package io.temporal.samples.taskinteraction;
2121

22+
import java.util.Objects;
23+
2224
public class Task {
2325

2426
private String token;
@@ -39,6 +41,19 @@ public TaskTitle getTitle() {
3941
return title;
4042
}
4143

44+
@Override
45+
public boolean equals(final Object o) {
46+
if (this == o) return true;
47+
if (o == null || getClass() != o.getClass()) return false;
48+
final Task task = (Task) o;
49+
return Objects.equals(token, task.token) && Objects.equals(title, task.title);
50+
}
51+
52+
@Override
53+
public int hashCode() {
54+
return Objects.hash(token, title);
55+
}
56+
4257
@Override
4358
public String toString() {
4459
return "Task{" + "token='" + token + '\'' + ", title=" + title + '}';
@@ -62,6 +77,19 @@ public void setValue(final String value) {
6277
this.value = value;
6378
}
6479

80+
@Override
81+
public boolean equals(final Object o) {
82+
if (this == o) return true;
83+
if (o == null || getClass() != o.getClass()) return false;
84+
final TaskTitle taskTitle = (TaskTitle) o;
85+
return Objects.equals(value, taskTitle.value);
86+
}
87+
88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(value);
91+
}
92+
6593
@Override
6694
public String toString() {
6795
return "TaskTitle{" + "value='" + value + '\'' + '}';

core/src/main/java/io/temporal/samples/taskinteraction/TaskService.java

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,34 +39,53 @@ public class TaskService<R> {
3939
ActivityTask.class,
4040
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build());
4141

42-
private final Map<String, CompletablePromise<R>> pendingPromises = new HashMap<>();
43-
private final Logger logger = Workflow.getLogger(TaskService.class);
44-
private final TaskClient listener =
45-
taskToken -> {
46-
logger.info("Completing task with token: " + taskToken);
42+
private TaskManager tasksManager = new TaskManager();
4743

48-
final CompletablePromise<R> completablePromise = pendingPromises.get(taskToken);
49-
completablePromise.complete(null);
50-
};
44+
private final Logger logger = Workflow.getLogger(TaskService.class);
5145

5246
public TaskService() {
53-
Workflow.registerListener(listener);
47+
48+
// This listener exposes a signal method that clients use to notify the task has been completed
49+
Workflow.registerListener(
50+
new TaskClient() {
51+
@Override
52+
public void completeTaskByToken(final String taskToken) {
53+
logger.info("Completing task with token: " + taskToken);
54+
tasksManager.completeTask(taskToken);
55+
}
56+
});
5457
}
5558

5659
public void executeTask(Task task) {
5760

5861
logger.info("Before creating task : " + task);
59-
final String token = task.getToken();
62+
63+
// Activity implementation is responsible for registering the task to the external service
64+
// (which is responsible for managing the task life-cycle)
6065
activity.createTask(task);
6166

6267
logger.info("Task created: " + task);
6368

64-
final CompletablePromise<R> promise = Workflow.newPromise();
65-
pendingPromises.put(token, promise);
66-
67-
// Wait promise to complete or fail
68-
promise.get();
69+
tasksManager.waitForTaskCompletion(task);
6970

7071
logger.info("Task completed: " + task);
7172
}
73+
74+
private class TaskManager {
75+
76+
private final Map<String, CompletablePromise<R>> tasks = new HashMap<>();
77+
78+
public void waitForTaskCompletion(final Task task) {
79+
final CompletablePromise<R> promise = Workflow.newPromise();
80+
tasks.put(task.getToken(), promise);
81+
// Wait promise to complete
82+
promise.get();
83+
}
84+
85+
public void completeTask(final String taskToken) {
86+
87+
final CompletablePromise<R> completablePromise = tasks.get(taskToken);
88+
completablePromise.complete(null);
89+
}
90+
}
7291
}

core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public interface WorkflowTaskManager {
2828
String WORKFLOW_ID = WorkflowTaskManager.class.getSimpleName();
2929

3030
@WorkflowMethod
31-
void execute(List<Task> inputPendingTask, List<String> inputTaskToComplete);
31+
void execute(final WorkflowTaskManagerImpl.PendingTasks pendingTasks);
3232

3333
@UpdateMethod
3434
void createTask(Task task);

core/src/main/java/io/temporal/samples/taskinteraction/WorkflowTaskManagerImpl.java

Lines changed: 58 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,85 +20,95 @@
2020
package io.temporal.samples.taskinteraction;
2121

2222
import io.temporal.workflow.Workflow;
23-
import java.util.*;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Optional;
26+
import java.util.StringTokenizer;
2427

2528
public class WorkflowTaskManagerImpl implements WorkflowTaskManager {
2629

27-
private List<Task> pendingTask;
2830

29-
private List<String> tasksToComplete;
31+
private PendingTasks pendingTasks = new PendingTasks();
3032

3133
@Override
32-
public void execute(List<Task> inputPendingTask, List<String> inputTaskToComplete) {
33-
initPendingTasks(inputPendingTask);
34-
initTaskToComplete(inputTaskToComplete);
34+
public void execute(final PendingTasks inputPendingTasks) {
3535

36-
while (true) {
36+
initTaskList(inputPendingTasks);
3737

38-
Workflow.await(
39-
() ->
40-
// Wait until there are pending task to complete
41-
!tasksToComplete.isEmpty());
38+
Workflow.await(() -> Workflow.getInfo().isContinueAsNewSuggested());
4239

43-
final String taskToken = tasksToComplete.remove(0);
44-
45-
// Find the workflow id of the workflow we have to signal back
46-
final String externalWorkflowId = new StringTokenizer(taskToken, "_").nextToken();
47-
48-
Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId)
49-
.completeTaskByToken(taskToken);
50-
51-
final Task task = getPendingTaskWithToken(taskToken).get();
52-
pendingTask.remove(task);
53-
54-
if (Workflow.getInfo().isContinueAsNewSuggested()) {
55-
Workflow.newContinueAsNewStub(WorkflowTaskManager.class)
56-
.execute(pendingTask, tasksToComplete);
57-
}
58-
}
40+
Workflow.newContinueAsNewStub(WorkflowTaskManager.class).execute(this.pendingTasks);
5941
}
6042

6143
@Override
6244
public void createTask(Task task) {
63-
initPendingTasks(new ArrayList<>());
64-
pendingTask.add(task);
45+
initTaskList(new PendingTasks());
46+
pendingTasks.addTask(task);
6547
}
6648

6749
@Override
6850
public void completeTaskByToken(String taskToken) {
6951

70-
tasksToComplete.add(taskToken);
52+
Task task = this.pendingTasks.filterTaskByToken(taskToken).get();
53+
54+
final String externalWorkflowId = extractWorkflowIdFromTaskToken(taskToken);
7155

72-
Workflow.await(
73-
() -> {
74-
final boolean taskCompleted =
75-
getPendingTask().stream().noneMatch((t) -> Objects.equals(t.getToken(), taskToken));
56+
//Signal back to the workflow that started this task to notify that the task was completed
57+
Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId)
58+
.completeTaskByToken(taskToken);
7659

77-
return taskCompleted;
78-
});
60+
this.pendingTasks.markTaskAsCompleted(task);
7961
}
8062

8163
@Override
8264
public List<Task> getPendingTask() {
83-
return pendingTask;
65+
return pendingTasks.getTasks();
8466
}
8567

86-
private Optional<Task> getPendingTaskWithToken(final String taskToken) {
87-
return pendingTask.stream().filter((t) -> t.getToken().equals(taskToken)).findFirst();
88-
}
68+
private void initTaskList(final PendingTasks pendingTasks) {
69+
this.pendingTasks = this.pendingTasks == null ? new PendingTasks() : this.pendingTasks;
8970

90-
private void initTaskToComplete(final List<String> tasks) {
91-
if (tasksToComplete == null) {
92-
tasksToComplete = new ArrayList<>();
71+
// Update method addTask can be invoked before the main workflow method.
72+
if (pendingTasks != null) {
73+
this.pendingTasks.addAll(pendingTasks.getTasks());
9374
}
94-
tasksToComplete.addAll(tasks);
9575
}
9676

97-
private void initPendingTasks(final List<Task> tasks) {
77+
private String extractWorkflowIdFromTaskToken(final String taskToken) {
78+
return new StringTokenizer(taskToken, "_").nextToken();
79+
}
80+
81+
public static class PendingTasks {
82+
private final List<Task> tasks;
83+
84+
public PendingTasks() {
85+
this(new ArrayList<>());
86+
}
87+
88+
public PendingTasks(final List<Task> tasks) {
89+
this.tasks = tasks;
90+
}
91+
92+
public void addTask(final Task task) {
93+
this.tasks.add(task);
94+
}
95+
96+
public void addAll(final List<Task> tasks) {
97+
this.tasks.addAll(tasks);
98+
}
99+
100+
public void markTaskAsCompleted(final Task task) {
101+
// For the sake of simplicity, we delete the task if it is marked as completed.
102+
// Nothing stops us from having a field to track the tasks' state
103+
tasks.remove(task);
104+
}
105+
106+
private Optional<Task> filterTaskByToken(final String taskToken) {
107+
return tasks.stream().filter((t) -> t.getToken().equals(taskToken)).findFirst();
108+
}
98109

99-
if (pendingTask == null) {
100-
pendingTask = new ArrayList<>();
110+
private List<Task> getTasks() {
111+
return tasks;
101112
}
102-
pendingTask.addAll(tasks);
103113
}
104114
}

core/src/main/java/io/temporal/samples/taskinteraction/activity/ActivityTaskImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import io.temporal.client.WorkflowOptions;
2626
import io.temporal.samples.taskinteraction.Task;
2727
import io.temporal.samples.taskinteraction.WorkflowTaskManager;
28-
import java.util.ArrayList;
28+
import io.temporal.samples.taskinteraction.WorkflowTaskManagerImpl;
2929

3030
public class ActivityTaskImpl implements ActivityTask {
3131

@@ -35,11 +35,13 @@ public ActivityTaskImpl(WorkflowClient workflowClient) {
3535
this.workflowClient = workflowClient;
3636
}
3737

38+
// This activity is responsible for registering the task to the external service
3839
@Override
3940
public void createTask(Task task) {
4041

4142
final String taskQueue = Activity.getExecutionContext().getInfo().getActivityTaskQueue();
4243

44+
// In this case the service that manages the task life-cycle is another workflow.
4345
final WorkflowOptions workflowOptions =
4446
WorkflowOptions.newBuilder()
4547
.setWorkflowId(WorkflowTaskManager.WORKFLOW_ID)
@@ -49,13 +51,13 @@ public void createTask(Task task) {
4951
final WorkflowTaskManager taskManager =
5052
workflowClient.newWorkflowStub(WorkflowTaskManager.class, workflowOptions);
5153
try {
52-
WorkflowClient.start(taskManager::execute, new ArrayList<>(), new ArrayList<>());
54+
WorkflowClient.start(taskManager::execute, new WorkflowTaskManagerImpl.PendingTasks());
5355
} catch (WorkflowExecutionAlreadyStarted e) {
5456
// expected exception if workflow was started by a previous activity execution.
5557
// This will be handled differently once updateWithStart is implemented
5658
}
5759

58-
// register the "task" to the external workflow that manages task lifecycle
60+
// Register the "task" to the external workflow and return
5961
taskManager.createTask(task);
6062
}
6163
}

0 commit comments

Comments
 (0)