Skip to content

Commit 72f6fb2

Browse files
authored
Warn on unfinished workflow handlers (#294)
Fixes #261
1 parent 0d0334f commit 72f6fb2

File tree

9 files changed

+460
-39
lines changed

9 files changed

+460
-39
lines changed

src/Temporalio/Worker/WorkflowInstance.cs

Lines changed: 148 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Linq;
77
using System.Reflection;
88
using System.Runtime.ExceptionServices;
9+
using System.Text.Json;
910
using System.Threading;
1011
using System.Threading.Tasks;
1112
using Microsoft.Extensions.Logging;
@@ -68,6 +69,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
6869
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted;
6970
private readonly IReadOnlyCollection<Type>? workerLevelFailureExceptionTypes;
7071
private readonly bool disableCompletionCommandReordering;
72+
private readonly Handlers inProgressHandlers = new();
7173
private WorkflowActivationCompletion? completion;
7274
// Will be set to null after last use (i.e. when workflow actually started)
7375
private Lazy<object?[]>? startArgs;
@@ -204,6 +206,9 @@ public WorkflowInstance(WorkflowInstanceDetails details)
204206
/// </summary>
205207
public bool TracingEventsEnabled { get; private init; }
206208

209+
/// <inheritdoc />
210+
public bool AllHandlersFinished => inProgressHandlers.Count == 0;
211+
207212
/// <inheritdoc />
208213
public CancellationToken CancellationToken => cancellationTokenSource.Token;
209214

@@ -576,7 +581,13 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
576581
}
577582

578583
// Maybe apply workflow completion command reordering logic
579-
ApplyCompletionCommandReordering(act, completion);
584+
ApplyCompletionCommandReordering(act, completion, out var workflowComplete);
585+
586+
// Log warnings if we have completed
587+
if (workflowComplete && !IsReplaying)
588+
{
589+
inProgressHandlers.WarnIfAnyLeftOver(Info.WorkflowId, logger);
590+
}
580591

581592
// Unset the completion
582593
var toReturn = completion;
@@ -886,6 +897,10 @@ private void ApplyDoUpdate(DoUpdate update)
886897
// Queue it up so it can run in workflow environment
887898
_ = QueueNewTaskAsync(() =>
888899
{
900+
// Make sure we have loaded the instance which may invoke the constructor thereby
901+
// letting the constructor register update handlers at runtime
902+
var ignored = Instance;
903+
889904
// Set the current update for the life of this task
890905
CurrentUpdateInfoLocal.Value = new(Id: update.Id, Name: update.Name);
891906

@@ -998,9 +1013,12 @@ private void ApplyDoUpdate(DoUpdate update)
9981013
Definition: updateDefn,
9991014
Args: argsForUpdate,
10001015
Headers: update.Headers));
1016+
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler(
1017+
update.Name, update.Id, updateDefn.UnfinishedPolicy));
10011018
return task.ContinueWith(
10021019
_ =>
10031020
{
1021+
inProgressHandlers.Remove(inProgress);
10041022
// If workflow failure exception, it's an update failure. If it's some
10051023
// other exception, it's a task failure. Otherwise it's a success.
10061024
var exc = task.Exception?.InnerExceptions?.SingleOrDefault();
@@ -1080,6 +1098,10 @@ private void ApplyQueryWorkflow(QueryWorkflow query)
10801098
// Queue it up so it can run in workflow environment
10811099
_ = QueueNewTaskAsync(() =>
10821100
{
1101+
// Make sure we have loaded the instance which may invoke the constructor thereby
1102+
// letting the constructor register query handlers at runtime
1103+
var ignored = Instance;
1104+
10831105
var origCmdCount = completion?.Successful?.Commands?.Count;
10841106
try
10851107
{
@@ -1241,11 +1263,21 @@ private void ApplySignalWorkflow(SignalWorkflow signal)
12411263
return;
12421264
}
12431265

1244-
await inbound.Value.HandleSignalAsync(new(
1245-
Signal: signal.SignalName,
1246-
Definition: signalDefn,
1247-
Args: args,
1248-
Headers: signal.Headers)).ConfigureAwait(true);
1266+
// Handle signal
1267+
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler(
1268+
signal.SignalName, null, signalDefn.UnfinishedPolicy));
1269+
try
1270+
{
1271+
await inbound.Value.HandleSignalAsync(new(
1272+
Signal: signal.SignalName,
1273+
Definition: signalDefn,
1274+
Args: args,
1275+
Headers: signal.Headers)).ConfigureAwait(true);
1276+
}
1277+
finally
1278+
{
1279+
inProgressHandlers.Remove(inProgress);
1280+
}
12491281
}));
12501282
}
12511283

