Skip to content

Commit f99bca3

Browse files
authored
💥 Fix how serialization context is applied in workflows [MINOR COMPAT BREAK] (#525)
Fixes #523
1 parent 8db923c commit f99bca3

File tree

8 files changed

+426
-235
lines changed

8 files changed

+426
-235
lines changed

‎src/Temporalio/Activities/ActivityExecutionContext.cs‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ public CancellationToken CancellationToken
125125
/// <summary>
126126
/// Gets the payload converter in use by this activity worker.
127127
/// </summary>
128+
/// <remarks>
129+
/// If the original converter supported serialization contexts, this is the converter with
130+
/// the activity serialization context applied.
131+
/// </remarks>
128132
public IPayloadConverter PayloadConverter { get; private init; }
129133

130134
/// <summary>

‎src/Temporalio/Worker/WorkflowCodecHelper.cs‎

Lines changed: 174 additions & 113 deletions
Large diffs are not rendered by default.

‎src/Temporalio/Worker/WorkflowInstance.cs‎

Lines changed: 60 additions & 39 deletions
Large diffs are not rendered by default.

‎src/Temporalio/Worker/WorkflowInstanceDetails.cs‎

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ namespace Temporalio.Worker
1717
/// <param name="InitialActivation">Initial activation for the workflow.</param>
1818
/// <param name="Init">Start attributes for the workflow.</param>
1919
/// <param name="Interceptors">Interceptors.</param>
20-
/// <param name="PayloadConverter">Payload converter.</param>
21-
/// <param name="FailureConverter">Failure converter.</param>
20+
/// <param name="PayloadConverterNoContext">Payload converter with no context.</param>
21+
/// <param name="PayloadConverterWorkflowContext">Payload converter with workflow context.</param>
22+
/// <param name="FailureConverterNoContext">Failure converter with no context.</param>
23+
/// <param name="FailureConverterWorkflowContext">Failure converter with workflow context.</param>
2224
/// <param name="LoggerFactory">Logger factory.</param>
2325
/// <param name="DisableTracingEvents">Whether tracing events are disabled.</param>
2426
/// <param name="WorkflowStackTrace">Option for workflow stack trace.</param>
@@ -35,8 +37,10 @@ internal record WorkflowInstanceDetails(
3537
WorkflowActivation InitialActivation,
3638
InitializeWorkflow Init,
3739
IReadOnlyCollection<Interceptors.IWorkerInterceptor> Interceptors,
38-
IPayloadConverter PayloadConverter,
39-
IFailureConverter FailureConverter,
40+
IPayloadConverter PayloadConverterNoContext,
41+
IPayloadConverter PayloadConverterWorkflowContext,
42+
IFailureConverter FailureConverterNoContext,
43+
IFailureConverter FailureConverterWorkflowContext,
4044
ILoggerFactory LoggerFactory,
4145
bool DisableTracingEvents,
4246
WorkflowStackTrace WorkflowStackTrace,

‎src/Temporalio/Worker/WorkflowWorker.cs‎

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -140,43 +140,50 @@ private async Task HandleActivationAsync(WorkflowActivation act)
140140
}
141141

142142
WorkflowActivationCompletion comp;
143-
DataConverter dataConverter = options.DataConverter;
143+
DataConverter dataConverterNoContext = options.DataConverter;
144+
DataConverter? dataConverterWorkflowContext = null;
144145
WorkflowCodecHelper.WorkflowCodecContext? codecContext = null;
145146

146147
// Catch any exception as a completion failure
147148
try
148149
{
149150
// Create data converter with context before doing any work
151+
string workflowId, workflowType;
152+
IWorkflowCodecHelperInstance? instanceForCodec;
150153
if (runningWorkflows.TryGetValue(act.RunId, out var inst))
151154
{
152-
codecContext = new(
153-
Namespace: options.Namespace,
154-
WorkflowId: inst.Info.WorkflowId,
155-
WorkflowType: inst.Info.WorkflowType,
156-
TaskQueue: options.TaskQueue,
157-
Instance: inst);
155+
workflowId = inst.Info.WorkflowId;
156+
workflowType = inst.Info.WorkflowType;
157+
instanceForCodec = inst;
158158
}
159159
else if (act.Jobs.Select(j => j.InitializeWorkflow).FirstOrDefault(s => s != null) is { } initJob)
160160
{
161-
codecContext = new(
162-
Namespace: options.Namespace,
163-
WorkflowId: initJob.WorkflowId,
164-
WorkflowType: initJob.WorkflowType,
165-
TaskQueue: options.TaskQueue,
166-
Instance: null);
161+
workflowId = initJob.WorkflowId;
162+
workflowType = initJob.WorkflowType;
163+
instanceForCodec = null;
167164
}
168165
else
169166
{
170167
throw new InvalidOperationException("Missing workflow start (unexpectedly evicted?)");
171168
}
172-
dataConverter = dataConverter.WithSerializationContext(
173-
new ISerializationContext.Workflow(
174-
Namespace: codecContext.Namespace, WorkflowId: codecContext.WorkflowId));
175-
176-
// Decode the activation if there is a codec
177-
if (dataConverter.PayloadCodec is { } decodeCodec)
169+
dataConverterWorkflowContext = dataConverterNoContext.WithSerializationContext(
170+
new ISerializationContext.Workflow(options.Namespace, WorkflowId: workflowId));
171+
// We'll only apply codec if one of the two converters has one
172+
if (dataConverterNoContext.PayloadCodec != null ||
173+
dataConverterWorkflowContext.PayloadCodec != null)
178174
{
179-
await WorkflowCodecHelper.DecodeAsync(decodeCodec, codecContext, act).ConfigureAwait(false);
175+
codecContext = new(
176+
CodecNoContext: dataConverterNoContext.PayloadCodec,
177+
CodecWorkflowContext: dataConverterWorkflowContext.PayloadCodec,
178+
Namespace: options.Namespace,
179+
WorkflowId: workflowId,
180+
WorkflowType: workflowType,
181+
TaskQueue: options.TaskQueue,
182+
Instance: instanceForCodec);
183+
}
184+
if (codecContext != null)
185+
{
186+
await WorkflowCodecHelper.DecodeAsync(codecContext, act).ConfigureAwait(false);
180187
}
181188

182189
// Log proto at trace level
@@ -191,8 +198,12 @@ private async Task HandleActivationAsync(WorkflowActivation act)
191198

192199
// If the workflow is not yet running, create it. We know that we will only get
193200
// one activation per workflow at a time, so GetOrAdd is safe for our use.
194-
var workflow = runningWorkflows.GetOrAdd(act.RunId, _ => CreateInstance(act, dataConverter));
195-
codecContext = codecContext with { Instance = workflow };
201+
var workflow = runningWorkflows.GetOrAdd(act.RunId, _ => CreateInstance(
202+
act, dataConverterNoContext, dataConverterWorkflowContext));
203+
if (codecContext != null)
204+
{
205+
codecContext = codecContext with { Instance = workflow };
206+
}
196207

197208
// Activate or timeout with deadlock timeout
198209
// TODO(cretz): Any reason for users to need to customize factory here?
@@ -230,9 +241,10 @@ private async Task HandleActivationAsync(WorkflowActivation act)
230241
comp = new() { Failed = new() };
231242
try
232243
{
233-
// Failure converter needs to be in workflow context
234-
comp.Failed.Failure_ = dataConverter.FailureConverter.ToFailure(
235-
e, dataConverter.PayloadConverter);
244+
// Failure converter needs to be in workflow context if available
245+
var dataConverterForFailure = dataConverterWorkflowContext ?? dataConverterNoContext;
246+
comp.Failed.Failure_ = dataConverterForFailure.FailureConverter.ToFailure(
247+
e, dataConverterForFailure.PayloadConverter);
236248
}
237249
catch (Exception inner)
238250
{
@@ -244,12 +256,12 @@ private async Task HandleActivationAsync(WorkflowActivation act)
244256
// Always set the run ID of the completion
245257
comp.RunId = act.RunId;
246258

247-
// Encode the completion if there is a codec
248-
if (dataConverter.PayloadCodec is { } encodeCodec && codecContext is { } encodeContext)
259+
// Encode the completion if there is a codec context
260+
if (codecContext != null)
249261
{
250262
try
251263
{
252-
await WorkflowCodecHelper.EncodeAsync(encodeCodec, encodeContext, comp).ConfigureAwait(false);
264+
await WorkflowCodecHelper.EncodeAsync(codecContext, comp).ConfigureAwait(false);
253265
}
254266
catch (Exception e)
255267
{
@@ -313,7 +325,10 @@ private async Task HandleCacheEvictionAsync(WorkflowActivation act, RemoveFromCa
313325
}
314326
}
315327

316-
private IWorkflowInstance CreateInstance(WorkflowActivation act, DataConverter dataConverter)
328+
private IWorkflowInstance CreateInstance(
329+
WorkflowActivation act,
330+
DataConverter dataConverterNoContext,
331+
DataConverter dataConverterWorkflowContext)
317332
{
318333
var init = act.Jobs.Select(j => j.InitializeWorkflow).FirstOrDefault(s => s != null) ??
319334
throw new InvalidOperationException("Missing workflow start (unexpectedly evicted?)");
@@ -336,8 +351,10 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act, DataConverter d
336351
InitialActivation: act,
337352
Init: init,
338353
Interceptors: options.Interceptors,
339-
PayloadConverter: dataConverter.PayloadConverter,
340-
FailureConverter: dataConverter.FailureConverter,
354+
PayloadConverterNoContext: dataConverterNoContext.PayloadConverter,
355+
PayloadConverterWorkflowContext: dataConverterWorkflowContext.PayloadConverter,
356+
FailureConverterNoContext: dataConverterNoContext.FailureConverter,
357+
FailureConverterWorkflowContext: dataConverterWorkflowContext.FailureConverter,
341358
LoggerFactory: options.LoggerFactory,
342359
DisableTracingEvents: options.DisableWorkflowTracingEventListener,
343360
WorkflowStackTrace: options.WorkflowStackTrace,

‎src/Temporalio/Workflows/Workflow.cs‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ public static WorkflowUpdateDefinition? DynamicUpdate
182182
/// <summary>
183183
/// Gets the payload converter for the workflow.
184184
/// </summary>
185+
/// <remarks>
186+
/// If the original converter supported serialization contexts, this is the converter with
187+
/// the workflow serialization context applied.
188+
/// </remarks>
185189
public static IPayloadConverter PayloadConverter => Context.PayloadConverter;
186190

187191
/// <summary>

‎tests/Temporalio.Tests/Worker/WorkflowCodecHelperTests.cs‎

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,6 @@ public WorkflowCodecHelperTests(ITestOutputHelper output)
1717
{
1818
}
1919

20-
internal static WorkflowCodecHelper.WorkflowCodecContext SimpleCodecContext { get; } = new(
21-
Namespace: "my-namespace",
22-
WorkflowId: "my-workflow-id",
23-
WorkflowType: "my-workflow-type",
24-
TaskQueue: "my-task-queue",
25-
Instance: null);
26-
2720
[Fact]
2821
public async Task CreateAndVisitPayload_Visiting_ReachesAllExpectedValues()
2922
{
@@ -58,7 +51,7 @@ await CreateAndVisitPayload(new(), comp, async (ctx, payload) =>
5851
Assert.DoesNotContain("encoded", payload().Metadata.Keys);
5952
foreach (var codec in codecs)
6053
{
61-
await WorkflowCodecHelper.EncodeAsync(codec, SimpleCodecContext, comp);
54+
await WorkflowCodecHelper.EncodeAsync(CreateSimpleCodecContext(codec), comp);
6255
if (!payload().Metadata.ContainsKey("encoded"))
6356
{
6457
Assert.Fail($"Payload at path {ctx.Path} not encoded with codec {codec}");
@@ -82,7 +75,7 @@ await CreateAndVisitPayload(new(), act, async (ctx, payload) =>
8275
Assert.DoesNotContain("decoded", payload().Metadata.Keys);
8376
foreach (var codec in codecs)
8477
{
85-
await WorkflowCodecHelper.DecodeAsync(codec, SimpleCodecContext, act);
78+
await WorkflowCodecHelper.DecodeAsync(CreateSimpleCodecContext(codec), act);
8679
if (!payload().Metadata.ContainsKey("decoded"))
8780
{
8881
Assert.Fail($"Payload at path {ctx.Path} not decoded with codec {codec}");
@@ -106,11 +99,20 @@ await CreateAndVisitPayload(new(), comp, async (ctx, payload) =>
10699
if (propInfo?.PropertyType == typeof(Payload))
107100
{
108101
propInfo.SetValue(msg, null);
109-
await WorkflowCodecHelper.EncodeAsync(codec, SimpleCodecContext, comp);
102+
await WorkflowCodecHelper.EncodeAsync(CreateSimpleCodecContext(codec), comp);
110103
}
111104
});
112105
}
113106

107+
private static WorkflowCodecHelper.WorkflowCodecContext CreateSimpleCodecContext(IPayloadCodec codec) => new(
108+
CodecNoContext: codec,
109+
CodecWorkflowContext: codec,
110+
Namespace: "my-namespace",
111+
WorkflowId: "my-workflow-id",
112+
WorkflowType: "my-workflow-type",
113+
TaskQueue: "my-task-queue",
114+
Instance: null);
115+
114116
// Creates payloads as needed, null context if already seen
115117
private static async Task CreateAndVisitPayload(
116118
PayloadVisitContext ctx, IMessage current, Func<PayloadVisitContext, Func<Payload>, Task> visitor)

0 commit comments

Comments
 (0)