diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 5b6d27ce24f..7ed930ab956 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -65,6 +65,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -1149,7 +1150,7 @@ public abstract class Engine implements Closeable { } /** - * Request that this engine throttle incoming indexing requests to one thread. Must be matched by a later call to {@link deactivateThrottling}. + * Request that this engine throttle incoming indexing requests to one thread. Must be matched by a later call to {@link #deactivateThrottling()}. */ public abstract void activateThrottling(); @@ -1157,4 +1158,10 @@ public abstract class Engine implements Closeable { * Reverses a previous {@link #activateThrottling} call. */ public abstract void deactivateThrottling(); + + /** + * Performs recovery from the transaction log. + * This operation will close the engine if the recovery fails. + */ + public abstract Engine recoverFromTranslog() throws IOException; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 47001f40309..24e32c68900 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -64,7 +64,7 @@ public final class EngineConfig { private final Similarity similarity; private final CodecService codecService; private final Engine.EventListener eventListener; - private final boolean forceNewTranslog; + private volatile boolean forceNewTranslog = false; private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; @@ -89,9 +89,6 @@ public final class EngineConfig { } }, Property.IndexScope, Property.NodeScope); - /** if set to true the engine will start even if the translog id in the commit point can not be found */ - public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog"; - private TranslogConfig translogConfig; private boolean create = false; @@ -105,7 +102,6 @@ public final class EngineConfig { TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter) { this.shardId = shardId; - final Settings settings = indexSettings.getSettings(); this.indexSettings = indexSettings; this.threadPool = threadPool; this.warmer = warmer == null ? (a) -> {} : warmer; @@ -122,7 +118,6 @@ public final class EngineConfig { // and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high: indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB); this.translogRecoveryPerformer = translogRecoveryPerformer; - this.forceNewTranslog = settings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false); this.queryCache = queryCache; this.queryCachingPolicy = queryCachingPolicy; this.translogConfig = translogConfig; @@ -305,4 +300,10 @@ public final class EngineConfig { */ public TimeValue getFlushMergesAfter() { return flushMergesAfter; } + /** if set to true the engine will start even if the translog id in the commit point can not be found and a new transaction log + * will be created this should be used if recovery from translog should be skipped */ + public void setForceNewTranslog(boolean forceNewTranslog) { + this.forceNewTranslog = forceNewTranslog; + } + } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineFactory.java b/core/src/main/java/org/elasticsearch/index/engine/EngineFactory.java index 77bcc3b28e4..ccb5f2860fa 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineFactory.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineFactory.java @@ -23,7 +23,7 @@ package org.elasticsearch.index.engine; */ public interface EngineFactory { - public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery); + Engine newReadWriteEngine(EngineConfig config); - public Engine newReadOnlyEngine(EngineConfig config); + Engine newReadOnlyEngine(EngineConfig config); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index aa62f255bb4..b8cdc281165 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -114,7 +114,7 @@ public class InternalEngine extends Engine { // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); - public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException { + public InternalEngine(EngineConfig engineConfig) throws EngineException { super(engineConfig); this.versionMap = new LiveVersionMap(); store.incRef(); @@ -132,14 +132,12 @@ public class InternalEngine extends Engine { } throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); - final Translog.TranslogGeneration translogGeneration; try { final boolean create = engineConfig.isCreate(); writer = createWriter(create); indexWriter = writer; - translog = openTranslog(engineConfig, writer, create || skipInitialTranslogRecovery || engineConfig.forceNewTranslog()); - translogGeneration = translog.getGeneration(); - assert translogGeneration != null; + translog = openTranslog(engineConfig, writer, create || engineConfig.forceNewTranslog()); + assert translog.getGeneration() != null; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); } catch (AssertionError e) { @@ -151,16 +149,15 @@ public class InternalEngine extends Engine { throw e; } } + this.translog = translog; manager = createSearcherManager(); this.searcherManager = manager; this.versionMap.setManager(searcherManager); try { - if (skipInitialTranslogRecovery) { + if (engineConfig.forceNewTranslog()) { // make sure we point at the latest translog from now on.. commitIndexWriter(writer, translog, lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID)); - } else { - recoverFromTranslog(engineConfig, translogGeneration); } } catch (IOException | EngineException ex) { throw new EngineCreationFailureException(shardId, "failed to recover from translog", ex); @@ -179,6 +176,20 @@ public class InternalEngine extends Engine { logger.trace("created new InternalEngine"); } + @Override + public InternalEngine recoverFromTranslog() throws IOException { + boolean success = false; + try { + recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer()); + success = true; + } finally { + if (success == false) { + close(); + } + } + return this; + } + private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, boolean createNew) throws IOException { final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer); final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); @@ -219,9 +230,10 @@ public class InternalEngine extends Engine { return translog; } - protected void recoverFromTranslog(EngineConfig engineConfig, Translog.TranslogGeneration translogGeneration) throws IOException { + + private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException { + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); int opsRecovered = 0; - final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer(); try { Translog.Snapshot snapshot = translog.newSnapshot(); opsRecovered = handler.recoveryFromSnapshot(this, snapshot); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java index 7b458f93904..0d6a1520e63 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java @@ -21,8 +21,8 @@ package org.elasticsearch.index.engine; public class InternalEngineFactory implements EngineFactory { @Override - public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { - return new InternalEngine(config, skipTranslogRecovery); + public Engine newReadWriteEngine(EngineConfig config) { + return new InternalEngine(config); } @Override diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 49fd7328000..c30b2e9bf50 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.translog.Translog; import java.io.IOException; @@ -247,4 +248,9 @@ public class ShadowEngine extends Engine { public void deactivateThrottling() { throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); } + + @Override + public Engine recoverFromTranslog() throws IOException { + throw new UnsupportedOperationException("can't recover on a shadow engine"); + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 97526b4141f..e49aba170c7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; @@ -53,7 +54,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.SuspendableRefContainer; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; @@ -110,7 +110,6 @@ import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -185,7 +184,6 @@ public class IndexShard extends AbstractIndexShardComponent { private static final EnumSet writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); private final IndexSearcherWrapper searcherWrapper; - /** * True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link * IndexingMemoryController}). @@ -239,11 +237,10 @@ public class IndexShard extends AbstractIndexShardComponent { } else { cachingPolicy = new UsageTrackingQueryCachingPolicy(); } - - this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); - this.suspendableRefContainer = new SuspendableRefContainer(); - this.searcherWrapper = indexSearcherWrapper; - this.primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); + engineConfig = newEngineConfig(translogConfig, cachingPolicy, new IndexShardRecoveryPerformer(shardId, mapperService, logger)); + suspendableRefContainer = new SuspendableRefContainer(); + searcherWrapper = indexSearcherWrapper; + primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); } public Store store() { @@ -859,7 +856,7 @@ public class IndexShard extends AbstractIndexShardComponent { /** * After the store has been recovered, we need to start the engine in order to apply operations */ - public void performTranslogRecovery(boolean indexExists) { + public void performTranslogRecovery(boolean indexExists) throws IOException { if (indexExists == false) { // note: these are set when recovering from the translog final RecoveryState.Translog translogStats = recoveryState().getTranslog(); @@ -870,7 +867,7 @@ public class IndexShard extends AbstractIndexShardComponent { assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } - private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) { + private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -893,7 +890,12 @@ public class IndexShard extends AbstractIndexShardComponent { // we still give sync'd flush a chance to run: active.set(true); } - createNewEngine(skipTranslogRecovery, engineConfig); + engineConfig.setForceNewTranslog(skipTranslogRecovery); + Engine newEngine = createNewEngine(engineConfig); + verifyNotClosed(); + if (skipTranslogRecovery == false) { + newEngine.recoverFromTranslog(); + } } @@ -1313,13 +1315,14 @@ public class IndexShard extends AbstractIndexShardComponent { } } - private void createNewEngine(boolean skipTranslogRecovery, EngineConfig config) { + private Engine createNewEngine(EngineConfig config) { synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new EngineClosedException(shardId); } assert this.currentEngineReference.get() == null; - this.currentEngineReference.set(newEngine(skipTranslogRecovery, config)); + this.currentEngineReference.set(newEngine(config)); + } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which @@ -1330,10 +1333,11 @@ public class IndexShard extends AbstractIndexShardComponent { if (engine != null) { engine.onSettingsChanged(); } + return engine; } - protected Engine newEngine(boolean skipTranslogRecovery, EngineConfig config) { - return engineFactory.newReadWriteEngine(config, skipTranslogRecovery); + protected Engine newEngine(EngineConfig config) { + return engineFactory.newReadWriteEngine(config); } /** @@ -1374,33 +1378,7 @@ public class IndexShard extends AbstractIndexShardComponent { return mapperService.documentMapperWithAutoCreate(type); } - private final EngineConfig newEngineConfig(TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy) { - final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(shardId, mapperService, logger) { - @Override - protected void operationProcessed() { - assert recoveryState != null; - recoveryState.getTranslog().incrementRecoveredOperations(); - } - - @Override - public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { - assert recoveryState != null; - RecoveryState.Translog translogStats = recoveryState.getTranslog(); - translogStats.totalOperations(snapshot.totalOperations()); - translogStats.totalOperationsOnStart(snapshot.totalOperations()); - return super.recoveryFromSnapshot(engine, snapshot); - } - - @Override - protected void index(Engine engine, Engine.Index engineIndex) { - IndexShard.this.index(engine, engineIndex); - } - - @Override - protected void delete(Engine engine, Engine.Delete engineDelete) { - IndexShard.this.delete(engine, engineDelete); - } - }; + private final EngineConfig newEngineConfig(TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer) { return new EngineConfig(shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, @@ -1535,4 +1513,36 @@ public class IndexShard extends AbstractIndexShardComponent { return getEngine().refreshNeeded(); } + private class IndexShardRecoveryPerformer extends TranslogRecoveryPerformer { + + protected IndexShardRecoveryPerformer(ShardId shardId, MapperService mapperService, ESLogger logger) { + super(shardId, mapperService, logger); + } + + @Override + protected void operationProcessed() { + assert recoveryState != null; + recoveryState.getTranslog().incrementRecoveredOperations(); + } + + @Override + public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { + assert recoveryState != null; + RecoveryState.Translog translogStats = recoveryState.getTranslog(); + translogStats.totalOperations(snapshot.totalOperations()); + translogStats.totalOperationsOnStart(snapshot.totalOperations()); + return super.recoveryFromSnapshot(engine, snapshot); + } + + @Override + protected void index(Engine engine, Engine.Index engineIndex) { + IndexShard.this.index(engine, engineIndex); + } + + @Override + protected void delete(Engine engine, Engine.Delete engineDelete) { + IndexShard.this.delete(engine, engineDelete); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 54fdd8de5f9..c1f28b93188 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -81,9 +81,8 @@ public final class ShadowIndexShard extends IndexShard { } @Override - protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig config) { + protected Engine newEngine(EngineConfig config) { assert this.shardRouting.primary() == false; - assert skipInitialTranslogRecovery : "can not recover from gateway"; config.setCreate(false); // hardcoded - we always expect an index to be present return engineFactory.newReadOnlyEngine(config); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index d11e6734025..574e68d8cc9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -225,7 +225,7 @@ final class StoreRecovery { indexShard.performTranslogRecovery(indexShouldExists); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); - } catch (EngineException e) { + } catch (EngineException | IOException e) { throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e); } finally { store.decRef(); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ab2041baa4a..bd7b9dfffbe 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -79,13 +79,11 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.index.mapper.ParseContext.Document; -import org.elasticsearch.index.mapper.core.StringFieldMapper; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.object.RootObjectMapper; import org.elasticsearch.index.shard.IndexSearcherWrapper; -import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; @@ -259,12 +257,13 @@ public class InternalEngineTests extends ESTestCase { return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); } - protected InternalEngine createEngine(Store store, Path translogPath) { + protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { return createEngine(defaultSettings, store, translogPath, newMergePolicy()); } - protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { - return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), false); + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy); + return new InternalEngine(config).recoverFromTranslog(); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { @@ -556,7 +555,7 @@ public class InternalEngineTests extends ESTestCase { InternalEngine engine = createEngine(store, translog); engine.close(); - engine = new InternalEngine(engine.config(), false); + engine = new InternalEngine(engine.config()).recoverFromTranslog(); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); searcher.close(); @@ -794,7 +793,7 @@ public class InternalEngineTests extends ESTestCase { public void testSyncedFlush() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy()), false)) { + new LogByteSizeMergePolicy())).recoverFromTranslog()) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); engine.index(new Engine.Index(newUid("1"), doc)); @@ -821,7 +820,7 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < iters; i++) { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy()), false)) { + new LogDocMergePolicy())).recoverFromTranslog()) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); Engine.Index doc1 = new Engine.Index(newUid("1"), doc); @@ -889,7 +888,10 @@ public class InternalEngineTests extends ESTestCase { } else { engine.flushAndClose(); } - engine = new InternalEngine(config, randomBoolean()); + engine = new InternalEngine(config); + if (randomBoolean()) { + engine.recoverFromTranslog(); + } assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -913,7 +915,7 @@ public class InternalEngineTests extends ESTestCase { directory.setPreventDoubleWrite(false); } config.setCreate(false); - engine = new InternalEngine(config, false); + engine = new InternalEngine(config).recoverFromTranslog(); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } @@ -1051,7 +1053,7 @@ public class InternalEngineTests extends ESTestCase { public void testForceMerge() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP + new LogByteSizeMergePolicy())).recoverFromTranslog()) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null); @@ -1489,7 +1491,7 @@ public class InternalEngineTests extends ESTestCase { public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy()), false)) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy())).recoverFromTranslog()) { engine.config().setEnableGcDeletes(false); // Add document @@ -1625,9 +1627,9 @@ public class InternalEngineTests extends ESTestCase { // expected } // now it should be OK. - IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), - Settings.builder().put(defaultSettings.getSettings()).put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build()); - engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy()); + EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy()); + config.setForceNewTranslog(true); + engine = new InternalEngine(config); } public void testTranslogReplayWithFailure() throws IOException { @@ -1661,7 +1663,7 @@ public class InternalEngineTests extends ESTestCase { engine = createEngine(store, primaryTranslogDir); started = true; break; - } catch (EngineCreationFailureException ex) { + } catch (EngineException | IOException e) { } } @@ -1702,7 +1704,7 @@ public class InternalEngineTests extends ESTestCase { directory.setPreventDoubleWrite(false); } engine.close(); - engine = new InternalEngine(engine.config(), true); + engine = new InternalEngine(engine.config()); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); @@ -1831,7 +1833,7 @@ public class InternalEngineTests extends ESTestCase { engine.close(); engine.config().setCreate(false); - engine = new InternalEngine(engine.config(), false); // we need to reuse the engine config unless the parser.mappingModified won't work + engine = new InternalEngine(engine.config()).recoverFromTranslog(); // we need to reuse the engine config unless the parser.mappingModified won't work try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); @@ -1966,7 +1968,7 @@ public class InternalEngineTests extends ESTestCase { , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5)); try { - new InternalEngine(brokenConfig, false); + new InternalEngine(brokenConfig).recoverFromTranslog(); fail("translog belongs to a different engine"); } catch (EngineCreationFailureException ex) { } diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index b432b758ca1..1fbe9e159a1 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -217,7 +217,9 @@ public class ShadowEngineTests extends ESTestCase { } protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { - return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), true); + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy); + config.setForceNewTranslog(true); + return new InternalEngine(config); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c488fc2b35c..cc5374c9e0a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; @@ -117,6 +118,8 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -1317,6 +1320,60 @@ public class IndexShardTests extends ESSingleNodeTestCase { } } + public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService(resolveIndex("test")); + IndexShard shard = indexService.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").get(); + client().prepareDelete("test", "test", "0").get(); + client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {}; + shard.close("simon says", false); + AtomicReference shardRef = new AtomicReference<>(); + List failures = new ArrayList<>(); + IndexingOperationListener listener = new IndexingOperationListener() { + + @Override + public void postIndex(Engine.Index index, boolean created) { + try { + assertNotNull(shardRef.get()); + // this is all IMC needs to do - check current memory and refresh + assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); + shardRef.get().refresh("test"); + } catch (Throwable t) { + failures.add(t); + throw t; + } + } + + + @Override + public void postDelete(Engine.Delete delete) { + try { + assertNotNull(shardRef.get()); + // this is all IMC needs to do - check current memory and refresh + assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); + shardRef.get().refresh("test"); + } catch (Throwable t) { + failures.add(t); + throw t; + } + } + }; + final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener); + shardRef.set(newShard); + recoverShard(newShard, shard.routingEntry()); + + try { + ExceptionsHelper.rethrowAndSuppress(failures); + } finally { + newShard.close("just do it", randomBoolean()); + } + } + public void testSearchIsReleaseIfWrapperFails() throws IOException { createIndex("test"); ensureGreen(); @@ -1348,12 +1405,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { } private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { - ShardRouting routing = new ShardRouting(shard.routingEntry()); - IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), - shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), - indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners) - ); + IndexShard newShard = newIndexShard(indexService, shard, wrapper, listeners); + return recoverShard(newShard, shard.routingEntry()); + } + + private final IndexShard recoverShard(IndexShard newShard, ShardRouting oldRouting) throws IOException { + ShardRouting routing = new ShardRouting(oldRouting); ShardRoutingHelper.reinit(routing); newShard.updateRoutingEntry(routing, false); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); @@ -1365,6 +1422,15 @@ public class IndexShardTests extends ESSingleNodeTestCase { return newShard; } + private final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { + IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), + shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), + indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, + indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners) + ); + return newShard; + } + public void testTranslogRecoverySyncsTranslog() throws IOException { createIndex("testindexfortranslogsync"); client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject() diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineFactory.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineFactory.java index 87a12791bc1..9a6747d5301 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineFactory.java @@ -32,8 +32,8 @@ public final class MockEngineFactory implements EngineFactory { } @Override - public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { - return new MockInternalEngine(config, skipTranslogRecovery, wrapper); + public Engine newReadWriteEngine(EngineConfig config) { + return new MockInternalEngine(config, wrapper); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index c75c41d3038..603907cc03c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -32,8 +32,8 @@ final class MockInternalEngine extends InternalEngine { private final boolean randomizeFlushOnClose; private Class wrapperClass; - MockInternalEngine(EngineConfig config, boolean skipInitialTranslogRecovery, Class wrapper) throws EngineException { - super(config, skipInitialTranslogRecovery); + MockInternalEngine(EngineConfig config, Class wrapper) throws EngineException { + super(config); randomizeFlushOnClose = config.getIndexSettings().isOnSharedFilesystem() == false; wrapperClass = wrapper;