Skip to content

a way to parallelize with less contention #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 79 additions & 4 deletions git-features/src/parallel/in_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::parallel::{num_threads, Reduce};
use std::sync::atomic::{AtomicBool, Ordering};

/// Runs `left` and `right` in parallel, returning their output when both are done.
pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) {
Expand Down Expand Up @@ -34,8 +35,8 @@ where
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S + Send + Clone,
consume: impl Fn(I, &mut S) -> O + Send + Clone,
new_thread_state: impl FnMut(usize) -> S + Send + Clone,
consume: impl FnMut(I, &mut S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
Expand All @@ -52,8 +53,8 @@ where
s.spawn({
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let consume = consume.clone();
let mut new_thread_state = new_thread_state.clone();
let mut consume = consume.clone();
move |_| {
let mut state = new_thread_state(thread_id);
for item in receive_input {
Expand Down Expand Up @@ -81,3 +82,77 @@ where
})
.unwrap()
}

#[allow(missing_docs)] // TODO: docs
pub fn in_parallel_with_mut_slice<I, S, O, E, FinalResult>(
input: &mut [I],
thread_limit: Option<usize>,
new_thread_state: impl FnMut(usize) -> S + Send + Clone,
consume: impl FnMut(&mut I, &mut S) -> Result<O, E> + Send + Clone,
mut periodic: impl FnMut() -> std::time::Duration + Send,
mut finalize: impl FnMut(&[I], Vec<O>) -> Result<FinalResult, E>,
) -> Result<FinalResult, E>
where
I: Send,
O: Send,
E: Send,
{
let num_threads = num_threads(thread_limit);
let input = &*input; // downgrade to immutable
let results: Vec<parking_lot::Mutex<Option<Result<O, E>>>> = (0..input.len()).map(|_| Default::default()).collect();
let stop_periodic = &AtomicBool::default();

crossbeam_utils::thread::scope({
let results = &results;
move |s| {
s.spawn({
move |_| loop {
if stop_periodic.load(Ordering::Relaxed) {
break;
}

std::thread::sleep(periodic());
}
});

let threads: Vec<_> = (0..num_threads)
.map(|n| {
s.spawn({
let mut new_thread_state = new_thread_state.clone();
let mut _consume = consume.clone();
move |_| {
let _state = new_thread_state(n);
let mut item = 0;
while let Some(res) = &results.get(num_threads * item + n) {
item += 1;
if let Some(mut guard) = res.try_lock() {
match guard.as_mut() {
Some(_) => {
// somebody stole our work, assume all future work is done, too
return;
}
None => {
todo!("make input mutable and consume/process")
}
}
}
}
// TODO: work-stealing logic once we are out of bounds. Pose as prior thread and walk backwards.
}
})
})
.collect();
for thread in threads {
thread.join().expect("must not panic");
}

stop_periodic.store(true, Ordering::Relaxed);
}
})
.unwrap();
let mut unwrapped_results = Vec::with_capacity(results.len());
for res in results {
unwrapped_results.push(res.into_inner().expect("result obtained, item processed")?);
}
finalize(input, unwrapped_results)
}
2 changes: 1 addition & 1 deletion git-features/src/parallel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#[cfg(feature = "parallel")]
mod in_parallel;
#[cfg(feature = "parallel")]
pub use in_parallel::{in_parallel, join, threads};
pub use in_parallel::{in_parallel, in_parallel_with_mut_slice, join, threads};

mod serial;
#[cfg(not(feature = "parallel"))]
Expand Down
4 changes: 2 additions & 2 deletions git-features/src/parallel/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ pub use not_parallel::{join, threads, Scope, ScopedJoinHandle};
pub fn in_parallel<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S,
consume: impl Fn(I, &mut S) -> O,
mut new_thread_state: impl FnMut(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
Expand Down