Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions src/Temporalio/Client/AsyncActivityHandle.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Temporalio.Converters;

namespace Temporalio.Client
{
Expand All @@ -10,7 +11,13 @@ namespace Temporalio.Client
/// <param name="Activity">Reference to the activity for this handle.</param>
public record AsyncActivityHandle(
ITemporalClient Client, AsyncActivityHandle.Reference Activity)
: IWithSerializationContext<AsyncActivityHandle>
{
/// <summary>
/// Gets or inits the data converter that will be used instead of the clients if set.
/// </summary>
public DataConverter? DataConverterOverride { get; init; }

/// <summary>
/// Issue a heartbeat for this activity.
/// </summary>
Expand All @@ -22,7 +29,7 @@ public record AsyncActivityHandle(
/// </exception>
public Task HeartbeatAsync(AsyncActivityHeartbeatOptions? options = null) =>
Client.OutboundInterceptor.HeartbeatAsyncActivityAsync(new(
Activity: Activity, Options: options));
Activity: Activity, Options: options, DataConverterOverride: DataConverterOverride));

/// <summary>
/// Complete this activity.
Expand All @@ -33,7 +40,7 @@ public Task HeartbeatAsync(AsyncActivityHeartbeatOptions? options = null) =>
public Task CompleteAsync(
object? result = null, AsyncActivityCompleteOptions? options = null) =>
Client.OutboundInterceptor.CompleteAsyncActivityAsync(new(
Activity: Activity, Result: result, Options: options));
Activity: Activity, Result: result, Options: options, DataConverterOverride: DataConverterOverride));

/// <summary>
/// Fail this activity.
Expand All @@ -43,7 +50,7 @@ public Task CompleteAsync(
/// <returns>Completion task.</returns>
public Task FailAsync(Exception exception, AsyncActivityFailOptions? options = null) =>
Client.OutboundInterceptor.FailAsyncActivityAsync(new(
Activity: Activity, Exception: exception, Options: options));
Activity: Activity, Exception: exception, Options: options, DataConverterOverride: DataConverterOverride));

/// <summary>
/// Report this activity as cancelled.
Expand All @@ -53,7 +60,25 @@ public Task FailAsync(Exception exception, AsyncActivityFailOptions? options = n
public Task ReportCancellationAsync(
AsyncActivityReportCancellationOptions? options = null) =>
Client.OutboundInterceptor.ReportCancellationAsyncActivityAsync(new(
Activity: Activity, Options: options));
Activity: Activity, Options: options, DataConverterOverride: DataConverterOverride));

/// <summary>
/// If the data converter supports customizing based on serialization context, recreate this
/// handle with a data converter override using the given context.
/// </summary>
/// <param name="context">Context to provide to data converter.</param>
/// <returns>New handle if context supported on data converter, same handle otherwise.</returns>
public AsyncActivityHandle WithSerializationContext(ISerializationContext context)
{
var converter = DataConverterOverride ?? Client.Options.DataConverter;
var newConverter = converter.WithSerializationContext(context);
// Don't do anything if same object
if (ReferenceEquals(converter, newConverter))
{
return this;
}
return this with { DataConverterOverride = newConverter };
}

/// <summary>
/// Reference to an existing activity.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Temporalio.Converters;

namespace Temporalio.Client.Interceptors
{
/// <summary>
Expand All @@ -6,12 +8,14 @@ namespace Temporalio.Client.Interceptors
/// <param name="Activity">Activity to complete.</param>
/// <param name="Result">Result.</param>
/// <param name="Options">Options passed in to complete.</param>
/// <param name="DataConverterOverride">Data converter to use instead of client one.</param>
/// <remarks>
/// WARNING: This constructor may have required properties added. Do not rely on the exact
/// constructor, only use "with" clauses.
/// </remarks>
public record CompleteAsyncActivityInput(
AsyncActivityHandle.Reference Activity,
object? Result,
AsyncActivityCompleteOptions? Options);
AsyncActivityCompleteOptions? Options,
DataConverter? DataConverterOverride = null);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Temporalio.Converters;

