Skip to content

Commit 1753f39

Browse files
committed
Emit sorted dir-diff result as soon as preceding results get ready
If the directory diff contains lots of changes across files, it's better to emit earlier results incrementally. The idea is borrowed from the following thread. The diff producer assigns incremental index, and the consumer puts the results in BinaryHeap to reorder them by index. https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3
1 parent 958f78d commit 1753f39

File tree

2 files changed

+103
-12
lines changed

2 files changed

+103
-12
lines changed

src/main.rs

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ use crate::parse::syntax;
7878
#[global_allocator]
7979
static GLOBAL: MiMalloc = MiMalloc;
8080

81+
use std::cmp::Ordering;
82+
use std::collections::BinaryHeap;
8183
use std::path::Path;
82-
use std::{env, thread};
84+
use std::{env, iter, thread};
8385

8486
use humansize::{format_size, BINARY};
8587
use owo_colors::OwoColorize;
@@ -290,15 +292,6 @@ fn main() {
290292
.iter()
291293
.any(|diff_result| diff_result.has_reportable_change());
292294
display::json::print_directory(results, display_options.print_unchanged);
293-
} else if display_options.sort_paths {
294-
let result: Vec<DiffResult> = diff_iter.collect();
295-
for diff_result in result {
296-
print_diff_result(&display_options, &diff_result);
297-
298-
if diff_result.has_reportable_change() {
299-
encountered_changes = true;
300-
}
301-
}
302295
} else {
303296
// We want to diff files in the directory in
304297
// parallel, but print the results serially
@@ -309,11 +302,18 @@ fn main() {
309302

310303
s.spawn(move || {
311304
diff_iter
305+
.enumerate()
312306
.try_for_each_with(send, |s, diff_result| s.send(diff_result))
313307
.expect("Receiver should be connected")
314308
});
315309

316-
for diff_result in recv.into_iter() {
310+
let serial_iter: Box<dyn Iterator<Item = _>> =
311+
if display_options.sort_paths {
312+
Box::new(reorder_by_index(recv.into_iter(), 0))
313+
} else {
314+
Box::new(recv.into_iter())
315+
};
316+
for (_, diff_result) in serial_iter {
317317
print_diff_result(&display_options, &diff_result);
318318

319319
if diff_result.has_reportable_change() {
@@ -769,7 +769,7 @@ fn diff_directories<'a>(
769769
display_options: &DisplayOptions,
770770
diff_options: &DiffOptions,
771771
overrides: &[(LanguageOverride, Vec<glob::Pattern>)],
772-
) -> impl ParallelIterator<Item = DiffResult> + 'a {
772+
) -> impl IndexedParallelIterator<Item = DiffResult> + 'a {
773773
let diff_options = diff_options.clone();
774774
let display_options = display_options.clone();
775775
let overrides: Vec<_> = overrides.into();
@@ -929,6 +929,57 @@ fn print_diff_result(display_options: &DisplayOptions, summary: &DiffResult) {
929929
}
930930
}
931931

932+
/// Sort items in the `source` iterator by the 0th `usize` field, yield when all
933+
/// preceding items are received.
934+
///
935+
/// The idea is borrowed from
936+
/// <https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3>.
937+
fn reorder_by_index<T>(
938+
source: impl Iterator<Item = (usize, T)>,
939+
mut next_index: usize,
940+
) -> impl Iterator<Item = (usize, T)> {
941+
struct WorkItem<T> {
942+
index: usize,
943+
value: T,
944+
}
945+
946+
impl<T> Eq for WorkItem<T> {}
947+
948+
impl<T> Ord for WorkItem<T> {
949+
fn cmp(&self, other: &Self) -> Ordering {
950+
self.index.cmp(&other.index).reverse() // min heap
951+
}
952+
}
953+
954+
impl<T> PartialEq for WorkItem<T> {
955+
fn eq(&self, other: &Self) -> bool {
956+
self.index == other.index
957+
}
958+
}
959+
960+
impl<T> PartialOrd for WorkItem<T> {
961+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
962+
Some(self.cmp(other))
963+
}
964+
}
965+
966+
let mut source = source.fuse();
967+
let mut queue: BinaryHeap<WorkItem<T>> = BinaryHeap::new();
968+
iter::from_fn(move || {
969+
// Consume the source iterator until the next_index item is found.
970+
while queue.peek().map_or(true, |item| item.index > next_index) {
971+
if let Some((index, value)) = source.next() {
972+
queue.push(WorkItem { index, value });
973+
} else {
974+
break;
975+
}
976+
}
977+
let item = queue.pop()?;
978+
next_index = item.index + 1;
979+
Some((item.index, item.value))
980+
})
981+
}
982+
932983
#[cfg(test)]
933984
mod tests {
934985
use std::ffi::OsStr;
@@ -953,4 +1004,13 @@ mod tests {
9531004
assert_eq!(res.lhs_positions, vec![]);
9541005
assert_eq!(res.rhs_positions, vec![]);
9551006
}
1007+
1008+
#[test]
1009+
fn test_reorder_by_index() {
1010+
let source = vec![(0, "a"), (4, "b"), (2, "c"), (1, "d"), (3, "e")];
1011+
let reordered: Vec<_> = reorder_by_index(source.iter().copied(), 0).collect();
1012+
let mut sorted = source.clone();
1013+
sorted.sort();
1014+
assert_eq!(reordered, sorted);
1015+
}
9561016
}

tests/cli.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,37 @@ fn directory_arguments() {
194194
cmd.assert().stdout(predicate_fn);
195195
}
196196

197+
#[test]
198+
fn directory_arguments_sorted() {
199+
let mut cmd = get_base_command();
200+
201+
cmd.arg("--sort-paths")
202+
.arg("--display=inline")
203+
.arg("sample_files/dir_1")
204+
.arg("sample_files/dir_2");
205+
206+
let expected_files = [
207+
"clojure.clj",
208+
"foo.js",
209+
"has_many_hunk.py",
210+
"only_in_1.c",
211+
"only_in_2.rs",
212+
];
213+
let predicate_fn = predicate::function(|output: &str| {
214+
let mut file_linenos = Vec::new();
215+
for name in expected_files {
216+
if let Some(lineno) = output.lines().position(|line| line.starts_with(name)) {
217+
file_linenos.push(lineno);
218+
} else {
219+
return false; // All files should be emitted
220+
}
221+
}
222+
// Output should be sorted
223+
file_linenos.windows(2).all(|w| w[0] < w[1])
224+
});
225+
cmd.assert().stdout(predicate_fn);
226+
}
227+
197228
#[test]
198229
fn git_style_arguments_rename() {
199230
let mut cmd = get_base_command();

0 commit comments

Comments
 (0)