Untangle Engine Constructor logic (#28245)

Currently we have a fairly complicated logic in the engine constructor logic to deal with all the 
various ways we want to mutate the lucene index and translog we're opening.

We can:
1) Create an empty index
2) Use the lucene but create a new translog
3) Use both
4) Force a new history uuid in all cases.

This leads complicated code flows which makes it harder and harder to make sure we cover all the 
corner cases. This PR tries to take another approach. Constructing an InternalEngine always opens 
things as they are and all needed modifications are done by static methods directly on the 
directory, one at a time.
This commit is contained in:
Boaz Leskes 2018-03-14 20:59:47 +01:00 committed by GitHub
parent 8e8fdc4f0e
commit bf65cb4914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 620 additions and 686 deletions

View File

@ -93,12 +93,12 @@ which returns something similar to:
{
"commit" : {
"id" : "3M3zkw2GHMo2Y4h4/KFKCg==",
"generation" : 2,
"generation" : 3,
"user_data" : {
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
"local_checkpoint" : "-1",
"translog_generation" : "2",
"translog_generation" : "3",
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1"

View File

@ -46,16 +46,14 @@ import java.util.function.LongSupplier;
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
this.openMode = openMode;
this.logger = logger;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
@ -65,25 +63,11 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
@Override
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
break;
case OPEN_INDEX_CREATE_TRANSLOG:
case OPEN_INDEX_AND_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
keepOnlyStartingCommitOnInit(commits);
// OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
// We therefore should not use that index commit to update the translog deletion policy.
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
updateTranslogDeletionPolicy();
}
break;
default:
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
}
assert commits.isEmpty() == false : "index is opened, but we have no commits";
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
keepOnlyStartingCommitOnInit(commits);
updateTranslogDeletionPolicy();
}
/**

View File

@ -75,7 +75,6 @@ public final class EngineConfig {
private final List<ReferenceManager.RefreshListener> internalRefreshListener;
@Nullable
private final Sort indexSort;
private final boolean forceNewHistoryUUID;
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
@ -113,24 +112,20 @@ public final class EngineConfig {
Property.IndexScope, Property.Dynamic);
private final TranslogConfig translogConfig;
private final OpenMode openMode;
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, ThreadPool threadPool,
public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
@ -151,8 +146,6 @@ public final class EngineConfig {
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
this.flushMergesAfter = flushMergesAfter;
this.openMode = openMode;
this.forceNewHistoryUUID = forceNewHistoryUUID;
this.externalRefreshListener = externalRefreshListener;
this.internalRefreshListener = internalRefreshListener;
this.indexSort = indexSort;
@ -315,22 +308,6 @@ public final class EngineConfig {
*/
public TimeValue getFlushMergesAfter() { return flushMergesAfter; }
/**
* Returns the {@link OpenMode} for this engine config.
*/
public OpenMode getOpenMode() {
return openMode;
}
/**
* Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing
* one is found.
*/
public boolean getForceNewHistoryUUID() {
return forceNewHistoryUUID;
}
@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
@ -343,20 +320,6 @@ public final class EngineConfig {
return translogRecoveryRunner;
}
/**
* Engine open mode defines how the engine should be opened or in other words what the engine should expect
* to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index.
* If the index exists we also have the ability open only the index and create a new transaction log which happens
* during remote recovery since we have already transferred the index files but the translog is replayed from remote. The last
* and safest option opens the lucene index as well as it's referenced transaction log for a translog recovery.
* See also {@link Engine#recoverFromTranslog()}
*/
public enum OpenMode {
CREATE_INDEX_AND_TRANSLOG,
OPEN_INDEX_CREATE_TRANSLOG,
OPEN_INDEX_AND_TRANSLOG;
}
/**
* The refresh listeners to add to Lucene for externally visible refreshes
*/

View File

@ -0,0 +1,144 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class contains utility methods for mutating the shard lucene index and translog as a preparation to be opened.
*/
public abstract class EngineDiskUtils {
/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
public static void createEmpty(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException {
try (IndexWriter writer = newIndexWriter(true, dir)) {
final String translogUuid = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1");
updateCommitData(writer, map);
}
}
/**
* Converts an existing lucene index and marks it with a new history uuid. Also creates a new empty translog file.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
*/
public static void bootstrapNewHistoryFromLuceneIndex(final Directory dir, final Path translogPath, final ShardId shardId)
throws IOException {
try (IndexWriter writer = newIndexWriter(false, dir)) {
final Map<String, String> userData = getUserData(writer);
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUuid = Translog.createEmptyTranslog(translogPath, maxSeqNo, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
updateCommitData(writer, map);
}
}
/**
* Creates a new empty translog and associates it with an existing lucene index.
*/
public static void createNewTranslog(final Directory dir, final Path translogPath, long initialGlobalCheckpoint, final ShardId shardId)
throws IOException {
if (Assertions.ENABLED) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(dir);
assert existingCommits.size() == 1 : "creating a translog translog should have one commit, commits[" + existingCommits + "]";
SequenceNumbers.CommitInfo commitInfo = Store.loadSeqNoInfo(existingCommits.get(0));
assert commitInfo.localCheckpoint >= initialGlobalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+ initialGlobalCheckpoint + "]";
}
try (IndexWriter writer = newIndexWriter(false, dir)) {
final String translogUuid = Translog.createEmptyTranslog(translogPath, initialGlobalCheckpoint, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
updateCommitData(writer, map);
}
}
/**
* Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed.
*/
public static void ensureIndexHasHistoryUUID(final Directory dir) throws IOException {
try (IndexWriter writer = newIndexWriter(false, dir)) {
final Map<String, String> userData = getUserData(writer);
if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) {
updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()));
}
}
}
private static void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
final Map<String, String> userData = getUserData(writer);
userData.putAll(keysToUpdate);
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
private static Map<String, String> getUserData(IndexWriter writer) {
final Map<String, String> userData = new HashMap<>();
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
return userData;
}
private static IndexWriter newIndexWriter(final boolean create, final Directory dir) throws IOException {
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
return new IndexWriter(dir, iwc);
}
}

