Skip to content

Commit 864e654

Browse files
authored
Activity pause support (#482)
Fixes #441
1 parent 8918a31 commit 864e654

File tree

8 files changed

+272
-31
lines changed

8 files changed

+272
-31
lines changed

src/Temporalio/Activities/ActivityCancelReason.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,10 @@ public enum ActivityCancelReason
3636
/// to payloads.
3737
/// </summary>
3838
HeartbeatRecordFailure,
39+
40+
/// <summary>
41+
/// Activity was explicitly paused.
42+
/// </summary>
43+
Paused,
3944
}
4045
}
Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
namespace Temporalio.Activities
22
{
33
/// <summary>
4-
/// Boxed reference wrapper for <see cref="ActivityCancelReason" />.
4+
/// Boxed reference wrapper for <see cref="ActivityCancelReason" /> and
5+
/// <see cref="ActivityCancellationDetails"/>.
56
/// </summary>
6-
internal class ActivityCancelReasonRef
7+
internal class ActivityCancelRef
78
{
89
/// <summary>
910
/// Gets or sets the cancel reason. Default is <see cref="ActivityCancelReason.None" />.
1011
/// </summary>
1112
public ActivityCancelReason CancelReason { get; set; } = ActivityCancelReason.None;
13+
14+
/// <summary>
15+
/// Gets or sets the cancel details.
16+
/// </summary>
17+
public ActivityCancellationDetails? CancellationDetails { get; set; }
1218
}
1319
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
namespace Temporalio.Activities
2+
{
3+
/// <summary>
4+
/// Details that are set when an activity is cancelled. This is only valid at the time the
5+
/// cancel was received, the state may change on the server after it is received.
6+
/// </summary>
7+
public class ActivityCancellationDetails
8+
{
9+
/// <summary>
10+
/// Gets a value indicating whether the activity no longer exists on the server (may already
11+
/// be completed or its workflow may be completed).
12+
/// </summary>
13+
public bool IsGoneFromServer { get; init; }
14+
15+
/// <summary>
16+
/// Gets a value indicating whether the activity was explicitly cancelled.
17+
/// </summary>
18+
public bool IsCancelRequested { get; init; }
19+
20+
/// <summary>
21+
/// Gets a value indicating whether the activity timeout caused activity to be marked
22+
/// cancelled.
23+
/// </summary>
24+
public bool IsTimedOut { get; init; }
25+
26+
/// <summary>
27+
/// Gets a value indicating whether the worker the activity is running on is shutting down.
28+
/// </summary>
29+
public bool IsWorkerShutdown { get; init; }
30+
31+
/// <summary>
32+
/// Gets a value indicating whether the activity was explicitly paused.
33+
/// </summary>
34+
public bool IsPaused { get; init; }
35+
36+
/// <summary>
37+
/// Gets a value indicating whether the activity failed to record heartbeat. This usually
38+
/// only happens if the details cannot be converted to payloads.
39+
/// </summary>
40+
public bool IsHeartbeatRecordFailure { get; init; }
41+
}
42+
}

src/Temporalio/Activities/ActivityExecutionContext.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,36 @@ internal ActivityExecutionContext(
8383
public ILogger Logger { get; private init; }
8484

8585
/// <summary>
86-
/// Gets why the activity was cancelled. This value is inaccurate until
87-
/// <see cref="CancellationToken" /> is cancelled.
86+
/// Gets why the activity was cancelled.
8887
/// </summary>
89-
public ActivityCancelReason CancelReason => CancelReasonRef.CancelReason;
88+
/// <remarks>
89+
/// This value may be inaccurate until <see cref="CancellationToken" /> is cancelled.
90+
/// </remarks>
91+
/// <remarks>
92+
/// In some cases there may be multiple reasons an activity is cancelled. For this, use
93+
/// <see cref="CancellationDetails"/>.
94+
/// </remarks>
95+
public ActivityCancelReason CancelReason => CancelRef.CancelReason;
96+
97+
/// <summary>
98+
/// Gets the details of why a cancellation was performed.
99+
/// </summary>
100+
/// <remarks>
101+
/// This value may be inaccurate until <see cref="CancellationToken" /> is cancelled.
102+
/// </remarks>
103+
/// <remarks>
104+
/// These details only represent when the cancel was first performed. Once set, this object
105+
/// is never mutated. Therefore, the situation on the server may have changed
106+
/// (e.g. unpause), but this still represents the case when cancellation first occurred for
107+
/// this attempt.
108+
/// </remarks>
109+
public ActivityCancellationDetails? CancellationDetails => CancelRef.CancellationDetails;
90110

91111
/// <summary>
92112
/// Gets the cancellation token that is cancelled when the activity is cancelled.
93113
/// </summary>
94-
public CancellationToken CancellationToken { get; private init; }
114+
public CancellationToken CancellationToken
115+
{ get; private init; }
95116

96117
/// <summary>
97118
/// Gets the cancellation token that is cancelled when the worker is shutdown. This can be
@@ -137,9 +158,9 @@ internal ActivityExecutionContext(
137158
internal Action<object?[]>? Heartbeater { get; set; }
138159

139160
/// <summary>
140-
/// Gets a reference to the reason enum.
161+
/// Gets a reference to update cancel values.
141162
/// </summary>
142-
internal ActivityCancelReasonRef CancelReasonRef { get; init; } = new();
163+
internal ActivityCancelRef CancelRef { get; init; } = new();
143164

144165
/// <summary>
145166
/// Gets the raw proto task token for this activity.

src/Temporalio/Testing/ActivityEnvironment.cs

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,23 @@ public record ActivityEnvironment
6969
public ITemporalClient? TemporalClient { get; init; }
7070

7171
/// <summary>
72-
/// Gets or sets the cancel reason. Callers may prefer <see cref="Cancel" /> instead.
72+
/// Gets or sets the cancel reason. Callers should use one of the overloads of
73+
/// <see cref="Cancel()" /> instead.
7374
/// </summary>
7475
public ActivityCancelReason CancelReason
7576
{
76-
get => CancelReasonRef.CancelReason;
77-
set => CancelReasonRef.CancelReason = value;
77+
get => CancelRef.CancelReason;
78+
set => CancelRef.CancelReason = value;
79+
}
80+
81+
/// <summary>
82+
/// Gets or sets the cancellation details. Callers should use one of the overloads of
83+
/// <see cref="Cancel()" /> instead.
84+
/// </summary>
85+
public ActivityCancellationDetails? CancellationDetails
86+
{
87+
get => CancelRef.CancellationDetails;
88+
set => CancelRef.CancellationDetails = value;
7889
}
7990

8091
/// <summary>
@@ -93,9 +104,9 @@ public ActivityCancelReason CancelReason
93104
public Action<object?[]> Heartbeater { get; init; } = details => { };
94105

95106
/// <summary>
96-
/// Gets the cancel reason reference.
107+
/// Gets the cancel reference.
97108
/// </summary>
98-
internal ActivityCancelReasonRef CancelReasonRef { get; } = new();
109+
internal ActivityCancelRef CancelRef { get; } = new();
99110

100111
/// <summary>
101112
/// Run the given activity with a context.
@@ -146,7 +157,7 @@ public async Task<T> RunAsync<T>(Func<Task<T>> activity)
146157
temporalClient: TemporalClient)
147158
{
148159
Heartbeater = Heartbeater,
149-
CancelReasonRef = CancelReasonRef,
160+
CancelRef = CancelRef,
150161
};
151162
return await activity().ConfigureAwait(false);
152163
}
@@ -157,18 +168,35 @@ public async Task<T> RunAsync<T>(Func<Task<T>> activity)
157168
}
158169

