Skip to content

Commit 2ee8cc0

Browse files
committed
Emit sorted dir-diff result as soon as preceding results get ready
If the directory diff contains lots of changes across files, it's better to emit earlier results incrementally. The idea is borrowed from the following thread. The diff producer assigns incremental index, and the consumer puts the results in BinaryHeap to reorder them by index. https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3
1 parent 917ca08 commit 2ee8cc0

File tree

2 files changed

+103
-12
lines changed

2 files changed

+103
-12
lines changed

src/main.rs

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,10 @@ use crate::parse::syntax;
7171
#[global_allocator]
7272
static GLOBAL: MiMalloc = MiMalloc;
7373

74+
use std::cmp::Ordering;
75+
use std::collections::BinaryHeap;
7476
use std::path::Path;
75-
use std::{env, thread};
77+
use std::{env, iter, thread};
7678

7779
use humansize::{format_size, BINARY};
7880
use owo_colors::OwoColorize;
@@ -260,15 +262,6 @@ fn main() {
260262
.iter()
261263
.any(|diff_result| diff_result.has_reportable_change());
262264
display::json::print_directory(results, display_options.print_unchanged);
263-
} else if display_options.sort_paths {
264-
let result: Vec<DiffResult> = diff_iter.collect();
265-
for diff_result in result {
266-
print_diff_result(&display_options, &diff_result);
267-
268-
if diff_result.has_reportable_change() {
269-
encountered_changes = true;
270-
}
271-
}
272265
} else {
273266
// We want to diff files in the directory in
274267
// parallel, but print the results serially
@@ -279,11 +272,18 @@ fn main() {
279272

280273
s.spawn(move || {
281274
diff_iter
275+
.enumerate()
282276
.try_for_each_with(send, |s, diff_result| s.send(diff_result))
283277
.expect("Receiver should be connected")
284278
});
285279

286-
for diff_result in recv.into_iter() {
280+
let serial_iter: Box<dyn Iterator<Item = _>> =
281+
if display_options.sort_paths {
282+
Box::new(reorder_by_index(recv.into_iter(), 0))
283+
} else {
284+
Box::new(recv.into_iter())
285+
};
286+
for (_, diff_result) in serial_iter {
287287
print_diff_result(&display_options, &diff_result);
288288

289289
if diff_result.has_reportable_change() {
@@ -723,7 +723,7 @@ fn diff_directories<'a>(
723723
display_options: &DisplayOptions,
724724
diff_options: &DiffOptions,
725725
overrides: &[(LanguageOverride, Vec<glob::Pattern>)],
726-
) -> impl ParallelIterator<Item = DiffResult> + 'a {
726+
) -> impl IndexedParallelIterator<Item = DiffResult> + 'a {
727727
let diff_options = diff_options.clone();
728728
let display_options = display_options.clone();
729729
let overrides: Vec<_> = overrides.into();
@@ -883,6 +883,57 @@ fn print_diff_result(display_options: &DisplayOptions, summary: &DiffResult) {
883883
}
884884
}
885885

886+
/// Sort items in the `source` iterator by the 0th `usize` field, yield when all
887+
/// preceding items are received.
888+
///
889+
/// The idea is borrowed from
890+
/// <https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3>.
891+
fn reorder_by_index<T>(
892+
source: impl Iterator<Item = (usize, T)>,
893+
mut next_index: usize,
894+
) -> impl Iterator<Item = (usize, T)> {
895+
struct WorkItem<T> {
896+
index: usize,
897+
value: T,
898+
}
899+
900+
impl<T> Eq for WorkItem<T> {}
901+
902+
impl<T> Ord for WorkItem<T> {
903+
fn cmp(&self, other: &Self) -> Ordering {
904+
self.index.cmp(&other.index).reverse() // min heap
905+
}
906+
}
907+
908+
impl<T> PartialEq for WorkItem<T> {
909+
fn eq(&self, other: &Self) -> bool {
910+
self.index == other.index
911+
}
912+
}
913+
914+
impl<T> PartialOrd for WorkItem<T> {
915+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
916+
Some(self.cmp(other))
917+
}
918+
}
919+
920+
let mut source = source.fuse();
921+
let mut queue: BinaryHeap<WorkItem<T>> = BinaryHeap::new();
922+
iter::from_fn(move || {
923+
// Consume the source iterator until the next_index item is found.
924+
while queue.peek().map_or(true, |item| item.index > next_index) {
925+
if let Some((index, value)) = source.next() {
926+
queue.push(WorkItem { index, value });
927+
} else {
928+
break;
929+
}
930+
}
931+
let item = queue.pop()?;
932+
next_index = item.index + 1;
933+
Some((item.index, item.value))
934+
})
935+
}
936+
886937
#[cfg(test)]
887938
mod tests {
888939
use std::ffi::OsStr;
@@ -907,4 +958,13 @@ mod tests {
907958
assert_eq!(res.lhs_positions, vec![]);
908959
assert_eq!(res.rhs_positions, vec![]);
909960
}
961+
962+
#[test]
963+
fn test_reorder_by_index() {
964+
let source = vec![(0, "a"), (4, "b"), (2, "c"), (1, "d"), (3, "e")];
965+
let reordered: Vec<_> = reorder_by_index(source.iter().copied(), 0).collect();
966+
let mut sorted = source.clone();
967+
sorted.sort();
968+
assert_eq!(reordered, sorted);
969+
}
910970
}

tests/cli.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,37 @@ fn directory_arguments() {
185185
cmd.assert().stdout(predicate_fn);
186186
}
187187

188+
#[test]
189+
fn directory_arguments_sorted() {
190+
let mut cmd = get_base_command();
191+
192+
cmd.arg("--sort-paths")
193+
.arg("--display=inline")
194+
.arg("sample_files/dir_before")
195+
.arg("sample_files/dir_after");
196+
197+
let expected_files = [
198+
"clojure.clj",
199+
"foo.js",
200+
"has_many_hunk.py",
201+
"only_in_after_dir.rs",
202+
"only_in_before.c",
203+
];
204+
let predicate_fn = predicate::function(|output: &str| {
205+
let mut file_linenos = Vec::new();
206+
for name in expected_files {
207+
if let Some(lineno) = output.lines().position(|line| line.starts_with(name)) {
208+
file_linenos.push(lineno);
209+
} else {
210+
return false; // All files should be emitted
211+
}
212+
}
213+
// Output should be sorted
214+
file_linenos.windows(2).all(|w| w[0] < w[1])
215+
});
216+
cmd.assert().stdout(predicate_fn);
217+
}
218+
188219
#[test]
189220
fn git_style_arguments_rename() {
190221
let mut cmd = get_base_command();

0 commit comments

Comments
 (0)