Skip to content

Activity pause support #482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Temporalio/Activities/ActivityCancelReason.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@ public enum ActivityCancelReason
/// to payloads.
/// </summary>
HeartbeatRecordFailure,

/// <summary>
/// Activity was explicitly paused.
/// </summary>
Paused,
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
namespace Temporalio.Activities
{
/// <summary>
/// Boxed reference wrapper for <see cref="ActivityCancelReason" />.
/// Boxed reference wrapper for <see cref="ActivityCancelReason" /> and
/// <see cref="ActivityCancellationDetails"/>.
/// </summary>
internal class ActivityCancelReasonRef
internal class ActivityCancelRef
{
/// <summary>
/// Gets or sets the cancel reason. Default is <see cref="ActivityCancelReason.None" />.
/// </summary>
public ActivityCancelReason CancelReason { get; set; } = ActivityCancelReason.None;

/// <summary>
/// Gets or sets the cancel details.
/// </summary>
public ActivityCancellationDetails? CancellationDetails { get; set; }
}
}
42 changes: 42 additions & 0 deletions src/Temporalio/Activities/ActivityCancellationDetails.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace Temporalio.Activities
{
/// <summary>
/// Details that are set when an activity is cancelled. This is only valid at the time the
/// cancel was received, the state may change on the server after it is received.
/// </summary>
public class ActivityCancellationDetails
{
/// <summary>
/// Gets a value indicating whether the activity no longer exists on the server (may already
/// be completed or its workflow may be completed).
/// </summary>
public bool IsGoneFromServer { get; init; }

/// <summary>
/// Gets a value indicating whether the activity was explicitly cancelled.
/// </summary>
public bool IsCancelRequested { get; init; }

/// <summary>
/// Gets a value indicating whether the activity timeout caused activity to be marked
/// cancelled.
/// </summary>
public bool IsTimedOut { get; init; }

/// <summary>
/// Gets a value indicating whether the worker the activity is running on is shutting down.
/// </summary>
public bool IsWorkerShutdown { get; init; }

/// <summary>
/// Gets a value indicating whether the activity was explicitly paused.
/// </summary>
public bool IsPaused { get; init; }

/// <summary>
/// Gets a value indicating whether the activity failed to record heartbeat. This usually
/// only happens if the details cannot be converted to payloads.
/// </summary>
public bool IsHeartbeatRecordFailure { get; init; }
}
}
33 changes: 27 additions & 6 deletions src/Temporalio/Activities/ActivityExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,36 @@ internal ActivityExecutionContext(
public ILogger Logger { get; private init; }

/// <summary>
/// Gets why the activity was cancelled. This value is inaccurate until
/// <see cref="CancellationToken" /> is cancelled.
/// Gets why the activity was cancelled.
/// </summary>
public ActivityCancelReason CancelReason => CancelReasonRef.CancelReason;
/// <remarks>
/// This value may be inaccurate until <see cref="CancellationToken" /> is cancelled.
/// </remarks>
/// <remarks>
/// In some cases there may be multiple reasons an activity is cancelled. For this, use
/// <see cref="CancellationDetails"/>.
/// </remarks>
public ActivityCancelReason CancelReason => CancelRef.CancelReason;

/// <summary>
/// Gets the details of why a cancellation was performed.
/// </summary>
/// <remarks>
/// This value may be inaccurate until <see cref="CancellationToken" /> is cancelled.
/// </remarks>
/// <remarks>
/// These details only represent when the cancel was first performed. Once set, this object
/// is never mutated. Therefore, the situation on the server may have changed
/// (e.g. unpause), but this still represents the case when cancellation first occurred for
/// this attempt.
/// </remarks>
public ActivityCancellationDetails? CancellationDetails => CancelRef.CancellationDetails;

/// <summary>
/// Gets the cancellation token that is cancelled when the activity is cancelled.
/// </summary>
public CancellationToken CancellationToken { get; private init; }
public CancellationToken CancellationToken
{ get; private init; }

/// <summary>
/// Gets the cancellation token that is cancelled when the worker is shutdown. This can be
Expand Down Expand Up @@ -137,9 +158,9 @@ internal ActivityExecutionContext(
internal Action<object?[]>? Heartbeater { get; set; }

/// <summary>
/// Gets a reference to the reason enum.
/// Gets a reference to update cancel values.
/// </summary>
internal ActivityCancelReasonRef CancelReasonRef { get; init; } = new();
internal ActivityCancelRef CancelRef { get; init; } = new();

/// <summary>
/// Gets the raw proto task token for this activity.
Expand Down
46 changes: 37 additions & 9 deletions src/Temporalio/Testing/ActivityEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,23 @@ public record ActivityEnvironment
public ITemporalClient? TemporalClient { get; init; }

/// <summary>
/// Gets or sets the cancel reason. Callers may prefer <see cref="Cancel" /> instead.
/// Gets or sets the cancel reason. Callers should use one of the overloads of
/// <see cref="Cancel()" /> instead.
/// </summary>
public ActivityCancelReason CancelReason
{
get => CancelReasonRef.CancelReason;
set => CancelReasonRef.CancelReason = value;
get => CancelRef.CancelReason;
set => CancelRef.CancelReason = value;
}

/// <summary>
/// Gets or sets the cancellation details. Callers should use one of the overloads of
/// <see cref="Cancel()" /> instead.
/// </summary>
public ActivityCancellationDetails? CancellationDetails
{
get => CancelRef.CancellationDetails;
set => CancelRef.CancellationDetails = value;
}

/// <summary>
Expand All @@ -93,9 +104,9 @@ public ActivityCancelReason CancelReason
public Action<object?[]> Heartbeater { get; init; } = details => { };

/// <summary>
/// Gets the cancel reason reference.
/// Gets the cancel reference.
/// </summary>
internal ActivityCancelReasonRef CancelReasonRef { get; } = new();
internal ActivityCancelRef CancelRef { get; } = new();

/// <summary>
/// Run the given activity with a context.
Expand Down Expand Up @@ -146,7 +157,7 @@ public async Task<T> RunAsync<T>(Func<Task<T>> activity)
temporalClient: TemporalClient)
{
Heartbeater = Heartbeater,
CancelReasonRef = CancelReasonRef,
CancelRef = CancelRef,
};
return await activity().ConfigureAwait(false);
}
Expand All @@ -157,18 +168,35 @@ public async Task<T> RunAsync<T>(Func<Task<T>> activity)
}

