Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/Temporalio.Extensions.Hosting/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,48 @@ start instead of expecting a `ITemporalClient` to be present on the service coll
Some users may prefer to manually create the `TemporalWorker` without using host support, but still make their
activities created via the service provider. `CreateTemporalActivityDefinitions` extension methods are present on
`IServiceProvider` that will return a collection of `ActivityDefinition` instances for each activity on the type. These
can be added to the `TemporalWorkerOptions` directly.
can be added to the `TemporalWorkerOptions` directly.

## Worker Client Refresh

Some users may need to update the worker's connection to Temporal. It's desirable to do this without stopping the worker entirely, as that will evict the sticky workflow cache.

This can be done by using the `IWorkerClientUpdater`.

```csharp
using Temporalio.Extensions.Hosting;

var builder = Host.CreateApplicationBuilder(args);

// Register a worker client updater.
builder.Services.AddSingleton<TemporalWorkerClientUpdater>();

// Add a hosted Temporal worker which returns a builder to add activities and workflows, along with the worker client updater.
builder.Services.
AddHostedTemporalWorker(
"my-temporal-host:7233",
"my-namespace",
"my-task-queue").
AddScopedActivities<MyActivityClass>().
AddWorkflow<MyWorkflow>().
ConfigureOptions().
Configure<TemporalWorkerClientUpdater>((options, workerClientUpdater) => options.WorkerClientUpdater = workerClientUpdater);

var host = builder.Build();

// You can have a BackgroundService periodically refresh the worker client like this.
TemporalWorkerClientUpdater workerClientUpdater = host.Services.GetRequiredService<TemporalWorkerClientUpdater>();

// Can update the TLS options if you need.
TemporalClientConnectOptions clientConnectOptions = new("my-other-temporal-host:7233")
{
Namespace = "default"
};

ITemporalClient updatedClient = await TemporalClient.ConnectAsync(clientConnectOptions).ConfigureAwait(false);

workerClientUpdater.UpdateTemporalWorkerClient(updatedClient);

// Make sure you use RunAsync and not Run, see https://github.com/temporalio/sdk-dotnet/issues/220
await host.RunAsync();
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using Temporalio.Worker;

namespace Temporalio.Extensions.Hosting
{
/// <summary>
/// Subscribes to the <see cref="TemporalWorkerClientUpdater"/>, and propagates the worker client changes to the Temporal worker.
/// </summary>
public class TemporalWorkerClientUpdateSubscriber : IDisposable
{
private readonly TemporalWorkerClientUpdater temporalWorkerClientUpdater;
private readonly TemporalWorker worker;
private bool disposedValue;

/// <summary>
/// Initializes a new instance of the <see cref="TemporalWorkerClientUpdateSubscriber"/> class.
/// </summary>
/// <param name="temporalWorkerClientUpdater">The optional <see cref="TemporalWorkerClientUpdater"/> used to subscribe to updates.</param>
/// <param name="worker">The <see cref="TemporalWorker"/> that will be updated when the worker client updates.</param>
public TemporalWorkerClientUpdateSubscriber(
TemporalWorkerClientUpdater temporalWorkerClientUpdater,
TemporalWorker worker)
{
this.temporalWorkerClientUpdater = temporalWorkerClientUpdater;
this.worker = worker;
this.temporalWorkerClientUpdater.TemporalWorkerClientUpdated += OnTemporalWorkerClientUpdated;
}

/// <summary>
/// Finalizes an instance of the <see cref="TemporalWorkerClientUpdateSubscriber"/> class.
/// </summary>
~TemporalWorkerClientUpdateSubscriber() => Dispose(false);

/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Unsubscribes from the worker client updater if one exists.
/// </summary>
/// <param name="disposing">If set to <see langword="true"/>, the worker will unsubscribe from the worker client updater.</param>
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
temporalWorkerClientUpdater.TemporalWorkerClientUpdated -= OnTemporalWorkerClientUpdated;
}

disposedValue = true;
}
}

/// <summary>
/// Callback invoked when a worker client updated is pushed through the <see cref="TemporalWorkerClientUpdater"/>.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="eventArgs">The <see cref="TemporalWorkerClientUpdatedEventArgs"/> of the event.</param>
private void OnTemporalWorkerClientUpdated(object? sender, TemporalWorkerClientUpdatedEventArgs eventArgs)
{
worker.Client = eventArgs.WorkerClient;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using Temporalio.Worker;

namespace Temporalio.Extensions.Hosting
{
/// <summary>
/// Event raised when a worker client is updated.
/// </summary>
public class TemporalWorkerClientUpdatedEventArgs : EventArgs
{
/// <summary>
/// Initializes a new instance of the <see cref="TemporalWorkerClientUpdatedEventArgs"/> class.
/// </summary>
/// <param name="workerClient">The client to update workers with.</param>
public TemporalWorkerClientUpdatedEventArgs(IWorkerClient workerClient) => WorkerClient = workerClient;

/// <summary>
/// Gets the <see cref="IWorkerClient"/> that will be propagated to all event listeners.
/// </summary>
public IWorkerClient WorkerClient { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using Temporalio.Worker;

namespace Temporalio.Extensions.Hosting
{
/// <summary>
/// Notification hub that can be used to push Temporal worker client updates to subscribing Temporal workers.
/// </summary>
public class TemporalWorkerClientUpdater
{
private readonly object clientLock = new();

/// <summary>
/// The <see cref="EventHandler"/> used to dispatch notifications to subscribers that the Temporal worker client was updated.
/// </summary>
public event EventHandler<TemporalWorkerClientUpdatedEventArgs> TemporalWorkerClientUpdated
{
add
{
lock (clientLock)
{
TemporalWorkerClientUpdatedEvent += value;
}
}

remove
{
lock (clientLock)
{
TemporalWorkerClientUpdatedEvent -= value;
}
}
}

private event EventHandler<TemporalWorkerClientUpdatedEventArgs>? TemporalWorkerClientUpdatedEvent;

/// <summary>
/// Dispatches a notification to all subscribers that a new worker client should be used.
/// </summary>
/// <param name="client">The new <see cref="IWorkerClient"/> that should be pushed out to all subscribing workers.</param>
public void UpdateTemporalWorkerClient(IWorkerClient client)
{
TemporalWorkerClientUpdatedEventArgs eventArgs = new TemporalWorkerClientUpdatedEventArgs(client);

TemporalWorkerClientUpdatedEvent?.Invoke(this, eventArgs);
}
}
}
19 changes: 18 additions & 1 deletion src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class TemporalWorkerService : BackgroundService
private readonly TemporalClientConnectOptions? newClientOptions;
private readonly ITemporalClient? existingClient;
private readonly TemporalWorkerOptions workerOptions;
private readonly TemporalWorkerClientUpdater? workerClientUpdater;
Comment on lines 19 to +22
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private readonly TemporalClientConnectOptions? newClientOptions;
private readonly ITemporalClient? existingClient;
private readonly TemporalWorkerOptions workerOptions;
private readonly TemporalWorkerClientUpdater? workerClientUpdater;
private readonly ITemporalClient? existingClient;
private readonly TemporalWorkerServiceOptions workerOptions;

Now that we are basically storing the three separate pieces anyways, let's just store the full service options as one field

Copy link
Contributor Author

@robcao robcao Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not really a good way I've found to handle the constructors that take in TemporalWorkerOptions directly (from some testing, cloning is not contravariant).

I don't know of a better way to convert an instance of TemporalWorkerOptions into TemporalWorkerServiceOptions other than writing a direct translation method, which imo is not desirable for maintenance. Do you have a suggestion?

I think it would be easier if TemporalWorkerServiceOptions was composed of TemporalWorkerOptions instead of directly inheriting from it, but that is a breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see now that we have multiple constructors for this TemporalWorkerService that accept partial forms. Ok, can ignore my comment and leave these as separate fields.


/// <summary>
/// Initializes a new instance of the <see cref="TemporalWorkerService"/> class using
Expand Down Expand Up @@ -157,6 +158,11 @@ public TemporalWorkerService(
{
newClientOptions.LoggerFactory = workerOptions.LoggerFactory;
}

if (options.WorkerClientUpdater != null)
{
this.workerClientUpdater = options.WorkerClientUpdater;
}
}

/// <inheritdoc />
Expand All @@ -166,7 +172,18 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
// Call connect just in case it was a lazy client (no-op if already connected)
await client.Connection.ConnectAsync().ConfigureAwait(false);
using var worker = new TemporalWorker(client, workerOptions);
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);

if (workerClientUpdater != null)
{
using (new TemporalWorkerClientUpdateSubscriber(workerClientUpdater, worker))
{
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);
}
}
else
{
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public TemporalWorkerServiceOptions(string taskQueue)
/// </summary>
public TemporalClientConnectOptions? ClientOptions { get; set; }

/// <summary>
/// Gets or sets the <see cref="TemporalWorkerClientUpdater"/> that can be used to push Temporal worker client updates to the underlying <see cref="TemporalWorker"/>.
/// If not set, the worker service will not be updateable with a new Temporal worker client.
/// </summary>
public TemporalWorkerClientUpdater? WorkerClientUpdater { get; set; }

/// <inheritdoc />
public override object Clone()
{
Expand All @@ -39,6 +45,12 @@ public override object Clone()
{
options.ClientOptions = (TemporalClientConnectOptions)ClientOptions.Clone();
}

if (WorkerClientUpdater != null)
{
options.WorkerClientUpdater = WorkerClientUpdater;
}

return options;
}

Expand Down
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
/// </summary>
/// <remarks>
/// When this property is set, it actually replaces the underlying client that is being used
/// by the worker. This means the next calls by the worker to Temporal (e.g. responding
/// by the worker. This means subsequent calls by the worker to Temporal (e.g. responding
/// task completion, activity heartbeat, etc) will be on this new client, but outstanding
/// calls will not be immediately interrupted.
/// </remarks>
Expand Down Expand Up @@ -255,6 +255,7 @@ protected virtual void Dispose(bool disposing)
{
activityWorker?.Dispose();
BridgeWorker.Dispose();

// Remove task tracing if not disabled and there are workflows present
if (workflowTracingEventListenerEnabled)
{
Expand Down
56 changes: 56 additions & 0 deletions tests/Temporalio.Tests/AssertMore.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,67 @@
using System.Text;
using System.Text.Json;
using Temporalio.Api.History.V1;
using Temporalio.Client;
using Xunit;

namespace Temporalio.Tests
{
public static class AssertMore
{
public static Task TaskFailureEventuallyAsync(WorkflowHandle handle, Action<WorkflowTaskFailedEventAttributes> assert)
{
return AssertMore.EventuallyAsync(async () =>
{
WorkflowTaskFailedEventAttributes? attrs = null;
await foreach (var evt in handle.FetchHistoryEventsAsync())
{
if (evt.WorkflowTaskFailedEventAttributes != null)
{
attrs = evt.WorkflowTaskFailedEventAttributes;
}
}
Assert.NotNull(attrs);
assert(attrs!);
});
}

public static Task StartedEventuallyAsync(WorkflowHandle handle)
{
return HasEventEventuallyAsync(handle, e => e.WorkflowExecutionStartedEventAttributes != null);
}

public static async Task ChildStartedEventuallyAsync(WorkflowHandle handle)
{
// Wait for started
string? childId = null;
await HasEventEventuallyAsync(
handle,
e =>
{
childId = e.ChildWorkflowExecutionStartedEventAttributes?.WorkflowExecution?.WorkflowId;
return childId != null;
});
// Check that a workflow task has completed proving child has really started
await HasEventEventuallyAsync(
handle.Client.GetWorkflowHandle(childId!),
e => e.WorkflowTaskCompletedEventAttributes != null);
}

public static Task HasEventEventuallyAsync(WorkflowHandle handle, Func<HistoryEvent, bool> predicate)
{
return AssertMore.EventuallyAsync(async () =>
{
await foreach (var evt in handle.FetchHistoryEventsAsync())
{
if (predicate(evt))
{
return;
}
}
Assert.Fail("Event not found");
});
}

public static Task EventuallyAsync(
Func<Task> func, TimeSpan? interval = null, int iterations = 15) =>
EventuallyAsync(
Expand Down
Loading