Skip to content

Commit d97bdf0

Browse files
committed
Emit sorted dir-diff result as soon as preceding results get ready
If the directory diff contains lots of changes across files, it's better to emit earlier results incrementally. The idea is borrowed from the following thread. The diff producer assigns incremental index, and the consumer puts the results in BinaryHeap to reorder them by index. https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3
1 parent d99c686 commit d97bdf0

File tree

2 files changed

+103
-12
lines changed

2 files changed

+103
-12
lines changed

src/main.rs

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ use crate::parse::syntax;
7878
#[global_allocator]
7979
static GLOBAL: MiMalloc = MiMalloc;
8080

81+
use std::cmp::Ordering;
82+
use std::collections::BinaryHeap;
8183
use std::path::Path;
82-
use std::{env, thread};
84+
use std::{env, iter, thread};
8385

8486
use humansize::{format_size, BINARY};
8587
use owo_colors::OwoColorize;
@@ -267,15 +269,6 @@ fn main() {
267269
.iter()
268270
.any(|diff_result| diff_result.has_reportable_change());
269271
display::json::print_directory(results, display_options.print_unchanged);
270-
} else if display_options.sort_paths {
271-
let result: Vec<DiffResult> = diff_iter.collect();
272-
for diff_result in result {
273-
print_diff_result(&display_options, &diff_result);
274-
275-
if diff_result.has_reportable_change() {
276-
encountered_changes = true;
277-
}
278-
}
279272
} else {
280273
// We want to diff files in the directory in
281274
// parallel, but print the results serially
@@ -286,11 +279,18 @@ fn main() {
286279

287280
s.spawn(move || {
288281
diff_iter
282+
.enumerate()
289283
.try_for_each_with(send, |s, diff_result| s.send(diff_result))
290284
.expect("Receiver should be connected")
291285
});
292286

293-
for diff_result in recv.into_iter() {
287+
let serial_iter: Box<dyn Iterator<Item = _>> =
288+
if display_options.sort_paths {
289+
Box::new(reorder_by_index(recv.into_iter(), 0))
290+
} else {
291+
Box::new(recv.into_iter())
292+
};
293+
for (_, diff_result) in serial_iter {
294294
print_diff_result(&display_options, &diff_result);
295295

296296
if diff_result.has_reportable_change() {
@@ -747,7 +747,7 @@ fn diff_directories<'a>(
747747
display_options: &DisplayOptions,
748748
diff_options: &DiffOptions,
749749
overrides: &[(LanguageOverride, Vec<glob::Pattern>)],
750-
) -> impl ParallelIterator<Item = DiffResult> + 'a {
750+
) -> impl IndexedParallelIterator<Item = DiffResult> + 'a {
751751
let diff_options = diff_options.clone();
752752
let display_options = display_options.clone();
753753
let overrides: Vec<_> = overrides.into();
@@ -907,6 +907,57 @@ fn print_diff_result(display_options: &DisplayOptions, summary: &DiffResult) {
907907
}
908908
}
909909

910+
/// Sort items in the `source` iterator by the 0th `usize` field, yield when all
911+
/// preceding items are received.
912+
///
913+
/// The idea is borrowed from
914+
/// <https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3>.
915+
fn reorder_by_index<T>(
916+
source: impl Iterator<Item = (usize, T)>,
917+
mut next_index: usize,
918+
) -> impl Iterator<Item = (usize, T)> {
919+
struct WorkItem<T> {
920+
index: usize,
921+
value: T,
922+
}
923+
924+
impl<T> Eq for WorkItem<T> {}
925+
926+
impl<T> Ord for WorkItem<T> {
927+
fn cmp(&self, other: &Self) -> Ordering {
928+
self.index.cmp(&other.index).reverse() // min heap
929+
}
930+
}
931+
932+
impl<T> PartialEq for WorkItem<T> {
933+
fn eq(&self, other: &Self) -> bool {
934+
self.index == other.index
935+
}
936+
}
937+
938+
impl<T> PartialOrd for WorkItem<T> {
939+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
940+
Some(self.cmp(other))
941+
}
942+
}
943+
944+
let mut source = source.fuse();
945+
let mut queue: BinaryHeap<WorkItem<T>> = BinaryHeap::new();
946+
iter::from_fn(move || {
947+
// Consume the source iterator until the next_index item is found.
948+
while queue.peek().map_or(true, |item| item.index > next_index) {
949+
if let Some((index, value)) = source.next() {
950+
queue.push(WorkItem { index, value });
951+
} else {
952+
break;
953+
}
954+
}
955+
let item = queue.pop()?;
956+
next_index = item.index + 1;
957+
Some((item.index, item.value))
958+
})
959+
}
960+
910961
#[cfg(test)]
911962
mod tests {
912963
use std::ffi::OsStr;
@@ -931,4 +982,13 @@ mod tests {
931982
assert_eq!(res.lhs_positions, vec![]);
932983
assert_eq!(res.rhs_positions, vec![]);
933984
}
985+
986+
#[test]
987+
fn test_reorder_by_index() {
988+
let source = vec![(0, "a"), (4, "b"), (2, "c"), (1, "d"), (3, "e")];
989+
let reordered: Vec<_> = reorder_by_index(source.iter().copied(), 0).collect();
990+
let mut sorted = source.clone();
991+
sorted.sort();
992+
assert_eq!(reordered, sorted);
993+
}
934994
}

tests/cli.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,37 @@ fn directory_arguments() {
194194
cmd.assert().stdout(predicate_fn);
195195
}
196196

197+
#[test]
198+
fn directory_arguments_sorted() {
199+
let mut cmd = get_base_command();
200+
201+
cmd.arg("--sort-paths")
202+
.arg("--display=inline")
203+
.arg("sample_files/dir_1")
204+
.arg("sample_files/dir_2");
205+
206+
let expected_files = [
207+
"clojure.clj",
208+
"foo.js",
209+
"has_many_hunk.py",
210+
"only_in_1.c",
211+
"only_in_2.rs",
212+
];
213+
let predicate_fn = predicate::function(|output: &str| {
214+
let mut file_linenos = Vec::new();
215+
for name in expected_files {
216+
if let Some(lineno) = output.lines().position(|line| line.starts_with(name)) {
217+
file_linenos.push(lineno);
218+
} else {
219+
return false; // All files should be emitted
220+
}
221+
}
222+
// Output should be sorted
223+
file_linenos.windows(2).all(|w| w[0] < w[1])
224+
});
225+
cmd.assert().stdout(predicate_fn);
226+
}
227+
197228
#[test]
198229
fn git_style_arguments_rename() {
199230
let mut cmd = get_base_command();

0 commit comments

Comments
 (0)