Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/Temporalio.Extensions.Hosting/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,48 @@ start instead of expecting a `ITemporalClient` to be present on the service coll
Some users may prefer to manually create the `TemporalWorker` without using host support, but still make their
activities created via the service provider. `CreateTemporalActivityDefinitions` extension methods are present on
`IServiceProvider` that will return a collection of `ActivityDefinition` instances for each activity on the type. These
can be added to the `TemporalWorkerOptions` directly.
can be added to the `TemporalWorkerOptions` directly.

## Worker Client Refresh

Some users may need to update the worker's connection to Temporal. It's desirable to do this without stopping the worker entirely, as that will evict the sticky workflow cache.

This can be done by using the `IWorkerClientUpdater`.

```csharp
using Temporalio.Extensions.Hosting;

var builder = Host.CreateApplicationBuilder(args);

// Register a worker client updater.
builder.Services.AddSingleton<IWorkerClientUpdater, WorkerClientUpdater>();

// Add a hosted Temporal worker which returns a builder to add activities and workflows, along with the worker client updater.
builder.Services.
AddHostedTemporalWorker(
"my-temporal-host:7233",
"my-namespace",
"my-task-queue").
AddScopedActivities<MyActivityClass>().
AddWorkflow<MyWorkflow>().
ConfigureOptions().
Configure<IWorkerClientUpdater>((options, workerClientUpdater) => options.WorkerClientUpdater = workerClientUpdater);

var host = builder.Build();

// You can have a BackgroundService periodically refresh the worker client like this.
IWorkerClientUpdater workerClientUpdater = host.Services.GetRequiredService<IWorkerClientUpdater>();

// Can update the TLS options if you need.
TemporalClientConnectOptions clientConnectOptions = new("my-other-temporal-host:7233")
{
Namespace = "default"
};

ITemporalClient updatedClient = await TemporalClient.ConnectAsync(clientConnectOptions).ConfigureAwait(false);

workerClientUpdater.UpdateWorkerClient(updatedClient);

// Make sure you use RunAsync and not Run, see https://github.com/temporalio/sdk-dotnet/issues/220
await host.RunAsync();
```
21 changes: 21 additions & 0 deletions src/Temporalio/Worker/IWorkerClientUpdater.cs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any of this is needed in the Temporalio project. This is a generic host problem only. Those making workers directly can just use the Client setter. I also don't think an interface is needed unless you really think it's important for testing (even then not sure the event handler needs to be part of the interface). Also, even though it's a mouthful, I'd call this TemporalWorkerClientUpdater w/ the Temporal prefix to match the worker class name for discoverability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, done.

I don't think a lack of an interface hampers testing, since the implementation has no dependencies

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;

namespace Temporalio.Worker
{
/// <summary>
/// Represents a notification hub that can be used to push worker client updates to subscribing workers.
/// </summary>
public interface IWorkerClientUpdater
{
/// <summary>
/// The event for when a worker client update is broadcast.
/// </summary>
event EventHandler<WorkerClientUpdatedEventArgs> OnWorkerClientUpdated;

/// <summary>
/// Dispatches a notification to all subscribers that a new worker client should be used.
/// </summary>
/// <param name="client">The new <see cref="IWorkerClient"/> that should be pushed out to all subscribing workers.</param>
void UpdateWorkerClient(IWorkerClient client);
}
}
23 changes: 22 additions & 1 deletion src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
WorkflowTracingEventListener.Instance.Register();
}

if (Options.WorkerClientUpdater != null)
{
Options.WorkerClientUpdater.OnWorkerClientUpdated += OnWorkerClientUpdated;
}

// Create workers
if (options.Activities.Count > 0)
{
Expand Down Expand Up @@ -108,7 +113,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
/// </summary>
/// <remarks>
/// When this property is set, it actually replaces the underlying client that is being used
/// by the worker. This means the next calls by the worker to Temporal (e.g. responding
/// by the worker. This means subsequent calls by the worker to Temporal (e.g. responding
/// task completion, activity heartbeat, etc) will be on this new client, but outstanding
/// calls will not be immediately interrupted.
/// </remarks>
Expand Down Expand Up @@ -245,6 +250,16 @@ public void Dispose()
GC.SuppressFinalize(this);
}

/// <summary>
/// Callback invoked when a worker client updated is pushed through the <see cref="IWorkerClientUpdater"/>.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="eventArgs">The <see cref="WorkerClientUpdatedEventArgs"/> of the event.</param>
internal void OnWorkerClientUpdated(object? sender, WorkerClientUpdatedEventArgs eventArgs)
{
Client = eventArgs.WorkerClient;
}

/// <summary>
/// Dispose the worker.
/// </summary>
Expand All @@ -253,8 +268,14 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (Options.WorkerClientUpdater != null)
{
Options.WorkerClientUpdater.OnWorkerClientUpdated -= OnWorkerClientUpdated;
}

activityWorker?.Dispose();
BridgeWorker.Dispose();

// Remove task tracing if not disabled and there are workflows present
if (workflowTracingEventListenerEnabled)
{
Expand Down
5 changes: 5 additions & 0 deletions src/Temporalio/Worker/TemporalWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public TemporalWorkerOptions()
/// </remarks>
public WorkflowStackTrace WorkflowStackTrace { get; set; } = WorkflowStackTrace.None;

/// <summary>
/// Gets or sets the <see cref="IWorkerClientUpdater"/> that can be used to push updates to the worker client.
/// </summary>
public IWorkerClientUpdater? WorkerClientUpdater { get; set; }

/// <summary>
/// Gets the TEMPORAL_DEBUG environment variable.
/// </summary>
Expand Down
22 changes: 22 additions & 0 deletions src/Temporalio/Worker/WorkerClientUpdatedEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using Temporalio.Client;

namespace Temporalio.Worker
{
/// <summary>
/// Event raised when a worker client is updated.
/// </summary>
public class WorkerClientUpdatedEventArgs : EventArgs
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkerClientUpdatedEventArgs"/> class.
/// </summary>
/// <param name="workerClient">The client to update workers with.</param>
public WorkerClientUpdatedEventArgs(IWorkerClient workerClient) => WorkerClient = workerClient;

/// <summary>
/// Gets the <see cref="ITemporalClient"/> that will be propagated to all event listeners.
/// </summary>
public IWorkerClient WorkerClient { get; }
}
}
44 changes: 44 additions & 0 deletions src/Temporalio/Worker/WorkerClientUpdater.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;