namespace Temporalio.Client.Interceptors
{
Expand All @@ -8,12 +9,14 @@ namespace Temporalio.Client.Interceptors
/// <param name="Activity">Activity to fail.</param>
/// <param name="Exception">Exception.</param>
/// <param name="Options">Options passed in to fail.</param>
/// <param name="DataConverterOverride">Data converter to use instead of client one.</param>
/// <remarks>
/// WARNING: This constructor may have required properties added. Do not rely on the exact
/// constructor, only use "with" clauses.
/// </remarks>
public record FailAsyncActivityInput(
AsyncActivityHandle.Reference Activity,
Exception Exception,
AsyncActivityFailOptions? Options);
AsyncActivityFailOptions? Options,
DataConverter? DataConverterOverride = null);
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
using Temporalio.Converters;

namespace Temporalio.Client.Interceptors
{
/// <summary>
/// Input for <see cref="ClientOutboundInterceptor.HeartbeatAsyncActivityAsync" />.
/// </summary>
/// <param name="Activity">Activity to heartbeat.</param>
/// <param name="Options">Options passed in to heartbeat.</param>
/// <param name="DataConverterOverride">Data converter to use instead of client one.</param>
/// <remarks>
/// WARNING: This constructor may have required properties added. Do not rely on the exact
/// constructor, only use "with" clauses.
/// </remarks>
public record HeartbeatAsyncActivityInput(
AsyncActivityHandle.Reference Activity,
AsyncActivityHeartbeatOptions? Options);
AsyncActivityHeartbeatOptions? Options,
DataConverter? DataConverterOverride = null);
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
using Temporalio.Converters;