/// <summary>
/// If cancellation not already requested, set the cancel reason and cancel the token
/// source.
/// If cancellation not already requested, set the cancel reason/details and cancel the
/// token source.
/// </summary>
public void Cancel() =>
Cancel(ActivityCancelReason.CancelRequested);

/// <summary>
/// If cancellation not already requested, set the cancel reason/details and cancel the
/// token source.
/// </summary>
/// <param name="reason">Cancel reason.</param>
public void Cancel(ActivityCancelReason reason) =>
Cancel(reason, new() { IsCancelRequested = true });

/// <summary>
/// If cancellation not already requested, set the cancel reason/details and cancel the
/// token source.
/// </summary>
/// <param name="reason">Cancel reason.</param>
public void Cancel(ActivityCancelReason reason = ActivityCancelReason.CancelRequested)
/// <param name="details">Cancellation details.</param>
public void Cancel(ActivityCancelReason reason, ActivityCancellationDetails details)
{
// This is intentionally not an atomic operation same as it's not in the real worker.
// It is documented for callers not to expect reason to be valid until cancellation
// token is set.
if (!CancellationTokenSource.IsCancellationRequested)
{
CancelReason = reason;
CancellationDetails = details;
CancellationTokenSource.Cancel();
}
}
Expand Down
56 changes: 42 additions & 14 deletions src/Temporalio/Worker/ActivityWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public async Task ExecuteAsync()
}
else
{
act.Cancel(task.Cancel.Reason);
act.Cancel(task.Cancel);
}
break;
case null:
Expand Down Expand Up @@ -422,10 +422,23 @@ await Task.WhenAll(tsk.Start.Input.Select(p =>
act.Context.Info.ActivityType);
completion.Result.WillCompleteAsync = new();
}
catch (OperationCanceledException e) when (
act.ServerRequestedCancel && act.Context.CancellationDetails?.IsPaused == true)
{
act.Context.Logger.LogDebug(
"Completing activity {ActivityType} as failed due cancel exception caused by pause",
act.Context.Info.ActivityType);
completion.Result.Failed = new()
{
Failure_ = await dataConverter.ToFailureAsync(
new ApplicationFailureException(
"Activity paused", e, "ActivityPause")).ConfigureAwait(false),
};
}
catch (OperationCanceledException) when (act.ServerRequestedCancel)
{
act.Context.Logger.LogDebug(
"Completing activity {ActivityType} as cancelled",
"Completing activity {ActivityType} as cancelled, reason: ",
act.Context.Info.ActivityType);
completion.Result.Cancelled = new()
{
Expand Down Expand Up @@ -557,7 +570,8 @@ public void MarkDone()
/// Cancel this activity for the given reason if not already cancelled.
/// </summary>
/// <param name="reason">Cancel reason.</param>
public void Cancel(ActivityCancelReason reason)
/// <param name="details">Cancellation details.</param>
public void Cancel(ActivityCancelReason reason, ActivityCancellationDetails details)
{
// Ignore if already cancelled
if (cancelTokenSource.IsCancellationRequested)
Expand All @@ -570,37 +584,49 @@ public void Cancel(ActivityCancelReason reason)
"Cancelling activity {TaskToken}, reason {Reason}",
Context.TaskToken,
reason);
Context.CancelReasonRef.CancelReason = reason;
Context.CancelRef.CancelReason = reason;
Context.CancelRef.CancellationDetails = details;
cancelTokenSource.Cancel();
}
}