namespace Temporalio.Worker
{
/// <summary>
/// Implementation of the <see cref="IWorkerClientUpdater"/>.
/// </summary>
public class WorkerClientUpdater : IWorkerClientUpdater
{
private readonly object clientLock = new();

/// <summary>
/// The <see cref="EventHandler"/> used to dispatch notifications to subscribers that the client was updated.
/// </summary>
public event EventHandler<WorkerClientUpdatedEventArgs> OnWorkerClientUpdated
{
add
{
lock (clientLock)
{
WorkerClientUpdatedEvent += value;
}
}

remove
{
lock (clientLock)
{
WorkerClientUpdatedEvent -= value;
}
}
}

private event EventHandler<WorkerClientUpdatedEventArgs>? WorkerClientUpdatedEvent;

/// <inheritdoc />
public void UpdateWorkerClient(IWorkerClient client)
{
WorkerClientUpdatedEventArgs eventArgs = new WorkerClientUpdatedEventArgs(client);

WorkerClientUpdatedEvent?.Invoke(this, eventArgs);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using Temporalio.Tests.Worker;
using Temporalio.Worker;

#pragma warning disable SA1201, SA1204 // We want to have classes near their tests
namespace Temporalio.Tests.Extensions.Hosting;

Expand Down Expand Up @@ -79,11 +82,11 @@ public async Task TemporalWorkerService_ExecuteAsync_SimpleWorker()
// Add the rest of the services
services.
AddSingleton<ILoggerFactory>(loggerFactory).
AddScoped<DatabaseClient>().
// We are also adding the DB client as a keyed service to demonstrate keyed service
// support for our DI logic. This used to break because newer DI library versions
// disallowed accessing certain properties on keyed services which we access
// internally for dupe checks.
AddScoped<DatabaseClient>().
// We are also adding the DB client as a keyed service to demonstrate keyed service
// support for our DI logic. This used to break because newer DI library versions
// disallowed accessing certain properties on keyed services which we access
// internally for dupe checks.
AddKeyedScoped<DatabaseClient>("client-keyed").
AddHostedTemporalWorker(taskQueue).
AddScopedActivities<DatabaseActivities>().
Expand Down Expand Up @@ -206,6 +209,70 @@ public async Task TemporalWorkerService_ExecuteAsync_MultipleWorkers()
["scoped-other2"] = $"tq: {taskQueue2}, counter: 6",
},
result);
}

[Fact]
public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient()
{
// We are going to start a second ephemeral server and then replace the client. So we will
// start a no-cache ticking workflow with the current client and confirm it has accomplished
// at least one task. Then we will start another on the other client, and confirm it gets
// started too. Then we will terminate both. We have to use a ticking workflow with only one
// poller to force a quick re-poll to recognize our client change quickly (as opposed to
// just waiting the minute for poll timeout).
await using var otherEnv = await Temporalio.Testing.WorkflowEnvironment.StartLocalAsync();

// Start both workflows on different servers
var taskQueue = $"tq-{Guid.NewGuid()}";
var handle1 = await Client.StartWorkflowAsync(
(WorkflowWorkerTests.TickingWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));
var handle2 = await otherEnv.Client.StartWorkflowAsync(
(WorkflowWorkerTests.TickingWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));

var bld = Host.CreateApplicationBuilder();

WorkerClientUpdater workerClientUpdater = new WorkerClientUpdater();

// Register the worker client updater.
bld.Services.AddSingleton<IWorkerClientUpdater>(workerClientUpdater);

// Add the first worker with the workflow and client already DI'd, and add the worker client updater.
bld.Services.
AddSingleton(Client).
AddHostedTemporalWorker(taskQueue).
AddWorkflow<WorkflowWorkerTests.TickingWorkflow>()
.ConfigureOptions()
.Configure<IWorkerClientUpdater>((options, updater) =>
{
options.WorkerClientUpdater = updater;
options.MaxCachedWorkflows = 0;
options.MaxConcurrentWorkflowTaskPolls = 1;
});

// Start the host
using var tokenSource = new CancellationTokenSource();
using var host = bld.Build();
var hostTask = Task.Run(() => host.RunAsync(tokenSource.Token));

// Confirm the first ticking workflow has completed a task but not the second workflow
await handle1.AssertHasEventEventuallyAsync(e => e.WorkflowTaskCompletedEventAttributes != null);
await foreach (var evt in handle2.FetchHistoryEventsAsync())
{
Assert.Null(evt.WorkflowTaskCompletedEventAttributes);
}

// Now replace the client, which should be used fairly quickly because we should have
// timer-done poll completions every 100ms
workerClientUpdater.UpdateWorkerClient(otherEnv.Client);

// Now confirm the other workflow has started
await handle1.AssertHasEventEventuallyAsync(e => e.WorkflowTaskCompletedEventAttributes != null);

// Terminate both
await handle1.TerminateAsync();
await handle2.TerminateAsync();
}

[Workflow("Workflow")]
Expand Down
Loading