@@ -1394,7 +1426,9 @@ private string GetStackTrace()
13941426
}
13951427

13961428
private void ApplyCompletionCommandReordering(
1397-
WorkflowActivation act, WorkflowActivationCompletion completion)
1429+
WorkflowActivation act,
1430+
WorkflowActivationCompletion completion,
1431+
out bool workflowComplete)
13981432
{
13991433
// In earlier versions of the SDK we allowed commands to be sent after workflow
14001434
// completion. These ended up being removed effectively making the result of the
@@ -1404,40 +1438,42 @@ private void ApplyCompletionCommandReordering(
14041438
//
14051439
// Note this only applies for successful activations that don't have completion
14061440
// reordering disabled and that are either not replaying or have the flag set.
1407-
if (completion.Successful == null || disableCompletionCommandReordering)
1408-
{
1409-
return;
1410-
}
1411-
if (IsReplaying && !act.AvailableInternalFlags.Contains((uint)WorkflowLogicFlag.ReorderWorkflowCompletion))
1412-
{
1413-
return;
1414-
}
14151441

1416-
// We know we're on a newer SDK and can move completion to the end if we need to. First,
1417-
// find the completion command.
1442+
// Find the index of the completion command
14181443
var completionCommandIndex = -1;
1419-
for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--)
1444+
if (completion.Successful != null)
14201445
{
1421-
var cmd = completion.Successful.Commands[i];
1422-
// Set completion index if the command is a completion
1423-
if (cmd.CancelWorkflowExecution != null ||
1424-
cmd.CompleteWorkflowExecution != null ||
1425-
cmd.ContinueAsNewWorkflowExecution != null ||
1426-
cmd.FailWorkflowExecution != null)
1446+
for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--)
14271447
{
1428-
completionCommandIndex = i;
1429-
break;
1448+
var cmd = completion.Successful.Commands[i];
1449+
// Set completion index if the command is a completion
1450+
if (cmd.CancelWorkflowExecution != null ||
1451+
cmd.CompleteWorkflowExecution != null ||
1452+
cmd.ContinueAsNewWorkflowExecution != null ||
1453+
cmd.FailWorkflowExecution != null)
1454+
{
1455+
completionCommandIndex = i;
1456+
break;
1457+
}
14301458
}
14311459
}
1432-
1433-
// If there is no completion command or it's already at the end, nothing to do
1434-
if (completionCommandIndex == -1 ||
1435-
completionCommandIndex == completion.Successful.Commands.Count - 1)
1460+
workflowComplete = completionCommandIndex >= 0;
1461+
1462+
// This only applies for successful activations that have a completion not at the end,
1463+
// don't have completion reordering disabled, and that are either not replaying or have
1464+
// the flag set.
1465+
if (completion.Successful == null ||
1466+
completionCommandIndex == -1 ||
1467+
completionCommandIndex == completion.Successful.Commands.Count - 1 ||
1468+
disableCompletionCommandReordering ||
1469+
(IsReplaying && !act.AvailableInternalFlags.Contains(
1470+
(uint)WorkflowLogicFlag.ReorderWorkflowCompletion)))
14361471
{
14371472
return;
14381473
}
14391474