159170
/// <summary>
160-
/// If cancellation not already requested, set the cancel reason and cancel the token
161-
/// source.
171+
/// If cancellation not already requested, set the cancel reason/details and cancel the
172+
/// token source.
173+
/// </summary>
174+
public void Cancel() =>
175+
Cancel(ActivityCancelReason.CancelRequested);
176+
177+
/// <summary>
178+
/// If cancellation not already requested, set the cancel reason/details and cancel the
179+
/// token source.
180+
/// </summary>
181+
/// <param name="reason">Cancel reason.</param>
182+
public void Cancel(ActivityCancelReason reason) =>
183+
Cancel(reason, new() { IsCancelRequested = true });
184+
185+
/// <summary>
186+
/// If cancellation not already requested, set the cancel reason/details and cancel the
187+
/// token source.
162188
/// </summary>
163189
/// <param name="reason">Cancel reason.</param>
164-
public void Cancel(ActivityCancelReason reason = ActivityCancelReason.CancelRequested)
190+
/// <param name="details">Cancellation details.</param>
191+
public void Cancel(ActivityCancelReason reason, ActivityCancellationDetails details)
165192
{
166193
// This is intentionally not an atomic operation same as it's not in the real worker.
167194
// It is documented for callers not to expect reason to be valid until cancellation
168195
// token is set.
169196
if (!CancellationTokenSource.IsCancellationRequested)
170197
{
171198
CancelReason = reason;
199+
CancellationDetails = details;
172200
CancellationTokenSource.Cancel();
173201
}
174202
}

