diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index e172b53f1a8..91fac0908ef 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -93,12 +93,12 @@ which returns something similar to: { "commit" : { "id" : "3M3zkw2GHMo2Y4h4/KFKCg==", - "generation" : 2, + "generation" : 3, "user_data" : { "translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA", "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ", "local_checkpoint" : "-1", - "translog_generation" : "2", + "translog_generation" : "3", "max_seq_no" : "-1", "sync_id" : "AVvFY-071siAOuFGEO9P", <1> "max_unsafe_auto_id_timestamp" : "-1" diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index f26a24c47a6..6f06c310e4c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -46,16 +46,14 @@ import java.util.function.LongSupplier; public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; - private final EngineConfig.OpenMode openMode; private final LongSupplier globalCheckpointSupplier; private final IndexCommit startingCommit; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point - CombinedDeletionPolicy(EngineConfig.OpenMode openMode, Logger logger, TranslogDeletionPolicy translogDeletionPolicy, + CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) { - this.openMode = openMode; this.logger = logger; this.translogDeletionPolicy = translogDeletionPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; @@ -65,25 +63,11 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { @Override public synchronized void onInit(List commits) throws IOException { - switch (openMode) { - case CREATE_INDEX_AND_TRANSLOG: - assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]"; - break; - case OPEN_INDEX_CREATE_TRANSLOG: - case OPEN_INDEX_AND_TRANSLOG: - assert commits.isEmpty() == false : "index is opened, but we have no commits"; - assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; " - + "startingCommit [" + startingCommit + "], commit list [" + commits + "]"; - keepOnlyStartingCommitOnInit(commits); - // OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history, - // We therefore should not use that index commit to update the translog deletion policy. - if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - updateTranslogDeletionPolicy(); - } - break; - default: - throw new IllegalArgumentException("unknown openMode [" + openMode + "]"); - } + assert commits.isEmpty() == false : "index is opened, but we have no commits"; + assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; " + + "startingCommit [" + startingCommit + "], commit list [" + commits + "]"; + keepOnlyStartingCommitOnInit(commits); + updateTranslogDeletionPolicy(); } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 30743c18cfe..352c3ba3e62 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -75,7 +75,6 @@ public final class EngineConfig { private final List internalRefreshListener; @Nullable private final Sort indexSort; - private final boolean forceNewHistoryUUID; private final TranslogRecoveryRunner translogRecoveryRunner; @Nullable private final CircuitBreakerService circuitBreakerService; @@ -113,24 +112,20 @@ public final class EngineConfig { Property.IndexScope, Property.Dynamic); private final TranslogConfig translogConfig; - private final OpenMode openMode; /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ - public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, ThreadPool threadPool, + public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool, IndexSettings indexSettings, Engine.Warmer warmer, Store store, MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter, + TranslogConfig translogConfig, TimeValue flushMergesAfter, List externalRefreshListener, List internalRefreshListener, Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier) { - if (openMode == null) { - throw new IllegalArgumentException("openMode must not be null"); - } this.shardId = shardId; this.allocationId = allocationId; this.indexSettings = indexSettings; @@ -151,8 +146,6 @@ public final class EngineConfig { this.queryCachingPolicy = queryCachingPolicy; this.translogConfig = translogConfig; this.flushMergesAfter = flushMergesAfter; - this.openMode = openMode; - this.forceNewHistoryUUID = forceNewHistoryUUID; this.externalRefreshListener = externalRefreshListener; this.internalRefreshListener = internalRefreshListener; this.indexSort = indexSort; @@ -315,22 +308,6 @@ public final class EngineConfig { */ public TimeValue getFlushMergesAfter() { return flushMergesAfter; } - /** - * Returns the {@link OpenMode} for this engine config. - */ - public OpenMode getOpenMode() { - return openMode; - } - - - /** - * Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing - * one is found. - */ - public boolean getForceNewHistoryUUID() { - return forceNewHistoryUUID; - } - @FunctionalInterface public interface TranslogRecoveryRunner { int run(Engine engine, Translog.Snapshot snapshot) throws IOException; @@ -343,20 +320,6 @@ public final class EngineConfig { return translogRecoveryRunner; } - /** - * Engine open mode defines how the engine should be opened or in other words what the engine should expect - * to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index. - * If the index exists we also have the ability open only the index and create a new transaction log which happens - * during remote recovery since we have already transferred the index files but the translog is replayed from remote. The last - * and safest option opens the lucene index as well as it's referenced transaction log for a translog recovery. - * See also {@link Engine#recoverFromTranslog()} - */ - public enum OpenMode { - CREATE_INDEX_AND_TRANSLOG, - OPEN_INDEX_CREATE_TRANSLOG, - OPEN_INDEX_AND_TRANSLOG; - } - /** * The refresh listeners to add to Lucene for externally visible refreshes */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java b/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java new file mode 100644 index 00000000000..f7f3aa8e9fe --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.store.Directory; +import org.elasticsearch.Assertions; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * This class contains utility methods for mutating the shard lucene index and translog as a preparation to be opened. + */ +public abstract class EngineDiskUtils { + + /** + * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. + */ + public static void createEmpty(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException { + try (IndexWriter writer = newIndexWriter(true, dir)) { + final String translogUuid = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId); + final Map map = new HashMap<>(); + map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); + map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); + map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); + map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); + map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1"); + updateCommitData(writer, map); + } + } + + + /** + * Converts an existing lucene index and marks it with a new history uuid. Also creates a new empty translog file. + * This is used to make sure no existing shard will recovery from this index using ops based recovery. + */ + public static void bootstrapNewHistoryFromLuceneIndex(final Directory dir, final Path translogPath, final ShardId shardId) + throws IOException { + try (IndexWriter writer = newIndexWriter(false, dir)) { + final Map userData = getUserData(writer); + final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); + final String translogUuid = Translog.createEmptyTranslog(translogPath, maxSeqNo, shardId); + final Map map = new HashMap<>(); + map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); + map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); + map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); + updateCommitData(writer, map); + } + } + + /** + * Creates a new empty translog and associates it with an existing lucene index. + */ + public static void createNewTranslog(final Directory dir, final Path translogPath, long initialGlobalCheckpoint, final ShardId shardId) + throws IOException { + if (Assertions.ENABLED) { + final List existingCommits = DirectoryReader.listCommits(dir); + assert existingCommits.size() == 1 : "creating a translog translog should have one commit, commits[" + existingCommits + "]"; + SequenceNumbers.CommitInfo commitInfo = Store.loadSeqNoInfo(existingCommits.get(0)); + assert commitInfo.localCheckpoint >= initialGlobalCheckpoint : + "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint [" + + initialGlobalCheckpoint + "]"; + } + + try (IndexWriter writer = newIndexWriter(false, dir)) { + final String translogUuid = Translog.createEmptyTranslog(translogPath, initialGlobalCheckpoint, shardId); + final Map map = new HashMap<>(); + map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); + map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); + updateCommitData(writer, map); + } + } + + + /** + * Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed. + */ + public static void ensureIndexHasHistoryUUID(final Directory dir) throws IOException { + try (IndexWriter writer = newIndexWriter(false, dir)) { + final Map userData = getUserData(writer); + if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) { + updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID())); + } + } + } + + private static void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { + final Map userData = getUserData(writer); + userData.putAll(keysToUpdate); + writer.setLiveCommitData(userData.entrySet()); + writer.commit(); + } + + private static Map getUserData(IndexWriter writer) { + final Map userData = new HashMap<>(); + writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); + return userData; + } + + private static IndexWriter newIndexWriter(final boolean create, final Directory dir) throws IOException { + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); + return new IndexWriter(dir, iwc); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 4ac399eac90..49be68efcad 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -49,7 +49,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.LoggerInfoStream; @@ -72,6 +71,7 @@ import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; @@ -133,7 +133,6 @@ public class InternalEngine extends Engine { // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); - private final EngineConfig.OpenMode openMode; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); @@ -157,7 +156,6 @@ public class InternalEngine extends Engine { final EngineConfig engineConfig, final BiFunction localCheckpointTrackerSupplier) { super(engineConfig); - openMode = engineConfig.getOpenMode(); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } @@ -183,22 +181,15 @@ public class InternalEngine extends Engine { assert translog.getGeneration() != null; this.translog = translog; final IndexCommit startingCommit = getStartingCommitPoint(); - assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null : - "Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]"; + assert startingCommit != null : "Starting commit should be non-null"; this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit); - this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, logger, translogDeletionPolicy, + this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint, startingCommit); - writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit); + writer = createWriter(startingCommit); updateMaxUnsafeAutoIdTimestampFromWriter(writer); - assert engineConfig.getForceNewHistoryUUID() == false - || openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG - || openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG - : "OpenMode must be either CREATE_INDEX_AND_TRANSLOG or OPEN_INDEX_CREATE_TRANSLOG if forceNewHistoryUUID; " + - "openMode [" + openMode + "], forceNewHistoryUUID [" + engineConfig.getForceNewHistoryUUID() + "]"; - historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID()); + historyUUID = loadOrGenerateHistoryUUID(writer); Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; - updateWriterOnOpen(); } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); } catch (AssertionError e) { @@ -217,7 +208,7 @@ public class InternalEngine extends Engine { internalSearcherManager.addListener(versionMap); assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; // don't allow commits until we are done with recovering - pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); + pendingTranslogRecovery.set(true); for (ReferenceManager.RefreshListener listener: engineConfig.getExternalRefreshListener()) { this.externalSearcherManager.addListener(listener); } @@ -241,20 +232,10 @@ public class InternalEngine extends Engine { BiFunction localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException { final long maxSeqNo; final long localCheckpoint; - switch (openMode) { - case CREATE_INDEX_AND_TRANSLOG: - maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; - localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; - break; - case OPEN_INDEX_AND_TRANSLOG: - case OPEN_INDEX_CREATE_TRANSLOG: - final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo(startingCommit); - maxSeqNo = seqNoStats.maxSeqNo; - localCheckpoint = seqNoStats.localCheckpoint; - logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); - break; - default: throw new IllegalArgumentException("unknown type: " + openMode); - } + final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(startingCommit); + maxSeqNo = seqNoStats.maxSeqNo; + localCheckpoint = seqNoStats.localCheckpoint; + logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); } @@ -380,9 +361,6 @@ public class InternalEngine extends Engine { flushLock.lock(); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - throw new IllegalStateException("Can't recover from translog with open mode: " + openMode); - } if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } @@ -405,50 +383,31 @@ public class InternalEngine extends Engine { @Override public void skipTranslogRecovery() { - if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode); - } assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit } private IndexCommit getStartingCommitPoint() throws IOException { final IndexCommit startingIndexCommit; - final List existingCommits; - switch (openMode) { - case CREATE_INDEX_AND_TRANSLOG: - startingIndexCommit = null; - break; - case OPEN_INDEX_CREATE_TRANSLOG: - // Use the last commit - existingCommits = DirectoryReader.listCommits(store.directory()); - startingIndexCommit = existingCommits.get(existingCommits.size() - 1); - break; - case OPEN_INDEX_AND_TRANSLOG: - // Use the safe commit - final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); - final long minRetainedTranslogGen = translog.getMinFileGeneration(); - existingCommits = DirectoryReader.listCommits(store.directory()); - // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog - // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. - // To avoid this issue, we only select index commits whose translog are fully retained. - if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { - final List recoverableCommits = new ArrayList<>(); - for (IndexCommit commit : existingCommits) { - if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { - recoverableCommits.add(commit); - } - } - assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + - "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; - startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); - } else { - // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. - startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); + final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); + final long minRetainedTranslogGen = translog.getMinFileGeneration(); + final List existingCommits = DirectoryReader.listCommits(store.directory()); + // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog + // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. + // To avoid this issue, we only select index commits whose translog are fully retained. + if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { + final List recoverableCommits = new ArrayList<>(); + for (IndexCommit commit : existingCommits) { + if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { + recoverableCommits.add(commit); } - break; - default: - throw new IllegalArgumentException("unknown mode: " + openMode); + } + assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + + "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); + } else { + // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); } return startingIndexCommit; } @@ -469,58 +428,20 @@ public class InternalEngine extends Engine { if (opsRecovered > 0) { logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]", opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration()); - flush(true, true); - refresh("translog_recovery"); - } else if (translog.isCurrent(translogGeneration) == false) { - commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); + commitIndexWriter(indexWriter, translog, null); refreshLastCommittedSegmentInfos(); + refresh("translog_recovery"); } - // clean up what's not needed translog.trimUnreferencedReaders(); } private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException { - assert openMode != null; final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final String translogUUID; - switch (openMode) { - case CREATE_INDEX_AND_TRANSLOG: - case OPEN_INDEX_CREATE_TRANSLOG: - translogUUID = - Translog.createEmptyTranslog(translogConfig.getTranslogPath(), globalCheckpointSupplier.getAsLong(), shardId); - break; - case OPEN_INDEX_AND_TRANSLOG: - translogUUID = loadTranslogUUIDFromLastCommit(); - break; - default: - throw new AssertionError("Unknown openMode " + openMode); - } + final String translogUUID = loadTranslogUUIDFromLastCommit(); + // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); } - /** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */ - private void updateWriterOnOpen() throws IOException { - Objects.requireNonNull(historyUUID); - final Map commitUserData = commitDataAsMap(indexWriter); - boolean needsCommit = false; - if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) { - needsCommit = true; - } else { - assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change"; - assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid"; - } - if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) { - needsCommit = true; - } else { - assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode; - } - if (needsCommit) { - commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG - ? commitUserData.get(SYNC_COMMIT_ID) : null); - } - } - - @Override public Translog getTranslog() { ensureOpen(); @@ -564,31 +485,20 @@ public class InternalEngine extends Engine { */ @Nullable private String loadTranslogUUIDFromLastCommit() throws IOException { - assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : - "Only reuse existing translogUUID with OPEN_INDEX_AND_TRANSLOG; openMode = [" + openMode + "]"; final Map commitUserData = store.readLastCommittedSegmentsInfo().getUserData(); - if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY)) { - if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog generation id"); - } - return commitUserData.get(Translog.TRANSLOG_UUID_KEY); - } else { - return null; + if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { + throw new IllegalStateException("commit doesn't contain translog generation id"); } + return commitUserData.get(Translog.TRANSLOG_UUID_KEY); } /** - * Reads the current stored history ID from the IW commit data. Generates a new UUID if not found or if generation is forced. + * Reads the current stored history ID from the IW commit data. */ - private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean forceNew) throws IOException { - String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); - if (uuid == null || forceNew) { - assert - forceNew || // recovery from a local store creates an index that doesn't have yet a history_uuid - openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || - config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : - "existing index was created after 6_0_0_rc1 but has no history uuid"; - uuid = UUIDs.randomBase64UUID(); + private String loadOrGenerateHistoryUUID(final IndexWriter writer) throws IOException { + final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); + if (uuid == null) { + throw new IllegalStateException("commit doesn't contain history uuid"); } return uuid; } @@ -1530,6 +1440,8 @@ public class InternalEngine extends Engine { // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL); translog.trimUnreferencedReaders(); + } catch (AlreadyClosedException e) { + throw e; } catch (Exception e) { throw new FlushFailedEngineException(shardId, e); } @@ -1898,9 +1810,9 @@ public class InternalEngine extends Engine { } } - private IndexWriter createWriter(boolean create, IndexCommit startingCommit) throws IOException { + private IndexWriter createWriter(IndexCommit startingCommit) throws IOException { try { - final IndexWriterConfig iwc = getIndexWriterConfig(create, startingCommit); + final IndexWriterConfig iwc = getIndexWriterConfig(startingCommit); return createWriter(store.directory(), iwc); } catch (LockObtainFailedException ex) { logger.warn("could not lock IndexWriter", ex); @@ -1913,10 +1825,10 @@ public class InternalEngine extends Engine { return new IndexWriter(directory, iwc); } - private IndexWriterConfig getIndexWriterConfig(boolean create, IndexCommit startingCommit) { + private IndexWriterConfig getIndexWriterConfig(IndexCommit startingCommit) { final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); iwc.setCommitOnClose(false); // we by default don't commit on close - iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); + iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); iwc.setIndexCommit(startingCommit); iwc.setIndexDeletionPolicy(combinedDeletionPolicy); // with tests.verbose, lucene sets this up: plumb to align with filesystem stream diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8a45246034a..13708add481 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1283,44 +1283,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return opsRecovered; } - /** creates an empty index and translog and opens the engine **/ - public void createIndexAndTranslog() throws IOException { - assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EMPTY_STORE; - assert shardRouting.primary() && shardRouting.isRelocationTarget() == false; - // note: these are set when recovering from the translog - final RecoveryState.Translog translogStats = recoveryState().getTranslog(); - translogStats.totalOperations(0); - translogStats.totalOperationsOnStart(0); - replicationTracker.updateGlobalCheckpointOnReplica(SequenceNumbers.NO_OPS_PERFORMED, "index created"); - innerOpenEngineAndTranslog(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, false); - } - - /** opens the engine on top of the existing lucene engine but creates an empty translog **/ - public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException { - if (Assertions.ENABLED) { - assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE && - recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE; - SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null); - assert commitInfo.localCheckpoint >= globalCheckpoint : - "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint [" - + globalCheckpoint + "]"; - // This assertion is only guaranteed if all nodes are on 6.2+. - if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_6_2_0)) { - final List existingCommits = DirectoryReader.listCommits(store.directory()); - assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]"; - } - } - replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog"); - innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID); - } - /** * opens the engine on top of the existing lucene engine and translog. * Operations from the translog will be replayed to bring lucene up to date. **/ - public void openIndexAndRecoveryFromTranslog() throws IOException { - assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE; - innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false); + public void openEngineAndRecoverFromTranslog() throws IOException { + innerOpenEngineAndTranslog(); getEngine().recoverFromTranslog(); } @@ -1328,13 +1296,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * Opens the engine on top of the existing lucene engine and translog. * The translog is kept but its operations won't be replayed. */ - public void openIndexAndSkipTranslogRecovery() throws IOException { - assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER; - innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false); + public void openEngineAndSkipTranslogRecovery() throws IOException { + innerOpenEngineAndTranslog(); getEngine().skipTranslogRecovery(); } - private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException { + private void innerOpenEngineAndTranslog() throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1349,29 +1316,25 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - assert openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || assertMaxUnsafeAutoIdInCommit(); - - - final EngineConfig config = newEngineConfig(openMode, forceNewHistoryUUID); + final EngineConfig config = newEngineConfig(); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); - if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - // we have to set it before we open an engine and recover from the translog because - // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, - // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. - final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); - replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); - } + // we have to set it before we open an engine and recover from the translog because + // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, + // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); + + assertMaxUnsafeAutoIdInCommit(); + createNewEngine(config); verifyNotClosed(); - if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, - // we still give sync'd flush a chance to run: - active.set(true); - } + // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, + // we still give sync'd flush a chance to run: + active.set(true); assertSequenceNumbersInCommit(); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } @@ -1388,15 +1351,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private boolean assertMaxUnsafeAutoIdInCommit() throws IOException { final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); - if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0) && - // TODO: LOCAL_SHARDS need to transfer this information - recoveryState().getRecoverySource().getType() != RecoverySource.Type.LOCAL_SHARDS) { - // as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point. - // This should have baked into the commit by the primary we recover from, regardless of the index age. - assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : - "opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID - + " is not found in commit"; - } + assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : + "opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + + " is not found in commit"; return true; } @@ -2189,12 +2146,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return mapperService.documentMapperWithAutoCreate(type); } - private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) { + private EngineConfig newEngineConfig() { Sort indexSort = indexSortSupplier.get(); - return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(), + return new EngineConfig(shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, - indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig, + indexCache.query(), cachingPolicy, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index c3b4525924a..224ae60a420 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; @@ -389,8 +390,8 @@ final class StoreRecovery { recoveryState.getIndex().updateVersion(version); if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { assert indexShouldExists; - indexShard.openIndexAndCreateTranslog(true, store.loadSeqNoInfo(null).localCheckpoint); - } else { + EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), indexShard.shardPath().resolveTranslog(), shardId); + } else if (indexShouldExists) { // since we recover from local, just fill the files and size try { final RecoveryState.Index index = recoveryState.getIndex(); @@ -400,13 +401,11 @@ final class StoreRecovery { } catch (IOException e) { logger.debug("failed to list file details", e); } - if (indexShouldExists) { - indexShard.openIndexAndRecoveryFromTranslog(); - indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); - } else { - indexShard.createIndexAndTranslog(); - } + } else { + EngineDiskUtils.createEmpty(store.directory(), indexShard.shardPath().resolveTranslog(), shardId); } + indexShard.openEngineAndRecoverFromTranslog(); + indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); } catch (EngineException | IOException e) { @@ -446,16 +445,10 @@ final class StoreRecovery { } final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); - final Store store = indexShard.store(); - final long localCheckpoint; - store.incRef(); - try { - localCheckpoint = store.loadSeqNoInfo(null).localCheckpoint; - } finally { - store.decRef(); - } - indexShard.openIndexAndCreateTranslog(true, localCheckpoint); + EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(indexShard.store().directory(), indexShard.shardPath().resolveTranslog(), + shardId); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 7b73a945d6e..be9164cec57 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -218,8 +218,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * @return {@link SequenceNumbers.CommitInfo} containing information about the last commit * @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk */ - public SequenceNumbers.CommitInfo loadSeqNoInfo(final IndexCommit commit) throws IOException { - final Map userData = commit != null ? commit.getUserData() : SegmentInfos.readLatestCommit(directory).getUserData(); + public static SequenceNumbers.CommitInfo loadSeqNoInfo(final IndexCommit commit) throws IOException { + final Map userData = commit.getUserData(); return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet()); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 57aa4cf1403..73764249ce1 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -364,7 +364,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID); final List existingCommits = DirectoryReader.listCommits(store.directory()); final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint); - final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo(safeCommit); + final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit); if (logger.isTraceEnabled()) { final StringJoiner descriptionOfExistingCommits = new StringJoiner(","); for (IndexCommit commit : existingCommits) { @@ -406,7 +406,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.target().prepareForTranslogOperations(request.createNewTranslog(), request.totalTranslogOps()); + recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java index 28df2897d97..65ccb078c94 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java @@ -33,13 +33,13 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { private final long recoveryId; private final ShardId shardId; private final int totalTranslogOps; - private final boolean createNewTranslog; + private final boolean fileBasedRecovery; - RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean createNewTranslog) { + RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean fileBasedRecovery) { this.recoveryId = recoveryId; this.shardId = shardId; this.totalTranslogOps = totalTranslogOps; - this.createNewTranslog = createNewTranslog; + this.fileBasedRecovery = fileBasedRecovery; } RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException { @@ -51,9 +51,9 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { in.readLong(); // maxUnsafeAutoIdTimestamp } if (in.getVersion().onOrAfter(Version.V_6_2_0)) { - createNewTranslog = in.readBoolean(); + fileBasedRecovery = in.readBoolean(); } else { - createNewTranslog = true; + fileBasedRecovery = true; } } @@ -70,10 +70,10 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { } /** - * Whether or not the recover target should create a new local translog + * Whether or not the recovery is file based */ - boolean createNewTranslog() { - return createNewTranslog; + public boolean isFileBasedRecovery() { + return fileBasedRecovery; } @Override @@ -86,7 +86,7 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp } if (out.getVersion().onOrAfter(Version.V_6_2_0)) { - out.writeBoolean(createNewTranslog); + out.writeBoolean(fileBasedRecovery); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 41df6ec73e0..1b1a2802b52 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -40,6 +41,7 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -362,14 +364,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); - if (createNewTranslog) { - // TODO: Assigns the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 - indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); - } else { - indexShard().openIndexAndSkipTranslogRecovery(); - } + indexShard().openEngineAndSkipTranslogRecovery(); } @Override @@ -440,8 +437,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget // to recover from in case of a full cluster shutdown just when this code executes... renameAllTempFiles(); final Store store = store(); + store.incRef(); try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); + if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) { + EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory()); + } + // TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 + EngineDiskUtils.createNewTranslog(store.directory(), indexShard.shardPath().resolveTranslog(), + SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are @@ -465,6 +469,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); fail(rfe, true); throw rfe; + } finally { + store.decRef(); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 9cedfa8039a..4e728a72b30 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -32,10 +32,10 @@ public interface RecoveryTargetHandler { /** * Prepares the target to receive translog operations, after all file have been copied - * @param createNewTranslog whether or not to delete the local translog on the target + * @param fileBasedRecovery whether or not this call is part of an file based recovery * @param totalTranslogOps total translog operations expected to be sent */ - void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException; + void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException; /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 966ed426d48..edf17595350 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -76,9 +76,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog), + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 4588010fe9c..67fd385955f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -38,8 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.singletonList; -import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; -import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.doAnswer; @@ -54,8 +52,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -94,8 +91,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final AtomicLong globalCheckpoint = new AtomicLong(); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); long lastMaxSeqNo = between(1, 1000); long lastTranslogGen = between(1, 20); int safeIndex = 0; @@ -165,8 +161,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); long legacyTranslogGen = randomNonNegativeLong(); IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); @@ -199,8 +194,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_CREATE_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -237,8 +231,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong())); } final IndexCommit startingCommit = randomFrom(commitList); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, startingCommit); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, startingCommit); indexPolicy.onInit(commitList); for (IndexCommit commit : commitList) { if (commit.equals(startingCommit) == false) { @@ -256,8 +249,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); diff --git a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java new file mode 100644 index 00000000000..c57af9b4486 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java @@ -0,0 +1,207 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.search.IndexSearcher; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.IndexSettingsModule; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; + +public class EngineDiskUtilsTests extends EngineTestCase { + + + public void testHistoryUUIDIsSetIfMissing() throws IOException { + final int numDocs = randomIntBetween(0, 3); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + Engine.IndexResult index = engine.index(firstIndexRequest); + assertThat(index.getVersion(), equalTo(1L)); + } + assertVisibleCount(engine, numDocs); + engine.close(); + + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND); + try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) { + Map newCommitData = new HashMap<>(); + for (Map.Entry entry : writer.getLiveCommitData()) { + if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) { + newCommitData.put(entry.getKey(), entry.getValue()); + } + } + writer.setLiveCommitData(newCommitData.entrySet()); + writer.commit(); + } + + EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory()); + + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1) + .build()); + + EngineConfig config = engine.config(); + EngineConfig newConfig = new EngineConfig( + shardId, allocationId.getId(), + threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5), + config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), + new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED); + engine = new InternalEngine(newConfig); + engine.recoverFromTranslog(); + assertVisibleCount(engine, numDocs, false); + assertThat(engine.getHistoryUUID(), notNullValue()); + } + + public void testCurrentTranslogIDisCommitted() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + + // create + { + EngineDiskUtils.createEmpty(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId); + ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + + try (InternalEngine engine = createEngine(config)) { + engine.index(firstIndexRequest); + globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + // open and recover tlog + { + for (int i = 0; i < 2; i++) { + try (InternalEngine engine = new InternalEngine(config)) { + assertTrue(engine.isRecovering()); + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + if (i == 0) { + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + } else { + // creating an empty index will create the first translog gen and commit it + // opening the empty index will make the second translog file but not commit it + // opening the engine again (i=0) will make the third translog file, which then be committed + assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + } + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + } + // open index with new tlog + { + EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, shardId); + try (InternalEngine engine = new InternalEngine(config)) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + assertEquals(2, engine.getTranslog().currentFileGeneration()); + assertEquals(0L, engine.getTranslog().uncommittedOperations()); + } + } + + // open and recover tlog with empty tlog + { + for (int i = 0; i < 2; i++) { + try (InternalEngine engine = new InternalEngine(config)) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + } + } + } + + public void testHistoryUUIDCanBeForced() throws IOException { + final int numDocs = randomIntBetween(0, 3); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + Engine.IndexResult index = engine.index(firstIndexRequest); + assertThat(index.getVersion(), equalTo(1L)); + } + assertVisibleCount(engine, numDocs); + final String oldHistoryUUID = engine.getHistoryUUID(); + engine.close(); + EngineConfig config = engine.config(); + EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId); + + EngineConfig newConfig = new EngineConfig( + shardId, allocationId.getId(), + threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5), + config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), + new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED); + engine = new InternalEngine(newConfig); + engine.recoverFromTranslog(); + assertVisibleCount(engine, 0, false); + assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID))); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f0bd1cc389e..2488ca79fe4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -81,7 +81,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; @@ -119,7 +118,6 @@ import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.test.IndexSettingsModule; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -133,7 +131,6 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -149,6 +146,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.function.ToLongBiFunction; @@ -641,7 +639,7 @@ public class InternalEngineTests extends EngineTestCase { InternalEngine engine = createEngine(store, translog); engine.close(); - engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + engine = new InternalEngine(engine.config()); assertTrue(engine.isRecovering()); engine.recoverFromTranslog(); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); @@ -656,7 +654,7 @@ public class InternalEngineTests extends EngineTestCase { engine.index(indexForDoc(doc)); engine.close(); - engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + engine = new InternalEngine(engine.config()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); engine.recoverFromTranslog(); @@ -690,7 +688,7 @@ public class InternalEngineTests extends EngineTestCase { Engine recoveringEngine = null; try { - recoveringEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine = new InternalEngine(engine.config()); recoveringEngine.recoverFromTranslog(); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -718,20 +716,19 @@ public class InternalEngineTests extends EngineTestCase { Engine recoveringEngine = null; try { - final AtomicBoolean flushed = new AtomicBoolean(); - recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + final AtomicBoolean committed = new AtomicBoolean(); + recoveringEngine = new InternalEngine(initialEngine.config()) { + @Override - public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { - assertThat(getTranslog().uncommittedOperations(), equalTo(docs)); - final CommitId commitId = super.flush(force, waitIfOngoing); - flushed.set(true); - return commitId; + protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { + committed.set(true); + super.commitIndexWriter(writer, translog, syncId); } }; assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs)); recoveringEngine.recoverFromTranslog(); - assertTrue(flushed.get()); + assertTrue(committed.get()); } finally { IOUtils.close(recoveringEngine); } @@ -762,7 +759,7 @@ public class InternalEngineTests extends EngineTestCase { } } initialEngine.close(); - recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine = new InternalEngine(initialEngine.config()); recoveringEngine.recoverFromTranslog(); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); @@ -1009,9 +1006,9 @@ public class InternalEngineTests extends EngineTestCase { IOUtils.close(engine, store); final Path translogPath = createTempDir(); store = createStore(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); - engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)); + engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); boolean inSync = randomBoolean(); @@ -1021,17 +1018,17 @@ public class InternalEngineTests extends EngineTestCase { engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 2L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L)); assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 2L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L)); assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 2L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 1L)); assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L)); globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); @@ -1043,7 +1040,7 @@ public class InternalEngineTests extends EngineTestCase { public void testSyncedFlush() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null)) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); @@ -1069,8 +1066,8 @@ public class InternalEngineTests extends EngineTestCase { final int iters = randomIntBetween(2, 5); // run this a couple of times to get some coverage for (int i = 0; i < iters; i++) { try (Store store = createStore(); - InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy(), null))) { + InternalEngine engine = + createEngine(config(defaultSettings, store, createTempDir(), new LogDocMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); Engine.Index doc1 = indexForDoc(testParsedDocument("1", null, testDocumentWithTextField(), B_1, null)); engine.index(doc1); @@ -1125,7 +1122,7 @@ public class InternalEngineTests extends EngineTestCase { } public void testSyncedFlushSurvivesEngineRestart() throws IOException { - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); IOUtils.close(store, engine); store = createStore(); engine = createEngine(store, primaryTranslogDir, globalCheckpoint::get); @@ -1144,12 +1141,13 @@ public class InternalEngineTests extends EngineTestCase { } else { engine.flushAndClose(); } - engine = new InternalEngine(copy(config, randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))); - - if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG && randomBoolean()) { - engine.recoverFromTranslog(); + if (randomBoolean()) { + EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(), + SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); } - assertEquals(engine.config().getOpenMode().toString(), engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); + engine = new InternalEngine(config); + engine.recoverFromTranslog(); + assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } public void testSyncedFlushVanishesOnReplay() throws IOException { @@ -1165,7 +1163,7 @@ public class InternalEngineTests extends EngineTestCase { engine.index(indexForDoc(doc)); EngineConfig config = engine.config(); engine.close(); - engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + engine = new InternalEngine(config); engine.recoverFromTranslog(); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } @@ -1270,7 +1268,7 @@ public class InternalEngineTests extends EngineTestCase { public void testForceMerge() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), + Engine engine = createEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { @@ -2051,7 +2049,7 @@ public class InternalEngineTests extends EngineTestCase { InternalEngine recoveringEngine = null; try { - recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine = new InternalEngine(initialEngine.config()); recoveringEngine.recoverFromTranslog(); assertEquals(primarySeqNo, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -2078,10 +2076,9 @@ public class InternalEngineTests extends EngineTestCase { // this test writes documents to the engine while concurrently flushing/commit // and ensuring that the commit points contain the correct sequence number data public void testConcurrentWritesAndCommits() throws Exception { + List commits = new ArrayList<>(); try (Store store = createStore(); - InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { - final List commits = new ArrayList<>(); - + InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { final int numIndexingThreads = scaledRandomIntBetween(2, 4); final int numDocsPerThread = randomIntBetween(500, 1000); final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1); @@ -2243,7 +2240,7 @@ public class InternalEngineTests extends EngineTestCase { public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { + Engine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); final BiFunction searcherFactory = engine::acquireSearcher; @@ -2326,7 +2323,7 @@ public class InternalEngineTests extends EngineTestCase { InternalEngine holder; try { holder = createEngine(store, translogPath); - } catch (EngineCreationFailureException ex) { + } catch (EngineCreationFailureException | IOException ex) { assertEquals(store.refCount(), refCount); continue; } @@ -2372,9 +2369,9 @@ public class InternalEngineTests extends EngineTestCase { } catch (EngineCreationFailureException ex) { // expected } - // now it should be OK. - EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null), - EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); + // when a new translog is created it should be ok + EngineDiskUtils.createNewTranslog(store.directory(), primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null); engine = new InternalEngine(config); } @@ -2421,21 +2418,6 @@ public class InternalEngineTests extends EngineTestCase { } } - private static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException { - assertVisibleCount(engine, numDocs, true); - } - - private static void assertVisibleCount(InternalEngine engine, int numDocs, boolean refresh) throws IOException { - if (refresh) { - engine.refresh("test"); - } - try (Searcher searcher = engine.acquireSearcher("test")) { - final TotalHitCountCollector collector = new TotalHitCountCollector(); - searcher.searcher().search(new MatchAllDocsQuery(), collector); - assertThat(collector.getTotalHits(), equalTo(numDocs)); - } - } - public void testTranslogCleanUpPostCommitCrash() throws Exception { IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(), defaultSettings.getScopedSettings()); @@ -2449,8 +2431,9 @@ public class InternalEngineTests extends EngineTestCase { try (Store store = createStore()) { AtomicBoolean throwErrorOnCommit = new AtomicBoolean(); final Path translogPath = createTempDir(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); + EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId); try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)) { @@ -2463,6 +2446,7 @@ public class InternalEngineTests extends EngineTestCase { } } }) { + engine.recoverFromTranslog(); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); @@ -2495,7 +2479,8 @@ public class InternalEngineTests extends EngineTestCase { } assertVisibleCount(engine, numDocs); engine.close(); - engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)); + engine = new InternalEngine(engine.config()); + engine.skipTranslogRecovery(); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); assertThat(topDocs.totalHits, equalTo(0L)); @@ -2535,7 +2520,7 @@ public class InternalEngineTests extends EngineTestCase { parser.mappingUpdate = dynamicUpdate(); engine.close(); - engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work + engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work engine.recoverFromTranslog(); assertVisibleCount(engine, numDocs, false); @@ -2622,10 +2607,10 @@ public class InternalEngineTests extends EngineTestCase { TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE); - EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(), + EngineConfig brokenConfig = new EngineConfig(shardId, allocationId.getId(), threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), + IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO); try { @@ -2638,94 +2623,6 @@ public class InternalEngineTests extends EngineTestCase { assertVisibleCount(engine, numDocs, false); } - public void testHistoryUUIDIsSetIfMissing() throws IOException { - final int numDocs = randomIntBetween(0, 3); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult index = engine.index(firstIndexRequest); - assertThat(index.getVersion(), equalTo(1L)); - } - assertVisibleCount(engine, numDocs); - engine.close(); - - IndexWriterConfig iwc = new IndexWriterConfig(null) - .setCommitOnClose(false) - // we don't want merges to happen here - we call maybe merge on the engine - // later once we stared it up otherwise we would need to wait for it here - // we also don't specify a codec here and merges should use the engines for this index - .setMergePolicy(NoMergePolicy.INSTANCE) - .setOpenMode(IndexWriterConfig.OpenMode.APPEND); - try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) { - Map newCommitData = new HashMap<>(); - for (Map.Entry entry: writer.getLiveCommitData()) { - if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) { - newCommitData.put(entry.getKey(), entry.getValue()); - } - } - writer.setLiveCommitData(newCommitData.entrySet()); - writer.commit(); - } - - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1) - .build()); - - EngineConfig config = engine.config(); - - EngineConfig newConfig = new EngineConfig( - randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, - shardId, allocationId.getId(), - threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), - new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO); - engine = new InternalEngine(newConfig); - if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - engine.recoverFromTranslog(); - assertVisibleCount(engine, numDocs, false); - } else { - assertVisibleCount(engine, 0, false); - } - assertThat(engine.getHistoryUUID(), notNullValue()); - } - - public void testHistoryUUIDCanBeForced() throws IOException { - final int numDocs = randomIntBetween(0, 3); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult index = engine.index(firstIndexRequest); - assertThat(index.getVersion(), equalTo(1L)); - } - assertVisibleCount(engine, numDocs); - final String oldHistoryUUID = engine.getHistoryUUID(); - engine.close(); - EngineConfig config = engine.config(); - - EngineConfig newConfig = new EngineConfig( - randomBoolean() ? EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, - shardId, allocationId.getId(), - threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), - new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO); - if (newConfig.getOpenMode() == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG) { - Lucene.cleanLuceneIndex(store.directory()); - } - engine = new InternalEngine(newConfig); - if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - engine.recoverFromTranslog(); - assertVisibleCount(engine, numDocs, false); - } else { - assertVisibleCount(engine, 0, false); - } - assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID))); - } - public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException { AtomicReference exception = new AtomicReference<>(); String operation = randomFrom("optimize", "refresh", "flush"); @@ -2818,74 +2715,6 @@ public class InternalEngineTests extends EngineTestCase { } } - public void testCurrentTranslogIDisCommitted() throws IOException { - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); - try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - - // create - { - ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - - try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){ - assertFalse(engine.isRecovering()); - engine.index(firstIndexRequest); - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); - expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - // open and recover tlog - { - for (int i = 0; i < 2; i++) { - try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { - assertTrue(engine.isRecovering()); - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - if (i == 0) { - assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } else { - assertEquals("4", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(); - userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("4", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - } - // open index with new tlog - { - try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) { - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); - assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().uncommittedOperations()); - } - } - - // open and recover tlog with empty tlog - { - for (int i = 0; i < 2; i++) { - try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("2", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(); - userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("no changes - nothing to commit", "2", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - } - } - } - private static class ThrowingIndexWriter extends IndexWriter { private AtomicReference> failureToThrow = new AtomicReference<>(); @@ -3367,21 +3196,22 @@ public class InternalEngineTests extends EngineTestCase { public void testEngineMaxTimestampIsInitialized() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final long timestamp1 = Math.abs(randomNonNegativeLong()); final Path storeDir = createTempDir(); final Path translogDir = createTempDir(); final long timestamp2 = randomNonNegativeLong(); final long maxTimestamp12 = Math.max(timestamp1, timestamp2); - try (Store store = createStore(newFSDirectory(storeDir)); - Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) { + final Function configSupplier = + store -> config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = createEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); engine.index(appendOnlyPrimary(doc, true, timestamp1)); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } - try (Store store = createStore(newFSDirectory(storeDir)); - Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) { + try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); engine.recoverFromTranslog(); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); @@ -3389,13 +3219,16 @@ public class InternalEngineTests extends EngineTestCase { new BytesArray("{}".getBytes(Charset.defaultCharset())), null); engine.index(appendOnlyPrimary(doc, true, timestamp2)); assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + globalCheckpoint.set(1); // make sure flush cleans up commits for later. engine.flush(); } - try (Store store = createStore(newFSDirectory(storeDir)); - Engine engine = new InternalEngine( - copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null), - randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) { - assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + try (Store store = createStore(newFSDirectory(storeDir))) { + if (randomBoolean() || true) { + EngineDiskUtils.createNewTranslog(store.directory(), translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId); + } + try (Engine engine = new InternalEngine(configSupplier.apply(store))) { + assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + } } } @@ -3491,7 +3324,7 @@ public class InternalEngineTests extends EngineTestCase { } } }); - InternalEngine internalEngine = new InternalEngine(config); + InternalEngine internalEngine = createEngine(config); int docId = 0; final ParsedDocument doc = testParsedDocument(Integer.toString(docId), null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); @@ -3662,53 +3495,13 @@ public class InternalEngineTests extends EngineTestCase { IOUtils.close(initialEngine); } - try (Engine recoveringEngine = - new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { recoveringEngine.recoverFromTranslog(); recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); } } - public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws IOException { - final long v = 1; - final VersionType t = VersionType.EXTERNAL; - final long ts = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; - final int docs = randomIntBetween(1, 32); - InternalEngine initialEngine = null; - try { - initialEngine = engine; - for (int i = 0; i < docs; i++) { - final String id = Integer.toString(i); - final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); - final Term uid = newUid(doc); - // create a gap at sequence number 3 * i + 1 - initialEngine.index(new Engine.Index(uid, doc, 3 * i, 1, v, t, REPLICA, System.nanoTime(), ts, false)); - initialEngine.delete(new Engine.Delete("type", id, uid, 3 * i + 2, 1, v, t, REPLICA, System.nanoTime())); - } - - // bake the commit with the local checkpoint stuck at 0 and gaps all along the way up to the max sequence number - assertThat(initialEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) 0)); - assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo((long) (3 * (docs - 1) + 2))); - initialEngine.flush(true, true); - - for (int i = 0; i < docs; i++) { - final String id = Integer.toString(i); - final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); - final Term uid = newUid(doc); - initialEngine.index(new Engine.Index(uid, doc, 3 * i + 1, 1, v, t, REPLICA, System.nanoTime(), ts, false)); - } - } finally { - IOUtils.close(initialEngine); - } - - try (Engine recoveringEngine = - new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { - recoveringEngine.recoverFromTranslog(); - recoveringEngine.fillSeqNoGaps(1); - assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1))); - } - } /** java docs */ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOException { @@ -3803,7 +3596,7 @@ public class InternalEngineTests extends EngineTestCase { final BiFunction supplier = (ms, lcp) -> new LocalCheckpointTracker( maxSeqNo, localCheckpoint); - noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier) { + noOpEngine = new InternalEngine(engine.config(), supplier) { @Override protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); @@ -3950,7 +3743,7 @@ public class InternalEngineTests extends EngineTestCase { completedSeqNos.add(seqNo); } }; - actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier); + actualEngine = new InternalEngine(engine.config(), supplier); final int operations = randomIntBetween(0, 1024); final Set expectedCompletedSeqNos = new HashSet<>(); for (int i = 0; i < operations; i++) { @@ -4013,15 +3806,14 @@ public class InternalEngineTests extends EngineTestCase { boolean flushed = false; - AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); Engine recoveringEngine = null; try { assertEquals(docs - 1, engine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint()); assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); - recoveringEngine = new InternalEngine(copy( - replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); + recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4054,8 +3846,7 @@ public class InternalEngineTests extends EngineTestCase { // now do it again to make sure we preserve values etc. try { - recoveringEngine = new InternalEngine( - copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); + recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); } @@ -4234,10 +4025,11 @@ public class InternalEngineTests extends EngineTestCase { final Path translogPath = createTempDir(); store = createStore(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get()); + EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId); try (Engine engine = new InternalEngine(engineConfig) { @Override protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { @@ -4249,6 +4041,7 @@ public class InternalEngineTests extends EngineTestCase { super.commitIndexWriter(writer, translog, syncId); } }) { + engine.recoverFromTranslog(); int numDocs = scaledRandomIntBetween(10, 100); final String translogUUID = engine.getTranslog().getTranslogUUID(); for (int docId = 0; docId < numDocs; docId++) { @@ -4340,7 +4133,7 @@ public class InternalEngineTests extends EngineTestCase { public void testAcquireIndexCommit() throws Exception { IOUtils.close(engine, store); store = createStore(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { int numDocs = between(1, 20); for (int i = 0; i < numDocs; i++) { @@ -4377,10 +4170,10 @@ public class InternalEngineTests extends EngineTestCase { public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception { IOUtils.close(engine); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); - final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final EngineConfig config = copy(engine.config(), globalCheckpoint::get); final IndexCommit safeCommit; - try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) { + try (InternalEngine engine = createEngine(config)) { final int numDocs = between(5, 50); for (int i = 0; i < numDocs; i++) { index(engine, i); @@ -4394,44 +4187,16 @@ public class InternalEngineTests extends EngineTestCase { globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO))); engine.getTranslog().sync(); } - try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + try (InternalEngine engine = new InternalEngine(config)) { final List existingCommits = DirectoryReader.listCommits(engine.store.directory()); - assertThat("OPEN_INDEX_AND_TRANSLOG should keep only safe commit", existingCommits, contains(safeCommit)); - } - } - - public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception { - IOUtils.close(engine); - final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); - final Map lastCommit; - try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { - engine.skipTranslogRecovery(); - final int numDocs = between(5, 50); - for (int i = 0; i < numDocs; i++) { - index(engine, i); - if (randomBoolean()) { - engine.flush(); - } - } - final List commits = DirectoryReader.listCommits(engine.store.directory()); - lastCommit = commits.get(commits.size() - 1).getUserData(); - } - try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) { - final List existingCommits = DirectoryReader.listCommits(engine.store.directory()); - assertThat("OPEN_INDEX_CREATE_TRANSLOG should keep only last commit", existingCommits, hasSize(1)); - final Map userData = existingCommits.get(0).getUserData(); - assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(lastCommit.get(SequenceNumbers.MAX_SEQ_NO))); - assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(lastCommit.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); - // Translog tags should be fresh. - assertThat(userData.get(Translog.TRANSLOG_UUID_KEY), not(equalTo(lastCommit.get(Translog.TRANSLOG_UUID_KEY)))); - assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("2")); + assertThat("safe commit should be kept", existingCommits, contains(safeCommit)); } } public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { IOUtils.close(engine, store); store = createStore(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { final int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { @@ -4456,7 +4221,7 @@ public class InternalEngineTests extends EngineTestCase { public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { IOUtils.close(engine, store); store = createStore(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { final int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 3049c3a4579..72813cf2637 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -101,8 +101,8 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -import static org.hamcrest.Matchers.containsString; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -333,7 +333,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { assertFalse(shard.shouldPeriodicallyFlush()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), - new ByteSizeValue(117 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); + new ByteSizeValue(160 /* size of the operation + two generations header&footer*/, ByteSizeUnit.BYTES)).build()).get(); client().prepareIndex("test", "test", "0") .setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertFalse(shard.shouldPeriodicallyFlush()); @@ -407,15 +407,15 @@ public class IndexShardIT extends ESSingleNodeTestCase { IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); assertFalse(shard.shouldPeriodicallyFlush()); - final String key; final boolean flush = randomBoolean(); + final Settings settings; if (flush) { - key = "index.translog.flush_threshold_size"; + // size of the operation plus two generations of overhead. + settings = Settings.builder().put("index.translog.flush_threshold_size", "180b").build(); } else { - key = "index.translog.generation_threshold_size"; + // size of the operation plus header and footer + settings = Settings.builder().put("index.translog.generation_threshold_size", "117b").build(); } - // size of the operation plus header and footer - final Settings settings = Settings.builder().put(key, "117b").build(); client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get(); client().prepareIndex("test", "test", "0") .setSource("{}", XContentType.JSON) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6bd378bdb85..f05fdc60c5c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2111,7 +2111,7 @@ public class IndexShardTests extends IndexShardTestCase { shard.prepareForIndexRecovery(); // Shard is still inactive since we haven't started recovering yet assertFalse(shard.isActive()); - shard.openIndexAndRecoveryFromTranslog(); + shard.openEngineAndRecoverFromTranslog(); // Shard should now be active since we did recover: assertTrue(shard.isActive()); closeShards(shard); @@ -2138,14 +2138,6 @@ public class IndexShardTests extends IndexShardTestCase { recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { - @Override - public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(createNewTranslog, totalTranslogOps); - // Shard is still inactive since we haven't started recovering yet - assertFalse(replica.isActive()); - - } - @Override public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); @@ -2188,8 +2180,8 @@ public class IndexShardTests extends IndexShardTestCase { }) { // we're only checking that listeners are called when the engine is open, before there is no point @Override - public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(createNewTranslog, totalTranslogOps); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); assertListenerCalled.accept(replica); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 25b307e7d30..1f9c5ae6df3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -56,9 +57,9 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; import org.junit.After; import org.junit.Before; @@ -120,13 +121,14 @@ public class RefreshListenersTests extends ESTestCase { // we don't need to notify anybody in this test } }; - EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool, - indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), - eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, null, - new NoneCircuitBreakerService(), - () -> SequenceNumbers.UNASSIGNED_SEQ_NO); + EngineDiskUtils.createEmpty(store.directory(), translogConfig.getTranslogPath(), shardId); + EngineConfig config = new EngineConfig(shardId, allocationId, threadPool, + indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), + eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, + (e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED); engine = new InternalEngine(config); + engine.recoverFromTranslog(); listeners.setTranslog(engine.getTranslog()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 117dfe430c8..c75e469f7af 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -31,8 +31,10 @@ import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.elasticsearch.core.internal.io.IOUtils; @@ -89,6 +91,7 @@ import java.util.function.ToLongBiFunction; import static java.util.Collections.emptyList; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; +import static org.hamcrest.Matchers.equalTo; public abstract class EngineTestCase extends ESTestCase { @@ -109,6 +112,21 @@ public abstract class EngineTestCase extends ESTestCase { protected Path primaryTranslogDir; protected Path replicaTranslogDir; + protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException { + assertVisibleCount(engine, numDocs, true); + } + + protected static void assertVisibleCount(Engine engine, int numDocs, boolean refresh) throws IOException { + if (refresh) { + engine.refresh("test"); + } + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), equalTo(numDocs)); + } + } + @Override @Before public void setUp() throws Exception { @@ -155,24 +173,20 @@ public abstract class EngineTestCase extends ESTestCase { } } - public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) { - return copy(config, openMode, config.getAnalyzer()); - } - - public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, LongSupplier globalCheckpointSupplier) { - return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), + public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSupplier) { + return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), - config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService(), globalCheckpointSupplier); } - public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { - return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), + public EngineConfig copy(EngineConfig config, Analyzer analyzer) { + return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), - config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier()); } @@ -253,9 +267,9 @@ public abstract class EngineTestCase extends ESTestCase { protected Translog createTranslog(Path translogPath) throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - final String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId); - return new Translog(translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), - () -> SequenceNumbers.NO_OPS_PERFORMED); + String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId); + return new Translog( + translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { @@ -338,10 +352,23 @@ public abstract class EngineTestCase extends ESTestCase { @Nullable Sort indexSort, @Nullable LongSupplier globalCheckpointSupplier) throws IOException { EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort, globalCheckpointSupplier); - InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - internalEngine.recoverFromTranslog(); + return createEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); + } + + protected InternalEngine createEngine(EngineConfig config) throws IOException { + return createEngine(null, null, null, config); + } + + private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction localCheckpointTrackerSupplier, + @Nullable ToLongBiFunction seqNoForOperation, + EngineConfig config) throws IOException { + final Directory directory = config.getStore().directory(); + if (Lucene.indexExists(directory) == false) { + EngineDiskUtils.createEmpty(directory, config.getTranslogConfig().getTranslogPath(), config.getShardId()); } + InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); + internalEngine.recoverFromTranslog(); return internalEngine; } @@ -394,23 +421,13 @@ public abstract class EngineTestCase extends ESTestCase { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, () -> SequenceNumbers.UNASSIGNED_SEQ_NO); + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, () -> SequenceNumbers.NO_OPS_PERFORMED); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - final EngineConfig.OpenMode openMode; - try { - if (Lucene.indexExists(store.directory()) == false) { - openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG; - } else { - openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; - } - } catch (IOException e) { - throw new ElasticsearchException("can't find index?", e); - } Engine.EventListener listener = new Engine.EventListener() { @Override public void onFailedEngine(String reason, @Nullable Exception e) { @@ -421,14 +438,14 @@ public abstract class EngineTestCase extends ESTestCase { indexSettings.getSettings())); final List refreshListenerList = refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); - EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, + EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler, new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? - new ReplicationTracker(shardId, allocationId.getId(), indexSettings, - SequenceNumbers.UNASSIGNED_SEQ_NO) : globalCheckpointSupplier); + new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) : + globalCheckpointSupplier); return config; }