Move the IndexDeletionPolicy to be engine internal (#24930)

The `IndexDeletionPolicy` is currently instantiated by `IndexShard` and is then passed through to the engine as a parameter. That's a shame as it is really just an implementation detail and the engine already has a method to acquire a commit.

This is preparing for a follow up PR that will we connect the index deletion policy with a new translog deletion policy.

Relates to #10708
This commit is contained in:
Boaz Leskes 2017-05-29 15:56:30 +02:00 committed by GitHub
parent 5741005812
commit dfdf496c1a
11 changed files with 72 additions and 89 deletions

View File

@ -44,6 +44,7 @@ import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -63,7 +64,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
@ -100,7 +100,6 @@ public abstract class Engine implements Closeable {
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final EventListener eventListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
@ -121,7 +120,6 @@ public abstract class Engine implements Closeable {
protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
Objects.requireNonNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");
this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
@ -129,7 +127,6 @@ public abstract class Engine implements Closeable {
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings().getSettings(), engineConfig.getShardId());
this.eventListener = engineConfig.getEventListener();
this.deletionPolicy = engineConfig.getDeletionPolicy();
}
/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
@ -828,7 +825,7 @@ public abstract class Engine implements Closeable {
*
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException;
/**
* fail engine due to some error. the engine will also be closed.
@ -1387,6 +1384,28 @@ public abstract class Engine implements Closeable {
}
}
public static class IndexCommitRef implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean();
private final CheckedRunnable<IOException> onClose;
private final IndexCommit indexCommit;
IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException {
indexCommit = deletionPolicy.snapshot();
onClose = () -> deletionPolicy.release(indexCommit);
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
onClose.run();
}
}
public IndexCommit getIndexCommit() {
return indexCommit;
}
}
public void onSettingsChanged() {
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.engine;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
@ -58,7 +57,6 @@ public final class EngineConfig {
private final ThreadPool threadPool;
private final Engine.Warmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final MergePolicy mergePolicy;
private final Analyzer analyzer;
private final Similarity similarity;
@ -109,7 +107,7 @@ public final class EngineConfig {
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
@ -123,7 +121,6 @@ public final class EngineConfig {
this.threadPool = threadPool;
this.warmer = warmer == null ? (a) -> {} : warmer;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.mergePolicy = mergePolicy;
this.analyzer = analyzer;
this.similarity = similarity;
@ -214,14 +211,6 @@ public final class EngineConfig {
return store;
}
/**
* Returns a {@link SnapshotDeletionPolicy} used in the engines
* {@link org.apache.lucene.index.IndexWriter}.
*/
public SnapshotDeletionPolicy getDeletionPolicy() {
return deletionPolicy;
}
/**
* Returns the {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
*/

View File

@ -21,16 +21,17 @@ package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
@ -126,6 +127,8 @@ public class InternalEngine extends Engine {
private final String uidField;
private final SnapshotDeletionPolicy deletionPolicy;
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread:
@ -137,12 +140,14 @@ public class InternalEngine extends Engine {
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.versionMap = new LiveVersionMap();
store.incRef();
@ -1414,7 +1419,7 @@ public class InternalEngine extends Engine {
}
@Override
public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
@ -1425,7 +1430,7 @@ public class InternalEngine extends Engine {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
logger.trace("pulling snapshot");
return deletionPolicy.snapshot();
return new IndexCommitRef(deletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
}

View File

@ -23,9 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Sort;
@ -161,7 +159,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final String checkIndexOnStartup;
private final CodecService codecService;
private final Engine.Warmer warmer;
private final SnapshotDeletionPolicy deletionPolicy;
private final SimilarityService similarityService;
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
@ -230,7 +227,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final Settings settings = indexSettings.getSettings();
this.codecService = new CodecService(mapperService, logger);
this.warmer = warmer;
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
@ -878,11 +874,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}.
* commit won't be freed until the commit / snapshot is closed.
*
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
@ -893,14 +889,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources
* referenced by the given snapshot {@link IndexCommit}.
*/
public void releaseIndexCommit(IndexCommit snapshot) throws IOException {
deletionPolicy.release(snapshot);
}
/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
* without having to worry about the current state of the engine and concurrent flushes.
@ -915,25 +903,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present.
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
IndexCommit indexCommit = null;
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
Engine engine;
synchronized (mutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
// That can be done out of mutex, since the engine can be closed half way.
Engine engine = getEngineOrNull();
engine = getEngineOrNull();
if (engine == null) {
return store.getMetadata(null, true);
}
}
indexCommit = deletionPolicy.snapshot();
return store.getMetadata(indexCommit);
indexCommit = engine.acquireIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
if (indexCommit != null) {
deletionPolicy.release(indexCommit);
}
IOUtils.close(indexCommit);
}
}
@ -1838,7 +1825,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
@ -28,6 +27,7 @@ import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NoLockFactory;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.store.Store;
import java.io.Closeable;
@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
final class LocalShardSnapshot implements Closeable {
private final IndexShard shard;
private final Store store;
private final IndexCommit indexCommit;
private final Engine.IndexCommitRef indexCommit;
private final AtomicBoolean closed = new AtomicBoolean(false);
LocalShardSnapshot(IndexShard shard) {
@ -66,7 +66,7 @@ final class LocalShardSnapshot implements Closeable {
return new FilterDirectory(store.directory()) {
@Override
public String[] listAll() throws IOException {
Collection<String> fileNames = indexCommit.getFileNames();
Collection<String> fileNames = indexCommit.getIndexCommit().getFileNames();
final String[] fileNameArray = fileNames.toArray(new String[fileNames.size()]);
return fileNameArray;
}
@ -115,7 +115,7 @@ final class LocalShardSnapshot implements Closeable {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
shard.releaseIndexCommit(indexCommit);
indexCommit.close();
} finally {
store.decRef();
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbersService;
@ -135,7 +136,7 @@ public class RecoverySourceHandler {
if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
} else {
final IndexCommit phase1Snapshot;
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (final Exception e) {
@ -143,12 +144,12 @@ public class RecoverySourceHandler {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
try {
phase1(phase1Snapshot, translogView);
phase1(phase1Snapshot.getIndexCommit(), translogView);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
shard.releaseIndexCommit(phase1Snapshot);
IOUtils.close(phase1Snapshot);
} catch (final IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -45,6 +44,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -376,17 +376,14 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
try {
// we flush first to make sure we get the latest writes snapshotted
IndexCommit snapshotIndexCommit = indexShard.acquireIndexCommit(true);
try {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus);
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");
logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository,
TimeValue.timeValueMillis(snapshotStatus.time()), sb);
}
} finally {
indexShard.releaseIndexCommit(snapshotIndexCommit);
}
} catch (SnapshotFailedEngineException e) {
throw e;

View File

@ -40,17 +40,14 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoDeletionPolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
@ -260,7 +257,7 @@ public class InternalEngineTests extends ESTestCase {
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), analyzer, config.getSimilarity(),
config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
config.getIndexSort());
@ -337,10 +334,6 @@ public class InternalEngineTests extends ESTestCase {
return new Translog(translogConfig, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
}
protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
}
protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
}
@ -406,22 +399,11 @@ public class InternalEngineTests extends ESTestCase {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, null);
return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort) {
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, indexSort);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
SnapshotDeletionPolicy deletionPolicy, ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, deletionPolicy, refreshListener, null);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
SnapshotDeletionPolicy deletionPolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
final EngineConfig.OpenMode openMode;
@ -440,7 +422,7 @@ public class InternalEngineTests extends ESTestCase {
// we don't need to notify anybody in this test
}
};
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy,
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
new TranslogHandler(xContentRegistry(), shardId.getIndexName(), indexSettings.getSettings(), logger),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
@ -2127,11 +2109,11 @@ public class InternalEngineTests extends ESTestCase {
// this test writes documents to the engine while concurrently flushing/commit
// and ensuring that the commit points contain the correct sequence number data
public void testConcurrentWritesAndCommits() throws Exception {
List<Engine.IndexCommitRef> commits = new ArrayList<>();
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(),
new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), null))) {
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
final int numIndexingThreads = scaledRandomIntBetween(3, 6);
final int numIndexingThreads = scaledRandomIntBetween(2, 4);
final int numDocsPerThread = randomIntBetween(500, 1000);
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
final List<Thread> indexingThreads = new ArrayList<>();
@ -2164,13 +2146,14 @@ public class InternalEngineTests extends ESTestCase {
boolean doneIndexing;
do {
doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0;
//engine.flush(); // flush and commit
commits.add(engine.acquireIndexCommit(true));
} while (doneIndexing == false);
// now, verify all the commits have the correct docs according to the user commit data
long prevLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long prevMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
for (IndexCommit commit : DirectoryReader.listCommits(store.directory())) {
for (Engine.IndexCommitRef commitRef : commits) {
final IndexCommit commit = commitRef.getIndexCommit();
Map<String, String> userData = commit.getUserData();
long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ?
Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) :
@ -2202,6 +2185,8 @@ public class InternalEngineTests extends ESTestCase {
prevLocalCheckpoint = localCheckpoint;
prevMaxSeqNo = maxSeqNo;
}
} finally {
IOUtils.close(commits);
}
}
@ -2739,7 +2724,7 @@ public class InternalEngineTests extends ESTestCase {
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool,
config.getIndexSettings(), null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getAnalyzer(),
config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null);

View File

@ -562,17 +562,17 @@ public class IndexShardTests extends IndexShardTestCase {
indexDoc(shard, "type", "id_" + i);
}
final boolean flushFirst = randomBoolean();
IndexCommit commit = shard.acquireIndexCommit(flushFirst);
Engine.IndexCommitRef commit = shard.acquireIndexCommit(flushFirst);
int moreDocs = randomInt(20);
for (int i = 0; i < moreDocs; i++) {
indexDoc(shard, "type", "id_" + numDocs + i);
}
flushShard(shard);
// check that we can still read the commit that we captured
try (IndexReader reader = DirectoryReader.open(commit)) {
try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) {
assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0));
}
shard.releaseIndexCommit(commit);
commit.close();
flushShard(shard, true);
// check it's clean up

View File

@ -23,8 +23,6 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
@ -119,7 +117,7 @@ public class RefreshListenersTests extends ESTestCase {
};
TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), shardId.getIndexName(), Settings.EMPTY, logger);
EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, threadPool, indexSettings, null,
store, new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), newMergePolicy(), iwc.getAnalyzer(),
store, newMergePolicy(), iwc.getAnalyzer(),
iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), listeners, null);

View File

@ -375,6 +375,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final Translog.View translogView = mock(Translog.View.class);
when(shard.acquireTranslogView()).thenReturn(translogView);
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
final AtomicBoolean phase1Called = new AtomicBoolean();
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
final AtomicBoolean phase2Called = new AtomicBoolean();
@ -448,6 +449,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
assertTrue(recoveriesDelayed.get());
return null;
}).when(shard).relocated(any(String.class));
when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
final Supplier<Long> currentClusterStateVersionSupplier = () -> {
assertFalse(ensureClusterStateVersionCalled.get());