/// <summary>
/// Cancel this activity for the given upstream reason.
/// Cancel this activity with the given upstream info.
/// </summary>
/// <param name="reason">Cancel reason.</param>
public void Cancel(Bridge.Api.ActivityTask.ActivityCancelReason reason)
/// <param name="cancel">Cancel.</param>
public void Cancel(Bridge.Api.ActivityTask.Cancel cancel)
{
lock (mutex)
{
serverRequestedCancel = true;
}
switch (reason)
var details = new ActivityCancellationDetails()
{
IsGoneFromServer = cancel.Details?.IsNotFound ?? false,
IsCancelRequested = cancel.Details?.IsCancelled ?? false,
IsTimedOut = cancel.Details?.IsTimedOut ?? false,
IsWorkerShutdown = cancel.Details?.IsWorkerShutdown ?? false,
IsPaused = cancel.Details?.IsPaused ?? false,
};
switch (cancel.Reason)
{
case Bridge.Api.ActivityTask.ActivityCancelReason.NotFound:
Cancel(ActivityCancelReason.GoneFromServer);
Cancel(ActivityCancelReason.GoneFromServer, details);
break;
case Bridge.Api.ActivityTask.ActivityCancelReason.Cancelled:
Cancel(ActivityCancelReason.CancelRequested);
Cancel(ActivityCancelReason.CancelRequested, details);
break;
case Bridge.Api.ActivityTask.ActivityCancelReason.TimedOut:
Cancel(ActivityCancelReason.Timeout);
Cancel(ActivityCancelReason.Timeout, details);
break;
case Bridge.Api.ActivityTask.ActivityCancelReason.WorkerShutdown:
Cancel(ActivityCancelReason.WorkerShutdown);
Cancel(ActivityCancelReason.WorkerShutdown, details);
break;
case Bridge.Api.ActivityTask.ActivityCancelReason.Paused:
Cancel(ActivityCancelReason.Paused, details);
break;
default:
Cancel(ActivityCancelReason.None);
Cancel(ActivityCancelReason.None, details);
break;
}
}
Expand Down Expand Up @@ -702,7 +728,9 @@ private async Task HeartbeatAsync(TemporalWorker worker)
Context.Logger.LogWarning(
e, "Cancelling activity because failed recording heartbeat");
}
Cancel(ActivityCancelReason.HeartbeatRecordFailure);
Cancel(
ActivityCancelReason.HeartbeatRecordFailure,
new() { IsHeartbeatRecordFailure = true });
}
}
}
Expand Down
Loading
Loading