1440-
// Now we know the completion is in the wrong spot, so set the SDK flag and move it
1475+
// Now we know the completion is in the wrong spot and we're on a newer SDK, so set the
1476+
// SDK flag and move it
14411477
completion.Successful.UsedInternalFlags.Add((uint)WorkflowLogicFlag.ReorderWorkflowCompletion);
14421478
var compCmd = completion.Successful.Commands[completionCommandIndex];
14431479
completion.Successful.Commands.RemoveAt(completionCommandIndex);
@@ -2230,5 +2266,86 @@ public override Task SignalAsync(
22302266
public override Task CancelAsync() =>
22312267
instance.outbound.Value.CancelExternalWorkflowAsync(new(Id: Id, RunId: RunId));
22322268
}
2269+
2270+
private class Handlers : LinkedList<Handlers.Handler>
2271+
{
2272+
#pragma warning disable SA1118 // We're ok w/ string literals spanning lines
2273+
private static readonly Action<ILogger, string, WarnableSignals, Exception?> SignalWarning =
2274+
LoggerMessage.Define<string, WarnableSignals>(
2275+
LogLevel.Warning,
2276+
0,
2277+
"Workflow {Id} finished while signal handlers are still running. This may " +
2278+
"have interrupted work that the signal handler was doing. You can wait for " +
2279+
"all update and signal handlers to complete by using `await " +
2280+
"Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)`. " +
2281+
"Alternatively, if both you and the clients sending the signal are okay with " +
2282+
"interrupting running handlers when the workflow finishes, and causing " +
2283+
"clients to receive errors, then you can disable this warning via the signal " +
2284+
"handler attribute: " +
2285+
"`[WorkflowSignal(UnfinishedPolicy=HandlerUnfinishedPolicy.Abandon)]`. The " +
2286+
"following signals were unfinished (and warnings were not disabled for their " +
2287+
"handler): {Handlers}");
2288+
2289+
private static readonly Action<ILogger, string, WarnableUpdates, Exception?> UpdateWarning =
2290+
LoggerMessage.Define<string, WarnableUpdates>(
2291+
LogLevel.Warning,
2292+
0,
2293+
"Workflow {Id} finished while update handlers are still running. This may " +
2294+
"have interrupted work that the update handler was doing, and the client " +
2295+
"that sent the update will receive a 'workflow execution already completed' " +
2296+
"RpcException instead of the update result. You can wait for all update and " +
2297+
"signal handlers to complete by using `await " +
2298+
"Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)`. " +
2299+
"Alternatively, if both you and the clients sending the update are okay with " +
2300+
"interrupting running handlers when the workflow finishes, and causing " +
2301+
"clients to receive errors, then you can disable this warning via the update " +
2302+
"handler attribute: " +
2303+
"`[WorkflowUpdate(UnfinishedPolicy=HandlerUnfinishedPolicy.Abandon)]`. The " +
2304+
"following updates were unfinished (and warnings were not disabled for their " +
2305+
"handler): {Handlers}");
2306+
#pragma warning restore SA1118
2307+
2308+
public void WarnIfAnyLeftOver(string id, ILogger logger)
2309+
{
2310+
var signals = this.
2311+
Where(h => h.UpdateId == null && h.UnfinishedPolicy == HandlerUnfinishedPolicy.WarnAndAbandon).
2312+
GroupBy(h => h.Name).
2313+
Select(h => (h.Key, h.Count())).
2314+
ToArray();
2315+
if (signals.Length > 0)
2316+
{
2317+
SignalWarning(logger, id, new WarnableSignals { NamesAndCounts = signals }, null);
2318+
}
2319+
var updates = this.
2320+
Where(h => h.UpdateId != null && h.UnfinishedPolicy == HandlerUnfinishedPolicy.WarnAndAbandon).
2321+
Select(h => (h.Name, h.UpdateId!)).
2322+
ToArray();
2323+
if (updates.Length > 0)
2324+
{
2325+
UpdateWarning(logger, id, new WarnableUpdates { NamesAndIds = updates }, null);
2326+
}
2327+
}
2328+
2329+
public readonly struct WarnableSignals
2330+
{
2331+
public (string, int)[] NamesAndCounts { get; init; }
2332+
2333+
public override string ToString() => JsonSerializer.Serialize(
2334+
NamesAndCounts.Select(v => new { name = v.Item1, count = v.Item2 }).ToArray());
2335+
}
2336+
2337+
public readonly struct WarnableUpdates
2338+
{
2339+
public (string, string)[] NamesAndIds { get; init; }
2340+
2341+
public override string ToString() => JsonSerializer.Serialize(
2342+
NamesAndIds.Select(v => new { name = v.Item1, id = v.Item2 }).ToArray());
2343+
}
2344+
2345+
public record Handler(
2346+
string Name,
2347+
string? UpdateId,
2348+
HandlerUnfinishedPolicy UnfinishedPolicy);
2349+
}
22332350
}
22342351
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
namespace Temporalio.Workflows
2+
{
3+
/// <summary>
4+
/// Actions taken if a workflow terminates with running handlers.
5+
/// </summary>
6+
/// <remarks>
7+
/// Policy defining actions taken when a workflow exits while update or signal handlers are
8+
/// running. The workflow exit may be due to successful return, failure, cancellation, or
9+
/// continue-as-new.
10+
/// </remarks>
11+
public enum HandlerUnfinishedPolicy
12+
{
13+
/// <summary>
14+
/// Issue a warning in addition to abandoning.
15+
/// </summary>
16+
WarnAndAbandon,
17+
18+
/// <summary>
19+
/// Abandon the handler.
20+
/// </summary>
21+
/// <remarks>
22+
/// In the case of an update handler this means that the client will receive an error rather
23+
/// than the update result.
24+
/// </remarks>
25+
Abandon,
26+
}
27+
}

