diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java index 9383c53d6d830..deb8f437bfd63 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java @@ -13,23 +13,23 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IndexInput; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import java.io.IOException; +import java.lang.ref.Cleaner; import java.nio.file.Path; -import java.util.HashSet; -import java.util.Set; /** * Extension of {@link FileCachedIndexInput} for full files for handling clones and slices - * We maintain a clone map so that we can close them when the parent IndexInput is closed so that ref count is properly maintained in file cache - * Closing of clones explicitly is needed as Lucene does not guarantee that it will close the clones + * Since Lucene does not guarantee that it will close the clones/slices, we have created a Cleaner which handles closing of the clones/slices when they become phantom reachable * https://github.com/apache/lucene/blob/8340b01c3cc229f33584ce2178b07b8984daa6a9/lucene/core/src/java/org/apache/lucene/store/IndexInput.java#L32-L33 * @opensearch.experimental */ @ExperimentalApi public class FullFileCachedIndexInput extends FileCachedIndexInput { private static final Logger logger = LogManager.getLogger(FullFileCachedIndexInput.class); - private final Set clones; + private final IndexInputHolder indexInputHolder; + private static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory("index-input-cleaner")); public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) { this(cache, filePath, underlyingIndexInput, false); @@ -37,7 +37,8 @@ public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput under public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput, boolean isClone) { super(cache, filePath, underlyingIndexInput, isClone); - clones = new HashSet<>(); + indexInputHolder = new IndexInputHolder(underlyingIndexInput, isClone, cache, filePath); + CLEANER.register(this, indexInputHolder); } /** @@ -48,7 +49,6 @@ public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput under public FullFileCachedIndexInput clone() { FullFileCachedIndexInput clonedIndexInput = new FullFileCachedIndexInput(cache, filePath, luceneIndexInput.clone(), true); cache.incRef(filePath); - clones.add(clonedIndexInput); return clonedIndexInput; } @@ -74,7 +74,6 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw } IndexInput slicedLuceneIndexInput = luceneIndexInput.slice(sliceDescription, offset, length); FullFileCachedIndexInput slicedIndexInput = new FullFileCachedIndexInput(cache, filePath, slicedLuceneIndexInput, true); - clones.add(slicedIndexInput); cache.incRef(filePath); return slicedIndexInput; } @@ -88,21 +87,37 @@ public void close() throws IOException { if (isClone) { cache.decRef(filePath); } - clones.forEach(indexInput -> { - try { - indexInput.close(); - } catch (Exception e) { - logger.trace("Exception while closing clone - {}", e.getMessage()); - } - }); try { luceneIndexInput.close(); } catch (AlreadyClosedException e) { logger.trace("FullFileCachedIndexInput already closed"); } luceneIndexInput = null; - clones.clear(); closed = true; } } + + private static class IndexInputHolder implements Runnable { + private final IndexInput indexInput; + private final FileCache cache; + private final boolean isClone; + private final Path path; + + IndexInputHolder(IndexInput indexInput, boolean isClone, FileCache cache, Path path) { + this.indexInput = indexInput; + this.isClone = isClone; + this.cache = cache; + this.path = path; + } + + @Override + public void run() { + try { + indexInput.close(); + if (isClone) cache.decRef(path); + } catch (IOException e) { + logger.error("Failed to close IndexInput while clearing phantom reachable object"); + } + } + } } diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java index 643caa85b5862..3ab193c5325c6 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.filecache; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -18,6 +20,7 @@ import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.utils.FileTypeUtils; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -28,6 +31,7 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.List; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class FileCacheTests extends OpenSearchTestCase { // need concurrency level to be static to make these tests more deterministic because capacity per segment is dependent on // (total capacity) / (concurrency level) so having high concurrency level might trigger early evictions which is tolerable in real life diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java index ce0a4d7bf3c02..41e76d0b762ea 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java @@ -8,18 +8,22 @@ package org.opensearch.index.store.remote.filecache; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.breaker.NoopCircuitBreaker; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; import java.io.IOException; import java.nio.file.Path; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class FileCachedIndexInputTests extends OpenSearchTestCase { protected FileCache fileCache; diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java index 7fb7a03584e20..bc646cc8d50db 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java @@ -8,17 +8,22 @@ package org.opensearch.index.store.remote.filecache; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.IndexInput; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import java.io.IOException; +import java.util.concurrent.TimeUnit; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class FullFileCachedIndexInputTests extends FileCachedIndexInputTests { private FullFileCachedIndexInput fullFileCachedIndexInput; @Override protected void setupIndexInputAndAddToFileCache() { fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, filePath, underlyingIndexInput); + // Putting in the file cache would increase refCount to 1 fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fullFileCachedIndexInput)); } @@ -37,15 +42,11 @@ public void testClone() throws IOException { fileCache.decRef(filePath); assertFalse(isActiveAndTotalUsageSame()); - // After cloning the refCount will increase again and activeUsage and totalUsage will be same again - FileCachedIndexInput clonedFileCachedIndexInput1 = fullFileCachedIndexInput.clone(); - FileCachedIndexInput clonedFileCachedIndexInput2 = clonedFileCachedIndexInput1.clone(); - FileCachedIndexInput clonedFileCachedIndexInput3 = clonedFileCachedIndexInput2.clone(); - assertTrue(isActiveAndTotalUsageSame()); + // Since no clones have been done, refCount should be zero + assertEquals((int) fileCache.getRef(filePath), 0); - // closing the first level clone will close all subsequent level clones and reduce ref count to 0 - clonedFileCachedIndexInput1.close(); - assertFalse(isActiveAndTotalUsageSame()); + createUnclosedClonesSlices(false); + triggerGarbageCollectionAndAssertClonesClosed(); fileCache.prune(); @@ -68,12 +69,38 @@ public void testSlice() throws IOException { fileCache.decRef(filePath); assertFalse(isActiveAndTotalUsageSame()); - // Creating a slice will increase the refCount - IndexInput slicedFileCachedIndexInput = fullFileCachedIndexInput.slice(SLICE_DESC, 1, 2); - assertTrue(isActiveAndTotalUsageSame()); + // Since no clones have been done, refCount should be zero + assertEquals((int) fileCache.getRef(filePath), 0); + + createUnclosedClonesSlices(true); + triggerGarbageCollectionAndAssertClonesClosed(); - // Closing the parent will close all the slices as well decreasing the refCount to 0 - fullFileCachedIndexInput.close(); assertFalse(isActiveAndTotalUsageSame()); } + + private void triggerGarbageCollectionAndAssertClonesClosed() { + try { + // Clones/Slices will be phantom reachable now, triggering gc should call close on them + assertBusy(() -> { + System.gc(); // Do not rely on GC to be deterministic, hence the polling + assertEquals( + "Expected refCount to drop to zero as all clones/slices should have closed", + (int) fileCache.getRef(filePath), + 0 + ); + }, 5, TimeUnit.SECONDS); + } catch (Exception e) { + logger.error("Exception thrown while triggering gc", e); + fail(); + } + } + + private void createUnclosedClonesSlices(boolean createSlice) throws IOException { + int NUM_OF_CLONES = 3; + for (int i = 0; i < NUM_OF_CLONES; i++) { + if (createSlice) fullFileCachedIndexInput.slice("slice", 1, 2); + else fullFileCachedIndexInput.clone(); + } + assertEquals((int) fileCache.getRef(filePath), NUM_OF_CLONES); + } }