Skip to content

Commit 6dbf752

Browse files
authored
Serialization context for converters and codecs (#446)
Fixes #438
1 parent 409e53f commit 6dbf752

33 files changed

+1405
-273
lines changed

src/Temporalio/Client/AsyncActivityHandle.cs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Threading.Tasks;
3+
using Temporalio.Converters;
34

45
namespace Temporalio.Client
56
{
@@ -10,7 +11,13 @@ namespace Temporalio.Client
1011
/// <param name="Activity">Reference to the activity for this handle.</param>
1112
public record AsyncActivityHandle(
1213
ITemporalClient Client, AsyncActivityHandle.Reference Activity)
14+
: IWithSerializationContext<AsyncActivityHandle>
1315
{
16+
/// <summary>
17+
/// Gets or inits the data converter that will be used instead of the clients if set.
18+
/// </summary>
19+
public DataConverter? DataConverterOverride { get; init; }
20+
1421
/// <summary>
1522
/// Issue a heartbeat for this activity.
1623
/// </summary>
@@ -22,7 +29,7 @@ public record AsyncActivityHandle(
2229
/// </exception>
2330
public Task HeartbeatAsync(AsyncActivityHeartbeatOptions? options = null) =>
2431
Client.OutboundInterceptor.HeartbeatAsyncActivityAsync(new(
25-
Activity: Activity, Options: options));
32+
Activity: Activity, Options: options, DataConverterOverride: DataConverterOverride));
2633

2734
/// <summary>
2835
/// Complete this activity.
@@ -33,7 +40,7 @@ public Task HeartbeatAsync(AsyncActivityHeartbeatOptions? options = null) =>
3340
public Task CompleteAsync(
3441
object? result = null, AsyncActivityCompleteOptions? options = null) =>
3542
Client.OutboundInterceptor.CompleteAsyncActivityAsync(new(
36-
Activity: Activity, Result: result, Options: options));
43+
Activity: Activity, Result: result, Options: options, DataConverterOverride: DataConverterOverride));
3744

3845
/// <summary>
3946
/// Fail this activity.
@@ -43,7 +50,7 @@ public Task CompleteAsync(
4350
/// <returns>Completion task.</returns>
4451
public Task FailAsync(Exception exception, AsyncActivityFailOptions? options = null) =>
4552
Client.OutboundInterceptor.FailAsyncActivityAsync(new(
46-
Activity: Activity, Exception: exception, Options: options));
53+
Activity: Activity, Exception: exception, Options: options, DataConverterOverride: DataConverterOverride));
4754

4855
/// <summary>
4956
/// Report this activity as cancelled.
@@ -53,7 +60,25 @@ public Task FailAsync(Exception exception, AsyncActivityFailOptions? options = n
5360
public Task ReportCancellationAsync(
5461
AsyncActivityReportCancellationOptions? options = null) =>
5562
Client.OutboundInterceptor.ReportCancellationAsyncActivityAsync(new(
56-
Activity: Activity, Options: options));
63+
Activity: Activity, Options: options, DataConverterOverride: DataConverterOverride));
64+
65+
/// <summary>
66+
/// If the data converter supports customizing based on serialization context, recreate this
67+
/// handle with a data converter override using the given context.
68+
/// </summary>
69+
/// <param name="context">Context to provide to data converter.</param>
70+
/// <returns>New handle if context supported on data converter, same handle otherwise.</returns>
71+
public AsyncActivityHandle WithSerializationContext(ISerializationContext context)
72+
{
73+
var converter = DataConverterOverride ?? Client.Options.DataConverter;
74+
var newConverter = converter.WithSerializationContext(context);
75+
// Don't do anything if same object
76+
if (ReferenceEquals(converter, newConverter))
77+
{
78+
return this;
79+
}
80+
return this with { DataConverterOverride = newConverter };
81+
}
5782

5883
/// <summary>
5984
/// Reference to an existing activity.
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Temporalio.Converters;
2+
13
namespace Temporalio.Client.Interceptors
24
{
35
/// <summary>
@@ -6,12 +8,14 @@ namespace Temporalio.Client.Interceptors
68
/// <param name="Activity">Activity to complete.</param>
79
/// <param name="Result">Result.</param>
810
/// <param name="Options">Options passed in to complete.</param>
11+
/// <param name="DataConverterOverride">Data converter to use instead of client one.</param>
912
/// <remarks>
1013
/// WARNING: This constructor may have required properties added. Do not rely on the exact
1114
/// constructor, only use "with" clauses.
1215
/// </remarks>
1316
public record CompleteAsyncActivityInput(
1417
AsyncActivityHandle.Reference Activity,
1518
object? Result,
16-
AsyncActivityCompleteOptions? Options);
19+
AsyncActivityCompleteOptions? Options,
20+
DataConverter? DataConverterOverride = null);
1721
}

