Skip to content

Commit 84b4fee

Browse files
authored
Async package delivery sample (#746)
* [WIP] Async package delivery sample Signed-off-by: Tihomir Surdilovic <[email protected]> * update Signed-off-by: Tihomir Surdilovic <[email protected]> * adding cancelation and failure Signed-off-by: Tihomir Surdilovic <[email protected]> * adding readme Signed-off-by: Tihomir Surdilovic <[email protected]> * update readme Signed-off-by: Tihomir Surdilovic <[email protected]> * update Signed-off-by: Tihomir Surdilovic <[email protected]> * update Signed-off-by: Tihomir Surdilovic <[email protected]> * update starter Signed-off-by: Tihomir Surdilovic <[email protected]> * update readme Signed-off-by: Tihomir Surdilovic <[email protected]> * update readme Signed-off-by: Tihomir Surdilovic <[email protected]> * add comment Signed-off-by: Tihomir Surdilovic <[email protected]> * update readme Signed-off-by: Tihomir Surdilovic <[email protected]> * remove duplicate comment Signed-off-by: Tihomir Surdilovic <[email protected]> --------- Signed-off-by: Tihomir Surdilovic <[email protected]>
1 parent 902591b commit 84b4fee

File tree

10 files changed

+527
-0
lines changed

10 files changed

+527
-0
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ See the README.md file in each main sample directory for cut/paste Gradle comman
108108

109109
- [**Custom Annotation**](/core/src/main/java/io/temporal/samples/customannotation): Demonstrates how to create a custom annotation using an interceptor.
110110

111+
- [**Asnyc Packet Delivery**](/core/src/main/java/io/temporal/samples/packetdelivery): Demonstrates running multiple execution paths async within single execution.
112+
113+
111114
#### API demonstrations
112115

113116
- [**Async Untyped Child Workflow**](/core/src/main/java/io/temporal/samples/asyncuntypedchild): Demonstrates how to invoke an untyped child workflow async, that can complete after parent workflow is already completed.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
public class Packet {
4+
private int id;
5+
private String content;
6+
7+
public Packet() {}
8+
9+
public Packet(int id, String content) {
10+
this.id = id;
11+
this.content = content;
12+
}
13+
14+
public int getId() {
15+
return id;
16+
}
17+
18+
public String getContent() {
19+
return content;
20+
}
21+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.activity.ActivityOptions;
4+
import io.temporal.failure.ActivityFailure;
5+
import io.temporal.failure.CanceledFailure;
6+
import io.temporal.workflow.*;
7+
import java.time.Duration;
8+
import org.slf4j.Logger;
9+
10+
public class PacketDelivery {
11+
private Packet packet;
12+
private boolean deliveryConfirmation = false;
13+
private boolean needDeliveryConfirmation = false;
14+
private CompletablePromise delivered = Workflow.newPromise();
15+
private CancellationScope cancellationScope;
16+
17+
private Logger logger = Workflow.getLogger(this.getClass().getName());
18+
19+
private final PacketDeliveryActivities activities =
20+
Workflow.newActivityStub(
21+
PacketDeliveryActivities.class,
22+
ActivityOptions.newBuilder()
23+
.setStartToCloseTimeout(Duration.ofSeconds(5))
24+
.setHeartbeatTimeout(Duration.ofSeconds(2))
25+
.build());
26+
27+
private final PacketDeliveryActivities compensationActivities =
28+
Workflow.newActivityStub(
29+
PacketDeliveryActivities.class,
30+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build());
31+
32+
public PacketDelivery(Packet packet) {
33+
this.packet = packet;
34+
processDeliveryAsync();
35+
}
36+
37+
public Promise<Void> getDelivered() {
38+
return delivered;
39+
}
40+
41+
public void processDeliveryAsync() {
42+
delivered.completeFrom(Async.procedure(this::processDelivery));
43+
}
44+
45+
public void processDelivery() {
46+
cancellationScope =
47+
Workflow.newCancellationScope(
48+
() -> {
49+
String deliveryConfirmationResult = "";
50+
while (!deliveryConfirmationResult.equals(PacketUtils.COMPLETION_SUCCESS)) {
51+
// Step 1 perform delivery
52+
logger.info(
53+
"** Performing delivery for packet: "
54+
+ packet.getId()
55+
+ " - "
56+
+ packet.getContent());
57+
activities.performDelivery(packet);
58+
// Step 2 wait for delivery confirmation
59+
logger.info(
60+
"** Delivery for packet: "
61+
+ packet.getId()
62+
+ " - "
63+
+ packet.getContent()
64+
+ " awaiting delivery confirmation");
65+
needDeliveryConfirmation = true;
66+
Workflow.await(() -> deliveryConfirmation);
67+
logger.info(
68+
"** Delivery for packet: "
69+
+ packet.getId()
70+
+ " - "
71+
+ packet.getContent()
72+
+ " received confirmation");
73+
// Step 3 complete delivery processing
74+
logger.info(
75+
"** Completing delivery for packet: "
76+
+ packet.getId()
77+
+ " - "
78+
+ packet.getContent());
79+
deliveryConfirmationResult = activities.completeDelivery(packet);
80+
// Reset deliveryConfirmation and needDeliveryConfirmation
81+
deliveryConfirmation = false;
82+
needDeliveryConfirmation = false;
83+
}
84+
});
85+
86+
try {
87+
cancellationScope.run();
88+
} catch (Exception e) {
89+
if (e instanceof ActivityFailure) {
90+
ActivityFailure activityFailure = (ActivityFailure) e;
91+
if (activityFailure.getCause() instanceof CanceledFailure) {
92+
// Run compensation activity and complete
93+
compensationActivities.compensateDelivery(packet);
94+
}
95+
}
96+
// Just for show for example that cancel could come in while we are waiting on approval signal
97+
// too
98+
else if (e instanceof CanceledFailure) {
99+
needDeliveryConfirmation = false;
100+
// Run compensation activity and complete
101+
compensationActivities.compensateDelivery(packet);
102+
}
103+
return;
104+
}
105+
}
106+
107+
public void confirmDelivery() {
108+
this.deliveryConfirmation = true;
109+
}
110+
111+
public void cancelDelivery(String reason) {
112+
if (cancellationScope != null) {
113+
cancellationScope.cancel(reason);
114+
}
115+
}
116+
117+
public boolean isNeedDeliveryConfirmation() {
118+
return needDeliveryConfirmation;
119+
}
120+
121+
public Packet getPacket() {
122+
return packet;
123+
}
124+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.activity.ActivityInterface;
4+
import java.util.List;
5+
6+
@ActivityInterface
7+
public interface PacketDeliveryActivities {
8+
List<Packet> generatePackets();
9+
10+
void performDelivery(Packet packet);
11+
12+
String completeDelivery(Packet packet);
13+
14+
String compensateDelivery(Packet packet);
15+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.activity.Activity;
4+
import io.temporal.activity.ActivityExecutionContext;
5+
import io.temporal.client.ActivityCompletionException;
6+
import io.temporal.client.WorkflowClient;
7+
import java.util.*;
8+
9+
public class PacketDeliveryActivitiesImpl implements PacketDeliveryActivities {
10+
private List<Packet> packets =
11+
Arrays.asList(
12+
new Packet(1, "books"),
13+
new Packet(2, "jewelry"),
14+
new Packet(3, "furniture"),
15+
new Packet(4, "food"),
16+
new Packet(5, "electronics"));
17+
private WorkflowClient client;
18+
19+
public PacketDeliveryActivitiesImpl(WorkflowClient client) {
20+
this.client = client;
21+
}
22+
23+
@Override
24+
public List<Packet> generatePackets() {
25+
return packets;
26+
}
27+
28+
@Override
29+
public void performDelivery(Packet packet) {
30+
ActivityExecutionContext context = Activity.getExecutionContext();
31+
System.out.println(
32+
"** Activity - Performing delivery for packet: "
33+
+ packet.getId()
34+
+ " with content: "
35+
+ packet.getContent());
36+
for (int i = 0; i < 4; i++) {
37+
try {
38+
// Perform the heartbeat. Used to notify the workflow that activity execution is alive
39+
context.heartbeat(i);
40+
} catch (ActivityCompletionException e) {
41+
System.out.println(
42+
"** Activity - Canceling delivery activity for packet: "
43+
+ packet.getId()
44+
+ " with content: "
45+
+ packet.getContent());
46+
throw e;
47+
}
48+
}
49+
}
50+
51+
@Override
52+
public String completeDelivery(Packet packet) {
53+
ActivityExecutionContext context = Activity.getExecutionContext();
54+
System.out.println(
55+
"** Activity - Completing delivery for package: "
56+
+ packet.getId()
57+
+ " with content: "
58+
+ packet.getContent());
59+
for (int i = 0; i < 4; i++) {
60+
try {
61+
// Perform the heartbeat. Used to notify the workflow that activity execution is alive
62+
context.heartbeat(i);
63+
} catch (ActivityCompletionException e) {
64+
System.out.println(
65+
"** Activity - Canceling complete delivery activity for packet: "
66+
+ packet.getId()
67+
+ " with content: "
68+
+ packet.getContent());
69+
throw e;
70+
}
71+
}
72+
// For sample we just confirm
73+
return randomCompletionDeliveryResult(packet);
74+
}
75+
76+
@Override
77+
public String compensateDelivery(Packet packet) {
78+
System.out.println(
79+
"** Activity - Compensating delivery for package: "
80+
+ packet.getId()
81+
+ " with content: "
82+
+ packet.getContent());
83+
sleep(1);
84+
return PacketUtils.COMPENSATION_COMPLETED;
85+
}
86+
87+
/**
88+
* For this sample activity completion result can drive if 1. Delivery confirmation is completed,
89+
* in which case we complete delivery 2. Delivery confirmation is failed, in which case we run the
90+
* delivery again 3. Delivery confirmation is cancelled, in which case we want to cancel delivery
91+
* and perform "cleanup activity" Note that any delivery can cancel itself OR another delivery, so
92+
* for example Furniure delivery can cancel the Food delivery. For sample we have some specific
93+
* rules Which delivery can cancel which
94+
*/
95+
private String randomCompletionDeliveryResult(Packet packet) {
96+
Random random = new Random();
97+
double randomValue = random.nextDouble();
98+
if (randomValue < 0.10) { // 10% chance for delivery completion to be canceled
99+
int toCancelDelivery = determineCancelRules(packet);
100+
System.out.println(
101+
"** Activity - Delivery completion result for package: "
102+
+ packet.getId()
103+
+ " with content: "
104+
+ packet.getContent()
105+
+ ": "
106+
+ "Cancelling delivery: "
107+
+ toCancelDelivery);
108+
109+
// send cancellation signal for packet to be canceled
110+
PacketDeliveryWorkflow packetWorkflow =
111+
client.newWorkflowStub(
112+
PacketDeliveryWorkflow.class,
113+
Activity.getExecutionContext().getInfo().getWorkflowId());
114+
packetWorkflow.cancelDelivery(toCancelDelivery, "canceled from delivery " + packet.getId());
115+
116+
return PacketUtils.COMPLETION_CANCELLED;
117+
}
118+
if (randomValue < 0.20) { // 20% chance for delivery completion to fail
119+
System.out.println(
120+
"** Activity - Delivery completion result for package: "
121+
+ packet.getId()
122+
+ " with content: "
123+
+ packet.getContent()
124+
+ ": "
125+
+ "Failed");
126+
return PacketUtils.COMPLETION_FAILURE;
127+
}
128+
129+
System.out.println(
130+
"** Activity - Delivery completion result for package: "
131+
+ packet.getId()
132+
+ " with content: "
133+
+ packet.getContent()
134+
+ ": "
135+
+ "Successful");
136+
return PacketUtils.COMPLETION_SUCCESS;
137+
}
138+
139+
private void sleep(int seconds) {
140+
try {
141+
Thread.sleep(seconds * 1000L);
142+
} catch (Exception e) {
143+
System.out.println(e.getMessage());
144+
}
145+
}
146+
147+
/**
148+
* Sample rules for canceling different deliveries We just rotate the list 1-5 (packet ids) by
149+
* packet id and return first result
150+
*/
151+
private int determineCancelRules(Packet packet) {
152+
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
153+
Collections.rotate(list, packet.getId());
154+
System.out.println(
155+
"** Activity - Package delivery : "
156+
+ packet.getId()
157+
+ " canceling package delivery: "
158+
+ list.get(0));
159+
return list.get(0);
160+
}
161+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.workflow.QueryMethod;
4+
import io.temporal.workflow.SignalMethod;
5+
import io.temporal.workflow.WorkflowInterface;
6+
import io.temporal.workflow.WorkflowMethod;
7+
import java.util.List;
8+
9+
@WorkflowInterface
10+
public interface PacketDeliveryWorkflow {
11+
@WorkflowMethod
12+
String execute();
13+
14+
@SignalMethod
15+
void confirmDelivery(int deliveryId);
16+
17+
@SignalMethod
18+
void cancelDelivery(int deliveryId, String reason);
19+
20+
@QueryMethod
21+
List<Packet> deliveryConfirmationPackets();
22+
}

0 commit comments

Comments
 (0)