diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index d7f484bb26a79..b07573b6f262d 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -74,17 +74,11 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { - CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { - if (cachedIndexInput == null || cachedIndexInput.isClosed()) { - logger.trace("Transfer Manager - IndexInput closed or not in cache"); - // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); - } else { - logger.trace("Transfer Manager - Already in cache"); - // already in the cache and ready to be used (open) - return cachedIndexInput; - } - }); + CachedIndexInput cacheEntry = fileCache.get(key); + if (cacheEntry == null || cacheEntry.isClosed()) { + cacheEntry = new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); + fileCache.put(key, cacheEntry); + } // Cache entry was either retrieved from the cache or newly added, either // way the reference count has been incremented by one. We can only diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 668eac51b1b81..7d6bcd4b58363 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -13,8 +13,8 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.SimpleFSLockFactory; -import org.opensearch.core.common.breaker.CircuitBreaker; -import org.opensearch.core.common.breaker.NoopCircuitBreaker; +import org.opensearch.common.breaker.TestCircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheFactory; @@ -38,11 +38,8 @@ @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public abstract class TransferManagerTestCase extends OpenSearchTestCase { protected static final int EIGHT_MB = 1024 * 1024 * 8; - protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( - EIGHT_MB * 2, - 1, - new NoopCircuitBreaker(CircuitBreaker.REQUEST) - ); + protected final TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker(); + protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1, testCircuitBreaker); protected MMapDirectory directory; protected TransferManager transferManager; @@ -156,6 +153,29 @@ public void testDownloadFails() throws Exception { MatcherAssert.assertThat(fileCache.usage(), equalTo(0L)); } + public void testCircuitBreakerWhileDownloading() throws IOException { + // fetch blob when circuit breaking is not tripping + try (IndexInput i = fetchBlobWithName("1")) { + assertIndexInputIsFunctional(i); + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB)); + } + // should have entry in file cache + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L)); + MatcherAssert.assertThat(fileCache.usage(), equalTo((long) EIGHT_MB)); + + // start tripping the circuit breaker + testCircuitBreaker.startBreaking(); + + // fetch blob which already had entry in file cache, should not encounter circuit breaking exceptions + try (IndexInput i = fetchBlobWithName("1")) { + assertIndexInputIsFunctional(i); + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB)); + } + + // fetch new blob - should encounter circuit breaking exception + expectThrows(CircuitBreakingException.class, () -> fetchBlobWithName("2")); + } + public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception { // Mock a call for a blob that will block until the latch is released, // then start the fetch for that blob on a separate thread