Skip to content

Commit 5ed423f

Browse files
authored
Update core/dependencies and call worker validate (#267)
Fixes #253
1 parent 7b61747 commit 5ed423f

File tree

8 files changed

+527
-420
lines changed

8 files changed

+527
-420
lines changed

src/Temporalio/Bridge/Cargo.lock

Lines changed: 436 additions & 409 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Temporalio/Bridge/Interop/Interop.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -580,10 +580,10 @@ internal partial struct WorkerOptions
580580
}
581581

582582
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
583-
internal unsafe delegate void WorkerPollCallback(void* user_data, [NativeTypeName("const struct ByteArray *")] ByteArray* success, [NativeTypeName("const struct ByteArray *")] ByteArray* fail);
583+
internal unsafe delegate void WorkerCallback(void* user_data, [NativeTypeName("const struct ByteArray *")] ByteArray* fail);
584584

585585
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
586-
internal unsafe delegate void WorkerCallback(void* user_data, [NativeTypeName("const struct ByteArray *")] ByteArray* fail);
586+
internal unsafe delegate void WorkerPollCallback(void* user_data, [NativeTypeName("const struct ByteArray *")] ByteArray* success, [NativeTypeName("const struct ByteArray *")] ByteArray* fail);
587587

588588
internal unsafe partial struct WorkerReplayerOrFail
589589
{
@@ -726,6 +726,9 @@ internal static unsafe partial class Methods
726726
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
727727
public static extern void worker_free([NativeTypeName("struct Worker *")] Worker* worker);
728728

729+
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
730+
public static extern void worker_validate([NativeTypeName("struct Worker *")] Worker* worker, void* user_data, [NativeTypeName("WorkerCallback")] IntPtr callback);
731+
729732
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
730733
public static extern void worker_replace_client([NativeTypeName("struct Worker *")] Worker* worker, [NativeTypeName("struct Client *")] Client* new_client);
731734

src/Temporalio/Bridge/Worker.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,38 @@ internal Worker(Worker other)
8888
/// </summary>
8989
internal unsafe Interop.Worker* Ptr { get; private set; }
9090

91+
/// <summary>
92+
/// Validate the worker.
93+
/// </summary>
94+
/// <returns>Validation task.</returns>
95+
public async Task ValidateAsync()
96+
{
97+
using (var scope = new Scope())
98+
{
99+
var completion = new TaskCompletionSource<bool>();
100+
unsafe
101+
{
102+
Interop.Methods.worker_validate(
103+
Ptr,
104+
null,
105+
scope.FunctionPointer<Interop.WorkerCallback>(
106+
(userData, fail) =>
107+
{
108+
if (fail != null)
109+
{
110+
completion.TrySetException(new InvalidOperationException(
111+
new ByteArray(Runtime, fail).ToUTF8()));
112+
}
113+
else
114+
{
115+
completion.TrySetResult(true);
116+
}
117+
}));
118+
}
119+
await completion.Task.ConfigureAwait(false);
120+
}
121+
}
122+
91123
/// <summary>
92124
/// Replace the client.
93125
/// </summary>

src/Temporalio/Bridge/include/temporal-sdk-bridge.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,11 @@ typedef struct WorkerOptions {
382382
struct ByteArrayRefArray nondeterminism_as_workflow_fail_for_types;
383383
} WorkerOptions;
384384

385+
/**
386+
* If fail is present, it must be freed.
387+
*/
388+
typedef void (*WorkerCallback)(void *user_data, const struct ByteArray *fail);
389+
385390
/**
386391
* If success or fail are present, they must be freed. They will both be null
387392
* if this is a result of a poll shutdown.
@@ -390,11 +395,6 @@ typedef void (*WorkerPollCallback)(void *user_data,
390395
const struct ByteArray *success,
391396
const struct ByteArray *fail);
392397

393-
/**
394-
* If fail is present, it must be freed.
395-
*/
396-
typedef void (*WorkerCallback)(void *user_data, const struct ByteArray *fail);
397-
398398
typedef struct WorkerReplayerOrFail {
399399
struct Worker *worker;
400400
struct WorkerReplayPusher *worker_replay_pusher;
@@ -521,6 +521,8 @@ struct WorkerOrFail worker_new(struct Client *client, const struct WorkerOptions
521521

522522
void worker_free(struct Worker *worker);
523523

524+
void worker_validate(struct Worker *worker, void *user_data, WorkerCallback callback);
525+
524526
void worker_replace_client(struct Worker *worker, struct Client *new_client);
525527

526528
void worker_poll_workflow_activation(struct Worker *worker,

src/Temporalio/Bridge/src/worker.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,34 @@ pub extern "C" fn worker_free(worker: *mut Worker) {
134134
}
135135
}
136136

137+
/// If fail is present, it must be freed.
138+
type WorkerCallback = unsafe extern "C" fn(user_data: *mut libc::c_void, fail: *const ByteArray);
139+
140+
#[no_mangle]
141+
pub extern "C" fn worker_validate(
142+
worker: *mut Worker,
143+
user_data: *mut libc::c_void,
144+
callback: WorkerCallback,
145+
) {
146+
let worker = unsafe { &*worker };
147+
let user_data = UserDataHandle(user_data);
148+
let core_worker = worker.worker.as_ref().unwrap().clone();
149+
worker.runtime.core.tokio_handle().spawn(async move {
150+
let fail = match core_worker.validate().await {
151+
Ok(_) => std::ptr::null(),
152+
Err(err) => worker
153+
.runtime
154+
.clone()
155+
.alloc_utf8(&format!("Worker validation failed: {}", err))
156+
.into_raw()
157+
.cast_const(),
158+
};
159+
unsafe {
160+
callback(user_data.into(), fail);
161+
}
162+
});
163+
}
164+
137165
#[no_mangle]
138166
pub extern "C" fn worker_replace_client(worker: *mut Worker, new_client: *mut Client) {
139167
let worker = unsafe { &*worker };
@@ -218,9 +246,6 @@ pub extern "C" fn worker_poll_activity_task(
218246
});
219247
}
220248

221-
/// If fail is present, it must be freed.
222-
type WorkerCallback = unsafe extern "C" fn(user_data: *mut libc::c_void, fail: *const ByteArray);
223-
224249
#[no_mangle]
225250
pub extern "C" fn worker_complete_workflow_activation(
226251
worker: *mut Worker,

src/Temporalio/Worker/TemporalWorker.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,9 @@ private async Task ExecuteInternalAsync(
272272
throw new InvalidOperationException("Already started");
273273
}
274274

275+
// Check that the worker is valid
276+
await BridgeWorker.ValidateAsync().ConfigureAwait(false);
277+
275278
var tasks = new List<Task>()
276279
{
277280
// Create a task that will complete on cancellation

tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4587,6 +4587,21 @@ await AssertMore.HasEventEventuallyAsync(
45874587
});
45884588
}
45894589

4590+
[Fact]
4591+
public async Task ExecuteAsync_InvalidWorker_Fails()
4592+
{
4593+
// Try to run a worker on an invalid namespace
4594+
var options = (TemporalClientOptions)Client.Options.Clone();
4595+
options.Namespace = "does-not-exist";
4596+
var client = new TemporalClient(Client.Connection, options);
4597+
using var worker = new TemporalWorker(
4598+
client,
4599+
new TemporalWorkerOptions("some-task-queue").AddWorkflow<SimpleWorkflow>());
4600+
var err = await Assert.ThrowsAsync<InvalidOperationException>(
4601+
() => worker.ExecuteAsync(() => Task.Delay(2000)));
4602+
Assert.Contains("Worker validation failed", err.Message);
4603+
}
4604+
45904605
internal static Task AssertTaskFailureContainsEventuallyAsync(
45914606
WorkflowHandle handle, string messageContains)
45924607
{

0 commit comments

Comments
 (0)