src/Temporalio/Workflows/IWorkflowContext.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ namespace Temporalio.Workflows
1313
/// </summary>
1414
internal interface IWorkflowContext
1515
{
16+
/// <summary>
17+
/// Gets a value indicating whether <see cref="Workflow.AllHandlersFinished" /> is true.
18+
/// </summary>
19+
bool AllHandlersFinished { get; }
20+
1621
/// <summary>
1722
/// Gets value for <see cref="Workflow.CancellationToken" />.
1823
/// </summary>

src/Temporalio/Workflows/Workflow.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ namespace Temporalio.Workflows
1818
/// </summary>
1919
public static class Workflow
2020
{
21+
/// <summary>
22+
/// Gets a value indicating whether all update and signal handlers have finished executing.
23+
/// </summary>
24+
/// <remarks>
25+
/// Consider waiting on this condition before workflow return or continue-as-new, to prevent
26+
/// interruption of in-progress handlers by workflow return:
27+
/// <c>await Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)</c>.
28+
/// </remarks>
29+
public static bool AllHandlersFinished => Context.AllHandlersFinished;
30+
2131
/// <summary>
2232
/// Gets the cancellation token for the workflow.
2333
/// </summary>

src/Temporalio/Workflows/WorkflowSignalAttribute.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,11 @@ public WorkflowSignalAttribute(string name)
4343
/// an array of <see cref="Converters.IRawValue" />.
4444
/// </summary>
4545
public bool Dynamic { get; set; }
46+
47+
/// <summary>
48+
/// Gets or sets the actions taken if a workflow exits with a running instance of this
49+
/// handler. Default is <see cref="HandlerUnfinishedPolicy.WarnAndAbandon" />.
50+
/// </summary>
51+
public HandlerUnfinishedPolicy UnfinishedPolicy { get; set; } = HandlerUnfinishedPolicy.WarnAndAbandon;
4652
}
4753
}

src/Temporalio/Workflows/WorkflowSignalDefinition.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,16 @@ public class WorkflowSignalDefinition
1212
{
1313
private static readonly ConcurrentDictionary<MethodInfo, WorkflowSignalDefinition> Definitions = new();
1414

15-
private WorkflowSignalDefinition(string? name, MethodInfo? method, Delegate? del)
15+
private WorkflowSignalDefinition(
16+
string? name,
17+
MethodInfo? method,
18+
Delegate? del,
19+
HandlerUnfinishedPolicy unfinishedPolicy)
1620
{
1721
Name = name;
1822
Method = method;
1923
Delegate = del;
24+
UnfinishedPolicy = unfinishedPolicy;
2025
}
2126

2227
/// <summary>
@@ -39,6 +44,11 @@ private WorkflowSignalDefinition(string? name, MethodInfo? method, Delegate? del
3944
/// </summary>
4045
internal Delegate? Delegate { get; private init; }
4146

47+
/// <summary>
48+
/// Gets the unfinished policy.
49+
/// </summary>
50+
internal HandlerUnfinishedPolicy UnfinishedPolicy { get; private init; }
51+
4252
/// <summary>
4353
/// Get a signal definition from a method or fail. The result is cached.
4454
/// </summary>
@@ -63,12 +73,16 @@ public static WorkflowSignalDefinition FromMethod(MethodInfo method)
6373
/// </summary>
6474
/// <param name="name">Signal name. Null for dynamic signal.</param>
6575
/// <param name="del">Signal delegate.</param>
76+
/// <param name="unfinishedPolicy">Actions taken if a workflow exits with a running instance
77+
/// of this handler.</param>
6678
/// <returns>Signal definition.</returns>
6779
public static WorkflowSignalDefinition CreateWithoutAttribute(
68-
string? name, Delegate del)
80+
string? name,
81+
Delegate del,
82+
HandlerUnfinishedPolicy unfinishedPolicy = HandlerUnfinishedPolicy.WarnAndAbandon)
6983
{
7084
AssertValid(del.Method, dynamic: name == null);
71-
return new(name, null, del);
85+
return new(name, null, del, unfinishedPolicy);
7286
}
7387

7488
/// <summary>
@@ -103,7 +117,7 @@ private static WorkflowSignalDefinition CreateFromMethod(MethodInfo method)
103117
name = name.Substring(0, name.Length - 5);
104118
}
105119
}
106-
return new(name, method, null);
120+
return new(name, method, null, attr.UnfinishedPolicy);
107121
}
108122

109123
private static void AssertValid(MethodInfo method, bool dynamic)

0 commit comments

Comments
 (0)