src/Temporalio/Client/Interceptors/FailAsyncActivityInput.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using Temporalio.Converters;
23

34
namespace Temporalio.Client.Interceptors
45
{
@@ -8,12 +9,14 @@ namespace Temporalio.Client.Interceptors
89
/// <param name="Activity">Activity to fail.</param>
910
/// <param name="Exception">Exception.</param>
1011
/// <param name="Options">Options passed in to fail.</param>
12+
/// <param name="DataConverterOverride">Data converter to use instead of client one.</param>
1113
/// <remarks>
1214
/// WARNING: This constructor may have required properties added. Do not rely on the exact
1315
/// constructor, only use "with" clauses.
1416
/// </remarks>
1517
public record FailAsyncActivityInput(
1618
AsyncActivityHandle.Reference Activity,
1719
Exception Exception,
18-
AsyncActivityFailOptions? Options);
20+
AsyncActivityFailOptions? Options,
21+
DataConverter? DataConverterOverride = null);
1922
}
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
1+
using Temporalio.Converters;
2+
13
namespace Temporalio.Client.Interceptors
24
{
35
/// <summary>
46
/// Input for <see cref="ClientOutboundInterceptor.HeartbeatAsyncActivityAsync" />.
57
/// </summary>
68
/// <param name="Activity">Activity to heartbeat.</param>
79
/// <param name="Options">Options passed in to heartbeat.</param>
10+
/// <param name="DataConverterOverride">Data converter to use instead of client one.</param>
811
/// <remarks>
912
/// WARNING: This constructor may have required properties added. Do not rely on the exact
1013
/// constructor, only use "with" clauses.
1114
/// </remarks>
1215
public record HeartbeatAsyncActivityInput(
1316
AsyncActivityHandle.Reference Activity,
14-
AsyncActivityHeartbeatOptions? Options);
17+
AsyncActivityHeartbeatOptions? Options,
18+
DataConverter? DataConverterOverride = null);
1519
}
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
1+
using Temporalio.Converters;
2+
13
namespace Temporalio.Client.Interceptors
24
{
35
/// <summary>
46
/// Input for <see cref="ClientOutboundInterceptor.ReportCancellationAsyncActivityAsync" />.
57
/// </summary>
68
/// <param name="Activity">Activity to report cancellation.</param>
79
/// <param name="Options">Options passed in to report cancellation.</param>
10+
/// <param name="DataConverterOverride">Data converter to use instead of client one.</param>
811
/// <remarks>
912
/// WARNING: This constructor may have required properties added. Do not rely on the exact
1013
/// constructor, only use "with" clauses.
1114
/// </remarks>
1215
public record ReportCancellationAsyncActivityInput(
1316
AsyncActivityHandle.Reference Activity,
14-
AsyncActivityReportCancellationOptions? Options);
17+
AsyncActivityReportCancellationOptions? Options,
18+
DataConverter? DataConverterOverride = null);
1519
}

src/Temporalio/Client/Schedules/Schedule.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ public record Schedule(
2828
/// Convert from proto.
2929
/// </summary>
3030
/// <param name="proto">Proto.</param>
31+
/// <param name="clientNamespace">Client namespace.</param>
3132
/// <param name="dataConverter">Data converter.</param>
3233
/// <returns>Converted value.</returns>
3334
internal static async Task<Schedule> FromProtoAsync(
34-
Api.Schedule.V1.Schedule proto, DataConverter dataConverter) =>
35+
Api.Schedule.V1.Schedule proto, string clientNamespace, DataConverter dataConverter) =>
3536
new(
36-
Action: await ScheduleAction.FromProtoAsync(proto.Action, dataConverter).ConfigureAwait(false),
37+
Action: await ScheduleAction.FromProtoAsync(proto.Action, clientNamespace, dataConverter).ConfigureAwait(false),
3738
Spec: ScheduleSpec.FromProto(proto.Spec))
3839
{
3940
Policy = SchedulePolicy.FromProto(proto.Policies),
@@ -43,14 +44,16 @@ internal static async Task<Schedule> FromProtoAsync(
4344
/// <summary>
4445
/// Convert to proto.
4546
/// </summary>
47+
/// <param name="clientNamespace">Client namespace.</param>
4648
/// <param name="dataConverter">Data converter.</param>
4749
/// <returns>Proto.</returns>
48-
internal async Task<Api.Schedule.V1.Schedule> ToProtoAsync(DataConverter dataConverter) => new()
49-
{
50-
Spec = Spec.ToProto(),
51-
Action = await Action.ToProtoAsync(dataConverter).ConfigureAwait(false),
52-
Policies = Policy.ToProto(),
53-
State = State.ToProto(),
54-
};
50+
internal async Task<Api.Schedule.V1.Schedule> ToProtoAsync(
51+
string clientNamespace, DataConverter dataConverter) => new()
52+
{
53+
Spec = Spec.ToProto(),
54+
Action = await Action.ToProtoAsync(clientNamespace, dataConverter).ConfigureAwait(false),
55+
Policies = Policy.ToProto(),
56+
State = State.ToProto(),
57+
};
5558
}
5659
}

