-
Notifications
You must be signed in to change notification settings - Fork 44
feat: add workflow client updater for updating workflow client #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| using System; | ||
| using Temporalio.Worker; | ||
|
|
||
| namespace Temporalio.Extensions.Hosting | ||
| { | ||
| /// <summary> | ||
| /// Notification hub that can be used to push Temporal worker client updates to subscribing Temporal workers. | ||
| /// </summary> | ||
| public class TemporalWorkerClientUpdater | ||
| { | ||
| private readonly object clientLock = new(); | ||
|
|
||
| private event EventHandler<IWorkerClient>? OnClientUpdatedEvent; | ||
|
|
||
| /// <summary> | ||
| /// Dispatches a notification to all subscribers that a new worker client should be used. | ||
| /// </summary> | ||
| /// <param name="client">The new <see cref="IWorkerClient"/> that should be pushed out to all subscribing workers.</param> | ||
| public void UpdateClient(IWorkerClient client) | ||
| { | ||
| OnClientUpdatedEvent?.Invoke(this, client); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Adds a new subscriber that will be notified when a new worker client should be used. | ||
| /// </summary> | ||
| /// <param name="eventHandler">The event handler to add to the event listeners.</param> | ||
| internal void Subscribe(EventHandler<IWorkerClient> eventHandler) | ||
| { | ||
| lock (clientLock) | ||
| { | ||
| OnClientUpdatedEvent += eventHandler; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Removes an existing subscriber from receiving notifications when a new worker client should be used. | ||
| /// </summary> | ||
| /// <param name="eventHandler">The event handler to remove from the event listeners.</param> | ||
| internal void Unsubscribe(EventHandler<IWorkerClient> eventHandler) | ||
| { | ||
| lock (clientLock) | ||
| { | ||
| OnClientUpdatedEvent -= eventHandler; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,10 +15,11 @@ namespace Temporalio.Extensions.Hosting | |||||||||||||
| /// </summary> | ||||||||||||||
| public class TemporalWorkerService : BackgroundService | ||||||||||||||
| { | ||||||||||||||
| // These two are mutually exclusive | ||||||||||||||
| // These two (newClientOptions and existingClient) are mutually exclusive | ||||||||||||||
| private readonly TemporalClientConnectOptions? newClientOptions; | ||||||||||||||
| private readonly ITemporalClient? existingClient; | ||||||||||||||
| private readonly TemporalWorkerOptions workerOptions; | ||||||||||||||
| private readonly TemporalWorkerClientUpdater? workerClientUpdater; | ||||||||||||||
|
Comment on lines
19
to
+22
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Now that we are basically storing the three separate pieces anyways, let's just store the full service options as one field
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's not really a good way I've found to handle the constructors that take in I don't know of a better way to convert an instance of I think it would be easier if
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see now that we have multiple constructors for this |
||||||||||||||
|
|
||||||||||||||
| /// <summary> | ||||||||||||||
| /// Initializes a new instance of the <see cref="TemporalWorkerService"/> class using | ||||||||||||||
|
|
@@ -30,8 +31,11 @@ public class TemporalWorkerService : BackgroundService | |||||||||||||
| /// <param name="options">Options to use to create the worker service.</param> | ||||||||||||||
| public TemporalWorkerService(TemporalWorkerServiceOptions options) | ||||||||||||||
| { | ||||||||||||||
| newClientOptions = options.ClientOptions ?? throw new ArgumentException( | ||||||||||||||
| "Client options is required", nameof(options)); | ||||||||||||||
| if (options.ClientOptions == null) | ||||||||||||||
| { | ||||||||||||||
| throw new ArgumentException("Client options is required", nameof(options)); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| workerOptions = options; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -156,6 +160,11 @@ public TemporalWorkerService( | |||||||||||||
| if (newClientOptions != null && workerOptions.LoggerFactory != null) | ||||||||||||||
| { | ||||||||||||||
| newClientOptions.LoggerFactory = workerOptions.LoggerFactory; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| if (options.WorkerClientUpdater != null) | ||||||||||||||
| { | ||||||||||||||
| this.workerClientUpdater = options.WorkerClientUpdater; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -166,7 +175,28 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |||||||||||||
| // Call connect just in case it was a lazy client (no-op if already connected) | ||||||||||||||
| await client.Connection.ConnectAsync().ConfigureAwait(false); | ||||||||||||||
| using var worker = new TemporalWorker(client, workerOptions); | ||||||||||||||
| await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false); | ||||||||||||||
|
|
||||||||||||||
| if (workerClientUpdater != null) | ||||||||||||||
| { | ||||||||||||||
| void SubscribeToClientUpdates(object? sender, IWorkerClient updatedClient) | ||||||||||||||
| { | ||||||||||||||
| worker!.Client = updatedClient; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| try | ||||||||||||||
| { | ||||||||||||||
| workerClientUpdater.Subscribe(SubscribeToClientUpdates); | ||||||||||||||
| await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false); | ||||||||||||||
| } | ||||||||||||||
| finally | ||||||||||||||
| { | ||||||||||||||
| workerClientUpdater.Unsubscribe(SubscribeToClientUpdates); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| else | ||||||||||||||
| { | ||||||||||||||
| await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like locking technically not required per https://stackoverflow.com/questions/22825117/is-it-thread-safe-to-register-for-a-c-sharp-event, but this is fine