Skip to content

Commit bd2e3d0

Browse files
committed
Emit sorted dir-diff result as soon as preceding results get ready
If the directory diff contains lots of changes across files, it's better to emit earlier results incrementally. The idea is borrowed from the following thread. The diff producer assigns incremental index, and the consumer puts the results in BinaryHeap to reorder them by index. https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3
1 parent 8f292fb commit bd2e3d0

File tree

2 files changed

+103
-12
lines changed

2 files changed

+103
-12
lines changed

src/main.rs

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@ use crate::parse::syntax;
7575
#[global_allocator]
7676
static GLOBAL: MiMalloc = MiMalloc;
7777

78+
use std::cmp::Ordering;
79+
use std::collections::BinaryHeap;
7880
use std::path::Path;
79-
use std::{env, thread};
81+
use std::{env, iter, thread};
8082

8183
use humansize::{format_size, BINARY};
8284
use owo_colors::OwoColorize;
@@ -264,15 +266,6 @@ fn main() {
264266
.iter()
265267
.any(|diff_result| diff_result.has_reportable_change());
266268
display::json::print_directory(results, display_options.print_unchanged);
267-
} else if display_options.sort_paths {
268-
let result: Vec<DiffResult> = diff_iter.collect();
269-
for diff_result in result {
270-
print_diff_result(&display_options, &diff_result);
271-
272-
if diff_result.has_reportable_change() {
273-
encountered_changes = true;
274-
}
275-
}
276269
} else {
277270
// We want to diff files in the directory in
278271
// parallel, but print the results serially
@@ -283,11 +276,18 @@ fn main() {
283276

284277
s.spawn(move || {
285278
diff_iter
279+
.enumerate()
286280
.try_for_each_with(send, |s, diff_result| s.send(diff_result))
287281
.expect("Receiver should be connected")
288282
});
289283

290-
for diff_result in recv.into_iter() {
284+
let serial_iter: Box<dyn Iterator<Item = _>> =
285+
if display_options.sort_paths {
286+
Box::new(reorder_by_index(recv.into_iter(), 0))
287+
} else {
288+
Box::new(recv.into_iter())
289+
};
290+
for (_, diff_result) in serial_iter {
291291
print_diff_result(&display_options, &diff_result);
292292

293293
if diff_result.has_reportable_change() {
@@ -727,7 +727,7 @@ fn diff_directories<'a>(
727727
display_options: &DisplayOptions,
728728
diff_options: &DiffOptions,
729729
overrides: &[(LanguageOverride, Vec<glob::Pattern>)],
730-
) -> impl ParallelIterator<Item = DiffResult> + 'a {
730+
) -> impl IndexedParallelIterator<Item = DiffResult> + 'a {
731731
let diff_options = diff_options.clone();
732732
let display_options = display_options.clone();
733733
let overrides: Vec<_> = overrides.into();
@@ -887,6 +887,57 @@ fn print_diff_result(display_options: &DisplayOptions, summary: &DiffResult) {
887887
}
888888
}
889889

890+
/// Sort items in the `source` iterator by the 0th `usize` field, yield when all
891+
/// preceding items are received.
892+
///
893+
/// The idea is borrowed from
894+
/// <https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3>.
895+
fn reorder_by_index<T>(
896+
source: impl Iterator<Item = (usize, T)>,
897+
mut next_index: usize,
898+
) -> impl Iterator<Item = (usize, T)> {
899+
struct WorkItem<T> {
900+
index: usize,
901+
value: T,
902+
}
903+
904+
impl<T> Eq for WorkItem<T> {}
905+
906+
impl<T> Ord for WorkItem<T> {
907+
fn cmp(&self, other: &Self) -> Ordering {
908+
self.index.cmp(&other.index).reverse() // min heap
909+
}
910+
}
911+
912+
impl<T> PartialEq for WorkItem<T> {
913+
fn eq(&self, other: &Self) -> bool {
914+
self.index == other.index
915+
}
916+
}
917+
918+
impl<T> PartialOrd for WorkItem<T> {
919+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
920+
Some(self.cmp(other))
921+
}
922+
}
923+
924+
let mut source = source.fuse();
925+
let mut queue: BinaryHeap<WorkItem<T>> = BinaryHeap::new();
926+
iter::from_fn(move || {
927+
// Consume the source iterator until the next_index item is found.
928+
while queue.peek().map_or(true, |item| item.index > next_index) {
929+
if let Some((index, value)) = source.next() {
930+
queue.push(WorkItem { index, value });
931+
} else {
932+
break;
933+
}
934+
}
935+
let item = queue.pop()?;
936+
next_index = item.index + 1;
937+
Some((item.index, item.value))
938+
})
939+
}
940+
890941
#[cfg(test)]
891942
mod tests {
892943
use std::ffi::OsStr;
@@ -911,4 +962,13 @@ mod tests {
911962
assert_eq!(res.lhs_positions, vec![]);
912963
assert_eq!(res.rhs_positions, vec![]);
913964
}
965+
966+
#[test]
967+
fn test_reorder_by_index() {
968+
let source = vec![(0, "a"), (4, "b"), (2, "c"), (1, "d"), (3, "e")];
969+
let reordered: Vec<_> = reorder_by_index(source.iter().copied(), 0).collect();
970+
let mut sorted = source.clone();
971+
sorted.sort();
972+
assert_eq!(reordered, sorted);
973+
}
914974
}

tests/cli.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,37 @@ fn directory_arguments() {
184184
cmd.assert().stdout(predicate_fn);
185185
}
186186

187+
#[test]
188+
fn directory_arguments_sorted() {
189+
let mut cmd = get_base_command();
190+
191+
cmd.arg("--sort-paths")
192+
.arg("--display=inline")
193+
.arg("sample_files/dir_1")
194+
.arg("sample_files/dir_2");
195+
196+
let expected_files = [
197+
"clojure.clj",
198+
"foo.js",
199+
"has_many_hunk.py",
200+
"only_in_1.c",
201+
"only_in_2.rs",
202+
];
203+
let predicate_fn = predicate::function(|output: &str| {
204+
let mut file_linenos = Vec::new();
205+
for name in expected_files {
206+
if let Some(lineno) = output.lines().position(|line| line.starts_with(name)) {
207+
file_linenos.push(lineno);
208+
} else {
209+
return false; // All files should be emitted
210+
}
211+
}
212+
// Output should be sorted
213+
file_linenos.windows(2).all(|w| w[0] < w[1])
214+
});
215+
cmd.assert().stdout(predicate_fn);
216+
}
217+
187218
#[test]
188219
fn git_style_arguments_rename() {
189220
let mut cmd = get_base_command();

0 commit comments

Comments
 (0)