src/Temporalio/Client/Schedules/ScheduleAction.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@ public abstract record ScheduleAction
1414
/// Convert from proto.
1515
/// </summary>
1616
/// <param name="proto">Proto.</param>
17+
/// <param name="clientNamespace">Client namespace.</param>
1718
/// <param name="dataConverter">Data converter.</param>
1819
/// <returns>Converted value.</returns>
1920
internal static async Task<ScheduleAction> FromProtoAsync(
20-
Api.Schedule.V1.ScheduleAction proto, DataConverter dataConverter)
21+
Api.Schedule.V1.ScheduleAction proto, string clientNamespace, DataConverter dataConverter)
2122
{
2223
if (proto.StartWorkflow != null)
2324
{
2425
return await ScheduleActionStartWorkflow.FromProtoAsync(
25-
proto.StartWorkflow, dataConverter).ConfigureAwait(false);
26+
proto.StartWorkflow, clientNamespace, dataConverter).ConfigureAwait(false);
2627
}
2728
else
2829
{
@@ -33,8 +34,10 @@ internal static async Task<ScheduleAction> FromProtoAsync(
3334
/// <summary>
3435
/// Convert to proto.
3536
/// </summary>
37+
/// <param name="clientNamespace">Client namespace.</param>
3638
/// <param name="dataConverter">Data converter.</param>
3739
/// <returns>Proto.</returns>
38-
internal abstract Task<Api.Schedule.V1.ScheduleAction> ToProtoAsync(DataConverter dataConverter);
40+
internal abstract Task<Api.Schedule.V1.ScheduleAction> ToProtoAsync(
41+
string clientNamespace, DataConverter dataConverter);
3942
}
4043
}