View File

@ -49,7 +49,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.LoggerInfoStream;
@ -72,6 +71,7 @@ import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
@ -133,7 +133,6 @@ public class InternalEngine extends Engine {
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
@ -157,7 +156,6 @@ public class InternalEngine extends Engine {
final EngineConfig engineConfig,
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
@ -183,22 +181,15 @@ public class InternalEngine extends Engine {
assert translog.getGeneration() != null;
this.translog = translog;
final IndexCommit startingCommit = getStartingCommitPoint();
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null :
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
assert startingCommit != null : "Starting commit should be non-null";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, logger, translogDeletionPolicy,
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
writer = createWriter(startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
|| openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
: "OpenMode must be either CREATE_INDEX_AND_TRANSLOG or OPEN_INDEX_CREATE_TRANSLOG if forceNewHistoryUUID; " +
"openMode [" + openMode + "], forceNewHistoryUUID [" + engineConfig.getForceNewHistoryUUID() + "]";
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
historyUUID = loadOrGenerateHistoryUUID(writer);
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
updateWriterOnOpen();
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
} catch (AssertionError e) {
@ -217,7 +208,7 @@ public class InternalEngine extends Engine {
internalSearcherManager.addListener(versionMap);
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
// don't allow commits until we are done with recovering
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
pendingTranslogRecovery.set(true);
for (ReferenceManager.RefreshListener listener: engineConfig.getExternalRefreshListener()) {
this.externalSearcherManager.addListener(listener);
}
@ -241,20 +232,10 @@ public class InternalEngine extends Engine {
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
break;
case OPEN_INDEX_AND_TRANSLOG:
case OPEN_INDEX_CREATE_TRANSLOG:
final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo(startingCommit);
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
break;
default: throw new IllegalArgumentException("unknown type: " + openMode);
}
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(startingCommit);
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}
@ -380,9 +361,6 @@ public class InternalEngine extends Engine {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't recover from translog with open mode: " + openMode);
}
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
@ -405,50 +383,31 @@ public class InternalEngine extends Engine {
@Override
public void skipTranslogRecovery() {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode);
}
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
}
private IndexCommit getStartingCommitPoint() throws IOException {
final IndexCommit startingIndexCommit;
final List<IndexCommit> existingCommits;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
startingIndexCommit = null;
break;
case OPEN_INDEX_CREATE_TRANSLOG:
// Use the last commit
existingCommits = DirectoryReader.listCommits(store.directory());
startingIndexCommit = existingCommits.get(existingCommits.size() - 1);
break;
case OPEN_INDEX_AND_TRANSLOG:
// Use the safe commit
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
break;
default:
throw new IllegalArgumentException("unknown mode: " + openMode);
}
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
return startingIndexCommit;
}
@ -469,58 +428,20 @@ public class InternalEngine extends Engine {
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
flush(true, true);
refresh("translog_recovery");
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
commitIndexWriter(indexWriter, translog, null);
refreshLastCommittedSegmentInfos();
refresh("translog_recovery");
}
// clean up what's not needed
translog.trimUnreferencedReaders();
}
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
assert openMode != null;
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
case OPEN_INDEX_CREATE_TRANSLOG:
translogUUID =
Translog.createEmptyTranslog(translogConfig.getTranslogPath(), globalCheckpointSupplier.getAsLong(), shardId);
break;
case OPEN_INDEX_AND_TRANSLOG:
translogUUID = loadTranslogUUIDFromLastCommit();
break;
default:
throw new AssertionError("Unknown openMode " + openMode);
}
final String translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
}
/** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */
private void updateWriterOnOpen() throws IOException {
Objects.requireNonNull(historyUUID);
final Map<String, String> commitUserData = commitDataAsMap(indexWriter);
boolean needsCommit = false;
if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) {
needsCommit = true;
} else {
assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change";
assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid";
}
if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
needsCommit = true;
} else {
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode;
}
if (needsCommit) {
commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? commitUserData.get(SYNC_COMMIT_ID) : null);
}
}
@Override
public Translog getTranslog() {
ensureOpen();
@ -564,31 +485,20 @@ public class InternalEngine extends Engine {
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() throws IOException {
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG :
"Only reuse existing translogUUID with OPEN_INDEX_AND_TRANSLOG; openMode = [" + openMode + "]";
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY)) {
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
} else {
return null;
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}
/**
* Reads the current stored history ID from the IW commit data. Generates a new UUID if not found or if generation is forced.
* Reads the current stored history ID from the IW commit data.
*/
private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean forceNew) throws IOException {
String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
if (uuid == null || forceNew) {
assert
forceNew || // recovery from a local store creates an index that doesn't have yet a history_uuid
openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG ||
config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
"existing index was created after 6_0_0_rc1 but has no history uuid";
uuid = UUIDs.randomBase64UUID();
private String loadOrGenerateHistoryUUID(final IndexWriter writer) throws IOException {
final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
return uuid;
}
@ -1530,6 +1440,8 @@ public class InternalEngine extends Engine {
// we need to refresh in order to clear older version values
refresh("version_table_flush", SearcherScope.INTERNAL);
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
throw e;
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
@ -1898,9 +1810,9 @@ public class InternalEngine extends Engine {
}
}
private IndexWriter createWriter(boolean create, IndexCommit startingCommit) throws IOException {
private IndexWriter createWriter(IndexCommit startingCommit) throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(create, startingCommit);
final IndexWriterConfig iwc = getIndexWriterConfig(startingCommit);
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
@ -1913,10 +1825,10 @@ public class InternalEngine extends Engine {
return new IndexWriter(directory, iwc);
}
private IndexWriterConfig getIndexWriterConfig(boolean create, IndexCommit startingCommit) {
private IndexWriterConfig getIndexWriterConfig(IndexCommit startingCommit) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream

View File

@ -1283,44 +1283,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return opsRecovered;
}
/** creates an empty index and translog and opens the engine **/
public void createIndexAndTranslog() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EMPTY_STORE;
assert shardRouting.primary() && shardRouting.isRelocationTarget() == false;
// note: these are set when recovering from the translog
final RecoveryState.Translog translogStats = recoveryState().getTranslog();
translogStats.totalOperations(0);
translogStats.totalOperationsOnStart(0);
replicationTracker.updateGlobalCheckpointOnReplica(SequenceNumbers.NO_OPS_PERFORMED, "index created");
innerOpenEngineAndTranslog(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, false);
}
/** opens the engine on top of the existing lucene engine but creates an empty translog **/
public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException {
if (Assertions.ENABLED) {
assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
assert commitInfo.localCheckpoint >= globalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+ globalCheckpoint + "]";
// This assertion is only guaranteed if all nodes are on 6.2+.
if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_6_2_0)) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]";
}
}
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog");
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID);
}
/**
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openIndexAndRecoveryFromTranslog() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
public void openEngineAndRecoverFromTranslog() throws IOException {
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog();
}
@ -1328,13 +1296,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Opens the engine on top of the existing lucene engine and translog.
* The translog is kept but its operations won't be replayed.
*/
public void openIndexAndSkipTranslogRecovery() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
public void openEngineAndSkipTranslogRecovery() throws IOException {
innerOpenEngineAndTranslog();
getEngine().skipTranslogRecovery();
}
private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
private void innerOpenEngineAndTranslog() throws IOException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
@ -1349,29 +1316,25 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
assert openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || assertMaxUnsafeAutoIdInCommit();
final EngineConfig config = newEngineConfig(openMode, forceNewHistoryUUID);
final EngineConfig config = newEngineConfig();
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
}
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
assertMaxUnsafeAutoIdInCommit();
createNewEngine(config);
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
}
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}
@ -1388,15 +1351,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0) &&
// TODO: LOCAL_SHARDS need to transfer this information
recoveryState().getRecoverySource().getType() != RecoverySource.Type.LOCAL_SHARDS) {
// as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point.
// This should have baked into the commit by the primary we recover from, regardless of the index age.
assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
"opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID
+ " is not found in commit";
}
assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
"opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID
+ " is not found in commit";
return true;
}
@ -2189,12 +2146,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return mapperService.documentMapperWithAutoCreate(type);
}
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) {
private EngineConfig newEngineConfig() {
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(),
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig,
indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineDiskUtils;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
@ -389,8 +390,8 @@ final class StoreRecovery {
recoveryState.getIndex().updateVersion(version);
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
indexShard.openIndexAndCreateTranslog(true, store.loadSeqNoInfo(null).localCheckpoint);
} else {
EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), indexShard.shardPath().resolveTranslog(), shardId);
} else if (indexShouldExists) {
// since we recover from local, just fill the files and size
try {
final RecoveryState.Index index = recoveryState.getIndex();
@ -400,13 +401,11 @@ final class StoreRecovery {
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
if (indexShouldExists) {
indexShard.openIndexAndRecoveryFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
} else {
indexShard.createIndexAndTranslog();
}
} else {
EngineDiskUtils.createEmpty(store.directory(), indexShard.shardPath().resolveTranslog(), shardId);
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
} catch (EngineException | IOException e) {
@ -446,16 +445,10 @@ final class StoreRecovery {
}
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
final Store store = indexShard.store();
final long localCheckpoint;
store.incRef();
try {
localCheckpoint = store.loadSeqNoInfo(null).localCheckpoint;
} finally {
store.decRef();
}
indexShard.openIndexAndCreateTranslog(true, localCheckpoint);
EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(indexShard.store().directory(), indexShard.shardPath().resolveTranslog(),
shardId);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");

View File

@ -218,8 +218,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* @return {@link SequenceNumbers.CommitInfo} containing information about the last commit
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public SequenceNumbers.CommitInfo loadSeqNoInfo(final IndexCommit commit) throws IOException {
final Map<String, String> userData = commit != null ? commit.getUserData() : SegmentInfos.readLatestCommit(directory).getUserData();
public static SequenceNumbers.CommitInfo loadSeqNoInfo(final IndexCommit commit) throws IOException {
final Map<String, String> userData = commit.getUserData();
return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
}

View File

@ -364,7 +364,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo(safeCommit);
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
if (logger.isTraceEnabled()) {
final StringJoiner descriptionOfExistingCommits = new StringJoiner(",");
for (IndexCommit commit : existingCommits) {
@ -406,7 +406,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.createNewTranslog(), request.totalTranslogOps());
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -33,13 +33,13 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;
private final boolean createNewTranslog;
private final boolean fileBasedRecovery;
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean createNewTranslog) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean fileBasedRecovery) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.createNewTranslog = createNewTranslog;
this.fileBasedRecovery = fileBasedRecovery;
}
RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
@ -51,9 +51,9 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
createNewTranslog = in.readBoolean();
fileBasedRecovery = in.readBoolean();
} else {
createNewTranslog = true;
fileBasedRecovery = true;
}
}
@ -70,10 +70,10 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
}
/**
* Whether or not the recover target should create a new local translog
* Whether or not the recovery is file based
*/
boolean createNewTranslog() {
return createNewTranslog;
public boolean isFileBasedRecovery() {
return fileBasedRecovery;
}
@Override
@ -86,7 +86,7 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeBoolean(createNewTranslog);
out.writeBoolean(fileBasedRecovery);
}
}
}

