diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 295f559da16..70ec03c09fd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -44,6 +44,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -63,7 +64,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; @@ -100,7 +100,6 @@ public abstract class Engine implements Closeable { protected final Store store; protected final AtomicBoolean isClosed = new AtomicBoolean(false); protected final EventListener eventListener; - protected final SnapshotDeletionPolicy deletionPolicy; protected final ReentrantLock failEngineLock = new ReentrantLock(); protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); @@ -121,7 +120,6 @@ public abstract class Engine implements Closeable { protected Engine(EngineConfig engineConfig) { Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine"); - Objects.requireNonNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine"); this.engineConfig = engineConfig; this.shardId = engineConfig.getShardId(); @@ -129,7 +127,6 @@ public abstract class Engine implements Closeable { this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name engineConfig.getIndexSettings().getSettings(), engineConfig.getShardId()); this.eventListener = engineConfig.getEventListener(); - this.deletionPolicy = engineConfig.getDeletionPolicy(); } /** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */ @@ -828,7 +825,7 @@ public abstract class Engine implements Closeable { * * @param flushFirst indicates whether the engine should flush before returning the snapshot */ - public abstract IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException; + public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException; /** * fail engine due to some error. the engine will also be closed. @@ -1387,6 +1384,28 @@ public abstract class Engine implements Closeable { } } + public static class IndexCommitRef implements Closeable { + private final AtomicBoolean closed = new AtomicBoolean(); + private final CheckedRunnable onClose; + private final IndexCommit indexCommit; + + IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException { + indexCommit = deletionPolicy.snapshot(); + onClose = () -> deletionPolicy.release(indexCommit); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + onClose.run(); + } + } + + public IndexCommit getIndexCommit() { + return indexCommit; + } + } + public void onSettingsChanged() { } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index d22a93273c7..19ec3e036e5 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.engine; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; @@ -58,7 +57,6 @@ public final class EngineConfig { private final ThreadPool threadPool; private final Engine.Warmer warmer; private final Store store; - private final SnapshotDeletionPolicy deletionPolicy; private final MergePolicy mergePolicy; private final Analyzer analyzer; private final Similarity similarity; @@ -109,7 +107,7 @@ public final class EngineConfig { * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, - IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, + IndexSettings indexSettings, Engine.Warmer warmer, Store store, MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, @@ -123,7 +121,6 @@ public final class EngineConfig { this.threadPool = threadPool; this.warmer = warmer == null ? (a) -> {} : warmer; this.store = store; - this.deletionPolicy = deletionPolicy; this.mergePolicy = mergePolicy; this.analyzer = analyzer; this.similarity = similarity; @@ -214,14 +211,6 @@ public final class EngineConfig { return store; } - /** - * Returns a {@link SnapshotDeletionPolicy} used in the engines - * {@link org.apache.lucene.index.IndexWriter}. - */ - public SnapshotDeletionPolicy getDeletionPolicy() { - return deletionPolicy; - } - /** * Returns the {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter} */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ca653ff4124..40c5c0af5e3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -21,16 +21,17 @@ package org.elasticsearch.index.engine; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SearcherFactory; @@ -126,6 +127,8 @@ public class InternalEngine extends Engine { private final String uidField; + private final SnapshotDeletionPolicy deletionPolicy; + // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling // incoming indexing ops to a single thread: @@ -137,12 +140,14 @@ public class InternalEngine extends Engine { private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); + public InternalEngine(EngineConfig engineConfig) throws EngineException { super(engineConfig); openMode = engineConfig.getOpenMode(); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } + deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; this.versionMap = new LiveVersionMap(); store.incRef(); @@ -1414,7 +1419,7 @@ public class InternalEngine extends Engine { } @Override - public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineException { + public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException { // we have to flush outside of the readlock otherwise we might have a problem upgrading // the to a write lock when we fail the engine in this operation if (flushFirst) { @@ -1425,7 +1430,7 @@ public class InternalEngine extends Engine { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); logger.trace("pulling snapshot"); - return deletionPolicy.snapshot(); + return new IndexCommitRef(deletionPolicy); } catch (IOException e) { throw new SnapshotFailedEngineException(shardId, e); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 35ab1db1418..5f1f94f72f7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -23,9 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexOptions; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.Sort; @@ -161,7 +159,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final String checkIndexOnStartup; private final CodecService codecService; private final Engine.Warmer warmer; - private final SnapshotDeletionPolicy deletionPolicy; private final SimilarityService similarityService; private final TranslogConfig translogConfig; private final IndexEventListener indexEventListener; @@ -230,7 +227,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final Settings settings = indexSettings.getSettings(); this.codecService = new CodecService(mapperService, logger); this.warmer = warmer; - this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); this.similarityService = similarityService; Objects.requireNonNull(store, "Store must be provided to the index shard"); this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory; @@ -878,11 +874,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this - * commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}. + * commit won't be freed until the commit / snapshot is closed. * * @param flushFirst true if the index should first be flushed to disk / a low level lucene commit should be executed */ - public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException { + public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException { IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { @@ -893,14 +889,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } - /** - * Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources - * referenced by the given snapshot {@link IndexCommit}. - */ - public void releaseIndexCommit(IndexCommit snapshot) throws IOException { - deletionPolicy.release(snapshot); - } - /** * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, * without having to worry about the current state of the engine and concurrent flushes. @@ -915,25 +903,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present. */ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { - IndexCommit indexCommit = null; + Engine.IndexCommitRef indexCommit = null; store.incRef(); try { + Engine engine; synchronized (mutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. // That can be done out of mutex, since the engine can be closed half way. - Engine engine = getEngineOrNull(); + engine = getEngineOrNull(); if (engine == null) { return store.getMetadata(null, true); } } - indexCommit = deletionPolicy.snapshot(); - return store.getMetadata(indexCommit); + indexCommit = engine.acquireIndexCommit(false); + return store.getMetadata(indexCommit.getIndexCommit()); } finally { store.decRef(); - if (indexCommit != null) { - deletionPolicy.release(indexCommit); - } + IOUtils.close(indexCommit); } } @@ -1838,7 +1825,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger); Sort indexSort = indexSortSupplier.get(); return new EngineConfig(openMode, shardId, - threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), + threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index 302cdda4804..c2019e8c52a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.shard; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; @@ -28,6 +27,7 @@ import org.apache.lucene.store.Lock; import org.apache.lucene.store.NoLockFactory; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.store.Store; import java.io.Closeable; @@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; final class LocalShardSnapshot implements Closeable { private final IndexShard shard; private final Store store; - private final IndexCommit indexCommit; + private final Engine.IndexCommitRef indexCommit; private final AtomicBoolean closed = new AtomicBoolean(false); LocalShardSnapshot(IndexShard shard) { @@ -66,7 +66,7 @@ final class LocalShardSnapshot implements Closeable { return new FilterDirectory(store.directory()) { @Override public String[] listAll() throws IOException { - Collection fileNames = indexCommit.getFileNames(); + Collection fileNames = indexCommit.getIndexCommit().getFileNames(); final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]); return fileNameArray; } @@ -115,7 +115,7 @@ final class LocalShardSnapshot implements Closeable { public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - shard.releaseIndexCommit(indexCommit); + indexCommit.close(); } finally { store.decRef(); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4c8c31779ac..5c7787999da 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -135,7 +136,7 @@ public class RecoverySourceHandler { if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); } else { - final IndexCommit phase1Snapshot; + final Engine.IndexCommitRef phase1Snapshot; try { phase1Snapshot = shard.acquireIndexCommit(false); } catch (final Exception e) { @@ -143,12 +144,12 @@ public class RecoverySourceHandler { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } try { - phase1(phase1Snapshot, translogView); + phase1(phase1Snapshot.getIndexCommit(), translogView); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { try { - shard.releaseIndexCommit(phase1Snapshot); + IOUtils.close(phase1Snapshot); } catch (final IOException ex) { logger.warn("releasing snapshot caused exception", ex); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 7cbe10fad3b..259136ca9cc 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -22,7 +22,6 @@ package org.elasticsearch.snapshots; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.index.IndexCommit; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -45,6 +44,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.SnapshotFailedEngineException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -376,17 +376,14 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements try { // we flush first to make sure we get the latest writes snapshotted - IndexCommit snapshotIndexCommit = indexShard.acquireIndexCommit(true); - try { - repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus); + try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) { + repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository, TimeValue.timeValueMillis(snapshotStatus.time()), sb); } - } finally { - indexShard.releaseIndexCommit(snapshotIndexCommit); } } catch (SnapshotFailedEngineException e) { throw e; diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8bf948240b5..8f97da31a6b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -40,17 +40,14 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexableField; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.LogDocMergePolicy; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.NoDeletionPolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; @@ -260,7 +257,7 @@ public class InternalEngineTests extends ESTestCase { public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), - config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), analyzer, config.getSimilarity(), + config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort()); @@ -337,10 +334,6 @@ public class InternalEngineTests extends ESTestCase { return new Translog(translogConfig, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } - protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() { - return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); - } - protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); } @@ -406,22 +399,11 @@ public class InternalEngineTests extends ESTestCase { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, null); + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort) { - return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, indexSort); - } - - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - SnapshotDeletionPolicy deletionPolicy, ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, deletionPolicy, refreshListener, null); - } - - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - SnapshotDeletionPolicy deletionPolicy, - ReferenceManager.RefreshListener refreshListener, Sort indexSort) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final EngineConfig.OpenMode openMode; @@ -440,8 +422,8 @@ public class InternalEngineTests extends ESTestCase { // we don't need to notify anybody in this test } }; - EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy, - mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, + EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, + mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, new TranslogHandler(xContentRegistry(), shardId.getIndexName(), indexSettings.getSettings(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, indexSort); @@ -2127,11 +2109,11 @@ public class InternalEngineTests extends ESTestCase { // this test writes documents to the engine while concurrently flushing/commit // and ensuring that the commit points contain the correct sequence number data public void testConcurrentWritesAndCommits() throws Exception { + List commits = new ArrayList<>(); try (Store store = createStore(); - InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), - new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), null))) { + InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { - final int numIndexingThreads = scaledRandomIntBetween(3, 6); + final int numIndexingThreads = scaledRandomIntBetween(2, 4); final int numDocsPerThread = randomIntBetween(500, 1000); final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1); final List indexingThreads = new ArrayList<>(); @@ -2164,13 +2146,14 @@ public class InternalEngineTests extends ESTestCase { boolean doneIndexing; do { doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0; - //engine.flush(); // flush and commit + commits.add(engine.acquireIndexCommit(true)); } while (doneIndexing == false); // now, verify all the commits have the correct docs according to the user commit data long prevLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; long prevMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; - for (IndexCommit commit : DirectoryReader.listCommits(store.directory())) { + for (Engine.IndexCommitRef commitRef : commits) { + final IndexCommit commit = commitRef.getIndexCommit(); Map userData = commit.getUserData(); long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ? Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) : @@ -2202,6 +2185,8 @@ public class InternalEngineTests extends ESTestCase { prevLocalCheckpoint = localCheckpoint; prevMaxSeqNo = maxSeqNo; } + } finally { + IOUtils.close(commits); } } @@ -2739,7 +2724,7 @@ public class InternalEngineTests extends ESTestCase { TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE); EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool, - config.getIndexSettings(), null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getAnalyzer(), + config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a113132351b..6dce3dab3a9 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -562,17 +562,17 @@ public class IndexShardTests extends IndexShardTestCase { indexDoc(shard, "type", "id_" + i); } final boolean flushFirst = randomBoolean(); - IndexCommit commit = shard.acquireIndexCommit(flushFirst); + Engine.IndexCommitRef commit = shard.acquireIndexCommit(flushFirst); int moreDocs = randomInt(20); for (int i = 0; i < moreDocs; i++) { indexDoc(shard, "type", "id_" + numDocs + i); } flushShard(shard); // check that we can still read the commit that we captured - try (IndexReader reader = DirectoryReader.open(commit)) { + try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) { assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0)); } - shard.releaseIndexCommit(commit); + commit.close(); flushShard(shard, true); // check it's clean up diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 7ddd229a117..f99de847238 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -23,8 +23,6 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; -import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; @@ -119,7 +117,7 @@ public class RefreshListenersTests extends ESTestCase { }; TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), shardId.getIndexName(), Settings.EMPTY, logger); EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, threadPool, indexSettings, null, - store, new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), newMergePolicy(), iwc.getAnalyzer(), + store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), listeners, null); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 6a059b7484a..09a787ce0d3 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -375,6 +375,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { final Translog.View translogView = mock(Translog.View.class); when(shard.acquireTranslogView()).thenReturn(translogView); when(shard.state()).thenReturn(IndexShardState.RELOCATED); + when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); final AtomicBoolean phase1Called = new AtomicBoolean(); final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); final AtomicBoolean phase2Called = new AtomicBoolean(); @@ -448,6 +449,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { assertTrue(recoveriesDelayed.get()); return null; }).when(shard).relocated(any(String.class)); + when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); final Supplier currentClusterStateVersionSupplier = () -> { assertFalse(ensureClusterStateVersionCalled.get());