src/Temporalio/Client/Schedules/ScheduleActionStartWorkflow.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,18 @@ public static ScheduleActionStartWorkflow Create(
8282
/// Convert from proto.
8383
/// </summary>
8484
/// <param name="proto">Proto.</param>
85+
/// <param name="clientNamespace">Client namespace.</param>
8586
/// <param name="dataConverter">Data converter.</param>
8687
/// <returns>Converted value.</returns>
8788
internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(
88-
Api.Workflow.V1.NewWorkflowExecutionInfo proto, DataConverter dataConverter)
89+
Api.Workflow.V1.NewWorkflowExecutionInfo proto, string clientNamespace, DataConverter dataConverter)
8990
{
91+
// Workflow-specific data converter
92+
dataConverter = dataConverter.WithSerializationContext(
93+
new ISerializationContext.Workflow(
94+
Namespace: clientNamespace,
95+
WorkflowId: proto.WorkflowId));
96+
9097
IReadOnlyCollection<object?> args = proto.Input == null ?
9198
Array.Empty<object?>() :
9299
proto.Input.Payloads_.Select(p => new EncodedRawValue(dataConverter, p)).ToList();
@@ -119,7 +126,7 @@ internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(
119126

120127
/// <inheritdoc />
121128
internal override async Task<Api.Schedule.V1.ScheduleAction> ToProtoAsync(
122-
DataConverter dataConverter)
129+
string clientNamespace, DataConverter dataConverter)
123130
{
124131
// Disallow some options
125132
if (Options.IdReusePolicy != Api.Enums.V1.WorkflowIdReusePolicy.AllowDuplicate)
@@ -142,6 +149,11 @@ internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(
142149
{
143150
throw new ArgumentException("RPC options cannot be set on scheduled workflow");
144151
}
152+
// Workflow-specific data converter
153+
dataConverter = dataConverter.WithSerializationContext(
154+
new ISerializationContext.Workflow(
155+
Namespace: clientNamespace,
156+
WorkflowId: Options.Id ?? throw new ArgumentException("ID required on workflow action")));
145157

146158
// Build input. We have to go one payload at a time here because half could be encoded
147159
// and half not (e.g. they just changed the second parameter).
@@ -156,8 +168,7 @@ internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(
156168

157169
var workflow = new Api.Workflow.V1.NewWorkflowExecutionInfo()
158170
{
159-
WorkflowId = Options.Id ??
160-
throw new ArgumentException("ID required on workflow action"),
171+
WorkflowId = Options.Id,
161172
WorkflowType = new() { Name = Workflow },
162173
TaskQueue = new()
163174
{

src/Temporalio/Client/Schedules/ScheduleDescription.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,17 @@ protected internal ScheduleDescription(
8686
/// </summary>
8787
/// <param name="id">ID.</param>
8888
/// <param name="rawDescription">Proto.</param>
89+
/// <param name="clientNamespace">Client namespace.</param>
8990
/// <param name="dataConverter">Converter.</param>
9091
/// <returns>Converted value.</returns>
9192
internal static async Task<ScheduleDescription> FromProtoAsync(
92-
string id, DescribeScheduleResponse rawDescription, DataConverter dataConverter) =>
93+
string id,
94+
DescribeScheduleResponse rawDescription,
95+
string clientNamespace,
96+
DataConverter dataConverter) =>
9397
new(
9498
id,
95-
await Schedule.FromProtoAsync(rawDescription.Schedule, dataConverter).ConfigureAwait(false),
99+
await Schedule.FromProtoAsync(rawDescription.Schedule, clientNamespace, dataConverter).ConfigureAwait(false),
96100
rawDescription,
97101
dataConverter);
98102
}

src/Temporalio/Client/TemporalClient.AsyncActivity.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ internal partial class Impl
2525
/// <inheritdoc />
2626
public override async Task HeartbeatAsyncActivityAsync(HeartbeatAsyncActivityInput input)
2727
{
28+
var converter = input.DataConverterOverride ?? Client.Options.DataConverter;
2829
Payloads? details = null;
2930
if (input.Options?.Details != null && input.Options.Details.Count > 0)
3031
{
3132
details = new()
3233
{
3334
Payloads_ =
3435
{
35-
await Client.Options.DataConverter.ToPayloadsAsync(
36-
input.Options.Details).ConfigureAwait(false),
36+
await converter.ToPayloadsAsync(input.Options.Details).ConfigureAwait(false),
3737
},
3838
};
3939
}
@@ -80,8 +80,8 @@ await Client.Options.DataConverter.ToPayloadsAsync(
8080
/// <inheritdoc />
8181
public override async Task CompleteAsyncActivityAsync(CompleteAsyncActivityInput input)
8282
{
83-
var result = await Client.Options.DataConverter.ToPayloadAsync(
84-
input.Result).ConfigureAwait(false);
83+
var converter = input.DataConverterOverride ?? Client.Options.DataConverter;
84+
var result = await converter.ToPayloadAsync(input.Result).ConfigureAwait(false);
8585
if (input.Activity is AsyncActivityHandle.IdReference idRef)
8686
{
8787
await Client.Connection.WorkflowService.RespondActivityTaskCompletedByIdAsync(
@@ -117,8 +117,8 @@ await Client.Connection.WorkflowService.RespondActivityTaskCompletedAsync(
117117
/// <inheritdoc />
118118
public override async Task FailAsyncActivityAsync(FailAsyncActivityInput input)
119119
{
120-
var failure = await Client.Options.DataConverter.ToFailureAsync(
121-
input.Exception).ConfigureAwait(false);
120+
var converter = input.DataConverterOverride ?? Client.Options.DataConverter;
121+
var failure = await converter.ToFailureAsync(input.Exception).ConfigureAwait(false);
122122
Payloads? lastHeartbeatDetails = null;
123123
if (input.Options?.LastHeartbeatDetails != null &&
124124
input.Options.LastHeartbeatDetails.Count > 0)
@@ -127,7 +127,7 @@ public override async Task FailAsyncActivityAsync(FailAsyncActivityInput input)
127127
{
128128
Payloads_ =
129129
{
130-
await Client.Options.DataConverter.ToPayloadsAsync(
130+
await converter.ToPayloadsAsync(
131131
input.Options.LastHeartbeatDetails).ConfigureAwait(false),
132132
},
133133
};
@@ -170,15 +170,15 @@ await Client.Connection.WorkflowService.RespondActivityTaskFailedAsync(
170170
public override async Task ReportCancellationAsyncActivityAsync(
171171
ReportCancellationAsyncActivityInput input)
172172
{
173+
var converter = input.DataConverterOverride ?? Client.Options.DataConverter;
173174
Payloads? details = null;
174175
if (input.Options?.Details != null && input.Options.Details.Count > 0)
175176
{
176177
details = new()
177178
{
178179
Payloads_ =
179180
{
180-
await Client.Options.DataConverter.ToPayloadsAsync(
181-
input.Options.Details).ConfigureAwait(false),
181+
await converter.ToPayloadsAsync(input.Options.Details).ConfigureAwait(false),
182182
},
183183
};
184184
}

0 commit comments

Comments
 (0)