From 1e06139584186bc58df1323d259f1289ec8a1839 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 30 Mar 2016 21:02:06 +0200 Subject: [PATCH] Move translog recover outside of the engine We changed the way we manage engine memory buffers to an open model where each shard can essentially has infinite memory. The indexing memory controller is responsible for moving memory to disk when it's needed. Yet, this doesn't work today when we recover from store/translog since the engine is not fully initialized such that IMC has no access to the engine, neither to it's memory buffer nor can it move data to disk. The biggest issue here is that translog recovery happends inside the Engine constructor which is problematic by itself since it might take minutes and uses a not yet fully initialzied engine to perform write operations on. This change detaches the translog recovery and makes it the responsibility of the caller to run it once the engine is fully constructed or skip it if not necessary. --- .../elasticsearch/index/engine/Engine.java | 9 +- .../index/engine/EngineConfig.java | 13 +-- .../index/engine/EngineFactory.java | 4 +- .../index/engine/InternalEngine.java | 32 +++++-- .../index/engine/InternalEngineFactory.java | 4 +- .../index/engine/ShadowEngine.java | 6 ++ .../elasticsearch/index/shard/IndexShard.java | 94 ++++++++++--------- .../index/shard/ShadowIndexShard.java | 3 +- .../index/shard/StoreRecovery.java | 2 +- .../index/engine/InternalEngineTests.java | 40 ++++---- .../index/engine/ShadowEngineTests.java | 4 +- .../index/shard/IndexShardTests.java | 78 +++++++++++++-- .../test/engine/MockEngineFactory.java | 4 +- .../test/engine/MockInternalEngine.java | 4 +- 14 files changed, 201 insertions(+), 96 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 5b6d27ce24f..7ed930ab956 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -65,6 +65,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -1149,7 +1150,7 @@ public abstract class Engine implements Closeable { } /** - * Request that this engine throttle incoming indexing requests to one thread. Must be matched by a later call to {@link deactivateThrottling}. + * Request that this engine throttle incoming indexing requests to one thread. Must be matched by a later call to {@link #deactivateThrottling()}. */ public abstract void activateThrottling(); @@ -1157,4 +1158,10 @@ public abstract class Engine implements Closeable { * Reverses a previous {@link #activateThrottling} call. */ public abstract void deactivateThrottling(); + + /** + * Performs recovery from the transaction log. + * This operation will close the engine if the recovery fails. + */ + public abstract Engine recoverFromTranslog() throws IOException; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 47001f40309..24e32c68900 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -64,7 +64,7 @@ public final class EngineConfig { private final Similarity similarity; private final CodecService codecService; private final Engine.EventListener eventListener; - private final boolean forceNewTranslog; + private volatile boolean forceNewTranslog = false; private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; @@ -89,9 +89,6 @@ public final class EngineConfig { } }, Property.IndexScope, Property.NodeScope); - /** if set to true the engine will start even if the translog id in the commit point can not be found */ - public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog"; - private TranslogConfig translogConfig; private boolean create = false; @@ -105,7 +102,6 @@ public final class EngineConfig { TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter) { this.shardId = shardId; - final Settings settings = indexSettings.getSettings(); this.indexSettings = indexSettings; this.threadPool = threadPool; this.warmer = warmer == null ? (a) -> {} : warmer; @@ -122,7 +118,6 @@ public final class EngineConfig { // and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high: indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB); this.translogRecoveryPerformer = translogRecoveryPerformer; - this.forceNewTranslog = settings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false); this.queryCache = queryCache; this.queryCachingPolicy = queryCachingPolicy; this.translogConfig = translogConfig; @@ -305,4 +300,10 @@ public final class EngineConfig { */ public TimeValue getFlushMergesAfter() { return flushMergesAfter; } + /** if set to true the engine will start even if the translog id in the commit point can not be found and a new transaction log + * will be created this should be used if recovery from translog should be skipped */ + public void setForceNewTranslog(boolean forceNewTranslog) { + this.forceNewTranslog = forceNewTranslog; + } + } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineFactory.java b/core/src/main/java/org/elasticsearch/index/engine/EngineFactory.java index 77bcc3b28e4..ccb5f2860fa 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineFactory.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineFactory.java @@ -23,7 +23,7 @@ package org.elasticsearch.index.engine; */ public interface EngineFactory { - public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery); + Engine newReadWriteEngine(EngineConfig config); - public Engine newReadOnlyEngine(EngineConfig config); + Engine newReadOnlyEngine(EngineConfig config); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index aa62f255bb4..b8cdc281165 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -114,7 +114,7 @@ public class InternalEngine extends Engine { // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); - public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException { + public InternalEngine(EngineConfig engineConfig) throws EngineException { super(engineConfig); this.versionMap = new LiveVersionMap(); store.incRef(); @@ -132,14 +132,12 @@ public class InternalEngine extends Engine { } throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); - final Translog.TranslogGeneration translogGeneration; try { final boolean create = engineConfig.isCreate(); writer = createWriter(create); indexWriter = writer; - translog = openTranslog(engineConfig, writer, create || skipInitialTranslogRecovery || engineConfig.forceNewTranslog()); - translogGeneration = translog.getGeneration(); - assert translogGeneration != null; + translog = openTranslog(engineConfig, writer, create || engineConfig.forceNewTranslog()); + assert translog.getGeneration() != null; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); } catch (AssertionError e) { @@ -151,16 +149,15 @@ public class InternalEngine extends Engine { throw e; } } + this.translog = translog; manager = createSearcherManager(); this.searcherManager = manager; this.versionMap.setManager(searcherManager); try { - if (skipInitialTranslogRecovery) { + if (engineConfig.forceNewTranslog()) { // make sure we point at the latest translog from now on.. commitIndexWriter(writer, translog, lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID)); - } else { - recoverFromTranslog(engineConfig, translogGeneration); } } catch (IOException | EngineException ex) { throw new EngineCreationFailureException(shardId, "failed to recover from translog", ex); @@ -179,6 +176,20 @@ public class InternalEngine extends Engine { logger.trace("created new InternalEngine"); } + @Override + public InternalEngine recoverFromTranslog() throws IOException { + boolean success = false; + try { + recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer()); + success = true; + } finally { + if (success == false) { + close(); + } + } + return this; + } + private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, boolean createNew) throws IOException { final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer); final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); @@ -219,9 +230,10 @@ public class InternalEngine extends Engine { return translog; } - protected void recoverFromTranslog(EngineConfig engineConfig, Translog.TranslogGeneration translogGeneration) throws IOException { + + private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException { + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); int opsRecovered = 0; - final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer(); try { Translog.Snapshot snapshot = translog.newSnapshot(); opsRecovered = handler.recoveryFromSnapshot(this, snapshot); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java index 7b458f93904..0d6a1520e63 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java @@ -21,8 +21,8 @@ package org.elasticsearch.index.engine; public class InternalEngineFactory implements EngineFactory { @Override - public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { - return new InternalEngine(config, skipTranslogRecovery); + public Engine newReadWriteEngine(EngineConfig config) { + return new InternalEngine(config); } @Override diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 49fd7328000..c30b2e9bf50 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.translog.Translog; import java.io.IOException; @@ -247,4 +248,9 @@ public class ShadowEngine extends Engine { public void deactivateThrottling() { throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); } + + @Override + public Engine recoverFromTranslog() throws IOException { + throw new UnsupportedOperationException("can't recover on a shadow engine"); + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 97526b4141f..e49aba170c7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; @@ -53,7 +54,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.SuspendableRefContainer; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; @@ -110,7 +110,6 @@ import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -185,7 +184,6 @@ public class IndexShard extends AbstractIndexShardComponent { private static final EnumSet writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); private final IndexSearcherWrapper searcherWrapper; - /** * True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link * IndexingMemoryController}). @@ -239,11 +237,10 @@ public class IndexShard extends AbstractIndexShardComponent { } else { cachingPolicy = new UsageTrackingQueryCachingPolicy(); } - - this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); - this.suspendableRefContainer = new SuspendableRefContainer(); - this.searcherWrapper = indexSearcherWrapper; - this.primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); + engineConfig = newEngineConfig(translogConfig, cachingPolicy, new IndexShardRecoveryPerformer(shardId, mapperService, logger)); + suspendableRefContainer = new SuspendableRefContainer(); + searcherWrapper = indexSearcherWrapper; + primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); } public Store store() { @@ -859,7 +856,7 @@ public class IndexShard extends AbstractIndexShardComponent { /** * After the store has been recovered, we need to start the engine in order to apply operations */ - public void performTranslogRecovery(boolean indexExists) { + public void performTranslogRecovery(boolean indexExists) throws IOException { if (indexExists == false) { // note: these are set when recovering from the translog final RecoveryState.Translog translogStats = recoveryState().getTranslog(); @@ -870,7 +867,7 @@ public class IndexShard extends AbstractIndexShardComponent { assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } - private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) { + private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -893,7 +890,12 @@ public class IndexShard extends AbstractIndexShardComponent { // we still give sync'd flush a chance to run: active.set(true); } - createNewEngine(skipTranslogRecovery, engineConfig); + engineConfig.setForceNewTranslog(skipTranslogRecovery); + Engine newEngine = createNewEngine(engineConfig); + verifyNotClosed(); + if (skipTranslogRecovery == false) { + newEngine.recoverFromTranslog(); + } } @@ -1313,13 +1315,14 @@ public class IndexShard extends AbstractIndexShardComponent { } } - private void createNewEngine(boolean skipTranslogRecovery, EngineConfig config) { + private Engine createNewEngine(EngineConfig config) { synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new EngineClosedException(shardId); } assert this.currentEngineReference.get() == null; - this.currentEngineReference.set(newEngine(skipTranslogRecovery, config)); + this.currentEngineReference.set(newEngine(config)); + } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which @@ -1330,10 +1333,11 @@ public class IndexShard extends AbstractIndexShardComponent { if (engine != null) { engine.onSettingsChanged(); } + return engine; } - protected Engine newEngine(boolean skipTranslogRecovery, EngineConfig config) { - return engineFactory.newReadWriteEngine(config, skipTranslogRecovery); + protected Engine newEngine(EngineConfig config) { + return engineFactory.newReadWriteEngine(config); } /** @@ -1374,33 +1378,7 @@ public class IndexShard extends AbstractIndexShardComponent { return mapperService.documentMapperWithAutoCreate(type); } - private final EngineConfig newEngineConfig(TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy) { - final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(shardId, mapperService, logger) { - @Override - protected void operationProcessed() { - assert recoveryState != null; - recoveryState.getTranslog().incrementRecoveredOperations(); - } - - @Override - public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { - assert recoveryState != null; - RecoveryState.Translog translogStats = recoveryState.getTranslog(); - translogStats.totalOperations(snapshot.totalOperations()); - translogStats.totalOperationsOnStart(snapshot.totalOperations()); - return super.recoveryFromSnapshot(engine, snapshot); - } - - @Override - protected void index(Engine engine, Engine.Index engineIndex) { - IndexShard.this.index(engine, engineIndex); - } - - @Override - protected void delete(Engine engine, Engine.Delete engineDelete) { - IndexShard.this.delete(engine, engineDelete); - } - }; + private final EngineConfig newEngineConfig(TranslogConfig translogConfig, QueryCachingPolicy cachingPolicy, TranslogRecoveryPerformer translogRecoveryPerformer) { return new EngineConfig(shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, @@ -1535,4 +1513,36 @@ public class IndexShard extends AbstractIndexShardComponent { return getEngine().refreshNeeded(); } + private class IndexShardRecoveryPerformer extends TranslogRecoveryPerformer { + + protected IndexShardRecoveryPerformer(ShardId shardId, MapperService mapperService, ESLogger logger) { + super(shardId, mapperService, logger); + } + + @Override + protected void operationProcessed() { + assert recoveryState != null; + recoveryState.getTranslog().incrementRecoveredOperations(); + } + + @Override + public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { + assert recoveryState != null; + RecoveryState.Translog translogStats = recoveryState.getTranslog(); + translogStats.totalOperations(snapshot.totalOperations()); + translogStats.totalOperationsOnStart(snapshot.totalOperations()); + return super.recoveryFromSnapshot(engine, snapshot); + } + + @Override + protected void index(Engine engine, Engine.Index engineIndex) { + IndexShard.this.index(engine, engineIndex); + } + + @Override + protected void delete(Engine engine, Engine.Delete engineDelete) { + IndexShard.this.delete(engine, engineDelete); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 54fdd8de5f9..c1f28b93188 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -81,9 +81,8 @@ public final class ShadowIndexShard extends IndexShard { } @Override - protected Engine newEngine(boolean skipInitialTranslogRecovery, EngineConfig config) { + protected Engine newEngine(EngineConfig config) { assert this.shardRouting.primary() == false; - assert skipInitialTranslogRecovery : "can not recover from gateway"; config.setCreate(false); // hardcoded - we always expect an index to be present return engineFactory.newReadOnlyEngine(config); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index d11e6734025..574e68d8cc9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -225,7 +225,7 @@ final class StoreRecovery { indexShard.performTranslogRecovery(indexShouldExists); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); - } catch (EngineException e) { + } catch (EngineException | IOException e) { throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e); } finally { store.decRef(); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ab2041baa4a..bd7b9dfffbe 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -79,13 +79,11 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.index.mapper.ParseContext.Document; -import org.elasticsearch.index.mapper.core.StringFieldMapper; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.object.RootObjectMapper; import org.elasticsearch.index.shard.IndexSearcherWrapper; -import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; @@ -259,12 +257,13 @@ public class InternalEngineTests extends ESTestCase { return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); } - protected InternalEngine createEngine(Store store, Path translogPath) { + protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { return createEngine(defaultSettings, store, translogPath, newMergePolicy()); } - protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { - return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), false); + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy); + return new InternalEngine(config).recoverFromTranslog(); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { @@ -556,7 +555,7 @@ public class InternalEngineTests extends ESTestCase { InternalEngine engine = createEngine(store, translog); engine.close(); - engine = new InternalEngine(engine.config(), false); + engine = new InternalEngine(engine.config()).recoverFromTranslog(); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); searcher.close(); @@ -794,7 +793,7 @@ public class InternalEngineTests extends ESTestCase { public void testSyncedFlush() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy()), false)) { + new LogByteSizeMergePolicy())).recoverFromTranslog()) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); engine.index(new Engine.Index(newUid("1"), doc)); @@ -821,7 +820,7 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < iters; i++) { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy()), false)) { + new LogDocMergePolicy())).recoverFromTranslog()) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); Engine.Index doc1 = new Engine.Index(newUid("1"), doc); @@ -889,7 +888,10 @@ public class InternalEngineTests extends ESTestCase { } else { engine.flushAndClose(); } - engine = new InternalEngine(config, randomBoolean()); + engine = new InternalEngine(config); + if (randomBoolean()) { + engine.recoverFromTranslog(); + } assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -913,7 +915,7 @@ public class InternalEngineTests extends ESTestCase { directory.setPreventDoubleWrite(false); } config.setCreate(false); - engine = new InternalEngine(config, false); + engine = new InternalEngine(config).recoverFromTranslog(); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } @@ -1051,7 +1053,7 @@ public class InternalEngineTests extends ESTestCase { public void testForceMerge() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP + new LogByteSizeMergePolicy())).recoverFromTranslog()) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null); @@ -1489,7 +1491,7 @@ public class InternalEngineTests extends ESTestCase { public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy()), false)) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy())).recoverFromTranslog()) { engine.config().setEnableGcDeletes(false); // Add document @@ -1625,9 +1627,9 @@ public class InternalEngineTests extends ESTestCase { // expected } // now it should be OK. - IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), - Settings.builder().put(defaultSettings.getSettings()).put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build()); - engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy()); + EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy()); + config.setForceNewTranslog(true); + engine = new InternalEngine(config); } public void testTranslogReplayWithFailure() throws IOException { @@ -1661,7 +1663,7 @@ public class InternalEngineTests extends ESTestCase { engine = createEngine(store, primaryTranslogDir); started = true; break; - } catch (EngineCreationFailureException ex) { + } catch (EngineException | IOException e) { } } @@ -1702,7 +1704,7 @@ public class InternalEngineTests extends ESTestCase { directory.setPreventDoubleWrite(false); } engine.close(); - engine = new InternalEngine(engine.config(), true); + engine = new InternalEngine(engine.config()); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); @@ -1831,7 +1833,7 @@ public class InternalEngineTests extends ESTestCase { engine.close(); engine.config().setCreate(false); - engine = new InternalEngine(engine.config(), false); // we need to reuse the engine config unless the parser.mappingModified won't work + engine = new InternalEngine(engine.config()).recoverFromTranslog(); // we need to reuse the engine config unless the parser.mappingModified won't work try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); @@ -1966,7 +1968,7 @@ public class InternalEngineTests extends ESTestCase { , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5)); try { - new InternalEngine(brokenConfig, false); + new InternalEngine(brokenConfig).recoverFromTranslog(); fail("translog belongs to a different engine"); } catch (EngineCreationFailureException ex) { } diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index b432b758ca1..1fbe9e159a1 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -217,7 +217,9 @@ public class ShadowEngineTests extends ESTestCase { } protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { - return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), true); + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy); + config.setForceNewTranslog(true); + return new InternalEngine(config); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c488fc2b35c..cc5374c9e0a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; @@ -117,6 +118,8 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -1317,6 +1320,60 @@ public class IndexShardTests extends ESSingleNodeTestCase { } } + public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService(resolveIndex("test")); + IndexShard shard = indexService.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").get(); + client().prepareDelete("test", "test", "0").get(); + client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get(); + + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {}; + shard.close("simon says", false); + AtomicReference shardRef = new AtomicReference<>(); + List failures = new ArrayList<>(); + IndexingOperationListener listener = new IndexingOperationListener() { + + @Override + public void postIndex(Engine.Index index, boolean created) { + try { + assertNotNull(shardRef.get()); + // this is all IMC needs to do - check current memory and refresh + assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); + shardRef.get().refresh("test"); + } catch (Throwable t) { + failures.add(t); + throw t; + } + } + + + @Override + public void postDelete(Engine.Delete delete) { + try { + assertNotNull(shardRef.get()); + // this is all IMC needs to do - check current memory and refresh + assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); + shardRef.get().refresh("test"); + } catch (Throwable t) { + failures.add(t); + throw t; + } + } + }; + final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener); + shardRef.set(newShard); + recoverShard(newShard, shard.routingEntry()); + + try { + ExceptionsHelper.rethrowAndSuppress(failures); + } finally { + newShard.close("just do it", randomBoolean()); + } + } + public void testSearchIsReleaseIfWrapperFails() throws IOException { createIndex("test"); ensureGreen(); @@ -1348,12 +1405,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { } private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { - ShardRouting routing = new ShardRouting(shard.routingEntry()); - IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), - shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), - indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners) - ); + IndexShard newShard = newIndexShard(indexService, shard, wrapper, listeners); + return recoverShard(newShard, shard.routingEntry()); + } + + private final IndexShard recoverShard(IndexShard newShard, ShardRouting oldRouting) throws IOException { + ShardRouting routing = new ShardRouting(oldRouting); ShardRoutingHelper.reinit(routing); newShard.updateRoutingEntry(routing, false); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); @@ -1365,6 +1422,15 @@ public class IndexShardTests extends ESSingleNodeTestCase { return newShard; } + private final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { + IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), + shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), + indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, + indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners) + ); + return newShard; + } + public void testTranslogRecoverySyncsTranslog() throws IOException { createIndex("testindexfortranslogsync"); client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject() diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineFactory.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineFactory.java index 87a12791bc1..9a6747d5301 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineFactory.java @@ -32,8 +32,8 @@ public final class MockEngineFactory implements EngineFactory { } @Override - public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { - return new MockInternalEngine(config, skipTranslogRecovery, wrapper); + public Engine newReadWriteEngine(EngineConfig config) { + return new MockInternalEngine(config, wrapper); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index c75c41d3038..603907cc03c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -32,8 +32,8 @@ final class MockInternalEngine extends InternalEngine { private final boolean randomizeFlushOnClose; private Class wrapperClass; - MockInternalEngine(EngineConfig config, boolean skipInitialTranslogRecovery, Class wrapper) throws EngineException { - super(config, skipInitialTranslogRecovery); + MockInternalEngine(EngineConfig config, Class wrapper) throws EngineException { + super(config); randomizeFlushOnClose = config.getIndexSettings().isOnSharedFilesystem() == false; wrapperClass = wrapper;