Skip to content

Commit 2400644

Browse files
authored
Support DisableEagerActivityExecution option (#366)
Fixes #365
1 parent c30a2db commit 2400644

File tree

8 files changed

+43
-5
lines changed

8 files changed

+43
-5
lines changed

src/Temporalio/Worker/TemporalWorker.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
8989
OnTaskStarting: options.OnTaskStarting,
9090
OnTaskCompleted: options.OnTaskCompleted,
9191
RuntimeMetricMeter: MetricMeter,
92-
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes));
92+
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes,
93+
DisableEagerActivityExecution: options.DisableEagerActivityExecution));
9394
}
9495
}
9596

src/Temporalio/Worker/TemporalWorkerOptions.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,21 @@ public TemporalWorkerOptions()
283283
/// </remarks>
284284
public WorkerTuner? Tuner { get; set; }
285285

286+
/// <summary>
287+
/// Gets or sets a value indicating whether eager activity executions will be disabled from
288+
/// a workflow.
289+
/// </summary>
290+
/// <remarks>
291+
/// Eager activity execution is an optimization on some servers that sends activities back
292+
/// to the same worker as the calling workflow if they can run there.
293+
/// </remarks>
294+
/// <remarks>
295+
/// This should be set to <c>true</c> for <see cref="MaxTaskQueueActivitiesPerSecond" /> to
296+
/// work and in a future version of this API may be implied as such (i.e. this setting will
297+
/// be ignored if that setting is set).
298+
/// </remarks>
299+
public bool DisableEagerActivityExecution { get; set; }
300+
286301
/// <summary>
287302
/// Gets the TEMPORAL_DEBUG environment variable.
288303
/// </summary>

src/Temporalio/Worker/WorkflowInstance.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
6969
private readonly Action<WorkflowInstance> onTaskStarting;
7070
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted;
7171
private readonly IReadOnlyCollection<Type>? workerLevelFailureExceptionTypes;
72+
private readonly bool disableEagerActivityExecution;
7273
private readonly Handlers inProgressHandlers = new();
7374
private WorkflowActivationCompletion? completion;
7475
// Will be set to null after last use (i.e. when workflow actually started)
@@ -190,6 +191,7 @@ public WorkflowInstance(WorkflowInstanceDetails details)
190191
Random = new(details.Start.RandomnessSeed);
191192
TracingEventsEnabled = !details.DisableTracingEvents;
192193
workerLevelFailureExceptionTypes = details.WorkerLevelFailureExceptionTypes;
194+
disableEagerActivityExecution = details.DisableEagerActivityExecution;
193195
}
194196

195197
/// <summary>
@@ -1756,6 +1758,7 @@ public override Task<TResult> ScheduleActivityAsync<TResult>(
17561758
Arguments = { instance.PayloadConverter.ToPayloads(input.Args) },
17571759
RetryPolicy = input.Options.RetryPolicy?.ToProto(),
17581760
CancellationType = (Bridge.Api.WorkflowCommands.ActivityCancellationType)input.Options.CancellationType,
1761+
DoNotEagerlyExecute = instance.disableEagerActivityExecution || input.Options.DisableEagerActivityExecution,
17591762
};
17601763
if (input.Headers is IDictionary<string, Payload> headers)
17611764
{

src/Temporalio/Worker/WorkflowInstanceDetails.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ namespace Temporalio.Worker
2626
/// <param name="OnTaskCompleted">Callback for every instance task complete.</param>
2727
/// <param name="RuntimeMetricMeter">Lazy runtime-level metric meter.</param>
2828
/// <param name="WorkerLevelFailureExceptionTypes">Failure exception types at worker level.</param>
29+
/// <param name="DisableEagerActivityExecution">Whether to disable eager at the worker level.</param>
2930
internal record WorkflowInstanceDetails(
3031
string Namespace,
3132
string TaskQueue,
@@ -41,5 +42,6 @@ internal record WorkflowInstanceDetails(
4142
Action<WorkflowInstance> OnTaskStarting,
4243
Action<WorkflowInstance, Exception?> OnTaskCompleted,
4344
Lazy<MetricMeter> RuntimeMetricMeter,
44-
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes);
45+
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
46+
bool DisableEagerActivityExecution);
4547
}

src/Temporalio/Worker/WorkflowReplayer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ public WorkflowHistoryRunner(WorkflowReplayerOptions options, bool throwOnReplay
173173
OnTaskStarting: options.OnTaskStarting,
174174
OnTaskCompleted: options.OnTaskCompleted,
175175
RuntimeMetricMeter: new(() => runtime.MetricMeter),
176-
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes),
176+
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes,
177+
DisableEagerActivityExecution: false),
177178
(runId, removeFromCache) => SetResult(removeFromCache));
178179
}
179180
catch

src/Temporalio/Worker/WorkflowWorker.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,8 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act)
283283
OnTaskStarting: options.OnTaskStarting,
284284
OnTaskCompleted: options.OnTaskCompleted,
285285
RuntimeMetricMeter: options.RuntimeMetricMeter,
286-
WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes));
286+
WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes,
287+
DisableEagerActivityExecution: options.DisableEagerActivityExecution));
287288
}
288289
}
289290
}

src/Temporalio/Worker/WorkflowWorkerOptions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ internal record WorkflowWorkerOptions(
2121
Action<WorkflowInstance> OnTaskStarting,
2222
Action<WorkflowInstance, Exception?> OnTaskCompleted,
2323
Lazy<MetricMeter> RuntimeMetricMeter,
24-
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes);
24+
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
25+
bool DisableEagerActivityExecution);
2526
}

src/Temporalio/Workflows/ActivityOptions.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,20 @@ public class ActivityOptions : ICloneable
8585
/// </summary>
8686
public VersioningIntent VersioningIntent { get; set; } = VersioningIntent.Unspecified;
8787

88+
/// <summary>
89+
/// Gets or sets a value indicating whether eager activity execution will be disabled for
90+
/// this activity.
91+
/// </summary>
92+
/// <remarks>
93+
/// Eager activity execution is an optimization on some servers that sends activities back
94+
/// to the same worker as the calling workflow if they can run there.
95+
/// </remarks>
96+
/// <remarks>
97+
/// If <c>false</c> (the default), eager execution may still be disabled at the worker level
98+
/// or may not be requested due to lack of available slots.
99+
/// </remarks>
100+
public bool DisableEagerActivityExecution { get; set; }
101+
88102
/// <summary>
89103
/// Create a shallow copy of these options.
90104
/// </summary>

0 commit comments

Comments
 (0)