View File

@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
@ -40,6 +41,7 @@ import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineDiskUtils;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -362,14 +364,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
/*** Implementation of {@link RecoveryTargetHandler } */
@Override
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
if (createNewTranslog) {
// TODO: Assigns the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
} else {
indexShard().openIndexAndSkipTranslogRecovery();
}
indexShard().openEngineAndSkipTranslogRecovery();
}
@Override
@ -440,8 +437,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// to recover from in case of a full cluster shutdown just when this code executes...
renameAllTempFiles();
final Store store = store();
store.incRef();
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory());
}
// TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
EngineDiskUtils.createNewTranslog(store.directory(), indexShard.shardPath().resolveTranslog(),
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
@ -465,6 +469,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} finally {
store.decRef();
}
}

View File

@ -32,10 +32,10 @@ public interface RecoveryTargetHandler {
/**
* Prepares the target to receive translog operations, after all file have been copied
* @param createNewTranslog whether or not to delete the local translog on the target
* @param fileBasedRecovery whether or not this call is part of an file based recovery
* @param totalTranslogOps total translog operations expected to be sent
*/
void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException;
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException;
/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and

View File

@ -76,9 +76,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog),
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}

View File

@ -38,8 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.singletonList;
import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.doAnswer;
@ -54,8 +52,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
final LongArrayList maxSeqNoList = new LongArrayList();
final LongArrayList translogGenList = new LongArrayList();
@ -94,8 +91,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final AtomicLong globalCheckpoint = new AtomicLong();
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
long lastMaxSeqNo = between(1, 1000);
long lastTranslogGen = between(1, 20);
int safeIndex = 0;
@ -165,8 +161,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
long legacyTranslogGen = randomNonNegativeLong();
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
@ -199,8 +194,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_CREATE_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
final int invalidCommits = between(1, 10);
final List<IndexCommit> commitList = new ArrayList<>();
@ -237,8 +231,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong()));
}
final IndexCommit startingCommit = randomFrom(commitList);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, startingCommit);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, startingCommit);
indexPolicy.onInit(commitList);
for (IndexCommit commit : commitList) {
if (commit.equals(startingCommit) == false) {
@ -256,8 +249,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);

View File

@ -0,0 +1,207 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
public class EngineDiskUtilsTests extends EngineTestCase {
public void testHistoryUUIDIsSetIfMissing() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
engine.close();
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) {
Map<String, String> newCommitData = new HashMap<>();
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) {
newCommitData.put(entry.getKey(), entry.getValue());
}
}
writer.setLiveCommitData(newCommitData.entrySet());
writer.commit();
}
EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory());
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1)
.build());
EngineConfig config = engine.config();
EngineConfig newConfig = new EngineConfig(
shardId, allocationId.getId(),
threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED);
engine = new InternalEngine(newConfig);
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
assertThat(engine.getHistoryUUID(), notNullValue());
}
public void testCurrentTranslogIDisCommitted() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
// create
{
EngineDiskUtils.createEmpty(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId);
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
try (InternalEngine engine = createEngine(config)) {
engine.index(firstIndexRequest);
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
// open and recover tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
} else {
// creating an empty index will create the first translog gen and commit it
// opening the empty index will make the second translog file but not commit it
// opening the engine again (i=0) will make the third translog file, which then be committed
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
// open index with new tlog
{
EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED, shardId);
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().uncommittedOperations());
}
}
// open and recover tlog with empty tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
}
}
public void testHistoryUUIDCanBeForced() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
final String oldHistoryUUID = engine.getHistoryUUID();
engine.close();
EngineConfig config = engine.config();
EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId);
EngineConfig newConfig = new EngineConfig(
shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED);
engine = new InternalEngine(newConfig);
engine.recoverFromTranslog();
assertVisibleCount(engine, 0, false);
assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID)));
}
}