src/Temporalio/Worker/ActivityWorker.cs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public async Task ExecuteAsync()
100100
}
101101
else
102102
{
103-
act.Cancel(task.Cancel.Reason);
103+
act.Cancel(task.Cancel);
104104
}
105105
break;
106106
case null:
@@ -422,10 +422,23 @@ await Task.WhenAll(tsk.Start.Input.Select(p =>
422422
act.Context.Info.ActivityType);
423423
completion.Result.WillCompleteAsync = new();
424424
}
425+
catch (OperationCanceledException e) when (
426+
act.ServerRequestedCancel && act.Context.CancellationDetails?.IsPaused == true)
427+
{
428+
act.Context.Logger.LogDebug(
429+
"Completing activity {ActivityType} as failed due cancel exception caused by pause",
430+
act.Context.Info.ActivityType);
431+
completion.Result.Failed = new()
432+
{
433+
Failure_ = await dataConverter.ToFailureAsync(
434+
new ApplicationFailureException(
435+
"Activity paused", e, "ActivityPause")).ConfigureAwait(false),
436+
};
437+
}
425438
catch (OperationCanceledException) when (act.ServerRequestedCancel)
426439
{
427440
act.Context.Logger.LogDebug(
428-
"Completing activity {ActivityType} as cancelled",
441+
"Completing activity {ActivityType} as cancelled, reason: ",
429442
act.Context.Info.ActivityType);
430443
completion.Result.Cancelled = new()
431444
{
@@ -557,7 +570,8 @@ public void MarkDone()
557570
/// Cancel this activity for the given reason if not already cancelled.
558571
/// </summary>
559572
/// <param name="reason">Cancel reason.</param>
560-
public void Cancel(ActivityCancelReason reason)
573+
/// <param name="details">Cancellation details.</param>
574+
public void Cancel(ActivityCancelReason reason, ActivityCancellationDetails details)
561575
{
562576
// Ignore if already cancelled
563577
if (cancelTokenSource.IsCancellationRequested)
@@ -570,37 +584,49 @@ public void Cancel(ActivityCancelReason reason)
570584
"Cancelling activity {TaskToken}, reason {Reason}",
571585
Context.TaskToken,
572586
reason);
573-
Context.CancelReasonRef.CancelReason = reason;
587+
Context.CancelRef.CancelReason = reason;
588+
Context.CancelRef.CancellationDetails = details;
574589
cancelTokenSource.Cancel();
575590
}
576591
}
577592

578593
/// <summary>
579-
/// Cancel this activity for the given upstream reason.
594+
/// Cancel this activity with the given upstream info.
580595
/// </summary>
581-
/// <param name="reason">Cancel reason.</param>
582-
public void Cancel(Bridge.Api.ActivityTask.ActivityCancelReason reason)
596+
/// <param name="cancel">Cancel.</param>
597+
public void Cancel(Bridge.Api.ActivityTask.Cancel cancel)
583598
{
584599
lock (mutex)
585600
{
586601
serverRequestedCancel = true;
587602
}
588-
switch (reason)
603+
var details = new ActivityCancellationDetails()
604+
{
605+
IsGoneFromServer = cancel.Details?.IsNotFound ?? false,
606+
IsCancelRequested = cancel.Details?.IsCancelled ?? false,
607+
IsTimedOut = cancel.Details?.IsTimedOut ?? false,
608+
IsWorkerShutdown = cancel.Details?.IsWorkerShutdown ?? false,
609+
IsPaused = cancel.Details?.IsPaused ?? false,
610+
};
611+
switch (cancel.Reason)
589612
{
590613
case Bridge.Api.ActivityTask.ActivityCancelReason.NotFound:
591-
Cancel(ActivityCancelReason.GoneFromServer);
614+
Cancel(ActivityCancelReason.GoneFromServer, details);
592615
break;
593616
case Bridge.Api.ActivityTask.ActivityCancelReason.Cancelled:
594-
Cancel(ActivityCancelReason.CancelRequested);
617+
Cancel(ActivityCancelReason.CancelRequested, details);
595618
break;
596619
case Bridge.Api.ActivityTask.ActivityCancelReason.TimedOut:
597-
Cancel(ActivityCancelReason.Timeout);
620+
Cancel(ActivityCancelReason.Timeout, details);
598621
break;
599622
case Bridge.Api.ActivityTask.ActivityCancelReason.WorkerShutdown:
600-
Cancel(ActivityCancelReason.WorkerShutdown);
623+
Cancel(ActivityCancelReason.WorkerShutdown, details);
624+
break;
625+
case Bridge.Api.ActivityTask.ActivityCancelReason.Paused:
626+
Cancel(ActivityCancelReason.Paused, details);
601627
break;
602628
default:
603-
Cancel(ActivityCancelReason.None);
629+
Cancel(ActivityCancelReason.None, details);
604630
break;
605631
}
606632
}
@@ -702,7 +728,9 @@ private async Task HeartbeatAsync(TemporalWorker worker)
702728
Context.Logger.LogWarning(
703729
e, "Cancelling activity because failed recording heartbeat");
704730
}
705-
Cancel(ActivityCancelReason.HeartbeatRecordFailure);
731+
Cancel(
732+
ActivityCancelReason.HeartbeatRecordFailure,
733+
new() { IsHeartbeatRecordFailure = true });
706734
}
707735
}
708736
}

0 commit comments

Comments
 (0)