Skip to content

[WIP] Experiment with atomic builtins on Node #15740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions system/lib/libc/musl/src/thread/__timedwait.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <math.h>
#include <emscripten/threading.h>
#include <emscripten/emscripten.h>
#include "pthread_impl.h"
#else
#include "futex.h"
#endif
Expand Down Expand Up @@ -33,18 +32,20 @@ static int __futex4_cp(volatile void *addr, int op, int val, const struct timesp
if (r != -ENOSYS) return r;
return __syscall_cp(SYS_futex, addr, op & ~FUTEX_PRIVATE, val, to);
}
#endif

static volatile int dummy = 0;
weak_alias(dummy, __eintr_valid_flag);
#endif

int __timedwait_cp(volatile int *addr, int val,
clockid_t clk, const struct timespec *at, int priv)
{
int r;
struct timespec to, *top=0;

#ifndef __EMSCRIPTEN__
if (priv) priv = FUTEX_PRIVATE;
#endif

if (at) {
if (at->tv_nsec >= 1000000000UL) return EINVAL;
Expand All @@ -57,49 +58,62 @@ int __timedwait_cp(volatile int *addr, int val,
if (to.tv_sec < 0) return ETIMEDOUT;
top = &to;
}

#ifdef __EMSCRIPTEN__
double msecsToSleep = top ? (top->tv_sec * 1000 + top->tv_nsec / 1000000.0) : INFINITY;
int is_runtime_thread = emscripten_is_main_runtime_thread();
pthread_t self = __pthread_self();
double msecsToSleep = top ? (top->tv_sec * 1000.0 + top->tv_nsec / 1000000.0) : INFINITY;
const int is_runtime_thread = emscripten_is_main_runtime_thread();

// Main runtime thread may need to run proxied calls, so sleep in very small slices to be responsive.
const double maxMsecsToSleep = is_runtime_thread ? 1 : 100;

// cp suffix in the function name means "cancellation point", so this wait can be cancelled
// by the users unless current threads cancelability is set to PTHREAD_CANCEL_DISABLE
// by the users unless current threads cancellability is set to PTHREAD_CANCEL_DISABLE
// which may be either done by the user of __timedwait() function.
if (is_runtime_thread ||
pthread_self()->canceldisable != PTHREAD_CANCEL_DISABLE ||
pthread_self()->cancelasync == PTHREAD_CANCEL_ASYNCHRONOUS) {
if (is_runtime_thread || self->canceldisable != PTHREAD_CANCEL_DISABLE || self->cancelasync) {
double sleepUntilTime = emscripten_get_now() + msecsToSleep;
do {
if (pthread_self()->cancel) {
if (self->cancel) {
// Emscripten-specific return value: The wait was canceled by user calling
// pthread_cancel() for this thread, and the caller needs to cooperatively
// cancel execution.
return ECANCELED;
}
// Must wait in slices in case this thread is cancelled in between.
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
r = __builtin_wasm_memory_atomic_wait32((int*)addr, val,
(msecsToSleep > maxMsecsToSleep ? maxMsecsToSleep : msecsToSleep) * /*NSEC_PER_MSEC*/1000000);
if (r == 2) r = ETIMEDOUT;
if (r == 1) r = EWOULDBLOCK;
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
r = -emscripten_futex_wait((void*)addr, val,
msecsToSleep > maxMsecsToSleep ? maxMsecsToSleep : msecsToSleep);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
// Assist other threads by executing proxied operations that are effectively singlethreaded.
if (is_runtime_thread) emscripten_main_thread_process_queued_calls();
// Must wait in slices in case this thread is cancelled in between.
double waitMsecs = sleepUntilTime - emscripten_get_now();
if (waitMsecs <= 0) {
r = ETIMEDOUT;
break;
}
if (waitMsecs > 100) waitMsecs = 100; // non-main threads can sleep in longer slices.
if (is_runtime_thread && waitMsecs > 1) waitMsecs = 1; // the runtime thread may need to run proxied calls, so sleep in very small slices to be responsive.
r = -emscripten_futex_wait((void*)addr, val, waitMsecs);
} while(r == ETIMEDOUT);

msecsToSleep = sleepUntilTime - emscripten_get_now();
} while (r == ETIMEDOUT && msecsToSleep > 0);
} else {
// Can wait in one go.
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
r = __builtin_wasm_memory_atomic_wait32((int*)addr, val, top ? (msecsToSleep * /*NSEC_PER_MSEC*/1000000) : -1);
if (r == 2) r = ETIMEDOUT;
if (r == 1) r = EWOULDBLOCK;
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
r = -emscripten_futex_wait((void*)addr, val, msecsToSleep);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
}
#else
r = -__futex4_cp(addr, FUTEX_WAIT|priv, val, top);
#endif
if (r != EINTR && r != ETIMEDOUT && r != ECANCELED) r = 0;
#ifndef __EMSCRIPTEN__ // XXX Emscripten revert musl commit a63c0104e496f7ba78b64be3cd299b41e8cd427f
/* Mitigate bug in old kernels wrongly reporting EINTR for non-
* interrupting (SA_RESTART) signal handlers. This is only practical
* when NO interrupting signal handlers have been installed, and
* works by sigaction tracking whether that's the case. */
if (r == EINTR && !__eintr_valid_flag) r = 0;
#endif

return r;
}
Expand Down
34 changes: 26 additions & 8 deletions system/lib/libc/musl/src/thread/__wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,50 @@
void __wait(volatile int *addr, volatile int *waiters, int val, int priv)
{
int spins=100;
#ifndef __EMSCRIPTEN__
if (priv) priv = FUTEX_PRIVATE;
#endif
while (spins-- && (!waiters || !*waiters)) {
if (*addr==val) a_spin();
else return;
}
if (waiters) a_inc(waiters);
#ifdef __EMSCRIPTEN__
int is_runtime_thread = emscripten_is_main_runtime_thread();
pthread_t self = __pthread_self();
const int is_runtime_thread = emscripten_is_main_runtime_thread();

// Main runtime thread may need to run proxied calls, so sleep in very small slices to be responsive.
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
const int64_t maxNsecsToSleep = is_runtime_thread ? 1000000 : 100000000;
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
const double maxMsecsToSleep = is_runtime_thread ? 1 : 100;
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)

while (*addr==val) {
if (is_runtime_thread || pthread_self()->cancelasync == PTHREAD_CANCEL_ASYNCHRONOUS) {
// Must wait in slices in case this thread is cancelled in between.
if (is_runtime_thread || self->cancelasync) {
int e;
do {
if (pthread_self()->cancel) {
if (self->cancel) {
if (waiters) a_dec(waiters);
return;
}
// Must wait in slices in case this thread is cancelled in between.
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
e = __builtin_wasm_memory_atomic_wait32((int*)addr, val, maxNsecsToSleep);
if (e == 2) e = -ETIMEDOUT;
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
e = emscripten_futex_wait((void*)addr, val, maxMsecsToSleep);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
// Assist other threads by executing proxied operations that are effectively singlethreaded.
if (is_runtime_thread) emscripten_main_thread_process_queued_calls();
// Main thread waits in _very_ small slices so that it stays responsive to assist proxied
// pthread calls.
e = emscripten_futex_wait((void*)addr, val, is_runtime_thread ? 1 : 100);
} while(e == -ETIMEDOUT);
} while (e == -ETIMEDOUT);
} else {
// Can wait in one go.
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
__builtin_wasm_memory_atomic_wait32((int*)addr, val, -1);
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
emscripten_futex_wait((void*)addr, val, INFINITY);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But doesn't emscripten_futex_wait already used __builtin_wasm_memory_atomic_wait32 in places where this is legal (i.e. off the main web thread?)

I think the decision on whether we can use the atomic wait needs to be dynamic doesn't it ? We don't actually want to produce two different binaries do we?

Copy link
Collaborator Author

@kleisauke kleisauke Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emscripten_futex_wait is implemented in JS and calls directly Atomics.wait for non-web environments. So, this PR is NFC in that respect.

This does indeed produce a different binary if you only target Node, so that would be a breaking change. Perhaps it would be better to implement emscripten_futex_wait dynamically in native code. But as far as I know, that is currently not possible since a JS equivalent of ENVIRONMENT_IS_WEB is not available (PR #15659 is also relevant here).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can/should implement something like emscripten_thread_can_block in native code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to have a go addingemscripten_thread_can_block along with native version of emscripten_futex_wait: #15742

Copy link
Collaborator Author

@kleisauke kleisauke Dec 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Do you plan to move emscripten_futex_wake to native code as well (which could use the __builtin_wasm_memory_atomic_notify builtin)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I hope we can move all the wait/wait to native code but I'm doing it one step at a time,

#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
}
}
#else
Expand Down
18 changes: 13 additions & 5 deletions system/lib/libc/musl/src/thread/pthread_barrier_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,28 @@ int pthread_barrier_wait(pthread_barrier_t *b)
a_spin();
a_inc(&inst->finished);
#ifdef __EMSCRIPTEN__
int is_runtime_thread = emscripten_is_main_runtime_thread();
const int is_runtime_thread = emscripten_is_main_runtime_thread();
while (inst->finished == 1) {
if (is_runtime_thread) {
int e;
do {
// Main thread waits in _very_ small slices so that it stays responsive to assist proxied
// pthread calls.
e = emscripten_futex_wait(&inst->finished, 1, 1);
// Assist other threads by executing proxied operations that are effectively singlethreaded.
emscripten_main_thread_process_queued_calls();
} while(e == -ETIMEDOUT);
// Main runtime thread may need to run proxied calls, so sleep in very small slices to be responsive.
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
e = __builtin_wasm_memory_atomic_wait32((int*)&inst->finished, 1, 1);
if (e == 2) e = -ETIMEDOUT;
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
e = emscripten_futex_wait(&inst->finished, 1, 1);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
} while (e == -ETIMEDOUT);
} else {
// Can wait in one go.
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
__builtin_wasm_memory_atomic_wait32((int*)&inst->finished, 1, -1);
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
emscripten_futex_wait(&inst->finished, 1, INFINITY);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
}
}
#else
Expand Down
4 changes: 4 additions & 0 deletions system/lib/libc/musl/src/thread/pthread_cond_timedwait.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ static inline void unlock_requeue(volatile int *l, volatile int *r, int w)
// primitive is strictly not needed, since it is more like an optimization to avoid spuriously waking
// all waiters, just to make them wait on another location immediately afterwards. Here we do exactly
// that: wake every waiter.
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
__builtin_wasm_memory_atomic_notify((int*)l, -1);
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
emscripten_futex_wake(l, 0x7FFFFFFF);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
#else
if (w) __wake(l, 1, 1);
else __syscall(SYS_futex, l, FUTEX_REQUEUE|FUTEX_PRIVATE, 0, 1, r) != -ENOSYS
Expand Down
88 changes: 54 additions & 34 deletions system/lib/pthread/library_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,34 @@ void emscripten_thread_sleep(double msecs) {
double now = emscripten_get_now();
double target = now + msecs;

__pthread_testcancel(); // pthreads spec: sleep is a cancellation point, so must test if this
// thread is cancelled during the sleep.
emscripten_current_thread_process_queued_calls();

// If we have less than this many msecs left to wait, busy spin that instead.
const double minimumTimeSliceToSleep = 0.1;
const double minTimeSliceToSleep = 0.1;

// runtime thread may need to run proxied calls, so sleep in very small slices to be responsive.
// Runtime thread may need to run proxied calls, so sleep in very small slices to be responsive.
const double maxMsecsSliceToSleep = emscripten_is_main_runtime_thread() ? 1 : 100;

emscripten_conditional_set_current_thread_status(
EM_THREAD_STATUS_RUNNING, EM_THREAD_STATUS_SLEEPING);
now = emscripten_get_now();
while (now < target) {

double msecsToSleep;
do {
// Keep processing the main loop of the calling thread.
__pthread_testcancel(); // pthreads spec: sleep is a cancellation point, so must test if this
// thread is cancelled during the sleep.
emscripten_current_thread_process_queued_calls();

now = emscripten_get_now();
double msecsToSleep = target - now;
if (msecsToSleep > maxMsecsSliceToSleep)
msecsToSleep = maxMsecsSliceToSleep;
if (msecsToSleep >= minimumTimeSliceToSleep)
emscripten_futex_wait(&dummyZeroAddress, 0, msecsToSleep);
now = emscripten_get_now();
};
msecsToSleep = target - emscripten_get_now();
if (msecsToSleep < minTimeSliceToSleep)
continue;

#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
__builtin_wasm_memory_atomic_wait32((int*)&dummyZeroAddress, 0,
(msecsToSleep > maxMsecsSliceToSleep ? maxMsecsSliceToSleep : msecsToSleep) * /*NSEC_PER_MSEC*/1000000);
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
emscripten_futex_wait(&dummyZeroAddress, 0,
msecsToSleep > maxMsecsSliceToSleep ? maxMsecsSliceToSleep : msecsToSleep);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
} while (msecsToSleep > 0);

emscripten_conditional_set_current_thread_status(
EM_THREAD_STATUS_SLEEPING, EM_THREAD_STATUS_RUNNING);
Expand Down Expand Up @@ -327,7 +328,11 @@ static void _do_call(void* arg) {
} else {
// The caller owns this call object, it is listening to it and will free it up.
q->operationDone = 1;
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
__builtin_wasm_memory_atomic_notify((int*)&q->operationDone, -1);
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
emscripten_futex_wake(&q->operationDone, INT_MAX);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
}
}

Expand Down Expand Up @@ -386,24 +391,31 @@ static CallQueue* GetOrAllocateQueue(void* target) {
}

EMSCRIPTEN_RESULT emscripten_wait_for_call_v(em_queued_call* call, double timeoutMSecs) {
int r;

int done = atomic_load(&call->operationDone);
if (!done) {
double now = emscripten_get_now();
double waitEndTime = now + timeoutMSecs;
emscripten_set_current_thread_status(EM_THREAD_STATUS_WAITPROXY);
while (!done && now < waitEndTime) {
r = emscripten_futex_wait(&call->operationDone, 0, waitEndTime - now);
done = atomic_load(&call->operationDone);
now = emscripten_get_now();
}
emscripten_set_current_thread_status(EM_THREAD_STATUS_RUNNING);
}
if (done)
return EMSCRIPTEN_RESULT_SUCCESS;
else
return EMSCRIPTEN_RESULT_TIMED_OUT;
if (done) return EMSCRIPTEN_RESULT_SUCCESS;

#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
int waitIndefinitely = isinf(timeoutMSecs);
#endif

emscripten_set_current_thread_status(EM_THREAD_STATUS_WAITPROXY);

double timeoutUntilTime = emscripten_get_now() + timeoutMSecs;
do {
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
__builtin_wasm_memory_atomic_wait32((int*)&call->operationDone, 0,
waitIndefinitely ? -1 : (timeoutMSecs * /*NSEC_PER_MSEC*/1000000));
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
emscripten_futex_wait(&call->operationDone, 0, timeoutMSecs);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
done = atomic_load(&call->operationDone);

timeoutMSecs = timeoutUntilTime - emscripten_get_now();
} while (!done && timeoutMSecs > 0);

emscripten_set_current_thread_status(EM_THREAD_STATUS_RUNNING);

return done ? EMSCRIPTEN_RESULT_SUCCESS : EMSCRIPTEN_RESULT_TIMED_OUT;
}

EMSCRIPTEN_RESULT emscripten_wait_for_call_i(
Expand Down Expand Up @@ -460,7 +472,11 @@ int _emscripten_do_dispatch_to_thread(pthread_t target_thread, em_queued_call* c
// If queue of the main browser thread is full, then we wait. (never drop messages for the main
// browser thread)
if (target_thread == emscripten_main_browser_thread_id()) {
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
__builtin_wasm_memory_atomic_wait32((int*)&q->call_queue_head, head, -1);
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
emscripten_futex_wait((void*)&q->call_queue_head, head, INFINITY);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
pthread_mutex_lock(&call_queue_lock);
head = q->call_queue_head;
tail = q->call_queue_tail;
Expand Down Expand Up @@ -640,7 +656,11 @@ void emscripten_current_thread_process_queued_calls() {
pthread_mutex_unlock(&call_queue_lock);

// If the queue was full and we had waiters pending to get to put data to queue, wake them up.
emscripten_futex_wake((void*)&q->call_queue_head, 0x7FFFFFFF);
#ifdef __EMSCRIPTEN_ATOMIC_BUILTINS__
__builtin_wasm_memory_atomic_notify((int*)&q->call_queue_head, -1);
#else // !defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)
emscripten_futex_wake((void*)&q->call_queue_head, INT_MAX);
#endif // defined(__EMSCRIPTEN_ATOMIC_BUILTINS__)

thread_is_processing_queued_calls = false;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_posixtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def f(self):
if browser:
self.btest_exit(testfile, args=args)
else:
self.do_runf(testfile, emcc_args=args, output_basename=name)
self.do_runf(testfile, emcc_args=args + ['-sENVIRONMENT=node,worker'], output_basename=name)

if name in expect_fail:
f = unittest.expectedFailure(f)
Expand Down
11 changes: 9 additions & 2 deletions tools/system_libs.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,27 +526,34 @@ def get_usable_variations(cls):
class MTLibrary(Library):
def __init__(self, **kwargs):
self.is_mt = kwargs.pop('is_mt')
self.not_web = kwargs.pop('not_web')
super().__init__(**kwargs)

def get_cflags(self):
cflags = super().get_cflags()
if self.is_mt:
cflags += ['-s', 'USE_PTHREADS']
if self.not_web:
cflags += ['-D__EMSCRIPTEN_ATOMIC_BUILTINS__']
return cflags

def get_base_name(self):
name = super().get_base_name()
if self.is_mt:
name += '-mt'
if self.not_web:
name += '-not-web'
return name

@classmethod
def vary_on(cls):
return super().vary_on() + ['is_mt']
return super().vary_on() + ['is_mt', 'not_web']

@classmethod
def get_default_variation(cls, **kwargs):
return super().get_default_variation(is_mt=settings.USE_PTHREADS, **kwargs)
return super().get_default_variation(is_mt=settings.USE_PTHREADS,
not_web=not settings.ENVIRONMENT_MAY_BE_WEB,
**kwargs)


class DebugLibrary(Library):
Expand Down