Remove the IndexCommitRef class (#2421)

This inner class is no longer required because its functionality has been moved to the generic GatedCloseable class.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
This commit is contained in:
Kartik 2022-03-10 10:12:17 -08:00 committed by GitHub
parent c8d80090f4
commit 9cfa395128
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 135 additions and 124 deletions

View File

@ -32,11 +32,13 @@
package org.opensearch.action.admin.indices.forcemerge; package org.opensearch.action.admin.indices.forcemerge;
import org.apache.lucene.index.IndexCommit;
import org.opensearch.action.admin.indices.flush.FlushResponse; import org.opensearch.action.admin.indices.flush.FlushResponse;
import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.index.Index; import org.opensearch.index.Index;
import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.Engine;
@ -99,8 +101,8 @@ public class ForceMergeIT extends OpenSearchIntegTestCase {
} }
private static String getForceMergeUUID(IndexShard indexShard) throws IOException { private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) { try (GatedCloseable<IndexCommit> wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) {
return indexCommitRef.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY); return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
} }
} }
} }

View File

@ -33,8 +33,8 @@
package org.opensearch.indices.recovery; package org.opensearch.indices.recovery;
import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchException;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
@ -75,6 +75,7 @@ import org.opensearch.common.Priority;
import org.opensearch.common.Strings; import org.opensearch.common.Strings;
import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException; import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.ByteSizeValue;
@ -88,7 +89,6 @@ import org.opensearch.index.IndexSettings;
import org.opensearch.index.MockEngineFactoryPlugin; import org.opensearch.index.MockEngineFactoryPlugin;
import org.opensearch.index.analysis.AbstractTokenFilterFactory; import org.opensearch.index.analysis.AbstractTokenFilterFactory;
import org.opensearch.index.analysis.TokenFilterFactory; import org.opensearch.index.analysis.TokenFilterFactory;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperParsingException; import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.ReplicationTracker;
@ -114,11 +114,11 @@ import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotState; import org.opensearch.snapshots.SnapshotState;
import org.opensearch.tasks.Task; import org.opensearch.tasks.Task;
import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope; import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.engine.MockEngineSupport; import org.opensearch.test.engine.MockEngineSupport;
import org.opensearch.test.store.MockFSIndexStore; import org.opensearch.test.store.MockFSIndexStore;
import org.opensearch.test.transport.MockTransportService; import org.opensearch.test.transport.MockTransportService;
@ -151,12 +151,6 @@ import java.util.stream.StreamSupport;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.everyItem;
@ -167,6 +161,11 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0) @ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndexRecoveryIT extends OpenSearchIntegTestCase { public class IndexRecoveryIT extends OpenSearchIntegTestCase {
@ -1599,9 +1598,9 @@ public class IndexRecoveryIT extends OpenSearchIntegTestCase {
.getShardOrNull(new ShardId(resolveIndex(indexName), 0)); .getShardOrNull(new ShardId(resolveIndex(indexName), 0));
final long lastSyncedGlobalCheckpoint = shard.getLastSyncedGlobalCheckpoint(); final long lastSyncedGlobalCheckpoint = shard.getLastSyncedGlobalCheckpoint();
final long localCheckpointOfSafeCommit; final long localCheckpointOfSafeCommit;
try (Engine.IndexCommitRef safeCommitRef = shard.acquireSafeIndexCommit()) { try (GatedCloseable<IndexCommit> wrappedSafeCommit = shard.acquireSafeIndexCommit()) {
localCheckpointOfSafeCommit = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( localCheckpointOfSafeCommit = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
safeCommitRef.get().getUserData().entrySet() wrappedSafeCommit.get().getUserData().entrySet()
).localCheckpoint; ).localCheckpoint;
} }
final long maxSeqNo = shard.seqNoStats().getMaxSeqNo(); final long maxSeqNo = shard.seqNoStats().getMaxSeqNo();

View File

@ -55,7 +55,6 @@ import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.opensearch.ExceptionsHelper; import org.opensearch.ExceptionsHelper;
import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable; import org.opensearch.common.Nullable;
import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.collect.ImmutableOpenMap;
@ -1109,12 +1108,12 @@ public abstract class Engine implements Closeable {
* *
* @param flushFirst indicates whether the engine should flush before returning the snapshot * @param flushFirst indicates whether the engine should flush before returning the snapshot
*/ */
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException; public abstract GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException;
/** /**
* Snapshots the most recent safe index commit from the engine. * Snapshots the most recent safe index commit from the engine.
*/ */
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException; public abstract GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException;
/** /**
* @return a summary of the contents of the current safe commit * @return a summary of the contents of the current safe commit
@ -1829,12 +1828,6 @@ public abstract class Engine implements Closeable {
} }
} }
public static class IndexCommitRef extends GatedCloseable<IndexCommit> {
public IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
super(indexCommit, onClose);
}
}
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
} }

View File

@ -72,6 +72,7 @@ import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Booleans; import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable; import org.opensearch.common.Nullable;
import org.opensearch.common.SuppressForbidden; import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.LoggerInfoStream; import org.opensearch.common.lucene.LoggerInfoStream;
import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.Lucene;
@ -103,10 +104,10 @@ import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats; import org.opensearch.search.suggest.completion.CompletionStats;
@ -2193,7 +2194,7 @@ public class InternalEngine extends Engine {
} }
@Override @Override
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException { public GatedCloseable<IndexCommit> acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading // we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation // the to a write lock when we fail the engine in this operation
if (flushFirst) { if (flushFirst) {
@ -2202,13 +2203,13 @@ public class InternalEngine extends Engine {
logger.trace("finish flush for snapshot"); logger.trace("finish flush for snapshot");
} }
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit)); return new GatedCloseable<>(lastCommit, () -> releaseIndexCommit(lastCommit));
} }
@Override @Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException { public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit)); return new GatedCloseable<>(safeCommit, () -> releaseIndexCommit(safeCommit));
} }
private void releaseIndexCommit(IndexCommit snapshot) throws IOException { private void releaseIndexCommit(IndexCommit snapshot) throws IOException {

View File

@ -41,6 +41,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock; import org.apache.lucene.store.Lock;
import org.opensearch.LegacyESVersion; import org.opensearch.LegacyESVersion;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.concurrent.ReleasableLock;
@ -49,9 +50,9 @@ import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store; import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats; import org.opensearch.search.suggest.completion.CompletionStats;
@ -413,13 +414,13 @@ public class ReadOnlyEngine extends Engine {
) {} ) {}
@Override @Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) {
store.incRef(); store.incRef();
return new IndexCommitRef(indexCommit, store::decRef); return new GatedCloseable<>(indexCommit, store::decRef);
} }
@Override @Override
public IndexCommitRef acquireSafeIndexCommit() { public GatedCloseable<IndexCommit> acquireSafeIndexCommit() {
return acquireLastIndexCommit(false); return acquireLastIndexCommit(false);
} }

View File

@ -51,9 +51,9 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions; import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion; import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable; import org.opensearch.action.ActionRunnable;
import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.flush.FlushRequest;
@ -73,6 +73,7 @@ import org.opensearch.common.CheckedFunction;
import org.opensearch.common.CheckedRunnable; import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable; import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple; import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables; import org.opensearch.common.lease.Releasables;
@ -1409,7 +1410,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* *
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed * @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/ */
public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException { public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException {
final IndexShardState state = this.state; // one time volatile read final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) { if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) {
@ -1423,7 +1424,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Snapshots the most recent safe index commit from the currently running engine. * Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed. * All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
*/ */
public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException {
final IndexShardState state = this.state; // one time volatile read final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) { if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) {
@ -1448,7 +1449,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/ */
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex"; assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex";
Engine.IndexCommitRef indexCommit = null; GatedCloseable<IndexCommit> wrappedIndexCommit = null;
store.incRef(); store.incRef();
try { try {
synchronized (engineMutex) { synchronized (engineMutex) {
@ -1456,16 +1457,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
final Engine engine = getEngineOrNull(); final Engine engine = getEngineOrNull();
if (engine != null) { if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false); wrappedIndexCommit = engine.acquireLastIndexCommit(false);
} }
if (indexCommit == null) { if (wrappedIndexCommit == null) {
return store.getMetadata(null, true); return store.getMetadata(null, true);
} }
} }
return store.getMetadata(indexCommit.get()); return store.getMetadata(wrappedIndexCommit.get());
} finally { } finally {
store.decRef(); store.decRef();
IOUtils.close(indexCommit); IOUtils.close(wrappedIndexCommit);
} }
} }
@ -3913,7 +3914,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
true true
) { ) {
@Override @Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) {
synchronized (engineMutex) { synchronized (engineMutex) {
if (newEngineReference.get() == null) { if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed"); throw new AlreadyClosedException("engine was closed");
@ -3924,7 +3925,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
@Override @Override
public IndexCommitRef acquireSafeIndexCommit() { public GatedCloseable<IndexCommit> acquireSafeIndexCommit() {
synchronized (engineMutex) { synchronized (engineMutex) {
if (newEngineReference.get() == null) { if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed"); throw new AlreadyClosedException("engine was closed");

View File

@ -32,6 +32,7 @@
package org.opensearch.index.shard; package org.opensearch.index.shard;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
@ -39,6 +40,7 @@ import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock; import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.NoLockFactory;
import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.Index; import org.opensearch.index.Index;
import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.Engine;
import org.opensearch.index.store.Store; import org.opensearch.index.store.Store;
@ -52,7 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
final class LocalShardSnapshot implements Closeable { final class LocalShardSnapshot implements Closeable {
private final IndexShard shard; private final IndexShard shard;
private final Store store; private final Store store;
private final Engine.IndexCommitRef indexCommit; private final GatedCloseable<IndexCommit> wrappedIndexCommit;
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
LocalShardSnapshot(IndexShard shard) { LocalShardSnapshot(IndexShard shard) {
@ -61,7 +63,7 @@ final class LocalShardSnapshot implements Closeable {
store.incRef(); store.incRef();
boolean success = false; boolean success = false;
try { try {
indexCommit = shard.acquireLastIndexCommit(true); wrappedIndexCommit = shard.acquireLastIndexCommit(true);
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {
@ -88,7 +90,7 @@ final class LocalShardSnapshot implements Closeable {
return new FilterDirectory(store.directory()) { return new FilterDirectory(store.directory()) {
@Override @Override
public String[] listAll() throws IOException { public String[] listAll() throws IOException {
Collection<String> fileNames = indexCommit.get().getFileNames(); Collection<String> fileNames = wrappedIndexCommit.get().getFileNames();
final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]); final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]);
return fileNameArray; return fileNameArray;
} }
@ -143,7 +145,7 @@ final class LocalShardSnapshot implements Closeable {
public void close() throws IOException { public void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
try { try {
indexCommit.close(); wrappedIndexCommit.close();
} finally { } finally {
store.decRef(); store.decRef();
} }
@ -156,6 +158,6 @@ final class LocalShardSnapshot implements Closeable {
@Override @Override
public String toString() { public String toString() {
return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + indexCommit + "]"; return "local_shard_snapshot:[" + shard.shardId() + " indexCommit: " + wrappedIndexCommit + "]";
} }
} }

View File

@ -57,6 +57,7 @@ import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.StopWatch; import org.opensearch.common.StopWatch;
import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables; import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.Loggers;
@ -64,11 +65,10 @@ import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue; import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.common.util.concurrent.ListenableFuture; import org.opensearch.common.util.concurrent.ListenableFuture;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.internal.io.IOUtils; import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLease;
@ -250,10 +250,10 @@ public class RecoverySourceHandler {
sendFileStep.onResponse(SendFileResult.EMPTY); sendFileStep.onResponse(SendFileResult.EMPTY);
} }
} else { } else {
final Engine.IndexCommitRef safeCommitRef; final GatedCloseable<IndexCommit> wrappedSafeCommit;
try { try {
safeCommitRef = acquireSafeCommit(shard); wrappedSafeCommit = acquireSafeCommit(shard);
resources.add(safeCommitRef); resources.add(wrappedSafeCommit);
} catch (final Exception e) { } catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
} }
@ -268,16 +268,16 @@ public class RecoverySourceHandler {
// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can
// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled
// down. // down.
startingSeqNo = Long.parseLong(safeCommitRef.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; startingSeqNo = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L;
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);
try { try {
final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo); final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);
final Releasable releaseStore = acquireStore(shard.store()); final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore); resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> { sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> {
try { try {
IOUtils.close(safeCommitRef, releaseStore); IOUtils.close(wrappedSafeCommit, releaseStore);
} catch (final IOException ex) { } catch (final IOException ex) {
logger.warn("releasing snapshot caused exception", ex); logger.warn("releasing snapshot caused exception", ex);
} }
@ -307,7 +307,7 @@ public class RecoverySourceHandler {
deleteRetentionLeaseStep.whenComplete(ignored -> { deleteRetentionLeaseStep.whenComplete(ignored -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
phase1(safeCommitRef.get(), startingSeqNo, () -> estimateNumOps, sendFileStep); phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep);
}, onFailure); }, onFailure);
} catch (final Exception e) { } catch (final Exception e) {
@ -467,12 +467,12 @@ public class RecoverySourceHandler {
* with the file systems due to interrupt (see {@link org.apache.lucene.store.NIOFSDirectory} javadocs for more detail). * with the file systems due to interrupt (see {@link org.apache.lucene.store.NIOFSDirectory} javadocs for more detail).
* This method acquires a safe commit and wraps it to make sure that it will be released using the generic thread pool. * This method acquires a safe commit and wraps it to make sure that it will be released using the generic thread pool.
*/ */
private Engine.IndexCommitRef acquireSafeCommit(IndexShard shard) { private GatedCloseable<IndexCommit> acquireSafeCommit(IndexShard shard) {
final Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit(); final GatedCloseable<IndexCommit> wrappedSafeCommit = shard.acquireSafeIndexCommit();
final AtomicBoolean closed = new AtomicBoolean(false); final AtomicBoolean closed = new AtomicBoolean(false);
return new Engine.IndexCommitRef(commitRef.get(), () -> { return new GatedCloseable<>(wrappedSafeCommit.get(), () -> {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
runWithGenericThreadPool(commitRef::close); runWithGenericThreadPool(wrappedSafeCommit::close);
} }
}); });
} }

View File

@ -50,6 +50,7 @@ import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable; import org.opensearch.common.Nullable;
import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.core.internal.io.IOUtils; import org.opensearch.core.internal.io.IOUtils;
@ -368,25 +369,25 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
} }
final Repository repository = repositoriesService.repository(snapshot.getRepository()); final Repository repository = repositoriesService.repository(snapshot.getRepository());
Engine.IndexCommitRef snapshotRef = null; GatedCloseable<IndexCommit> wrappedSnapshot = null;
try { try {
// we flush first to make sure we get the latest writes snapshotted // we flush first to make sure we get the latest writes snapshotted
snapshotRef = indexShard.acquireLastIndexCommit(true); wrappedSnapshot = indexShard.acquireLastIndexCommit(true);
final IndexCommit snapshotIndexCommit = snapshotRef.get(); final IndexCommit snapshotIndexCommit = wrappedSnapshot.get();
repository.snapshotShard( repository.snapshotShard(
indexShard.store(), indexShard.store(),
indexShard.mapperService(), indexShard.mapperService(),
snapshot.getSnapshotId(), snapshot.getSnapshotId(),
indexId, indexId,
snapshotRef.get(), wrappedSnapshot.get(),
getShardStateId(indexShard, snapshotIndexCommit), getShardStateId(indexShard, snapshotIndexCommit),
snapshotStatus, snapshotStatus,
version, version,
userMetadata, userMetadata,
ActionListener.runBefore(listener, snapshotRef::close) ActionListener.runBefore(listener, wrappedSnapshot::close)
); );
} catch (Exception e) { } catch (Exception e) {
IOUtils.close(snapshotRef); IOUtils.close(wrappedSnapshot);
throw e; throw e;
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -82,6 +82,8 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchException;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
@ -101,6 +103,7 @@ import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple; import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
@ -154,8 +157,6 @@ import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.VersionUtils; import org.opensearch.test.VersionUtils;
import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -196,15 +197,6 @@ import java.util.stream.LongStream;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.shuffle; import static java.util.Collections.shuffle;
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET;
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
@ -230,6 +222,15 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET;
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
public class InternalEngineTests extends EngineTestCase { public class InternalEngineTests extends EngineTestCase {
@ -1086,9 +1087,9 @@ public class InternalEngineTests extends EngineTestCase {
final CheckedRunnable<IOException> checker = () -> { final CheckedRunnable<IOException> checker = () -> {
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0)); assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get())); assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get()));
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { try (GatedCloseable<IndexCommit> wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
safeCommit.get().getUserData().entrySet() wrappedSafeCommit.get().getUserData().entrySet()
); );
assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint())); assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint()));
} }
@ -1504,8 +1505,8 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(randomLongBetween(0, localCheckpoint)); globalCheckpoint.set(randomLongBetween(0, localCheckpoint));
engine.syncTranslog(); engine.syncTranslog();
final long safeCommitCheckpoint; final long safeCommitCheckpoint;
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { try (GatedCloseable<IndexCommit> wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
safeCommitCheckpoint = Long.parseLong(safeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); safeCommitCheckpoint = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
} }
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
@ -1594,8 +1595,10 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(randomLongBetween(0, engine.getPersistedLocalCheckpoint())); globalCheckpoint.set(randomLongBetween(0, engine.getPersistedLocalCheckpoint()));
engine.syncTranslog(); engine.syncTranslog();
final long minSeqNoToRetain; final long minSeqNoToRetain;
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { try (GatedCloseable<IndexCommit> wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
long safeCommitLocalCheckpoint = Long.parseLong(safeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); long safeCommitLocalCheckpoint = Long.parseLong(
wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
);
minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1); minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1);
} }
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
@ -2613,7 +2616,7 @@ public class InternalEngineTests extends EngineTestCase {
// this test writes documents to the engine while concurrently flushing/commit // this test writes documents to the engine while concurrently flushing/commit
// and ensuring that the commit points contain the correct sequence number data // and ensuring that the commit points contain the correct sequence number data
public void testConcurrentWritesAndCommits() throws Exception { public void testConcurrentWritesAndCommits() throws Exception {
List<Engine.IndexCommitRef> commits = new ArrayList<>(); List<GatedCloseable<IndexCommit>> commits = new ArrayList<>();
try ( try (
Store store = createStore(); Store store = createStore();
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null)) InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))
@ -2668,8 +2671,8 @@ public class InternalEngineTests extends EngineTestCase {
// now, verify all the commits have the correct docs according to the user commit data // now, verify all the commits have the correct docs according to the user commit data
long prevLocalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; long prevLocalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
long prevMaxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; long prevMaxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
for (Engine.IndexCommitRef commitRef : commits) { for (GatedCloseable<IndexCommit> wrappedCommit : commits) {
final IndexCommit commit = commitRef.get(); final IndexCommit commit = wrappedCommit.get();
Map<String, String> userData = commit.getUserData(); Map<String, String> userData = commit.getUserData();
long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
? Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) ? Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))
@ -5617,7 +5620,7 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(engine, store); IOUtils.close(engine, store);
store = createStore(); store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Engine.IndexCommitRef snapshot; final GatedCloseable<IndexCommit> wrappedSnapshot;
final boolean closeSnapshotBeforeEngine = randomBoolean(); final boolean closeSnapshotBeforeEngine = randomBoolean();
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
int numDocs = between(1, 20); int numDocs = between(1, 20);
@ -5630,9 +5633,9 @@ public class InternalEngineTests extends EngineTestCase {
final boolean flushFirst = randomBoolean(); final boolean flushFirst = randomBoolean();
final boolean safeCommit = randomBoolean(); final boolean safeCommit = randomBoolean();
if (safeCommit) { if (safeCommit) {
snapshot = engine.acquireSafeIndexCommit(); wrappedSnapshot = engine.acquireSafeIndexCommit();
} else { } else {
snapshot = engine.acquireLastIndexCommit(flushFirst); wrappedSnapshot = engine.acquireLastIndexCommit(flushFirst);
} }
int moreDocs = between(1, 20); int moreDocs = between(1, 20);
for (int i = 0; i < moreDocs; i++) { for (int i = 0; i < moreDocs; i++) {
@ -5641,13 +5644,13 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(numDocs + moreDocs - 1); globalCheckpoint.set(numDocs + moreDocs - 1);
engine.flush(); engine.flush();
// check that we can still read the commit that we captured // check that we can still read the commit that we captured
try (IndexReader reader = DirectoryReader.open(snapshot.get())) { try (IndexReader reader = DirectoryReader.open(wrappedSnapshot.get())) {
assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0)); assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0));
} }
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2)); assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
if (closeSnapshotBeforeEngine) { if (closeSnapshotBeforeEngine) {
snapshot.close(); wrappedSnapshot.close();
// check it's clean up // check it's clean up
engine.flush(true, true); engine.flush(true, true);
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1)); assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));
@ -5655,7 +5658,7 @@ public class InternalEngineTests extends EngineTestCase {
} }
if (closeSnapshotBeforeEngine == false) { if (closeSnapshotBeforeEngine == false) {
snapshot.close(); // shouldn't throw AlreadyClosedException wrappedSnapshot.close(); // shouldn't throw AlreadyClosedException
} }
} }
@ -5719,7 +5722,7 @@ public class InternalEngineTests extends EngineTestCase {
} }
engine.flush(false, randomBoolean()); engine.flush(false, randomBoolean());
int numSnapshots = between(1, 10); int numSnapshots = between(1, 10);
final List<Engine.IndexCommitRef> snapshots = new ArrayList<>(); final List<GatedCloseable<IndexCommit>> snapshots = new ArrayList<>();
for (int i = 0; i < numSnapshots; i++) { for (int i = 0; i < numSnapshots; i++) {
snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit. snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit.
} }
@ -6322,8 +6325,8 @@ public class InternalEngineTests extends EngineTestCase {
.collect(Collectors.toSet()); .collect(Collectors.toSet());
assertThat(actualOps, containsInAnyOrder(expectedOps)); assertThat(actualOps, containsInAnyOrder(expectedOps));
} }
try (Engine.IndexCommitRef commitRef = engine.acquireSafeIndexCommit()) { try (GatedCloseable<IndexCommit> wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
IndexCommit safeCommit = commitRef.get(); IndexCommit safeCommit = wrappedSafeCommit.get();
if (safeCommit.getUserData().containsKey(Engine.MIN_RETAINED_SEQNO)) { if (safeCommit.getUserData().containsKey(Engine.MIN_RETAINED_SEQNO)) {
lastMinRetainedSeqNo = Long.parseLong(safeCommit.getUserData().get(Engine.MIN_RETAINED_SEQNO)); lastMinRetainedSeqNo = Long.parseLong(safeCommit.getUserData().get(Engine.MIN_RETAINED_SEQNO));
} }

View File

@ -33,6 +33,7 @@
package org.opensearch.index.engine; package org.opensearch.index.engine;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
@ -41,6 +42,7 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue; import org.opensearch.common.unit.TimeValue;
@ -114,8 +116,8 @@ public class NoOpEngineTests extends EngineTestCase {
final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker));
assertThat(noOpEngine.getPersistedLocalCheckpoint(), equalTo(localCheckpoint)); assertThat(noOpEngine.getPersistedLocalCheckpoint(), equalTo(localCheckpoint));
assertThat(noOpEngine.getSeqNoStats(100L).getMaxSeqNo(), equalTo(maxSeqNo)); assertThat(noOpEngine.getSeqNoStats(100L).getMaxSeqNo(), equalTo(maxSeqNo));
try (Engine.IndexCommitRef ref = noOpEngine.acquireLastIndexCommit(false)) { try (GatedCloseable<IndexCommit> wrappedCommit = noOpEngine.acquireLastIndexCommit(false)) {
try (IndexReader reader = DirectoryReader.open(ref.get())) { try (IndexReader reader = DirectoryReader.open(wrappedCommit.get())) {
assertThat(reader.numDocs(), equalTo(docs)); assertThat(reader.numDocs(), equalTo(docs));
} }
} }

View File

@ -34,6 +34,7 @@ package org.opensearch.index.shard;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
@ -44,6 +45,7 @@ import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.junit.Assert;
import org.opensearch.Assertions; import org.opensearch.Assertions;
import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchException;
import org.opensearch.Version; import org.opensearch.Version;
@ -72,6 +74,7 @@ import org.opensearch.common.Strings;
import org.opensearch.common.UUIDs; import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.collect.Tuple; import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasable;
@ -142,7 +145,6 @@ import org.opensearch.test.FieldMaskingReader;
import org.opensearch.test.VersionUtils; import org.opensearch.test.VersionUtils;
import org.opensearch.test.store.MockFSDirectoryFactory; import org.opensearch.test.store.MockFSDirectoryFactory;
import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool;
import org.junit.Assert;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -179,12 +181,6 @@ import java.util.stream.Stream;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.either;
@ -204,6 +200,12 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.test.hamcrest.RegexMatcher.matches;
/** /**
* Simple unit-test IndexShard related operations. * Simple unit-test IndexShard related operations.
@ -4126,11 +4128,11 @@ public class IndexShardTests extends IndexShardTestCase {
try { try {
readyToSnapshotLatch.await(); readyToSnapshotLatch.await();
shard.snapshotStoreMetadata(); shard.snapshotStoreMetadata();
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(randomBoolean())) { try (GatedCloseable<IndexCommit> wrappedIndexCommit = shard.acquireLastIndexCommit(randomBoolean())) {
shard.store().getMetadata(indexCommitRef.get()); shard.store().getMetadata(wrappedIndexCommit.get());
} }
try (Engine.IndexCommitRef indexCommitRef = shard.acquireSafeIndexCommit()) { try (GatedCloseable<IndexCommit> wrappedSafeCommit = shard.acquireSafeIndexCommit()) {
shard.store().getMetadata(indexCommitRef.get()); shard.store().getMetadata(wrappedSafeCommit.get());
} }
} catch (InterruptedException | IOException e) { } catch (InterruptedException | IOException e) {
throw new AssertionError(e); throw new AssertionError(e);

View File

@ -46,6 +46,8 @@ import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.junit.After;
import org.junit.Before;
import org.opensearch.ExceptionsHelper; import org.opensearch.ExceptionsHelper;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
@ -59,6 +61,7 @@ import org.opensearch.common.Randomness;
import org.opensearch.common.UUIDs; import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.store.IndexOutputOutputStream; import org.opensearch.common.lucene.store.IndexOutputOutputStream;
@ -93,14 +96,12 @@ import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.Translog;
import org.opensearch.test.CorruptionUtils; import org.opensearch.test.CorruptionUtils;
import org.opensearch.test.DummyShardLock; import org.opensearch.test.DummyShardLock;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils; import org.opensearch.test.VersionUtils;
import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -650,7 +651,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.isRelocatedPrimary()).thenReturn(true); when(shard.isRelocatedPrimary()).thenReturn(true);
when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class)); when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class));
doAnswer(invocation -> { doAnswer(invocation -> {
((ActionListener<Releasable>) invocation.getArguments()[0]).onResponse(() -> {}); ((ActionListener<Releasable>) invocation.getArguments()[0]).onResponse(() -> {});
return null; return null;

View File

@ -61,6 +61,8 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.junit.After;
import org.junit.Before;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.ReplicationResponse;
@ -74,6 +76,7 @@ import org.opensearch.common.Strings;
import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
@ -113,12 +116,10 @@ import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.test.DummyShardLock; import org.opensearch.test.DummyShardLock;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -143,14 +144,14 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.shuffle; import static java.util.Collections.shuffle;
import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
public abstract class EngineTestCase extends OpenSearchTestCase { public abstract class EngineTestCase extends OpenSearchTestCase {
@ -1388,8 +1389,8 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations(); final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
final long seqNoForRecovery; final long seqNoForRecovery;
if (engine.config().getIndexSettings().isSoftDeleteEnabled()) { if (engine.config().getIndexSettings().isSoftDeleteEnabled()) {
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { try (GatedCloseable<IndexCommit> wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
seqNoForRecovery = Long.parseLong(safeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; seqNoForRecovery = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
} }
} else { } else {
seqNoForRecovery = engine.getMinRetainedSeqNo(); seqNoForRecovery = engine.getMinRetainedSeqNo();

View File

@ -32,6 +32,7 @@
package org.opensearch.index.shard; package org.opensearch.index.shard;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.flush.FlushRequest;
@ -52,6 +53,7 @@ import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable; import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs; import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
@ -113,10 +115,10 @@ import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
/** /**
* A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily,
@ -1030,13 +1032,13 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase {
); );
final PlainActionFuture<String> future = PlainActionFuture.newFuture(); final PlainActionFuture<String> future = PlainActionFuture.newFuture();
final String shardGen; final String shardGen;
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { try (GatedCloseable<IndexCommit> wrappedIndexCommit = shard.acquireLastIndexCommit(true)) {
repository.snapshotShard( repository.snapshotShard(
shard.store(), shard.store(),
shard.mapperService(), shard.mapperService(),
snapshot.getSnapshotId(), snapshot.getSnapshotId(),
indexId, indexId,
indexCommitRef.get(), wrappedIndexCommit.get(),
null, null,
snapshotStatus, snapshotStatus,
Version.CURRENT, Version.CURRENT,