Skip to content

core: Add priv::chan_from_global_ptr #2136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/libcore/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ enum rust_port {}

#[abi = "cdecl"]
native mod rustrt {
fn rust_port_id_send<T: send>(t: *sys::type_desc,
target_port: port_id,
fn rust_port_id_send<T: send>(target_port: port_id,
data: T) -> libc::uintptr_t;

fn new_port(unit_sz: libc::size_t) -> *rust_port;
Expand Down Expand Up @@ -114,7 +113,7 @@ whereupon the caller loses access to it.
"]
fn send<T: send>(ch: chan<T>, -data: T) {
let chan_t(p) = ch;
let res = rustrt::rust_port_id_send(sys::get_type_desc::<T>(), p, data);
let res = rustrt::rust_port_id_send(p, data);
if res != 0u unsafe {
// Data sent successfully
unsafe::forget(data);
Expand Down
84 changes: 84 additions & 0 deletions src/libcore/priv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ export chan_from_global_ptr;

import compare_and_swap = rustrt::rust_compare_and_swap_ptr;

type rust_port_id = uint;

native mod rustrt {
fn rust_compare_and_swap_ptr(address: *libc::uintptr_t,
oldval: libc::uintptr_t,
newval: libc::uintptr_t) -> bool;
fn rust_task_weaken(ch: rust_port_id);
fn rust_task_unweaken(ch: rust_port_id);
}

type global_ptr<T: send> = *libc::uintptr_t;
Expand Down Expand Up @@ -143,3 +147,83 @@ fn test_from_global_chan2() unsafe {
assert winners == 1u;
}
}

#[doc = "
Convert the current task to a 'weak' task temporarily

As a weak task it will not be counted towards the runtime's set
of live tasks. When there are no more outstanding live (non-weak) tasks
the runtime will send an exit message on the provided channel.

This function is super-unsafe. Do not use.

# Safety notes

* Weak tasks must either die on their own or exit upon receipt of
the exit message. Failure to do so will cause the runtime to never
exit
* Tasks must not call `weaken_task` multiple times. This will
break the kernel's accounting of live tasks.
* Weak tasks must not be supervised. A supervised task keeps
a reference to its parent, so the parent will not die.
"]
unsafe fn weaken_task(f: fn(comm::port<()>)) unsafe {
let po = comm::port();
let ch = comm::chan(po);
rustrt::rust_task_weaken(unsafe::reinterpret_cast(ch));
let _unweaken = unweaken(ch);
f(po);

resource unweaken(ch: comm::chan<()>) unsafe {
rustrt::rust_task_unweaken(unsafe::reinterpret_cast(ch));
}
}

#[test]
fn test_weaken_task_then_unweaken() unsafe {
task::try {||
weaken_task {|_po|
}
};
}

#[test]
fn test_weaken_task_wait() unsafe {
let builder = task::builder();
task::unsupervise(builder);
task::run(builder) {||
weaken_task {|po|
comm::recv(po);
}
}
}

#[test]
fn test_weaken_task_stress() unsafe {
// Create a bunch of weak tasks
iter::repeat(100u) {||
task::spawn {||
weaken_task {|_po|
}
}
let builder = task::builder();
task::unsupervise(builder);
task::run(builder) {||
weaken_task {|po|
// Wait for it to tell us to die
comm::recv(po);
}
}
}
}

#[test]
#[ignore(cfg(target_os = "win32"))]
fn test_weaken_task_fail() unsafe {
let res = task::try {||
weaken_task {|_po|
fail;
}
};
assert result::is_failure(res);
}
29 changes: 14 additions & 15 deletions src/rt/rust_builtin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,22 +669,9 @@ get_port_id(rust_port *port) {
}

extern "C" CDECL uintptr_t
rust_port_id_send(type_desc *t, rust_port_id target_port_id, void *sptr) {
bool sent = false;
rust_port_id_send(rust_port_id target_port_id, void *sptr) {
rust_task *task = rust_get_current_task();

LOG(task, comm, "rust_port_id*_send port: 0x%" PRIxPTR,
(uintptr_t) target_port_id);

rust_port *port = task->kernel->get_port_by_id(target_port_id);
if(port) {
port->send(sptr);
port->deref();
sent = true;
} else {
LOG(task, comm, "didn't get the port");
}
return (uintptr_t)sent;
return (uintptr_t)task->kernel->send_to_port(target_port_id, sptr);
}

// This is called by an intrinsic on the Rust stack and must run
Expand Down Expand Up @@ -782,6 +769,18 @@ rust_compare_and_swap_ptr(intptr_t *address,
return sync::compare_and_swap(address, oldval, newval);
}

extern "C" CDECL void
rust_task_weaken(rust_port_id chan) {
rust_task *task = rust_get_current_task();
task->kernel->weaken_task(chan);
}

extern "C" CDECL void
rust_task_unweaken(rust_port_id chan) {
rust_task *task = rust_get_current_task();
task->kernel->unweaken_task(chan);
}

//
// Local Variables:
// mode: C++
Expand Down
79 changes: 78 additions & 1 deletion src/rt/rust_kernel.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@



#include "rust_kernel.h"
#include "rust_port.h"
#include "rust_util.h"
#include "rust_scheduler.h"
#include "rust_sched_launcher.h"
#include <algorithm>

#define KLOG_(...) \
KLOG(this, kern, __VA_ARGS__)
Expand All @@ -21,6 +21,7 @@ rust_kernel::rust_kernel(rust_env *env) :
max_sched_id(0),
sched_reaper(this),
osmain_driver(NULL),
non_weak_tasks(0),
env(env)
{
// Create the single threaded scheduler that will run on the platform's
Expand Down Expand Up @@ -286,6 +287,82 @@ rust_kernel::set_exit_status(int code) {
}
}

void
rust_kernel::register_task() {
scoped_lock with(weak_task_lock);
KLOG_("Registering task");
uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
}

void
rust_kernel::unregister_task() {
scoped_lock with(weak_task_lock);
KLOG_("Unregistering task");
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
if (new_non_weak_tasks == 0) {
end_weak_tasks();
}
}

void
rust_kernel::weaken_task(rust_port_id chan) {
scoped_lock with(weak_task_lock);
KLOG_("Weakening task with channel %" PRIdPTR, chan);
weak_task_chans.push_back(chan);
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
if (new_non_weak_tasks == 0) {
end_weak_tasks();
}
}

void
rust_kernel::unweaken_task(rust_port_id chan) {
scoped_lock with(weak_task_lock);
KLOG_("Unweakening task with channel %" PRIdPTR, chan);
std::vector<rust_port_id>::iterator iter =
std::find(weak_task_chans.begin(), weak_task_chans.end(), chan);
if (iter != weak_task_chans.end()) {
weak_task_chans.erase(iter);
}
uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
}

void
rust_kernel::end_weak_tasks() {
std::vector<rust_port_id> chancopies;
{
//scoped_lock with(weak_task_lock);
chancopies = weak_task_chans;
weak_task_chans.clear();
}
while (!chancopies.empty()) {
rust_port_id chan = chancopies.back();
chancopies.pop_back();
KLOG_("Notifying weak task " PRIdPTR, chan);
uintptr_t token = 0;
send_to_port(chan, &token);
}
}

bool
rust_kernel::send_to_port(rust_port_id chan, void *sptr) {
KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan);

rust_port *port = get_port_by_id(chan);
if(port) {
port->send(sptr);
port->deref();
return true;
} else {
KLOG_("didn't get the port");
return false;
}
}

//
// Local Variables:
// mode: C++
Expand Down
15 changes: 15 additions & 0 deletions src/rt/rust_kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,15 @@ class rust_kernel {
// on the main thread
rust_sched_driver *osmain_driver;

// An atomically updated count of the live, 'non-weak' tasks
uintptr_t non_weak_tasks;
// Protects weak_task_chans
lock_and_signal weak_task_lock;
// A list of weak tasks that need to be told when to exit
std::vector<rust_port_id> weak_task_chans;

rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id);
void end_weak_tasks();

public:
struct rust_env *env;
Expand Down Expand Up @@ -102,6 +110,13 @@ class rust_kernel {
void set_exit_status(int code);

rust_sched_id osmain_sched_id() { return osmain_scheduler; }

void register_task();
void unregister_task();
void weaken_task(rust_port_id chan);
void unweaken_task(rust_port_id chan);

bool send_to_port(rust_port_id chan, void *sptr);
};

template <typename T> struct kernel_owned {
Expand Down
2 changes: 2 additions & 0 deletions src/rt/rust_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) {
if (cur_thread >= num_threads)
cur_thread = 0;
}
kernel->register_task();
rust_sched_launcher *thread = threads[thread_no];
return thread->get_loop()->create_task(spawner, name);
}
Expand All @@ -106,6 +107,7 @@ rust_scheduler::release_task() {
need_exit = true;
}
}
kernel->unregister_task();
if (need_exit) {
exit();
}
Expand Down
2 changes: 2 additions & 0 deletions src/rt/rustrt.def.in
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ rust_task_yield
rust_task_is_unwinding
rust_get_task
rust_task_config_notify
rust_task_weaken
rust_task_unweaken
sched_threads
shape_log_str
start_task
Expand Down