View File

@ -81,7 +81,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
@ -119,7 +118,6 @@ import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
@ -133,7 +131,6 @@ import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@ -149,6 +146,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToLongBiFunction;
@ -641,7 +639,7 @@ public class InternalEngineTests extends EngineTestCase {
InternalEngine engine = createEngine(store, translog);
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
engine = new InternalEngine(engine.config());
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
@ -656,7 +654,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
engine = new InternalEngine(engine.config());
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
@ -690,7 +688,7 @@ public class InternalEngineTests extends EngineTestCase {
Engine recoveringEngine = null;
try {
recoveringEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine = new InternalEngine(engine.config());
recoveringEngine.recoverFromTranslog();
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
@ -718,20 +716,19 @@ public class InternalEngineTests extends EngineTestCase {
Engine recoveringEngine = null;
try {
final AtomicBoolean flushed = new AtomicBoolean();
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
final AtomicBoolean committed = new AtomicBoolean();
recoveringEngine = new InternalEngine(initialEngine.config()) {
@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
assertThat(getTranslog().uncommittedOperations(), equalTo(docs));
final CommitId commitId = super.flush(force, waitIfOngoing);
flushed.set(true);
return commitId;
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
committed.set(true);
super.commitIndexWriter(writer, translog, syncId);
}
};
assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs));
recoveringEngine.recoverFromTranslog();
assertTrue(flushed.get());
assertTrue(committed.get());
} finally {
IOUtils.close(recoveringEngine);
}
@ -762,7 +759,7 @@ public class InternalEngineTests extends EngineTestCase {
}
}
initialEngine.close();
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.recoverFromTranslog();
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
@ -1009,9 +1006,9 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(engine, store);
final Path translogPath = createTempDir();
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get();
engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier));
engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier));
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
boolean inSync = randomBoolean();
@ -1021,17 +1018,17 @@ public class InternalEngineTests extends EngineTestCase {
engine.flush();
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 2L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L));
engine.flush();
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 2L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L));
engine.flush(true, true);
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 2L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
@ -1043,7 +1040,7 @@ public class InternalEngineTests extends EngineTestCase {
public void testSyncedFlush() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) {
Engine engine = createEngine(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
@ -1069,8 +1066,8 @@ public class InternalEngineTests extends EngineTestCase {
final int iters = randomIntBetween(2, 5); // run this a couple of times to get some coverage
for (int i = 0; i < iters; i++) {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogDocMergePolicy(), null))) {
InternalEngine engine =
createEngine(config(defaultSettings, store, createTempDir(), new LogDocMergePolicy(), null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
Engine.Index doc1 = indexForDoc(testParsedDocument("1", null, testDocumentWithTextField(), B_1, null));
engine.index(doc1);
@ -1125,7 +1122,7 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testSyncedFlushSurvivesEngineRestart() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
IOUtils.close(store, engine);
store = createStore();
engine = createEngine(store, primaryTranslogDir, globalCheckpoint::get);
@ -1144,12 +1141,13 @@ public class InternalEngineTests extends EngineTestCase {
} else {
engine.flushAndClose();
}
engine = new InternalEngine(copy(config, randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)));
if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG && randomBoolean()) {
engine.recoverFromTranslog();
if (randomBoolean()) {
EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
}
assertEquals(engine.config().getOpenMode().toString(), engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
engine = new InternalEngine(config);
engine.recoverFromTranslog();
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
}
public void testSyncedFlushVanishesOnReplay() throws IOException {
@ -1165,7 +1163,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
EngineConfig config = engine.config();
engine.close();
engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
engine = new InternalEngine(config);
engine.recoverFromTranslog();
assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
}
@ -1270,7 +1268,7 @@ public class InternalEngineTests extends EngineTestCase {
public void testForceMerge() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
Engine engine = createEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
@ -2051,7 +2049,7 @@ public class InternalEngineTests extends EngineTestCase {
InternalEngine recoveringEngine = null;
try {
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.recoverFromTranslog();
assertEquals(primarySeqNo, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
@ -2078,10 +2076,9 @@ public class InternalEngineTests extends EngineTestCase {
// this test writes documents to the engine while concurrently flushing/commit
// and ensuring that the commit points contain the correct sequence number data
public void testConcurrentWritesAndCommits() throws Exception {
List<Engine.IndexCommitRef> commits = new ArrayList<>();
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
final List<Engine.IndexCommitRef> commits = new ArrayList<>();
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
final int numIndexingThreads = scaledRandomIntBetween(2, 4);
final int numDocsPerThread = randomIntBetween(500, 1000);
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
@ -2243,7 +2240,7 @@ public class InternalEngineTests extends EngineTestCase {
public void testEnableGcDeletes() throws Exception {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
Engine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
engine.config().setEnableGcDeletes(false);
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
@ -2326,7 +2323,7 @@ public class InternalEngineTests extends EngineTestCase {
InternalEngine holder;
try {
holder = createEngine(store, translogPath);
} catch (EngineCreationFailureException ex) {
} catch (EngineCreationFailureException | IOException ex) {
assertEquals(store.refCount(), refCount);
continue;
}
@ -2372,9 +2369,9 @@ public class InternalEngineTests extends EngineTestCase {
} catch (EngineCreationFailureException ex) {
// expected
}
// now it should be OK.
EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null),
EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
// when a new translog is created it should be ok
EngineDiskUtils.createNewTranslog(store.directory(), primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null);
engine = new InternalEngine(config);
}
@ -2421,21 +2418,6 @@ public class InternalEngineTests extends EngineTestCase {
}
}
private static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException {
assertVisibleCount(engine, numDocs, true);
}
private static void assertVisibleCount(InternalEngine engine, int numDocs, boolean refresh) throws IOException {
if (refresh) {
engine.refresh("test");
}
try (Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new MatchAllDocsQuery(), collector);
assertThat(collector.getTotalHits(), equalTo(numDocs));
}
}
public void testTranslogCleanUpPostCommitCrash() throws Exception {
IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(),
defaultSettings.getScopedSettings());
@ -2449,8 +2431,9 @@ public class InternalEngineTests extends EngineTestCase {
try (Store store = createStore()) {
AtomicBoolean throwErrorOnCommit = new AtomicBoolean();
final Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get();
EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId);
try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier)) {
@ -2463,6 +2446,7 @@ public class InternalEngineTests extends EngineTestCase {
}
}
}) {
engine.recoverFromTranslog();
final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc1));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
@ -2495,7 +2479,8 @@ public class InternalEngineTests extends EngineTestCase {
}
assertVisibleCount(engine, numDocs);
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG));
engine = new InternalEngine(engine.config());
engine.skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits, equalTo(0L));
@ -2535,7 +2520,7 @@ public class InternalEngineTests extends EngineTestCase {
parser.mappingUpdate = dynamicUpdate();
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
@ -2622,10 +2607,10 @@ public class InternalEngineTests extends EngineTestCase {
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(),
BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(),
EngineConfig brokenConfig = new EngineConfig(shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
try {
@ -2638,94 +2623,6 @@ public class InternalEngineTests extends EngineTestCase {
assertVisibleCount(engine, numDocs, false);
}
public void testHistoryUUIDIsSetIfMissing() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
engine.close();
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) {
Map<String, String> newCommitData = new HashMap<>();
for (Map.Entry<String, String> entry: writer.getLiveCommitData()) {
if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) {
newCommitData.put(entry.getKey(), entry.getValue());
}
}
writer.setLiveCommitData(newCommitData.entrySet());
writer.commit();
}
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1)
.build());
EngineConfig config = engine.config();
EngineConfig newConfig = new EngineConfig(
randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG,
shardId, allocationId.getId(),
threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
engine = new InternalEngine(newConfig);
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
} else {
assertVisibleCount(engine, 0, false);
}
assertThat(engine.getHistoryUUID(), notNullValue());
}
public void testHistoryUUIDCanBeForced() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
final String oldHistoryUUID = engine.getHistoryUUID();
engine.close();
EngineConfig config = engine.config();
EngineConfig newConfig = new EngineConfig(
randomBoolean() ? EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG,
shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
if (newConfig.getOpenMode() == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG) {
Lucene.cleanLuceneIndex(store.directory());
}
engine = new InternalEngine(newConfig);
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
} else {
assertVisibleCount(engine, 0, false);
}
assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID)));
}
public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
AtomicReference<Exception> exception = new AtomicReference<>();
String operation = randomFrom("optimize", "refresh", "flush");
@ -2818,74 +2715,6 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testCurrentTranslogIDisCommitted() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
// create
{
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
assertFalse(engine.isRecovering());
engine.index(firstIndexRequest);
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
// open and recover tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
} else {
assertEquals("4", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("4", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
// open index with new tlog
{
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().uncommittedOperations());
}
}
// open and recover tlog with empty tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "2", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
}
}
private static class ThrowingIndexWriter extends IndexWriter {
private AtomicReference<Supplier<Exception>> failureToThrow = new AtomicReference<>();
@ -3367,21 +3196,22 @@ public class InternalEngineTests extends EngineTestCase {
public void testEngineMaxTimestampIsInitialized() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final long timestamp1 = Math.abs(randomNonNegativeLong());
final Path storeDir = createTempDir();
final Path translogDir = createTempDir();
final long timestamp2 = randomNonNegativeLong();
final long maxTimestamp12 = Math.max(timestamp1, timestamp2);
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) {
final Function<Store, EngineConfig> configSupplier =
store -> config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = createEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(appendOnlyPrimary(doc, true, timestamp1));
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) {
try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.recoverFromTranslog();
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
@ -3389,13 +3219,16 @@ public class InternalEngineTests extends EngineTestCase {
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(appendOnlyPrimary(doc, true, timestamp2));
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
globalCheckpoint.set(1); // make sure flush cleans up commits for later.
engine.flush();
}
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null),
randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
try (Store store = createStore(newFSDirectory(storeDir))) {
if (randomBoolean() || true) {
EngineDiskUtils.createNewTranslog(store.directory(), translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId);
}
try (Engine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
}
}
@ -3491,7 +3324,7 @@ public class InternalEngineTests extends EngineTestCase {
}
}
});
InternalEngine internalEngine = new InternalEngine(config);
InternalEngine internalEngine = createEngine(config);
int docId = 0;
final ParsedDocument doc = testParsedDocument(Integer.toString(docId), null,
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
@ -3662,53 +3495,13 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(initialEngine);
}
try (Engine recoveringEngine =
new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
}
}
public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws IOException {
final long v = 1;
final VersionType t = VersionType.EXTERNAL;
final long ts = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
final int docs = randomIntBetween(1, 32);
InternalEngine initialEngine = null;
try {
initialEngine = engine;
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
final Term uid = newUid(doc);
// create a gap at sequence number 3 * i + 1
initialEngine.index(new Engine.Index(uid, doc, 3 * i, 1, v, t, REPLICA, System.nanoTime(), ts, false));
initialEngine.delete(new Engine.Delete("type", id, uid, 3 * i + 2, 1, v, t, REPLICA, System.nanoTime()));
}
// bake the commit with the local checkpoint stuck at 0 and gaps all along the way up to the max sequence number
assertThat(initialEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) 0));
assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo((long) (3 * (docs - 1) + 2)));
initialEngine.flush(true, true);
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
final Term uid = newUid(doc);
initialEngine.index(new Engine.Index(uid, doc, 3 * i + 1, 1, v, t, REPLICA, System.nanoTime(), ts, false));
}
} finally {
IOUtils.close(initialEngine);
}
try (Engine recoveringEngine =
new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.fillSeqNoGaps(1);
assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1)));
}
}
/** java docs */
public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOException {
@ -3803,7 +3596,7 @@ public class InternalEngineTests extends EngineTestCase {
final BiFunction<Long, Long, LocalCheckpointTracker> supplier = (ms, lcp) -> new LocalCheckpointTracker(
maxSeqNo,
localCheckpoint);
noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier) {
noOpEngine = new InternalEngine(engine.config(), supplier) {
@Override
protected long doGenerateSeqNoForOperation(Operation operation) {
throw new UnsupportedOperationException();
@ -3950,7 +3743,7 @@ public class InternalEngineTests extends EngineTestCase {
completedSeqNos.add(seqNo);
}
};
actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier);
actualEngine = new InternalEngine(engine.config(), supplier);
final int operations = randomIntBetween(0, 1024);
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
for (int i = 0; i < operations; i++) {
@ -4013,15 +3806,14 @@ public class InternalEngineTests extends EngineTestCase {
boolean flushed = false;
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Engine recoveringEngine = null;
try {
assertEquals(docs - 1, engine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint());
recoveringEngine = new InternalEngine(copy(
replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
@ -4054,8 +3846,7 @@ public class InternalEngineTests extends EngineTestCase {
// now do it again to make sure we preserve values etc.
try {
recoveringEngine = new InternalEngine(
copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
if (flushed) {
assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations());
}
@ -4234,10 +4025,11 @@ public class InternalEngineTests extends EngineTestCase {
final Path translogPath = createTempDir();
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null,
() -> globalCheckpoint.get());
EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId);
try (Engine engine = new InternalEngine(engineConfig) {
@Override
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
@ -4249,6 +4041,7 @@ public class InternalEngineTests extends EngineTestCase {
super.commitIndexWriter(writer, translog, syncId);
}
}) {
engine.recoverFromTranslog();
int numDocs = scaledRandomIntBetween(10, 100);
final String translogUUID = engine.getTranslog().getTranslogUUID();
for (int docId = 0; docId < numDocs; docId++) {
@ -4340,7 +4133,7 @@ public class InternalEngineTests extends EngineTestCase {
public void testAcquireIndexCommit() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
int numDocs = between(1, 20);
for (int i = 0; i < numDocs; i++) {
@ -4377,10 +4170,10 @@ public class InternalEngineTests extends EngineTestCase {
public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception {
IOUtils.close(engine);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final EngineConfig config = copy(engine.config(), globalCheckpoint::get);
final IndexCommit safeCommit;
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
try (InternalEngine engine = createEngine(config)) {
final int numDocs = between(5, 50);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
@ -4394,44 +4187,16 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
engine.getTranslog().sync();
}
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
try (InternalEngine engine = new InternalEngine(config)) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(engine.store.directory());
assertThat("OPEN_INDEX_AND_TRANSLOG should keep only safe commit", existingCommits, contains(safeCommit));
}
}
public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
IOUtils.close(engine);
final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
final Map<String, String> lastCommit;
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
engine.skipTranslogRecovery();
final int numDocs = between(5, 50);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
if (randomBoolean()) {
engine.flush();
}
}
final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
lastCommit = commits.get(commits.size() - 1).getUserData();
}
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(engine.store.directory());
assertThat("OPEN_INDEX_CREATE_TRANSLOG should keep only last commit", existingCommits, hasSize(1));
final Map<String, String> userData = existingCommits.get(0).getUserData();
assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(lastCommit.get(SequenceNumbers.MAX_SEQ_NO)));
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(lastCommit.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
// Translog tags should be fresh.
assertThat(userData.get(Translog.TRANSLOG_UUID_KEY), not(equalTo(lastCommit.get(Translog.TRANSLOG_UUID_KEY))));
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("2"));
assertThat("safe commit should be kept", existingCommits, contains(safeCommit));
}
}
public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
@ -4456,7 +4221,7 @@ public class InternalEngineTests extends EngineTestCase {
public void testCleanupCommitsWhenReleaseSnapshot() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {

View File

@ -101,8 +101,8 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -333,7 +333,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
assertFalse(shard.shouldPeriodicallyFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(117 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
new ByteSizeValue(160 /* size of the operation + two generations header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0")
.setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertFalse(shard.shouldPeriodicallyFlush());
@ -407,15 +407,15 @@ public class IndexShardIT extends ESSingleNodeTestCase {
IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldPeriodicallyFlush());
final String key;
final boolean flush = randomBoolean();
final Settings settings;
if (flush) {
key = "index.translog.flush_threshold_size";
// size of the operation plus two generations of overhead.
settings = Settings.builder().put("index.translog.flush_threshold_size", "180b").build();
} else {
key = "index.translog.generation_threshold_size";
// size of the operation plus header and footer
settings = Settings.builder().put("index.translog.generation_threshold_size", "117b").build();
}
// size of the operation plus header and footer
final Settings settings = Settings.builder().put(key, "117b").build();
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
client().prepareIndex("test", "test", "0")
.setSource("{}", XContentType.JSON)

View File

@ -2111,7 +2111,7 @@ public class IndexShardTests extends IndexShardTestCase {
shard.prepareForIndexRecovery();
// Shard is still inactive since we haven't started recovering yet
assertFalse(shard.isActive());
shard.openIndexAndRecoveryFromTranslog();
shard.openEngineAndRecoverFromTranslog();
// Shard should now be active since we did recover:
assertTrue(shard.isActive());
closeShards(shard);
@ -2138,14 +2138,6 @@ public class IndexShardTests extends IndexShardTestCase {
recoverReplica(replica, primary, (shard, discoveryNode) ->
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
@Override
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
super.prepareForTranslogOperations(createNewTranslog, totalTranslogOps);
// Shard is still inactive since we haven't started recovering yet
assertFalse(replica.isActive());
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps);
@ -2188,8 +2180,8 @@ public class IndexShardTests extends IndexShardTestCase {
}) {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
super.prepareForTranslogOperations(createNewTranslog, totalTranslogOps);
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps);
assertListenerCalled.accept(replica);
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineDiskUtils;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
@ -56,9 +57,9 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.junit.After;
import org.junit.Before;
@ -120,13 +121,14 @@ public class RefreshListenersTests extends ESTestCase {
// we don't need to notify anybody in this test
}
};
EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool,
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, null,
new NoneCircuitBreakerService(),
() -> SequenceNumbers.UNASSIGNED_SEQ_NO);
EngineDiskUtils.createEmpty(store.directory(), translogConfig.getTranslogPath(), shardId);
EngineConfig config = new EngineConfig(shardId, allocationId, threadPool,
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null,
(e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED);
engine = new InternalEngine(config);
engine.recoverFromTranslog();
listeners.setTranslog(engine.getTranslog());
}

View File

@ -31,8 +31,10 @@ import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
@ -89,6 +91,7 @@ import java.util.function.ToLongBiFunction;
import static java.util.Collections.emptyList;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
public abstract class EngineTestCase extends ESTestCase {
@ -109,6 +112,21 @@ public abstract class EngineTestCase extends ESTestCase {
protected Path primaryTranslogDir;
protected Path replicaTranslogDir;
protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
assertVisibleCount(engine, numDocs, true);
}
protected static void assertVisibleCount(Engine engine, int numDocs, boolean refresh) throws IOException {
if (refresh) {
engine.refresh("test");
}
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new MatchAllDocsQuery(), collector);
assertThat(collector.getTotalHits(), equalTo(numDocs));
}
}
@Override
@Before
public void setUp() throws Exception {
@ -155,24 +173,20 @@ public abstract class EngineTestCase extends ESTestCase {
}
}
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) {
return copy(config, openMode, config.getAnalyzer());
}
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, LongSupplier globalCheckpointSupplier) {
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSupplier) {
return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
config.getCircuitBreakerService(), globalCheckpointSupplier);
}
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier());
}
@ -253,9 +267,9 @@ public abstract class EngineTestCase extends ESTestCase {
protected Translog createTranslog(Path translogPath) throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
final String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId);
return new Translog(translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS),
() -> SequenceNumbers.NO_OPS_PERFORMED);
String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId);
return new Translog(
translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED);
}
protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
@ -338,10 +352,23 @@ public abstract class EngineTestCase extends ESTestCase {
@Nullable Sort indexSort,
@Nullable LongSupplier globalCheckpointSupplier) throws IOException {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort, globalCheckpointSupplier);
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog();
return createEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
}
protected InternalEngine createEngine(EngineConfig config) throws IOException {
return createEngine(null, null, null, config);
}
private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFactory,
@Nullable BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier,
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
EngineConfig config) throws IOException {
final Directory directory = config.getStore().directory();
if (Lucene.indexExists(directory) == false) {
EngineDiskUtils.createEmpty(directory, config.getTranslogConfig().getTranslogPath(), config.getShardId());
}
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
internalEngine.recoverFromTranslog();
return internalEngine;
}
@ -394,23 +421,13 @@ public abstract class EngineTestCase extends ESTestCase {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, () -> SequenceNumbers.NO_OPS_PERFORMED);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
final EngineConfig.OpenMode openMode;
try {
if (Lucene.indexExists(store.directory()) == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
} catch (IOException e) {
throw new ElasticsearchException("can't find index?", e);
}
Engine.EventListener listener = new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, @Nullable Exception e) {
@ -421,14 +438,14 @@ public abstract class EngineTestCase extends ESTestCase {
indexSettings.getSettings()));
final List<ReferenceManager.RefreshListener> refreshListenerList =
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler,
new NoneCircuitBreakerService(),
globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings,
SequenceNumbers.UNASSIGNED_SEQ_NO) : globalCheckpointSupplier);
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) :
globalCheckpointSupplier);
return config;
}