From 9cfa395128a2e88abe605bb8e7c7ad1d7672f57f Mon Sep 17 00:00:00 2001 From: Kartik Date: Thu, 10 Mar 2022 10:12:17 -0800 Subject: [PATCH] Remove the IndexCommitRef class (#2421) This inner class is no longer required because its functionality has been moved to the generic GatedCloseable class. Signed-off-by: Kartik Ganesh --- .../indices/forcemerge/ForceMergeIT.java | 6 +- .../indices/recovery/IndexRecoveryIT.java | 23 ++++--- .../org/opensearch/index/engine/Engine.java | 11 +--- .../index/engine/InternalEngine.java | 11 ++-- .../index/engine/ReadOnlyEngine.java | 9 +-- .../opensearch/index/shard/IndexShard.java | 21 ++++--- .../index/shard/LocalShardSnapshot.java | 12 ++-- .../recovery/RecoverySourceHandler.java | 26 ++++---- .../snapshots/SnapshotShardsService.java | 13 ++-- .../index/engine/InternalEngineTests.java | 61 ++++++++++--------- .../index/engine/NoOpEngineTests.java | 6 +- .../index/shard/IndexShardTests.java | 24 ++++---- .../recovery/RecoverySourceHandlerTests.java | 9 +-- .../index/engine/EngineTestCase.java | 19 +++--- .../index/shard/IndexShardTestCase.java | 8 ++- 15 files changed, 135 insertions(+), 124 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java index 5c5bb6c6224..195817bf04c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java @@ -32,11 +32,13 @@ package org.opensearch.action.admin.indices.forcemerge; +import org.apache.lucene.index.IndexCommit; import org.opensearch.action.admin.indices.flush.FlushResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; import org.opensearch.index.Index; import org.opensearch.index.engine.Engine; @@ -99,8 +101,8 @@ public class ForceMergeIT extends OpenSearchIntegTestCase { } private static String getForceMergeUUID(IndexShard indexShard) throws IOException { - try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) { - return indexCommitRef.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY); + try (GatedCloseable wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) { + return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY); } } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index 17e457bba64..a7dc77e024d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -33,8 +33,8 @@ package org.opensearch.indices.recovery; import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.util.SetOnce; - import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; @@ -75,6 +75,7 @@ import org.opensearch.common.Priority; import org.opensearch.common.Strings; import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.breaker.CircuitBreakingException; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; @@ -88,7 +89,6 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.MockEngineFactoryPlugin; import org.opensearch.index.analysis.AbstractTokenFilterFactory; import org.opensearch.index.analysis.TokenFilterFactory; -import org.opensearch.index.engine.Engine; import org.opensearch.index.mapper.MapperParsingException; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -114,11 +114,11 @@ import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotState; import org.opensearch.tasks.Task; import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; -import org.opensearch.test.InternalSettingsPlugin; -import org.opensearch.test.InternalTestCluster; import org.opensearch.test.engine.MockEngineSupport; import org.opensearch.test.store.MockFSIndexStore; import org.opensearch.test.transport.MockTransportService; @@ -151,12 +151,6 @@ import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toList; -import static org.opensearch.action.DocWriteResponse.Result.CREATED; -import static org.opensearch.action.DocWriteResponse.Result.UPDATED; -import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; - import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -167,6 +161,11 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.opensearch.action.DocWriteResponse.Result.CREATED; +import static org.opensearch.action.DocWriteResponse.Result.UPDATED; +import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class IndexRecoveryIT extends OpenSearchIntegTestCase { @@ -1599,9 +1598,9 @@ public class IndexRecoveryIT extends OpenSearchIntegTestCase { .getShardOrNull(new ShardId(resolveIndex(indexName), 0)); final long lastSyncedGlobalCheckpoint = shard.getLastSyncedGlobalCheckpoint(); final long localCheckpointOfSafeCommit; - try (Engine.IndexCommitRef safeCommitRef = shard.acquireSafeIndexCommit()) { + try (GatedCloseable wrappedSafeCommit = shard.acquireSafeIndexCommit()) { localCheckpointOfSafeCommit = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( - safeCommitRef.get().getUserData().entrySet() + wrappedSafeCommit.get().getUserData().entrySet() ).localCheckpoint; } final long maxSeqNo = shard.seqNoStats().getMaxSeqNo(); diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index cbaf43b14c7..b821b687c5f 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -55,7 +55,6 @@ import org.apache.lucene.util.Accountables; import org.apache.lucene.util.SetOnce; import org.opensearch.ExceptionsHelper; import org.opensearch.action.index.IndexRequest; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.ImmutableOpenMap; @@ -1109,12 +1108,12 @@ public abstract class Engine implements Closeable { * * @param flushFirst indicates whether the engine should flush before returning the snapshot */ - public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException; + public abstract GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException; /** * Snapshots the most recent safe index commit from the engine. */ - public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException; + public abstract GatedCloseable acquireSafeIndexCommit() throws EngineException; /** * @return a summary of the contents of the current safe commit @@ -1829,12 +1828,6 @@ public abstract class Engine implements Closeable { } } - public static class IndexCommitRef extends GatedCloseable { - public IndexCommitRef(IndexCommit indexCommit, CheckedRunnable onClose) { - super(indexCommit, onClose); - } - } - public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 2c54b726348..a264c8e0a55 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -72,6 +72,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.LoggerInfoStream; import org.opensearch.common.lucene.Lucene; @@ -103,10 +104,10 @@ import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogCorruptedException; -import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogStats; import org.opensearch.search.suggest.completion.CompletionStats; @@ -2193,7 +2194,7 @@ public class InternalEngine extends Engine { } @Override - public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException { + public GatedCloseable acquireLastIndexCommit(final boolean flushFirst) throws EngineException { // we have to flush outside of the readlock otherwise we might have a problem upgrading // the to a write lock when we fail the engine in this operation if (flushFirst) { @@ -2202,13 +2203,13 @@ public class InternalEngine extends Engine { logger.trace("finish flush for snapshot"); } final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); - return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit)); + return new GatedCloseable<>(lastCommit, () -> releaseIndexCommit(lastCommit)); } @Override - public IndexCommitRef acquireSafeIndexCommit() throws EngineException { + public GatedCloseable acquireSafeIndexCommit() throws EngineException { final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); - return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit)); + return new GatedCloseable<>(safeCommit, () -> releaseIndexCommit(safeCommit)); } private void releaseIndexCommit(IndexCommit snapshot) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index d9cf8e2cd65..9bbffb7cc19 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -41,6 +41,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; import org.opensearch.LegacyESVersion; import org.opensearch.Version; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.util.concurrent.ReleasableLock; @@ -49,9 +50,9 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; -import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogStats; import org.opensearch.search.suggest.completion.CompletionStats; @@ -413,13 +414,13 @@ public class ReadOnlyEngine extends Engine { ) {} @Override - public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { + public GatedCloseable acquireLastIndexCommit(boolean flushFirst) { store.incRef(); - return new IndexCommitRef(indexCommit, store::decRef); + return new GatedCloseable<>(indexCommit, store::decRef); } @Override - public IndexCommitRef acquireSafeIndexCommit() { + public GatedCloseable acquireSafeIndexCommit() { return acquireLastIndexCommit(false); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 863c2684142..cbf5d35327f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -51,9 +51,9 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.ThreadInterruptedException; import org.opensearch.Assertions; +import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -73,6 +73,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; @@ -1409,7 +1410,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * * @param flushFirst true if the index should first be flushed to disk / a low level lucene commit should be executed */ - public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException { + public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { final IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) { @@ -1423,7 +1424,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * Snapshots the most recent safe index commit from the currently running engine. * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. */ - public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { + public GatedCloseable acquireSafeIndexCommit() throws EngineException { final IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) { @@ -1448,7 +1449,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex"; - Engine.IndexCommitRef indexCommit = null; + GatedCloseable wrappedIndexCommit = null; store.incRef(); try { synchronized (engineMutex) { @@ -1456,16 +1457,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. final Engine engine = getEngineOrNull(); if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); + wrappedIndexCommit = engine.acquireLastIndexCommit(false); } - if (indexCommit == null) { + if (wrappedIndexCommit == null) { return store.getMetadata(null, true); } } - return store.getMetadata(indexCommit.get()); + return store.getMetadata(wrappedIndexCommit.get()); } finally { store.decRef(); - IOUtils.close(indexCommit); + IOUtils.close(wrappedIndexCommit); } } @@ -3913,7 +3914,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl true ) { @Override - public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { + public GatedCloseable acquireLastIndexCommit(boolean flushFirst) { synchronized (engineMutex) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); @@ -3924,7 +3925,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } @Override - public IndexCommitRef acquireSafeIndexCommit() { + public GatedCloseable acquireSafeIndexCommit() { synchronized (engineMutex) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); diff --git a/server/src/main/java/org/opensearch/index/shard/LocalShardSnapshot.java b/server/src/main/java/org/opensearch/index/shard/LocalShardSnapshot.java index d62d0358eb7..98556db3ae1 100644 --- a/server/src/main/java/org/opensearch/index/shard/LocalShardSnapshot.java +++ b/server/src/main/java/org/opensearch/index/shard/LocalShardSnapshot.java @@ -32,6 +32,7 @@ package org.opensearch.index.shard; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; @@ -39,6 +40,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; import org.apache.lucene.store.NoLockFactory; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.index.Index; import org.opensearch.index.engine.Engine; import org.opensearch.index.store.Store; @@ -52,7 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean; final class LocalShardSnapshot implements Closeable { private final IndexShard shard; private final Store store; - private final Engine.IndexCommitRef indexCommit; + private final GatedCloseable wrappedIndexCommit; private final AtomicBoolean closed = new AtomicBoolean(false); LocalShardSnapshot(IndexShard shard) { @@ -61,7 +63,7 @@ final class LocalShardSnapshot implements Closeable { store.incRef(); boolean success = false; try { - indexCommit = shard.acquireLastIndexCommit(true); + wrappedIndexCommit = shard.acquireLastIndexCommit(true); success = true; } finally { if (success == false) { @@ -88,7 +90,7 @@ final class LocalShardSnapshot implements Closeable { return new FilterDirectory(store.directory()) { @Override public String[] listAll() throws IOException { - Collection fileNames = indexCommit.get().getFileNames(); + Collection fileNames = wrappedIndexCommit.get().getFileNames(); final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]); return fileNameArray; } @@ -143,7 +145,7 @@ final class LocalShardSnapshot implements Closeable { public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - indexCommit.close(); + wrappedIndexCommit.close(); } finally { store.decRef(); } @@ -156,6 +158,6 @@ final class LocalShardSnapshot implements Closeable { @Override public String toString() { - return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + indexCommit + "]"; + return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + wrappedIndexCommit + "]"; } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 710b01a6709..7899b11330a 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -57,6 +57,7 @@ import org.opensearch.common.CheckedRunnable; import org.opensearch.common.StopWatch; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; @@ -64,11 +65,10 @@ import org.opensearch.common.lucene.store.InputStreamIndexInput; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CancellableThreads; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.common.util.concurrent.ListenableFuture; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.internal.io.IOUtils; -import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; @@ -250,10 +250,10 @@ public class RecoverySourceHandler { sendFileStep.onResponse(SendFileResult.EMPTY); } } else { - final Engine.IndexCommitRef safeCommitRef; + final GatedCloseable wrappedSafeCommit; try { - safeCommitRef = acquireSafeCommit(shard); - resources.add(safeCommitRef); + wrappedSafeCommit = acquireSafeCommit(shard); + resources.add(wrappedSafeCommit); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } @@ -268,16 +268,16 @@ public class RecoverySourceHandler { // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // down. - startingSeqNo = Long.parseLong(safeCommitRef.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; + startingSeqNo = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); try { final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo); final Releasable releaseStore = acquireStore(shard.store()); resources.add(releaseStore); - sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> { + sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> { try { - IOUtils.close(safeCommitRef, releaseStore); + IOUtils.close(wrappedSafeCommit, releaseStore); } catch (final IOException ex) { logger.warn("releasing snapshot caused exception", ex); } @@ -307,7 +307,7 @@ public class RecoverySourceHandler { deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - phase1(safeCommitRef.get(), startingSeqNo, () -> estimateNumOps, sendFileStep); + phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep); }, onFailure); } catch (final Exception e) { @@ -467,12 +467,12 @@ public class RecoverySourceHandler { * with the file systems due to interrupt (see {@link org.apache.lucene.store.NIOFSDirectory} javadocs for more detail). * This method acquires a safe commit and wraps it to make sure that it will be released using the generic thread pool. */ - private Engine.IndexCommitRef acquireSafeCommit(IndexShard shard) { - final Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit(); + private GatedCloseable acquireSafeCommit(IndexShard shard) { + final GatedCloseable wrappedSafeCommit = shard.acquireSafeIndexCommit(); final AtomicBoolean closed = new AtomicBoolean(false); - return new Engine.IndexCommitRef(commitRef.get(), () -> { + return new GatedCloseable<>(wrappedSafeCommit.get(), () -> { if (closed.compareAndSet(false, true)) { - runWithGenericThreadPool(commitRef::close); + runWithGenericThreadPool(wrappedSafeCommit::close); } }); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 06b17c679cb..b6c0b63efe3 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -50,6 +50,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; import org.opensearch.core.internal.io.IOUtils; @@ -368,25 +369,25 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } final Repository repository = repositoriesService.repository(snapshot.getRepository()); - Engine.IndexCommitRef snapshotRef = null; + GatedCloseable wrappedSnapshot = null; try { // we flush first to make sure we get the latest writes snapshotted - snapshotRef = indexShard.acquireLastIndexCommit(true); - final IndexCommit snapshotIndexCommit = snapshotRef.get(); + wrappedSnapshot = indexShard.acquireLastIndexCommit(true); + final IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); repository.snapshotShard( indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, - snapshotRef.get(), + wrappedSnapshot.get(), getShardStateId(indexShard, snapshotIndexCommit), snapshotStatus, version, userMetadata, - ActionListener.runBefore(listener, snapshotRef::close) + ActionListener.runBefore(listener, wrappedSnapshot::close) ); } catch (Exception e) { - IOUtils.close(snapshotRef); + IOUtils.close(wrappedSnapshot); throw e; } } catch (Exception e) { diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 745508135c6..5f98a058405 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -82,6 +82,8 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.SetOnce; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -101,6 +103,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; @@ -154,8 +157,6 @@ import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import java.io.Closeable; import java.io.IOException; @@ -196,15 +197,6 @@ import java.util.stream.LongStream; import static java.util.Collections.emptyMap; import static java.util.Collections.shuffle; -import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET; -import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; -import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; -import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY; -import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.contains; @@ -230,6 +222,15 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET; +import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; +import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; +import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY; +import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; public class InternalEngineTests extends EngineTestCase { @@ -1086,9 +1087,9 @@ public class InternalEngineTests extends EngineTestCase { final CheckedRunnable checker = () -> { assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0)); assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get())); - try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( - safeCommit.get().getUserData().entrySet() + wrappedSafeCommit.get().getUserData().entrySet() ); assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint())); } @@ -1504,8 +1505,8 @@ public class InternalEngineTests extends EngineTestCase { globalCheckpoint.set(randomLongBetween(0, localCheckpoint)); engine.syncTranslog(); final long safeCommitCheckpoint; - try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { - safeCommitCheckpoint = Long.parseLong(safeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { + safeCommitCheckpoint = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); } engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); @@ -1594,8 +1595,10 @@ public class InternalEngineTests extends EngineTestCase { globalCheckpoint.set(randomLongBetween(0, engine.getPersistedLocalCheckpoint())); engine.syncTranslog(); final long minSeqNoToRetain; - try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { - long safeCommitLocalCheckpoint = Long.parseLong(safeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { + long safeCommitLocalCheckpoint = Long.parseLong( + wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) + ); minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1); } engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); @@ -2613,7 +2616,7 @@ public class InternalEngineTests extends EngineTestCase { // this test writes documents to the engine while concurrently flushing/commit // and ensuring that the commit points contain the correct sequence number data public void testConcurrentWritesAndCommits() throws Exception { - List commits = new ArrayList<>(); + List> commits = new ArrayList<>(); try ( Store store = createStore(); InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null)) @@ -2668,8 +2671,8 @@ public class InternalEngineTests extends EngineTestCase { // now, verify all the commits have the correct docs according to the user commit data long prevLocalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; long prevMaxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; - for (Engine.IndexCommitRef commitRef : commits) { - final IndexCommit commit = commitRef.get(); + for (GatedCloseable wrappedCommit : commits) { + final IndexCommit commit = wrappedCommit.get(); Map userData = commit.getUserData(); long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ? Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) @@ -5617,7 +5620,7 @@ public class InternalEngineTests extends EngineTestCase { IOUtils.close(engine, store); store = createStore(); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final Engine.IndexCommitRef snapshot; + final GatedCloseable wrappedSnapshot; final boolean closeSnapshotBeforeEngine = randomBoolean(); try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { int numDocs = between(1, 20); @@ -5630,9 +5633,9 @@ public class InternalEngineTests extends EngineTestCase { final boolean flushFirst = randomBoolean(); final boolean safeCommit = randomBoolean(); if (safeCommit) { - snapshot = engine.acquireSafeIndexCommit(); + wrappedSnapshot = engine.acquireSafeIndexCommit(); } else { - snapshot = engine.acquireLastIndexCommit(flushFirst); + wrappedSnapshot = engine.acquireLastIndexCommit(flushFirst); } int moreDocs = between(1, 20); for (int i = 0; i < moreDocs; i++) { @@ -5641,13 +5644,13 @@ public class InternalEngineTests extends EngineTestCase { globalCheckpoint.set(numDocs + moreDocs - 1); engine.flush(); // check that we can still read the commit that we captured - try (IndexReader reader = DirectoryReader.open(snapshot.get())) { + try (IndexReader reader = DirectoryReader.open(wrappedSnapshot.get())) { assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0)); } assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2)); if (closeSnapshotBeforeEngine) { - snapshot.close(); + wrappedSnapshot.close(); // check it's clean up engine.flush(true, true); assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1)); @@ -5655,7 +5658,7 @@ public class InternalEngineTests extends EngineTestCase { } if (closeSnapshotBeforeEngine == false) { - snapshot.close(); // shouldn't throw AlreadyClosedException + wrappedSnapshot.close(); // shouldn't throw AlreadyClosedException } } @@ -5719,7 +5722,7 @@ public class InternalEngineTests extends EngineTestCase { } engine.flush(false, randomBoolean()); int numSnapshots = between(1, 10); - final List snapshots = new ArrayList<>(); + final List> snapshots = new ArrayList<>(); for (int i = 0; i < numSnapshots; i++) { snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit. } @@ -6322,8 +6325,8 @@ public class InternalEngineTests extends EngineTestCase { .collect(Collectors.toSet()); assertThat(actualOps, containsInAnyOrder(expectedOps)); } - try (Engine.IndexCommitRef commitRef = engine.acquireSafeIndexCommit()) { - IndexCommit safeCommit = commitRef.get(); + try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { + IndexCommit safeCommit = wrappedSafeCommit.get(); if (safeCommit.getUserData().containsKey(Engine.MIN_RETAINED_SEQNO)) { lastMinRetainedSeqNo = Long.parseLong(safeCommit.getUserData().get(Engine.MIN_RETAINED_SEQNO)); } diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java index 772cda9efa5..e04bf1a4f20 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java @@ -33,6 +33,7 @@ package org.opensearch.index.engine; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.LockObtainFailedException; @@ -41,6 +42,7 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; @@ -114,8 +116,8 @@ public class NoOpEngineTests extends EngineTestCase { final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); assertThat(noOpEngine.getPersistedLocalCheckpoint(), equalTo(localCheckpoint)); assertThat(noOpEngine.getSeqNoStats(100L).getMaxSeqNo(), equalTo(maxSeqNo)); - try (Engine.IndexCommitRef ref = noOpEngine.acquireLastIndexCommit(false)) { - try (IndexReader reader = DirectoryReader.open(ref.get())) { + try (GatedCloseable wrappedCommit = noOpEngine.acquireLastIndexCommit(false)) { + try (IndexReader reader = DirectoryReader.open(wrappedCommit.get())) { assertThat(reader.numDocs(), equalTo(docs)); } } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 6485861f175..91fb1f9b1ff 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -34,6 +34,7 @@ package org.opensearch.index.shard; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; @@ -44,6 +45,7 @@ import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; +import org.junit.Assert; import org.opensearch.Assertions; import org.opensearch.OpenSearchException; import org.opensearch.Version; @@ -72,6 +74,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.lease.Releasable; @@ -142,7 +145,6 @@ import org.opensearch.test.FieldMaskingReader; import org.opensearch.test.VersionUtils; import org.opensearch.test.store.MockFSDirectoryFactory; import org.opensearch.threadpool.ThreadPool; -import org.junit.Assert; import java.io.IOException; import java.nio.charset.Charset; @@ -179,12 +181,6 @@ import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex; -import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS; -import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.opensearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; @@ -204,6 +200,12 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.sameInstance; +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex; +import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS; +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.opensearch.test.hamcrest.RegexMatcher.matches; /** * Simple unit-test IndexShard related operations. @@ -4126,11 +4128,11 @@ public class IndexShardTests extends IndexShardTestCase { try { readyToSnapshotLatch.await(); shard.snapshotStoreMetadata(); - try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(randomBoolean())) { - shard.store().getMetadata(indexCommitRef.get()); + try (GatedCloseable wrappedIndexCommit = shard.acquireLastIndexCommit(randomBoolean())) { + shard.store().getMetadata(wrappedIndexCommit.get()); } - try (Engine.IndexCommitRef indexCommitRef = shard.acquireSafeIndexCommit()) { - shard.store().getMetadata(indexCommitRef.get()); + try (GatedCloseable wrappedSafeCommit = shard.acquireSafeIndexCommit()) { + shard.store().getMetadata(wrappedSafeCommit.get()); } } catch (InterruptedException | IOException e) { throw new AssertionError(e); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index 720356cb495..3890470f966 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -46,6 +46,8 @@ import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.SetOnce; +import org.junit.After; +import org.junit.Before; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -59,6 +61,7 @@ import org.opensearch.common.Randomness; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.store.IndexOutputOutputStream; @@ -93,14 +96,12 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.DummyShardLock; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.io.OutputStream; @@ -650,7 +651,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase { when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.isRelocatedPrimary()).thenReturn(true); - when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class)); + when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class)); doAnswer(invocation -> { ((ActionListener) invocation.getArguments()[0]).onResponse(() -> {}); return null; diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 97d3490db4a..69f7bef90d7 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -61,6 +61,8 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.junit.After; +import org.junit.Before; import org.opensearch.Version; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.replication.ReplicationResponse; @@ -74,6 +76,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.compress.CompressedXContent; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.Settings; @@ -113,12 +116,10 @@ import org.opensearch.index.translog.TranslogConfig; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.test.DummyShardLock; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.nio.charset.Charset; @@ -143,14 +144,14 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.shuffle; -import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; -import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY; -import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; +import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; +import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY; +import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; public abstract class EngineTestCase extends OpenSearchTestCase { @@ -1388,8 +1389,8 @@ public abstract class EngineTestCase extends OpenSearchTestCase { final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations(); final long seqNoForRecovery; if (engine.config().getIndexSettings().isSoftDeleteEnabled()) { - try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { - seqNoForRecovery = Long.parseLong(safeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; + try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { + seqNoForRecovery = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; } } else { seqNoForRecovery = engine.getMinRetainedSeqNo(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b388ab8835a..09c5dfad486 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -32,6 +32,7 @@ package org.opensearch.index.shard; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.Directory; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -52,6 +53,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -113,10 +115,10 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Collectors; -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -1030,13 +1032,13 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase { ); final PlainActionFuture future = PlainActionFuture.newFuture(); final String shardGen; - try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { + try (GatedCloseable wrappedIndexCommit = shard.acquireLastIndexCommit(true)) { repository.snapshotShard( shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId, - indexCommitRef.get(), + wrappedIndexCommit.get(), null, snapshotStatus, Version.CURRENT,