Skip to content

Commit 5fc8b13

Browse files
committed
[wip] address michaeljo feedback and improve performance and memory overhead
1 parent 89e8b3b commit 5fc8b13

File tree

1 file changed

+54
-21
lines changed

1 file changed

+54
-21
lines changed

src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.query2.query.output;
1515

16+
import com.google.common.collect.Iterables;
1617
import com.google.devtools.build.lib.packages.LabelPrinter;
1718
import com.google.devtools.build.lib.packages.Target;
1819
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
1920
import com.google.devtools.build.lib.query2.proto.proto2api.Build;
2021
import com.google.protobuf.CodedOutputStream;
22+
2123
import java.io.IOException;
2224
import java.io.OutputStream;
23-
import java.util.stream.StreamSupport;
25+
import java.util.List;
26+
import java.util.concurrent.*;
27+
import java.util.concurrent.atomic.AtomicBoolean;
2428

2529
/**
2630
* An output formatter that outputs a protocol buffer representation of a query result and outputs
@@ -38,42 +42,71 @@ public String getName() {
3842
public OutputFormatterCallback<Target> createPostFactoStreamCallback(
3943
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
4044
return new OutputFormatterCallback<Target>() {
45+
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
46+
private static final int TARGETS_PER_CHUNK = 500;
47+
4148
private final LabelPrinter ourLabelPrinter = labelPrinter;
4249

4350
@Override
4451
public void processOutput(Iterable<Target> partialResult)
4552
throws IOException, InterruptedException {
53+
ForkJoinTask<?> writeAllTargetsFuture;
54+
try (ForkJoinPool executor =
55+
new ForkJoinPool(
56+
Runtime.getRuntime().availableProcessors(),
57+
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
58+
null,
59+
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
60+
// throughput
61+
true)) {
62+
var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
63+
var stillAddingTargetsToQueue = new AtomicBoolean(true);
64+
writeAllTargetsFuture =
65+
executor.submit(
66+
() -> {
67+
try {
68+
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
69+
Future<List<byte[]>> targets = targetQueue.take();
70+
for (byte[] target : targets.get()) {
71+
out.write(target);
72+
}
73+
}
74+
} catch (InterruptedException e) {
75+
throw new WrappedInterruptedException(e);
76+
} catch (IOException e) {
77+
throw new WrappedIOException(e);
78+
} catch (ExecutionException e) {
79+
// TODO: figure out what might be in here and propagate
80+
throw new RuntimeException(e);
81+
}
82+
});
83+
try {
84+
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
85+
targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
86+
}
87+
} finally {
88+
stillAddingTargetsToQueue.set(false);
89+
}
90+
}
4691
try {
47-
StreamSupport.stream(partialResult.spliterator(), /* parallel= */ true)
48-
.map(this::toProto)
49-
.map(StreamedProtoOutputFormatter::writeDelimited)
50-
// I imagine forEachOrdered hurts performance somewhat in some cases. While we may
51-
// not need to actually produce output in order, this code does not know whether
52-
// ordering was requested. So we just always write it in order, and hope performance
53-
// is OK.
54-
.forEachOrdered(this::writeToOutputStreamThreadSafe);
55-
} catch (WrappedIOException e) {
56-
throw e.getCause();
57-
} catch (WrappedInterruptedException e) {
58-
throw e.getCause();
92+
writeAllTargetsFuture.get();
93+
} catch (ExecutionException e) {
94+
// TODO: propagate
95+
throw new RuntimeException(e);
5996
}
6097
}
6198

99+
private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) {
100+
return targets.stream().map(target -> writeDelimited(toProto(target))).toList();
101+
}
102+
62103
private Build.Target toProto(Target target) {
63104
try {
64105
return toTargetProtoBuffer(target, ourLabelPrinter);
65106
} catch (InterruptedException e) {
66107
throw new WrappedInterruptedException(e);
67108
}
68109
}
69-
70-
private synchronized void writeToOutputStreamThreadSafe(byte[] data) {
71-
try {
72-
out.write(data);
73-
} catch (IOException e) {
74-
throw new WrappedIOException(e);
75-
}
76-
}
77110
};
78111
}
79112

0 commit comments

Comments
 (0)