Skip to content

Commit a5cc56e

Browse files
committed
Emit sorted dir-diff result as soon as preceding results get ready
If the directory diff contains lots of changes across files, it's better to emit earlier results incrementally. The idea is borrowed from the following thread. The diff producer assigns incremental index, and the consumer puts the results in BinaryHeap to reorder them by index. https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3
1 parent d1cb9f9 commit a5cc56e

File tree

2 files changed

+103
-12
lines changed

2 files changed

+103
-12
lines changed

src/main.rs

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@ use crate::parse::syntax;
7575
#[global_allocator]
7676
static GLOBAL: MiMalloc = MiMalloc;
7777

78+
use std::cmp::Ordering;
79+
use std::collections::BinaryHeap;
7880
use std::path::Path;
79-
use std::{env, thread};
81+
use std::{env, iter, thread};
8082

8183
use humansize::{format_size, BINARY};
8284
use owo_colors::OwoColorize;
@@ -264,15 +266,6 @@ fn main() {
264266
.iter()
265267
.any(|diff_result| diff_result.has_reportable_change());
266268
display::json::print_directory(results, display_options.print_unchanged);
267-
} else if display_options.sort_paths {
268-
let result: Vec<DiffResult> = diff_iter.collect();
269-
for diff_result in result {
270-
print_diff_result(&display_options, &diff_result);
271-
272-
if diff_result.has_reportable_change() {
273-
encountered_changes = true;
274-
}
275-
}
276269
} else {
277270
// We want to diff files in the directory in
278271
// parallel, but print the results serially
@@ -283,11 +276,18 @@ fn main() {
283276

284277
s.spawn(move || {
285278
diff_iter
279+
.enumerate()
286280
.try_for_each_with(send, |s, diff_result| s.send(diff_result))
287281
.expect("Receiver should be connected")
288282
});
289283

290-
for diff_result in recv.into_iter() {
284+
let serial_iter: Box<dyn Iterator<Item = _>> =
285+
if display_options.sort_paths {
286+
Box::new(reorder_by_index(recv.into_iter(), 0))
287+
} else {
288+
Box::new(recv.into_iter())
289+
};
290+
for (_, diff_result) in serial_iter {
291291
print_diff_result(&display_options, &diff_result);
292292

293293
if diff_result.has_reportable_change() {
@@ -739,7 +739,7 @@ fn diff_directories<'a>(
739739
display_options: &DisplayOptions,
740740
diff_options: &DiffOptions,
741741
overrides: &[(LanguageOverride, Vec<glob::Pattern>)],
742-
) -> impl ParallelIterator<Item = DiffResult> + 'a {
742+
) -> impl IndexedParallelIterator<Item = DiffResult> + 'a {
743743
let diff_options = diff_options.clone();
744744
let display_options = display_options.clone();
745745
let overrides: Vec<_> = overrides.into();
@@ -899,6 +899,57 @@ fn print_diff_result(display_options: &DisplayOptions, summary: &DiffResult) {
899899
}
900900
}
901901

902+
/// Sort items in the `source` iterator by the 0th `usize` field, yield when all
903+
/// preceding items are received.
904+
///
905+
/// The idea is borrowed from
906+
/// <https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3>.
907+
fn reorder_by_index<T>(
908+
source: impl Iterator<Item = (usize, T)>,
909+
mut next_index: usize,
910+
) -> impl Iterator<Item = (usize, T)> {
911+
struct WorkItem<T> {
912+
index: usize,
913+
value: T,
914+
}
915+
916+
impl<T> Eq for WorkItem<T> {}
917+
918+
impl<T> Ord for WorkItem<T> {
919+
fn cmp(&self, other: &Self) -> Ordering {
920+
self.index.cmp(&other.index).reverse() // min heap
921+
}
922+
}
923+
924+
impl<T> PartialEq for WorkItem<T> {
925+
fn eq(&self, other: &Self) -> bool {
926+
self.index == other.index
927+
}
928+
}
929+
930+
impl<T> PartialOrd for WorkItem<T> {
931+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
932+
Some(self.cmp(other))
933+
}
934+
}
935+
936+
let mut source = source.fuse();
937+
let mut queue: BinaryHeap<WorkItem<T>> = BinaryHeap::new();
938+
iter::from_fn(move || {
939+
// Consume the source iterator until the next_index item is found.
940+
while queue.peek().map_or(true, |item| item.index > next_index) {
941+
if let Some((index, value)) = source.next() {
942+
queue.push(WorkItem { index, value });
943+
} else {
944+
break;
945+
}
946+
}
947+
let item = queue.pop()?;
948+
next_index = item.index + 1;
949+
Some((item.index, item.value))
950+
})
951+
}
952+
902953
#[cfg(test)]
903954
mod tests {
904955
use std::ffi::OsStr;
@@ -923,4 +974,13 @@ mod tests {
923974
assert_eq!(res.lhs_positions, vec![]);
924975
assert_eq!(res.rhs_positions, vec![]);
925976
}
977+
978+
#[test]
979+
fn test_reorder_by_index() {
980+
let source = vec![(0, "a"), (4, "b"), (2, "c"), (1, "d"), (3, "e")];
981+
let reordered: Vec<_> = reorder_by_index(source.iter().copied(), 0).collect();
982+
let mut sorted = source.clone();
983+
sorted.sort();
984+
assert_eq!(reordered, sorted);
985+
}
926986
}

tests/cli.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,37 @@ fn directory_arguments() {
194194
cmd.assert().stdout(predicate_fn);
195195
}
196196

197+
#[test]
198+
fn directory_arguments_sorted() {
199+
let mut cmd = get_base_command();
200+
201+
cmd.arg("--sort-paths")
202+
.arg("--display=inline")
203+
.arg("sample_files/dir_1")
204+
.arg("sample_files/dir_2");
205+
206+
let expected_files = [
207+
"clojure.clj",
208+
"foo.js",
209+
"has_many_hunk.py",
210+
"only_in_1.c",
211+
"only_in_2.rs",
212+
];
213+
let predicate_fn = predicate::function(|output: &str| {
214+
let mut file_linenos = Vec::new();
215+
for name in expected_files {
216+
if let Some(lineno) = output.lines().position(|line| line.starts_with(name)) {
217+
file_linenos.push(lineno);
218+
} else {
219+
return false; // All files should be emitted
220+
}
221+
}
222+
// Output should be sorted
223+
file_linenos.windows(2).all(|w| w[0] < w[1])
224+
});
225+
cmd.assert().stdout(predicate_fn);
226+
}
227+
197228
#[test]
198229
fn git_style_arguments_rename() {
199230
let mut cmd = get_base_command();

0 commit comments

Comments
 (0)