Skip to content

Fix fetchBlob in TransferManager to compute from file cache only if entry is not present #18661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,11 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio

try {
return AccessController.doPrivileged((PrivilegedExceptionAction<IndexInput>) () -> {
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
});
CachedIndexInput cacheEntry = fileCache.get(key);
if (cacheEntry == null || cacheEntry.isClosed()) {
cacheEntry = new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
fileCache.put(key, cacheEntry);
}

// Cache entry was either retrieved from the cache or newly added, either
// way the reference count has been incremented by one. We can only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.common.breaker.TestCircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
Expand All @@ -38,11 +38,8 @@
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public abstract class TransferManagerTestCase extends OpenSearchTestCase {
protected static final int EIGHT_MB = 1024 * 1024 * 8;
protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
EIGHT_MB * 2,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
protected final TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker();
protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1, testCircuitBreaker);
protected MMapDirectory directory;
protected TransferManager transferManager;

Expand Down Expand Up @@ -156,6 +153,29 @@ public void testDownloadFails() throws Exception {
MatcherAssert.assertThat(fileCache.usage(), equalTo(0L));
}

public void testCircuitBreakerWhileDownloading() throws IOException {
// fetch blob when circuit breaking is not tripping
try (IndexInput i = fetchBlobWithName("1")) {
assertIndexInputIsFunctional(i);
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB));
}
// should have entry in file cache
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L));
MatcherAssert.assertThat(fileCache.usage(), equalTo((long) EIGHT_MB));

// start tripping the circuit breaker
testCircuitBreaker.startBreaking();

// fetch blob which already had entry in file cache, should not encounter circuit breaking exceptions
try (IndexInput i = fetchBlobWithName("1")) {
assertIndexInputIsFunctional(i);
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB));
}

// fetch new blob - should encounter circuit breaking exception
expectThrows(CircuitBreakingException.class, () -> fetchBlobWithName("2"));
}

public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception {
// Mock a call for a blob that will block until the latch is released,
// then start the fetch for that blob on a separate thread
Expand Down
Loading