Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 62 additions & 38 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,49 +1185,73 @@ private Task ApplyDoUpdateAsync(DoUpdate update)
return task.ContinueWith(
_ =>
{
inProgressHandlers.Remove(inProgress);
Copy link
Member Author

@cretz cretz Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR makes this a bit ugly, I recommend "hide whitespace" in the this diff view (i.e. https://github.com/temporalio/sdk-dotnet/pull/468/files?diff=unified&w=1)

// If workflow failure exception, it's an update failure. If it's some
// other exception, it's a task failure. Otherwise it's a success.
var exc = task.Exception?.InnerExceptions?.SingleOrDefault();
// There are .NET cases where cancellation occurs but is not considered
// an exception. We are going to make it an exception. Unfortunately
// there is no easy way to make it include the outer stack trace at this
// time.
if (exc == null && task.IsCanceled)
try
{
exc = new TaskCanceledException();
}
if (exc != null && IsWorkflowFailureException(exc))
{
AddCommand(new()
inProgressHandlers.Remove(inProgress);
// If workflow failure exception, it's an update failure. If it's some
// other exception, it's a task failure. Otherwise it's a success.
var exc = task.Exception?.InnerExceptions?.SingleOrDefault();
// There are .NET cases where cancellation occurs but is not considered
// an exception. We are going to make it an exception. Unfortunately
// there is no easy way to make it include the outer stack trace at this
// time.
if (exc == null && task.IsCanceled)
{
exc = new TaskCanceledException();
}
if (exc != null && IsWorkflowFailureException(exc))
{
UpdateResponse = new()
AddCommand(new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(exc, PayloadConverter),
},
});
}
else if (task.Exception is { } taskExc)
{
// Fails the task
currentActivationException =
taskExc.InnerExceptions.SingleOrDefault() ?? taskExc;
}
else
{
// Success, have to use reflection to extract value if it's a Task<>
var taskType = task.GetType();
var result = taskType.IsGenericType ?
taskType.GetProperty("Result")!.GetValue(task) : ValueTuple.Create();
AddCommand(new()
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(exc, PayloadConverter),
},
});
}
else if (task.Exception is { } taskExc)
{
UpdateResponse = new()
// Fails the task
currentActivationException =
taskExc.InnerExceptions.SingleOrDefault() ?? taskExc;
}
else
{
// Success, have to use reflection to extract value if it's a Task<>
try
{
ProtocolInstanceId = update.ProtocolInstanceId,
Completed = PayloadConverter.ToPayload(result),
},
});
var taskType = task.GetType();
var result = taskType.IsGenericType ?
taskType.GetProperty("Result")!.GetValue(task) : ValueTuple.Create();
AddCommand(new()
{
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Completed = PayloadConverter.ToPayload(result),
},
});
}
catch (Exception e) when (IsWorkflowFailureException(e))
{
// Payload conversion can fail with a fail-update exception
// instead of a fail-task exception. Any failure here does
// bubble to outer catch as task failure.
AddCommand(new()
{
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(e, PayloadConverter),
},
});
}
}
}
catch (Exception e)
{
currentActivationException = e;
}
return Task.CompletedTask;
},
Expand Down
81 changes: 77 additions & 4 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3081,7 +3081,7 @@ public class DynamicHandlersWorkflow
[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => false);

[WorkflowSignal]
[WorkflowUpdate]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix a mostly-unrelated flake where slow GH runners were not guaranteeing signal order as the test expected

public async Task SetHandlersAsync()
{
Workflow.DynamicSignal = WorkflowSignalDefinition.CreateWithoutAttribute(
Expand All @@ -3106,7 +3106,7 @@ public async Task SetHandlersAsync()
});
}

