diff --git a/git-features/src/parallel/in_parallel.rs b/git-features/src/parallel/in_parallel.rs index 2d3b69290dc..d3efa413bee 100644 --- a/git-features/src/parallel/in_parallel.rs +++ b/git-features/src/parallel/in_parallel.rs @@ -1,4 +1,5 @@ use crate::parallel::{num_threads, Reduce}; +use std::sync::atomic::{AtomicBool, Ordering}; /// Runs `left` and `right` in parallel, returning their output when both are done. pub fn join(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { @@ -34,8 +35,8 @@ where pub fn in_parallel( input: impl Iterator + Send, thread_limit: Option, - new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + new_thread_state: impl FnMut(usize) -> S + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, mut reducer: R, ) -> Result<::Output, ::Error> where @@ -52,8 +53,8 @@ where s.spawn({ let send_result = send_result.clone(); let receive_input = receive_input.clone(); - let new_thread_state = new_thread_state.clone(); - let consume = consume.clone(); + let mut new_thread_state = new_thread_state.clone(); + let mut consume = consume.clone(); move |_| { let mut state = new_thread_state(thread_id); for item in receive_input { @@ -81,3 +82,77 @@ where }) .unwrap() } + +#[allow(missing_docs)] // TODO: docs +pub fn in_parallel_with_mut_slice( + input: &mut [I], + thread_limit: Option, + new_thread_state: impl FnMut(usize) -> S + Send + Clone, + consume: impl FnMut(&mut I, &mut S) -> Result + Send + Clone, + mut periodic: impl FnMut() -> std::time::Duration + Send, + mut finalize: impl FnMut(&[I], Vec) -> Result, +) -> Result +where + I: Send, + O: Send, + E: Send, +{ + let num_threads = num_threads(thread_limit); + let input = &*input; // downgrade to immutable + let results: Vec>>> = (0..input.len()).map(|_| Default::default()).collect(); + let stop_periodic = &AtomicBool::default(); + + crossbeam_utils::thread::scope({ + let results = &results; + move |s| { + s.spawn({ + move |_| loop { + if stop_periodic.load(Ordering::Relaxed) { + break; + } + + std::thread::sleep(periodic()); + } + }); + + let threads: Vec<_> = (0..num_threads) + .map(|n| { + s.spawn({ + let mut new_thread_state = new_thread_state.clone(); + let mut _consume = consume.clone(); + move |_| { + let _state = new_thread_state(n); + let mut item = 0; + while let Some(res) = &results.get(num_threads * item + n) { + item += 1; + if let Some(mut guard) = res.try_lock() { + match guard.as_mut() { + Some(_) => { + // somebody stole our work, assume all future work is done, too + return; + } + None => { + todo!("make input mutable and consume/process") + } + } + } + } + // TODO: work-stealing logic once we are out of bounds. Pose as prior thread and walk backwards. + } + }) + }) + .collect(); + for thread in threads { + thread.join().expect("must not panic"); + } + + stop_periodic.store(true, Ordering::Relaxed); + } + }) + .unwrap(); + let mut unwrapped_results = Vec::with_capacity(results.len()); + for res in results { + unwrapped_results.push(res.into_inner().expect("result obtained, item processed")?); + } + finalize(input, unwrapped_results) +} diff --git a/git-features/src/parallel/mod.rs b/git-features/src/parallel/mod.rs index ebdedd3f308..cde597b355b 100644 --- a/git-features/src/parallel/mod.rs +++ b/git-features/src/parallel/mod.rs @@ -35,7 +35,7 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::{in_parallel, join, threads}; +pub use in_parallel::{in_parallel, in_parallel_with_mut_slice, join, threads}; mod serial; #[cfg(not(feature = "parallel"))] diff --git a/git-features/src/parallel/serial.rs b/git-features/src/parallel/serial.rs index eb92aa0a724..b3afc36943e 100644 --- a/git-features/src/parallel/serial.rs +++ b/git-features/src/parallel/serial.rs @@ -72,8 +72,8 @@ pub use not_parallel::{join, threads, Scope, ScopedJoinHandle}; pub fn in_parallel( input: impl Iterator, _thread_limit: Option, - new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + mut new_thread_state: impl FnMut(usize) -> S, + mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<::Output, ::Error> where