namespace Temporalio.Client.Interceptors
{
/// <summary>
/// Input for <see cref="ClientOutboundInterceptor.ReportCancellationAsyncActivityAsync" />.
/// </summary>
/// <param name="Activity">Activity to report cancellation.</param>
/// <param name="Options">Options passed in to report cancellation.</param>
/// <param name="DataConverterOverride">Data converter to use instead of client one.</param>
/// <remarks>
/// WARNING: This constructor may have required properties added. Do not rely on the exact
/// constructor, only use "with" clauses.
/// </remarks>
public record ReportCancellationAsyncActivityInput(
AsyncActivityHandle.Reference Activity,
AsyncActivityReportCancellationOptions? Options);
AsyncActivityReportCancellationOptions? Options,
DataConverter? DataConverterOverride = null);
}
21 changes: 12 additions & 9 deletions src/Temporalio/Client/Schedules/Schedule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ public record Schedule(
/// Convert from proto.
/// </summary>
/// <param name="proto">Proto.</param>
/// <param name="clientNamespace">Client namespace.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Converted value.</returns>
internal static async Task<Schedule> FromProtoAsync(
Api.Schedule.V1.Schedule proto, DataConverter dataConverter) =>
Api.Schedule.V1.Schedule proto, string clientNamespace, DataConverter dataConverter) =>
new(
Action: await ScheduleAction.FromProtoAsync(proto.Action, dataConverter).ConfigureAwait(false),
Action: await ScheduleAction.FromProtoAsync(proto.Action, clientNamespace, dataConverter).ConfigureAwait(false),
Spec: ScheduleSpec.FromProto(proto.Spec))
{
Policy = SchedulePolicy.FromProto(proto.Policies),
Expand All @@ -43,14 +44,16 @@ internal static async Task<Schedule> FromProtoAsync(
/// <summary>
/// Convert to proto.
/// </summary>
/// <param name="clientNamespace">Client namespace.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Proto.</returns>
internal async Task<Api.Schedule.V1.Schedule> ToProtoAsync(DataConverter dataConverter) => new()
{
Spec = Spec.ToProto(),
Action = await Action.ToProtoAsync(dataConverter).ConfigureAwait(false),
Policies = Policy.ToProto(),
State = State.ToProto(),
};
internal async Task<Api.Schedule.V1.Schedule> ToProtoAsync(
string clientNamespace, DataConverter dataConverter) => new()
{
Spec = Spec.ToProto(),
Action = await Action.ToProtoAsync(clientNamespace, dataConverter).ConfigureAwait(false),
Policies = Policy.ToProto(),
State = State.ToProto(),
};
}
}
9 changes: 6 additions & 3 deletions src/Temporalio/Client/Schedules/ScheduleAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ public abstract record ScheduleAction
/// Convert from proto.
/// </summary>
/// <param name="proto">Proto.</param>
/// <param name="clientNamespace">Client namespace.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Converted value.</returns>
internal static async Task<ScheduleAction> FromProtoAsync(
Api.Schedule.V1.ScheduleAction proto, DataConverter dataConverter)
Api.Schedule.V1.ScheduleAction proto, string clientNamespace, DataConverter dataConverter)
{
if (proto.StartWorkflow != null)
{
return await ScheduleActionStartWorkflow.FromProtoAsync(
proto.StartWorkflow, dataConverter).ConfigureAwait(false);
proto.StartWorkflow, clientNamespace, dataConverter).ConfigureAwait(false);
}
else
{
Expand All @@ -33,8 +34,10 @@ internal static async Task<ScheduleAction> FromProtoAsync(
/// <summary>
/// Convert to proto.
/// </summary>
/// <param name="clientNamespace">Client namespace.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Proto.</returns>
internal abstract Task<Api.Schedule.V1.ScheduleAction> ToProtoAsync(DataConverter dataConverter);
internal abstract Task<Api.Schedule.V1.ScheduleAction> ToProtoAsync(
string clientNamespace, DataConverter dataConverter);
}
}
19 changes: 15 additions & 4 deletions src/Temporalio/Client/Schedules/ScheduleActionStartWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,18 @@ public static ScheduleActionStartWorkflow Create(
/// Convert from proto.
/// </summary>
/// <param name="proto">Proto.</param>
/// <param name="clientNamespace">Client namespace.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Converted value.</returns>
internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(
Api.Workflow.V1.NewWorkflowExecutionInfo proto, DataConverter dataConverter)
Api.Workflow.V1.NewWorkflowExecutionInfo proto, string clientNamespace, DataConverter dataConverter)
{
// Workflow-specific data converter
dataConverter = dataConverter.WithSerializationContext(
new ISerializationContext.Workflow(
Namespace: clientNamespace,
WorkflowId: proto.WorkflowId));

IReadOnlyCollection<object?> args = proto.Input == null ?
Array.Empty<object?>() :
proto.Input.Payloads_.Select(p => new EncodedRawValue(dataConverter, p)).ToList();
Expand Down Expand Up @@ -119,7 +126,7 @@ internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(

/// <inheritdoc />
internal override async Task<Api.Schedule.V1.ScheduleAction> ToProtoAsync(
DataConverter dataConverter)
string clientNamespace, DataConverter dataConverter)
{
// Disallow some options
if (Options.IdReusePolicy != Api.Enums.V1.WorkflowIdReusePolicy.AllowDuplicate)
Expand All @@ -142,6 +149,11 @@ internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(
{
throw new ArgumentException("RPC options cannot be set on scheduled workflow");
}
// Workflow-specific data converter
dataConverter = dataConverter.WithSerializationContext(
new ISerializationContext.Workflow(
Namespace: clientNamespace,
WorkflowId: Options.Id ?? throw new ArgumentException("ID required on workflow action")));

// Build input. We have to go one payload at a time here because half could be encoded
// and half not (e.g. they just changed the second parameter).
Expand All @@ -156,8 +168,7 @@ internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(

var workflow = new Api.Workflow.V1.NewWorkflowExecutionInfo()
{
WorkflowId = Options.Id ??
throw new ArgumentException("ID required on workflow action"),
WorkflowId = Options.Id,
WorkflowType = new() { Name = Workflow },
TaskQueue = new()
{
Expand Down
8 changes: 6 additions & 2 deletions src/Temporalio/Client/Schedules/ScheduleDescription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,17 @@ private ScheduleDescription(
/// </summary>
/// <param name="id">ID.</param>
/// <param name="rawDescription">Proto.</param>
/// <param name="clientNamespace">Client namespace.</param>
/// <param name="dataConverter">Converter.</param>
/// <returns>Converted value.</returns>
internal static async Task<ScheduleDescription> FromProtoAsync(
string id, DescribeScheduleResponse rawDescription, DataConverter dataConverter) =>
string id,
DescribeScheduleResponse rawDescription,
string clientNamespace,
DataConverter dataConverter) =>
new(
id,
await Schedule.FromProtoAsync(rawDescription.Schedule, dataConverter).ConfigureAwait(false),
await Schedule.FromProtoAsync(rawDescription.Schedule, clientNamespace, dataConverter).ConfigureAwait(false),
rawDescription,
dataConverter);
}
Expand Down
18 changes: 9 additions & 9 deletions src/Temporalio/Client/TemporalClient.AsyncActivity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ internal partial class Impl
/// <inheritdoc />
public override async Task HeartbeatAsyncActivityAsync(HeartbeatAsyncActivityInput input)
{
var converter = input.DataConverterOverride ?? Client.Options.DataConverter;
Payloads? details = null;
if (input.Options?.Details != null && input.Options.Details.Count > 0)
{
details = new()
{
Payloads_ =
{
await Client.Options.DataConverter.ToPayloadsAsync(
input.Options.Details).ConfigureAwait(false),
await converter.ToPayloadsAsync(input.Options.Details).ConfigureAwait(false),
},
};
}
Expand Down Expand Up @@ -80,8 +80,8 @@ await Client.Options.DataConverter.ToPayloadsAsync(
/// <inheritdoc />
public override async Task CompleteAsyncActivityAsync(CompleteAsyncActivityInput input)
{
var result = await Client.Options.DataConverter.ToPayloadAsync(
input.Result).ConfigureAwait(false);
var converter = input.DataConverterOverride ?? Client.Options.DataConverter;
var result = await converter.ToPayloadAsync(input.Result).ConfigureAwait(false);
if (input.Activity is AsyncActivityHandle.IdReference idRef)
{
await Client.Connection.WorkflowService.RespondActivityTaskCompletedByIdAsync(
Expand Down Expand Up @@ -117,8 +117,8 @@ await Client.Connection.WorkflowService.RespondActivityTaskCompletedAsync(
/// <inheritdoc />
public override async Task FailAsyncActivityAsync(FailAsyncActivityInput input)
{
var failure = await Client.Options.DataConverter.ToFailureAsync(
input.Exception).ConfigureAwait(false);
var converter = input.DataConverterOverride ?? Client.Options.DataConverter;
var failure = await converter.ToFailureAsync(input.Exception).ConfigureAwait(false);
Payloads? lastHeartbeatDetails = null;
if (input.Options?.LastHeartbeatDetails != null &&
input.Options.LastHeartbeatDetails.Count > 0)
Expand All @@ -127,7 +127,7 @@ public override async Task FailAsyncActivityAsync(FailAsyncActivityInput input)
{
Payloads_ =
{
await Client.Options.DataConverter.ToPayloadsAsync(
await converter.ToPayloadsAsync(
input.Options.LastHeartbeatDetails).ConfigureAwait(false),
},
};
Expand Down Expand Up @@ -170,15 +170,15 @@ await Client.Connection.WorkflowService.RespondActivityTaskFailedAsync(
public override async Task ReportCancellationAsyncActivityAsync(
ReportCancellationAsyncActivityInput input)
{
var converter = input.DataConverterOverride ?? Client.Options.DataConverter;
Payloads? details = null;
if (input.Options?.Details != null && input.Options.Details.Count > 0)
{
details = new()
{
Payloads_ =
{
await Client.Options.DataConverter.ToPayloadsAsync(
input.Options.Details).ConfigureAwait(false),
await converter.ToPayloadsAsync(input.Options.Details).ConfigureAwait(false),
},
};
}
Expand Down
Loading
Loading