[WorkflowSignal]
[WorkflowUpdate]
public async Task UnsetHandlersAsync()
{
Workflow.DynamicSignal = null;
Expand Down Expand Up @@ -3138,7 +3138,7 @@ await ExecuteWorkerAsync<DynamicHandlersWorkflow>(

// Add handlers, and confirm all signals are drained to it and it handles
// queries/updates
await handle.SignalAsync(wf => wf.SetHandlersAsync());
await handle.ExecuteUpdateAsync(wf => wf.SetHandlersAsync());
await handle.SignalAsync("SomeSignal2", new[] { "signal arg 2" });
Assert.Equal(
"done", await handle.QueryAsync<string>("SomeQuery2", new[] { "query arg 2" }));
Expand All @@ -3156,7 +3156,7 @@ await ExecuteWorkerAsync<DynamicHandlersWorkflow>(
(await handle.QueryAsync(wf => wf.Events)).OrderBy(v => v).ToList());

// Remove handlers and confirm things go back to unhandled
await handle.SignalAsync(wf => wf.UnsetHandlersAsync());
await handle.ExecuteUpdateAsync(wf => wf.UnsetHandlersAsync());
await handle.SignalAsync("SomeSignal3", new[] { "signal arg 3" });
queryExc = await Assert.ThrowsAsync<WorkflowQueryFailedException>(
() => handle.QueryAsync<string>("SomeQuery3", new[] { "query arg 3" }));
Expand Down Expand Up @@ -4168,6 +4168,79 @@ await ExecuteWorkerAsync<ImmediatelyCompleteUpdateAndWorkflow>(
new(taskQueue) { MaxCachedWorkflows = 0 });
}

[Workflow]
public class UpdateBadResultSerializationWorkflow
{
public record MyUpdateResult(string Value);

[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => false);

[WorkflowUpdate]
public async Task<MyUpdateResult> DoUpdateAsync(string value) => new(value);
}

public class UpdateBadResultSerializationPayloadConverter : IPayloadConverter
{
public bool FailTask { get; set; }

public Payload ToPayload(object? value)
{
if (value is UpdateBadResultSerializationWorkflow.MyUpdateResult)
{
if (FailTask)
{
throw new InvalidOperationException("Intentionally failing task");
}
throw new ApplicationFailureException("Intentionally failing update");
}
return DataConverter.Default.PayloadConverter.ToPayload(value);
}

public object? ToValue(Payload payload, Type type) =>
DataConverter.Default.PayloadConverter.ToValue(payload, type);
}

[Fact]
public async Task ExecuteWorkflowAsync_Updates_BadResultSerialization()
{
// Client with failure payload converter
var converter = new UpdateBadResultSerializationPayloadConverter();
var newOptions = (TemporalClientOptions)Client.Options.Clone();
newOptions.DataConverter = DataConverter.Default with { PayloadConverter = converter };
var client = new TemporalClient(Client.Connection, newOptions);

// Update failure
await ExecuteWorkerAsync<UpdateBadResultSerializationWorkflow>(
async worker =>
{
var handle = await client.StartWorkflowAsync(
(UpdateBadResultSerializationWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
var exc = await Assert.ThrowsAsync<WorkflowUpdateFailedException>(() =>
handle.ExecuteUpdateAsync(wf => wf.DoUpdateAsync("some value")));
Assert.Equal(
"Intentionally failing update",
Assert.IsType<ApplicationFailureException>(exc.InnerException).Message);
},
client: client);

// Task failure
converter.FailTask = true;
await ExecuteWorkerAsync<UpdateBadResultSerializationWorkflow>(
async worker =>
{
var handle = await client.StartWorkflowAsync(
(UpdateBadResultSerializationWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
_ = Task.Run(() => handle.ExecuteUpdateAsync(wf => wf.DoUpdateAsync("some value")));
await AssertMore.HasEventEventuallyAsync(
handle,
e => e.WorkflowTaskFailedEventAttributes?.Failure?.Message == "Intentionally failing task");
},
client: client);
}

[Workflow]
public class CurrentBuildIdWorkflow
{
Expand Down
Loading