diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 778509a97dd..91725899c17 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -135,7 +135,7 @@ public final class EngineConfig { private static final String DEFAULT_CODEC_NAME = "default"; private TranslogConfig translogConfig; - + private boolean create = false; /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} @@ -433,4 +433,20 @@ public final class EngineConfig { public TranslogConfig getTranslogConfig() { return translogConfig; } + + /** + * Iff set to true the engine will create a new lucene index when opening the engine. + * Otherwise the lucene index writer is opened in append mode. The default is false + */ + public void setCreate(boolean create) { + this.create = create; + } + + /** + * Iff true the engine should create a new lucene index when opening the engine. + * Otherwise the lucene index writer should be opened in append mode. The default is false + */ + public boolean isCreate() { + return create; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fe8fdab090f..597cf9b8a05 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -118,14 +118,11 @@ public class InternalEngine extends Engine { for (int i = 0; i < dirtyLocks.length; i++) { dirtyLocks[i] = new Object(); } - throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); final Translog.TranslogGeneration translogGeneration; try { - // TODO: would be better if ES could tell us "from above" whether this shard was already here, instead of using Lucene's API - // (which relies on IO ops, directory listing, and has had scary bugs in the past): - boolean create = !Lucene.indexExists(store.directory()); + final boolean create = engineConfig.isCreate(); writer = createWriter(create); indexWriter = writer; translog = openTranslog(engineConfig, writer, create || skipInitialTranslogRecovery || engineConfig.forceNewTranslog()); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c4eecad452c..bb681586781 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -828,14 +828,13 @@ public class IndexShard extends AbstractIndexShardComponent { /** * After the store has been recovered, we need to start the engine in order to apply operations */ - public Map performTranslogRecovery() { - final Map recoveredTypes = internalPerformTranslogRecovery(false); + public Map performTranslogRecovery(boolean indexExists) { + final Map recoveredTypes = internalPerformTranslogRecovery(false, indexExists); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); return recoveredTypes; - } - private Map internalPerformTranslogRecovery(boolean skipTranslogRecovery) { + private Map internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -852,6 +851,7 @@ public class IndexShard extends AbstractIndexShardComponent { // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering engineConfig.setEnableGcDeletes(false); + engineConfig.setCreate(indexExists == false); createNewEngine(skipTranslogRecovery, engineConfig); return engineConfig.getTranslogRecoveryPerformer().getRecoveredTypes(); } @@ -860,12 +860,10 @@ public class IndexShard extends AbstractIndexShardComponent { * After the store has been recovered, we need to start the engine. This method starts a new engine but skips * the replay of the transaction log which is required in cases where we restore a previous index or recover from * a remote peer. - * - * @param wipeTranslogs if set to true all skipped / uncommitted translogs are removed. */ - public void skipTranslogRecovery(boolean wipeTranslogs) throws IOException { + public void skipTranslogRecovery() throws IOException { assert engineUnsafe() == null : "engine was already created"; - Map recoveredTypes = internalPerformTranslogRecovery(true); + Map recoveredTypes = internalPerformTranslogRecovery(true, true); assert recoveredTypes.isEmpty(); assert recoveryState.getTranslog().recoveredOperations() == 0; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 7224e701751..9e8776d1b1e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -104,6 +104,7 @@ public final class ShadowIndexShard extends IndexShard { protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig config) { assert this.shardRouting.primary() == false; assert skipInitialTranslogRecovery : "can not recover from gateway"; + config.setCreate(false); // hardcoded - we always expect an index to be present return engineFactory.newReadOnlyEngine(config); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java index 14b27efc8e9..e291589d614 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java @@ -246,7 +246,7 @@ public class StoreRecoveryService extends AbstractIndexShardComponent implements recoveryState.getTranslog().totalOperations(0); recoveryState.getTranslog().totalOperationsOnStart(0); } - typesToUpdate = indexShard.performTranslogRecovery(); + typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists); indexShard.finalizeRecovery(); String indexName = indexShard.shardId().index().name(); @@ -318,7 +318,7 @@ public class StoreRecoveryService extends AbstractIndexShardComponent implements snapshotShardId = new ShardId(restoreSource.index(), shardId.id()); } indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, recoveryState); - indexShard.skipTranslogRecovery(true); + indexShard.skipTranslogRecovery(); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId); diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index 8738e0abdf5..46c03de09ce 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -204,7 +204,7 @@ public class TranslogRecoveryPerformer { query = queryParserService.parseQuery(source).query(); } catch (QueryParsingException ex) { // for BWC we try to parse directly the query since pre 1.0.0.Beta2 we didn't require a top level query field - if ( queryParserService.getIndexCreatedVersion().onOrBefore(Version.V_1_0_0_Beta2)) { + if (queryParserService.getIndexCreatedVersion().onOrBefore(Version.V_1_0_0_Beta2)) { try { XContentParser parser = XContentHelper.createParser(source); ParsedQuery parse = queryParserService.parse(parser); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index a953206fa89..4e641b83362 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -274,7 +274,7 @@ public class RecoveryTarget extends AbstractComponent { try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) { final RecoveryStatus recoveryStatus = statusRef.status(); recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps()); - recoveryStatus.indexShard().skipTranslogRecovery(false); + recoveryStatus.indexShard().skipTranslogRecovery(); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 52de4859ffb..bad431f67e5 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -39,6 +39,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -256,7 +257,11 @@ public class InternalEngineTests extends ESTestCase { // we don't need to notify anybody in this test } }, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig); - + try { + config.setCreate(Lucene.indexExists(store.directory()) == false); + } catch (IOException e) { + throw new ElasticsearchException("can't find index?", e); + } return config; } @@ -775,6 +780,7 @@ public class InternalEngineTests extends ESTestCase { // this so we have to disable the check explicitly directory.setPreventDoubleWrite(false); } + config.setCreate(false); engine = new InternalEngine(config, false); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } @@ -1869,6 +1875,7 @@ public class InternalEngineTests extends ESTestCase { parser.mappingUpdate = dynamicUpdate(); engine.close(); + engine.config().setCreate(false); engine = new InternalEngine(engine.config(), false); // we need to reuse the engine config unless the parser.mappingModified won't work try (Engine.Searcher searcher = engine.acquireSearcher("test")) { diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 1634d21ee34..7b45a3b90cd 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -29,6 +29,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; @@ -226,6 +227,11 @@ public class ShadowEngineTests extends ESTestCase { public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig); + try { + config.setCreate(Lucene.indexExists(store.directory()) == false); + } catch (IOException e) { + throw new ElasticsearchException("can't find index?", e); + } return config; }