Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ java_library(
":chunker",
":combined_cache",
":remote_action_file_system",
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
"//src/main/java/com/google/devtools/build/lib/actions:virtual_action_input",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/remote/common",
Expand All @@ -344,6 +345,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//third_party:flogger",
"//third_party:guava",
"//third_party:jsr305",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.VirtualActionInput;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
Expand All @@ -49,6 +50,7 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
Expand Down Expand Up @@ -297,6 +299,7 @@ private ListenableFuture<Void> uploadBlob(

static class UploadTask {
Digest digest;
@Nullable PathFragment actionInputExecPath;
AtomicReference<Disposable> disposable;
SingleEmitter<Boolean> continuation;
Completable completion;
Expand Down Expand Up @@ -341,6 +344,12 @@ private Maybe<UploadTask> maybeCreateUploadTask(
AsyncSubject<Void> completion = AsyncSubject.create();
UploadTask uploadTask = new UploadTask();
uploadTask.digest = digest;
uploadTask.actionInputExecPath =
merkleTree
.actionInputForDigest(digest)
.filter(Artifact.DerivedArtifact.class::isInstance)
.map(input -> input.getExecPath())
.orElse(null);
uploadTask.disposable = new AtomicReference<>();
uploadTask.completion = Completable.fromObservable(completion);
Completable upload =
Expand Down Expand Up @@ -433,7 +442,28 @@ private Flowable<TransferResult> waitForUploadTasks(List<UploadTask> uploadTasks
() -> Profiler.instance().profile("upload"),
ignored ->
Flowable.fromIterable(uploadTasks)
.flatMapSingle(uploadTask -> toTransferResult(uploadTask.completion)),
.flatMapSingle(RemoteExecutionCache::toUploadTransferResult),
SilentCloseable::close);
}

private static Single<TransferResult> toUploadTransferResult(UploadTask uploadTask) {
return toTransferResult(uploadTask.completion)
.map(result -> annotateCacheNotFoundException(result, uploadTask));
}

private static TransferResult annotateCacheNotFoundException(
TransferResult result, UploadTask uploadTask) {
if (!result.isError() || uploadTask.actionInputExecPath == null) {
return result;
}

IOException error = checkNotNull(result.getError());
if (error instanceof CacheNotFoundException cacheNotFoundException
&& cacheNotFoundException.getMissingDigest().equals(uploadTask.digest)) {
return TransferResult.error(
new CacheNotFoundException(
cacheNotFoundException.getMissingDigest(), uploadTask.actionInputExecPath));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ java_library(
name = "cache_not_found_exception",
srcs = ["CacheNotFoundException.java"],
deps = [
"//src/main/java/com/google/devtools/build/lib/actions:action_input",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//third_party:jsr305",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ public static boolean allCausedByCacheNotFoundException(Throwable e) {
}

/**
* Returns a {@link LostArtifacts} instance that is non-empty if and only if all suppressed
* exceptions are caused by cache misses.
* Returns a {@link LostArtifacts} instance containing every cache miss that resolves to an input
* of the current action.
*
* <p>Cache misses for other artifacts, such as stdout/stderr or an output that failed to download
* after a cache hit, are not actionable by action rewinding. However, they should not mask cache
* misses for action inputs that can be rewound.
*/
public LostArtifacts getLostArtifacts(Function<PathFragment, ActionInput> actionInputResolver) {
if (!allCausedByCacheNotFoundException(this)) {
Expand All @@ -101,14 +105,14 @@ public LostArtifacts getLostArtifacts(Function<PathFragment, ActionInput> action
+ " with a filename",
e);
}
return LostArtifacts.EMPTY;
continue;
}
var actionInput = actionInputResolver.apply(execPath);
if (actionInput == null) {
// This can happen if the lost artifact is not an input of the action, but an output that
// e.g. failed to be retrieved from the remote cache after a cache hit. This also can't be
// solved by the rewinding that LostArtifacts would trigger.
return LostArtifacts.EMPTY;
continue;
}
byDigestBuilder.put(DigestUtil.toString(missingDigest), actionInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ public Map<Digest, Object> blobs() {
.collect(toImmutableMap(e -> adaptToDigest(e.getKey()), Map.Entry::getValue));
}

public Optional<ActionInput> actionInputForDigest(Digest digest) {
return blobs.get(digest) instanceof ActionInput actionInput
? Optional.of(actionInput)
: Optional.empty();
}

@Override
public RootOnly root() {
return root;
Expand All @@ -179,13 +185,15 @@ public Optional<ListenableFuture<Void>> upload(
Optional.of(uploader.uploadVirtualActionInput(context, digest, virtualActionInput));
case ActionInput actionInput -> {
var spawnExecutionContext = context.getSpawnExecutionContext();
var spawnPathResolver =
spawnExecutionContext != null ? spawnExecutionContext.getPathResolver() : null;
var pathResolver =
// This can only be null when uploading a tree created by
// MerkleTreeComputer#buildForFiles, which only happens for remote repo execution and
// tests. Only the latter actually reach this code path since remote repo execution
// doesn't upload any inputs.
spawnExecutionContext != null
? spawnExecutionContext.getPathResolver()
spawnPathResolver != null
? spawnPathResolver
: MerkleTreeComputer.PATH_ACTION_INPUT_RESOLVER;
yield Optional.of(
uploader.uploadFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.collect.ImmutableClassToInstanceMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -38,12 +39,15 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.ArtifactRoot;
import com.google.devtools.build.lib.actions.ArtifactRoot.RootType;
import com.google.devtools.build.lib.actions.ResourceSet;
import com.google.devtools.build.lib.actions.SimpleSpawn;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
import com.google.devtools.build.lib.clock.JavaClock;
import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.Order;
Expand All @@ -55,8 +59,10 @@
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.common.RemotePathResolver;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTreeComputer;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.FakeSpawnExecutionContext;
import com.google.devtools.build.lib.remote.util.InMemoryCacheClient;
import com.google.devtools.build.lib.remote.util.RxNoGlobalErrorsRule;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
Expand Down Expand Up @@ -85,6 +91,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -378,6 +385,85 @@ public void ensureInputsPresent_missingInputs_exceptionHasLostInputs() throws Ex
ActionInputHelper.fromPath("foo"));
}

@Test
public void ensureInputsPresent_sharedMissingDigest_exceptionsHaveOwnLostInputs()
throws Exception {
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol));

CountDownLatch findMissingDigestsCalls = new CountDownLatch(2);
doAnswer(
invocationOnMock -> {
findMissingDigestsCalls.countDown();
return invocationOnMock.callRealMethod();
})
.when(cacheProtocol)
.findMissingDigests(any(), any());

SettableFuture<Boolean> missingInputAvailable = SettableFuture.create();
CountDownLatch remotePathChecked = new CountDownLatch(1);
remoteCache.setRemotePathChecker(
(context, path) -> {
PathFragment execPath = path.relativeTo(execRoot);
if (execPath.equals(PathFragment.create("outputs/foo"))
|| execPath.equals(PathFragment.create("outputs/bar"))) {
remotePathChecked.countDown();
return missingInputAvailable;
}
return immediateFuture(true);
});

Artifact foo = ActionsTestUtil.createArtifact(artifactRoot, "foo");
Artifact bar = ActionsTestUtil.createArtifact(artifactRoot, "bar");
Digest digest = fakeFileCache.createScratchInput(foo, "same");
assertThat(fakeFileCache.createScratchInput(bar, "same")).isEqualTo(digest);

Spawn fooSpawn = spawnWithInput(foo);
var fooContext = spawnExecutionContext(fooSpawn);
var fooRemoteContext = RemoteActionExecutionContext.create(fooSpawn, fooContext, metadata);
var fooTree = buildMerkleTree(fooSpawn, fooContext);

Spawn barSpawn = spawnWithInput(bar);
var barContext = spawnExecutionContext(barSpawn);
var barRemoteContext = RemoteActionExecutionContext.create(barSpawn, barContext, metadata);
var barTree = buildMerkleTree(barSpawn, barContext);

var fooFailure = new AtomicReference<Throwable>();
Thread fooThread =
new Thread(
() ->
ensureInputsPresentAndCapture(remoteCache, fooRemoteContext, fooTree, fooFailure));
fooThread.start();
assertThat(remotePathChecked.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)).isTrue();

var barFailure = new AtomicReference<Throwable>();
Thread barThread =
new Thread(
() ->
ensureInputsPresentAndCapture(remoteCache, barRemoteContext, barTree, barFailure));
barThread.start();
assertThat(findMissingDigestsCalls.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.isTrue();

missingInputAvailable.set(false);
fooThread.join();
barThread.join();

assertThat(fooFailure.get()).isInstanceOf(BulkTransferException.class);
assertThat(
((BulkTransferException) fooFailure.get())
.getLostArtifacts(execPath -> execPath.equals(foo.getExecPath()) ? foo : null)
.byDigest())
.containsExactly(DigestUtil.toString(digest), foo);

assertThat(barFailure.get()).isInstanceOf(BulkTransferException.class);
assertThat(
((BulkTransferException) barFailure.get())
.getLostArtifacts(execPath -> execPath.equals(bar.getExecPath()) ? bar : null)
.byDigest())
.containsExactly(DigestUtil.toString(digest), bar);
}

@Test
public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUploadTasks()
throws Exception {
Expand Down Expand Up @@ -739,4 +825,54 @@ private RemoteExecutionCache newRemoteExecutionCache(RemoteCacheClient remoteCac
digestUtil,
/* chunkingEnabled= */ false);
}

private Spawn spawnWithInput(ActionInput input) {
return new SimpleSpawn(
new FakeOwner("Mnemonic", "Progress Message", "//dummy:label"),
ImmutableList.of(),
ImmutableMap.of(),
ImmutableMap.of(),
NestedSetBuilder.create(Order.STABLE_ORDER, input),
ImmutableSet.of(),
ResourceSet.ZERO);
}

private FakeSpawnExecutionContext spawnExecutionContext(Spawn spawn) {
return new FakeSpawnExecutionContext(
spawn,
fakeFileCache,
execRoot,
new FileOutErr(execRoot.getRelative("stdout"), execRoot.getRelative("stderr")),
ImmutableClassToInstanceMap.of(),
/* actionFileSystem= */ null);
}

private MerkleTree.Uploadable buildMerkleTree(
Spawn spawn, FakeSpawnExecutionContext spawnExecutionContext) throws Exception {
return (MerkleTree.Uploadable)
merkleTreeComputer.buildForSpawn(
spawn,
ImmutableSet.of(),
/* scrubber= */ null,
spawnExecutionContext,
RemotePathResolver.createDefault(execRoot),
MerkleTreeComputer.BlobPolicy.KEEP_AND_REUPLOAD);
}

private void ensureInputsPresentAndCapture(
RemoteExecutionCache remoteCache,
RemoteActionExecutionContext context,
MerkleTree.Uploadable merkleTree,
AtomicReference<Throwable> failure) {
try {
remoteCache.ensureInputsPresent(
context,
merkleTree,
ImmutableMap.of(),
/* force= */ false,
RemotePathResolver.createDefault(execRoot));
} catch (Throwable t) {
failure.set(t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ java_test(
srcs = glob(["*.java"]),
test_class = "com.google.devtools.build.lib.AllTests",
deps = [
"//src/main/java/com/google/devtools/build/lib/actions:action_input_helper",
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//src/test/java/com/google/devtools/build/lib:test_runner",
"//src/test/java/com/google/devtools/build/lib/testutil",
"//third_party:error_prone_annotations",
"//third_party:junit4",
"//third_party:mockito",
"//third_party:truth",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
Loading
Loading