Skip to content

Commit 4d2f683

Browse files
committed
Emit sorted dir-diff result as soon as preceding results get ready
If the directory diff contains lots of changes across files, it's better to emit earlier results incrementally. The idea is borrowed from the following thread. The diff producer assigns incremental index, and the consumer puts the results in BinaryHeap to reorder them by index. https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3
1 parent f4d9b39 commit 4d2f683

File tree

2 files changed

+103
-12
lines changed

2 files changed

+103
-12
lines changed

src/main.rs

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,11 @@ use crate::parse::syntax;
7070
#[global_allocator]
7171
static GLOBAL: MiMalloc = MiMalloc;
7272

73+
use std::cmp::Ordering;
74+
use std::collections::BinaryHeap;
7375
use std::fs::Permissions;
7476
use std::path::Path;
75-
use std::{env, thread};
77+
use std::{env, iter, thread};
7678

7779
use humansize::{format_size, BINARY};
7880
use owo_colors::OwoColorize;
@@ -258,15 +260,6 @@ fn main() {
258260
.iter()
259261
.any(|diff_result| diff_result.has_reportable_change());
260262
display::json::print_directory(results);
261-
} else if display_options.sort_paths {
262-
let result: Vec<DiffResult> = diff_iter.collect();
263-
for diff_result in result {
264-
print_diff_result(&display_options, &diff_result);
265-
266-
if diff_result.has_reportable_change() {
267-
encountered_changes = true;
268-
}
269-
}
270263
} else {
271264
// We want to diff files in the directory in
272265
// parallel, but print the results serially
@@ -277,11 +270,18 @@ fn main() {
277270

278271
s.spawn(move || {
279272
diff_iter
273+
.enumerate()
280274
.try_for_each_with(send, |s, diff_result| s.send(diff_result))
281275
.expect("Receiver should be connected")
282276
});
283277

284-
for diff_result in recv.into_iter() {
278+
let serial_iter: Box<dyn Iterator<Item = _>> =
279+
if display_options.sort_paths {
280+
Box::new(reorder_by_index(recv.into_iter(), 0))
281+
} else {
282+
Box::new(recv.into_iter())
283+
};
284+
for (_, diff_result) in serial_iter {
285285
print_diff_result(&display_options, &diff_result);
286286

287287
if diff_result.has_reportable_change() {
@@ -766,7 +766,7 @@ fn diff_directories<'a>(
766766
display_options: &DisplayOptions,
767767
diff_options: &DiffOptions,
768768
overrides: &[(LanguageOverride, Vec<glob::Pattern>)],
769-
) -> impl ParallelIterator<Item = DiffResult> + 'a {
769+
) -> impl IndexedParallelIterator<Item = DiffResult> + 'a {
770770
let diff_options = diff_options.clone();
771771
let display_options = display_options.clone();
772772
let overrides: Vec<_> = overrides.into();
@@ -924,6 +924,57 @@ fn print_diff_result(display_options: &DisplayOptions, summary: &DiffResult) {
924924
}
925925
}
926926

927+
/// Sort items in the `source` iterator by the 0th `usize` field, yield when all
928+
/// preceding items are received.
929+
///
930+
/// The idea is borrowed from
931+
/// <https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3>.
932+
fn reorder_by_index<T>(
933+
source: impl Iterator<Item = (usize, T)>,
934+
mut next_index: usize,
935+
) -> impl Iterator<Item = (usize, T)> {
936+
struct WorkItem<T> {
937+
index: usize,
938+
value: T,
939+
}
940+
941+
impl<T> Eq for WorkItem<T> {}
942+
943+
impl<T> Ord for WorkItem<T> {
944+
fn cmp(&self, other: &Self) -> Ordering {
945+
self.index.cmp(&other.index).reverse() // min heap
946+
}
947+
}
948+
949+
impl<T> PartialEq for WorkItem<T> {
950+
fn eq(&self, other: &Self) -> bool {
951+
self.index == other.index
952+
}
953+
}
954+
955+
impl<T> PartialOrd for WorkItem<T> {
956+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
957+
Some(self.cmp(other))
958+
}
959+
}
960+
961+
let mut source = source.fuse();
962+
let mut queue: BinaryHeap<WorkItem<T>> = BinaryHeap::new();
963+
iter::from_fn(move || {
964+
// Consume the source iterator until the next_index item is found.
965+
while queue.peek().map_or(true, |item| item.index > next_index) {
966+
if let Some((index, value)) = source.next() {
967+
queue.push(WorkItem { index, value });
968+
} else {
969+
break;
970+
}
971+
}
972+
let item = queue.pop()?;
973+
next_index = item.index + 1;
974+
Some((item.index, item.value))
975+
})
976+
}
977+
927978
#[cfg(test)]
928979
mod tests {
929980
use std::ffi::OsStr;
@@ -948,4 +999,13 @@ mod tests {
948999
assert_eq!(res.lhs_positions, vec![]);
9491000
assert_eq!(res.rhs_positions, vec![]);
9501001
}
1002+
1003+
#[test]
1004+
fn test_reorder_by_index() {
1005+
let source = vec![(0, "a"), (4, "b"), (2, "c"), (1, "d"), (3, "e")];
1006+
let reordered: Vec<_> = reorder_by_index(source.iter().copied(), 0).collect();
1007+
let mut sorted = source.clone();
1008+
sorted.sort();
1009+
assert_eq!(reordered, sorted);
1010+
}
9511011
}

tests/cli.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,37 @@ fn directory_arguments() {
185185
cmd.assert().stdout(predicate_fn);
186186
}
187187

188+
#[test]
189+
fn directory_arguments_sorted() {
190+
let mut cmd = get_base_command();
191+
192+
cmd.arg("--sort-paths")
193+
.arg("--display=inline")
194+
.arg("sample_files/dir_before")
195+
.arg("sample_files/dir_after");
196+
197+
let expected_files = [
198+
"clojure.clj",
199+
"foo.js",
200+
"has_many_hunk.py",
201+
"only_in_after_dir.rs",
202+
"only_in_before.c",
203+
];
204+
let predicate_fn = predicate::function(|output: &str| {
205+
let mut file_linenos = Vec::new();
206+
for name in expected_files {
207+
if let Some(lineno) = output.lines().position(|line| line.starts_with(name)) {
208+
file_linenos.push(lineno);
209+
} else {
210+
return false; // All files should be emitted
211+
}
212+
}
213+
// Output should be sorted
214+
file_linenos.windows(2).all(|w| w[0] < w[1])
215+
});
216+
cmd.assert().stdout(predicate_fn);
217+
}
218+
188219
#[test]
189220
fn git_style_arguments_rename() {
190221
let mut cmd = get_base_command();

0 commit comments

Comments
 (0)