From d5451b20371b31c49cb1f93a9af17eede796fd16 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 6 Nov 2017 17:55:11 -0500 Subject: [PATCH] Die with dignity while merging If an out of memory error is thrown while merging, today we quietly rewrap it into a merge exception and the out of memory error is lost. Instead, we need to rethrow out of memory errors, and in fact any fatal error here, and let those go uncaught so that the node is torn down. This commit causes this to be the case. Relates #27265 --- ...ElasticsearchUncaughtExceptionHandler.java | 7 +- .../index/engine/InternalEngine.java | 45 +- ...icsearchUncaughtExceptionHandlerTests.java | 2 - .../engine/CombinedDeletionPolicyTests.java | 2 +- .../index/engine/InternalEngineTests.java | 348 +------------- .../translog/TranslogDeletionPolicyTests.java | 12 - .../index/translog/TranslogTests.java | 2 +- .../indices/recovery/RecoveryTests.java | 2 +- .../index/engine/EvilInternalEngineTests.java | 107 +++++ .../index/engine/EngineTestCase.java | 441 ++++++++++++++++++ .../translog/TranslogDeletionPolicies.java | 39 ++ 11 files changed, 624 insertions(+), 383 deletions(-) create mode 100644 qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java create mode 100644 test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java create mode 100644 test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java diff --git a/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java b/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java index b1df4f5ccc0..c6692cec08b 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java @@ -21,7 +21,6 @@ package org.elasticsearch.bootstrap; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.MergePolicy; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.logging.Loggers; @@ -68,11 +67,7 @@ class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionH // visible for testing static boolean isFatalUncaught(Throwable e) { - return isFatalCause(e) || (e instanceof MergePolicy.MergeException && isFatalCause(e.getCause())); - } - - private static boolean isFatalCause(Throwable cause) { - return cause instanceof Error; + return e instanceof Error; } // visible for testing diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ac020999873..cb07cf5e696 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1579,23 +1579,15 @@ public class InternalEngine extends Engine { } } - @SuppressWarnings("finally") private boolean failOnTragicEvent(AlreadyClosedException ex) { final boolean engineFailed; // if we are already closed due to some tragic exception // we need to fail the engine. it might have already been failed before // but we are double-checking it's failed and closed if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) { - if (indexWriter.getTragicException() instanceof Error) { - try { - logger.error("tragic event in index writer", ex); - } finally { - throw (Error) indexWriter.getTragicException(); - } - } else { - failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException()); - engineFailed = true; - } + maybeDie("tragic event in index writer", indexWriter.getTragicException()); + failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException()); + engineFailed = true; } else if (translog.isOpen() == false && translog.getTragicException() != null) { failEngine("already closed by tragic event on the translog", translog.getTragicException()); engineFailed = true; @@ -1916,7 +1908,6 @@ public class InternalEngine extends Engine { @Override protected void handleMergeException(final Directory dir, final Throwable exc) { - logger.error("failed to merge", exc); engineConfig.getThreadPool().generic().execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -1925,13 +1916,39 @@ public class InternalEngine extends Engine { @Override protected void doRun() throws Exception { - MergePolicy.MergeException e = new MergePolicy.MergeException(exc, dir); - failEngine("merge failed", e); + /* + * We do this on another thread rather than the merge thread that we are initially called on so that we have complete + * confidence that the call stack does not contain catch statements that would cause the error that might be thrown + * here from being caught and never reaching the uncaught exception handler. + */ + maybeDie("fatal error while merging", exc); + logger.error("failed to merge", exc); + failEngine("merge failed", new MergePolicy.MergeException(exc, dir)); } }); } } + /** + * If the specified throwable is a fatal error, this throwable will be thrown. Callers should ensure that there are no catch statements + * that would catch an error in the stack as the fatal error here should go uncaught and be handled by the uncaught exception handler + * that we install during bootstrap. If the specified throwable is indeed a fatal error, the specified message will attempt to be logged + * before throwing the fatal error. If the specified throwable is not a fatal error, this method is a no-op. + * + * @param maybeMessage the message to maybe log + * @param maybeFatal the throwable that is maybe fatal + */ + @SuppressWarnings("finally") + private void maybeDie(final String maybeMessage, final Throwable maybeFatal) { + if (maybeFatal instanceof Error) { + try { + logger.error(maybeMessage, maybeFatal); + } finally { + throw (Error) maybeFatal; + } + } + } + /** * Commits the specified index writer. * diff --git a/core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java b/core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java index 6e40153b467..e2bf07b7d0b 100644 --- a/core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.bootstrap; -import org.apache.lucene.index.MergePolicy; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -131,7 +130,6 @@ public class ElasticsearchUncaughtExceptionHandlerTests extends ESTestCase { } public void testIsFatalCause() { - assertFatal(new MergePolicy.MergeException(new OutOfMemoryError(), null)); assertFatal(new OutOfMemoryError()); assertFatal(new StackOverflowError()); assertFatal(new InternalError()); diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index d1eef05c2ef..5d4385cbd38 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -30,7 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f6d2d996580..e196c6b4d0b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -26,8 +26,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.filter.RegexFilter; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; @@ -45,7 +43,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.LogDocMergePolicy; -import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; @@ -72,19 +69,16 @@ import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; @@ -95,7 +89,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; @@ -119,20 +112,13 @@ import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.test.DummyShardLock; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.io.UncheckedIOException; @@ -166,14 +152,13 @@ import java.util.function.ToLongBiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.shuffle; import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -185,313 +170,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -public class InternalEngineTests extends ESTestCase { - - protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); - protected final AllocationId allocationId = AllocationId.newInitializing(); - private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); - - protected ThreadPool threadPool; - - private Store store; - private Store storeReplica; - - protected InternalEngine engine; - protected InternalEngine replicaEngine; - - private IndexSettings defaultSettings; - private String codecName; - private Path primaryTranslogDir; - private Path replicaTranslogDir; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - - CodecService codecService = new CodecService(null, logger); - String name = Codec.getDefault().getName(); - if (Arrays.asList(codecService.availableCodecs()).contains(name)) { - // some codecs are read only so we only take the ones that we have in the service and randomly - // selected by lucene test case. - codecName = name; - } else { - codecName = "default"; - } - defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us - .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), - between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) - .build()); // TODO randomize more settings - threadPool = new TestThreadPool(getClass().getName()); - store = createStore(); - storeReplica = createStore(); - Lucene.cleanLuceneIndex(store.directory()); - Lucene.cleanLuceneIndex(storeReplica.directory()); - primaryTranslogDir = createTempDir("translog-primary"); - engine = createEngine(store, primaryTranslogDir); - LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); - - assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName()); - assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); - if (randomBoolean()) { - engine.config().setEnableGcDeletes(false); - } - replicaTranslogDir = createTempDir("translog-replica"); - replicaEngine = createEngine(storeReplica, replicaTranslogDir); - currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig(); - - assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); - assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); - if (randomBoolean()) { - engine.config().setEnableGcDeletes(false); - } - } - - public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) { - return copy(config, openMode, config.getAnalyzer()); - } - - public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { - return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), - config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), - config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getIndexSort(), config.getTranslogRecoveryRunner()); - } - - @Override - @After - public void tearDown() throws Exception { - super.tearDown(); - if (engine != null && engine.isClosed.get() == false) { - engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - } - if (replicaEngine != null && replicaEngine.isClosed.get() == false) { - replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - } - IOUtils.close( - replicaEngine, storeReplica, - engine, store); - terminate(threadPool); - } - - - private static Document testDocumentWithTextField() { - return testDocumentWithTextField("test"); - } - - private static Document testDocumentWithTextField(String value) { - Document document = testDocument(); - document.add(new TextField("value", value, Field.Store.YES)); - return document; - } - - - private static Document testDocument() { - return new Document(); - } - - public static ParsedDocument createParsedDoc(String id, String routing) { - return testParsedDocument(id, routing, testDocumentWithTextField(), new BytesArray("{ \"value\" : \"test\" }"), null); - } - - private static ParsedDocument testParsedDocument(String id, String routing, Document document, BytesReference source, Mapping mappingUpdate) { - Field uidField = new Field("_id", Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); - Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - document.add(uidField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - BytesRef ref = source.toBytesRef(); - document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length)); - return new ParsedDocument(versionField, seqID, id, "test", routing, Arrays.asList(document), source, XContentType.JSON, - mappingUpdate); - } - - protected Store createStore() throws IOException { - return createStore(newDirectory()); - } - - protected Store createStore(final Directory directory) throws IOException { - return createStore(INDEX_SETTINGS, directory); - } - - protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException { - final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { - @Override - public Directory newDirectory() throws IOException { - return directory; - } - }; - return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); - } - - protected Translog createTranslog() throws IOException { - return createTranslog(primaryTranslogDir); - } - - protected Translog createTranslog(Path translogPath) throws IOException { - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.UNASSIGNED_SEQ_NO); - } - - protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { - return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); - } - - protected InternalEngine createEngine(Store store, - Path translogPath, - BiFunction sequenceNumbersServiceSupplier) throws IOException { - return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier); - } - - protected InternalEngine createEngine(Store store, - Path translogPath, - BiFunction sequenceNumbersServiceSupplier, - ToLongBiFunction seqNoForOperation) throws IOException { - return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null); - } - - protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, null); - - } - - protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - @Nullable IndexWriterFactory indexWriterFactory) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null); - } - - protected InternalEngine createEngine( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - @Nullable IndexWriterFactory indexWriterFactory, - @Nullable BiFunction sequenceNumbersServiceSupplier) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null); - } - - protected InternalEngine createEngine( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - @Nullable IndexWriterFactory indexWriterFactory, - @Nullable BiFunction sequenceNumbersServiceSupplier, - @Nullable ToLongBiFunction seqNoForOperation) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, null); - } - - protected InternalEngine createEngine( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - @Nullable IndexWriterFactory indexWriterFactory, - @Nullable BiFunction sequenceNumbersServiceSupplier, - @Nullable ToLongBiFunction seqNoForOperation, - @Nullable Sort indexSort) throws IOException { - EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); - InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config); - if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - internalEngine.recoverFromTranslog(); - } - return internalEngine; - } - - @FunctionalInterface - public interface IndexWriterFactory { - - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException; - } - - public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory, - @Nullable final BiFunction sequenceNumbersServiceSupplier, - @Nullable final ToLongBiFunction seqNoForOperation, - final EngineConfig config) { - if (sequenceNumbersServiceSupplier == null) { - return new InternalEngine(config) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } - - @Override - protected long doGenerateSeqNoForOperation(final Operation operation) { - return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); - } - }; - } else { - return new InternalEngine(config, sequenceNumbersServiceSupplier) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } - - @Override - protected long doGenerateSeqNoForOperation(final Operation operation) { - return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); - } - }; - } - - } - - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null); - } - - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - ReferenceManager.RefreshListener refreshListener, Sort indexSort) { - IndexWriterConfig iwc = newIndexWriterConfig(); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - final EngineConfig.OpenMode openMode; - try { - if (Lucene.indexExists(store.directory()) == false) { - openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG; - } else { - openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; - } - } catch (IOException e) { - throw new ElasticsearchException("can't find index?", e); - } - Engine.EventListener listener = new Engine.EventListener() { - @Override - public void onFailedEngine(String reason, @Nullable Exception e) { - // we don't need to notify anybody in this test - } - }; - final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(), - indexSettings.getSettings())); - final List refreshListenerList = - refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); - EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, - mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); - - return config; - } - - private static final BytesReference B_1 = new BytesArray(new byte[]{1}); - private static final BytesReference B_2 = new BytesArray(new byte[]{2}); - private static final BytesReference B_3 = new BytesArray(new byte[]{3}); - private static final BytesArray SOURCE = bytesArray("{}"); - - private static BytesArray bytesArray(String string) { - return new BytesArray(string.getBytes(Charset.defaultCharset())); - } +public class InternalEngineTests extends EngineTestCase { public void testSegments() throws Exception { try (Store store = createStore(); @@ -2487,29 +2166,6 @@ public class InternalEngineTests extends ESTestCase { } } - protected Term newUid(String id) { - return new Term("_id", Uid.encodeId(id)); - } - - protected Term newUid(ParsedDocument doc) { - return newUid(doc.id()); - } - - protected Engine.Get newGet(boolean realtime, ParsedDocument doc) { - return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc)); - } - - private Engine.Index indexForDoc(ParsedDocument doc) { - return new Engine.Index(newUid(doc), doc); - } - - private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, - boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, System.nanoTime(), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); - } - public void testExtractShardId() { try (Engine.Searcher test = this.engine.acquireSearcher("test")) { ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader()); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 94a631906fc..f62d292730e 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -43,18 +43,6 @@ import static org.hamcrest.Matchers.equalTo; public class TranslogDeletionPolicyTests extends ESTestCase { - public static TranslogDeletionPolicy createTranslogDeletionPolicy() { - return new TranslogDeletionPolicy( - IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), - IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() - ); - } - - public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { - return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), - indexSettings.getTranslogRetentionAge().getMillis()); - } - public void testNoRetention() throws IOException { long now = System.currentTimeMillis(); Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 771302a903f..78ed6697b22 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -107,7 +107,7 @@ import java.util.stream.LongStream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index e2314cff014..55b7e22eb8a 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -44,7 +44,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java new file mode 100644 index 00000000000..c32b3ab2020 --- /dev/null +++ b/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.SegmentCommitInfo; +import org.elasticsearch.index.mapper.ParsedDocument; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; + +public class EvilInternalEngineTests extends EngineTestCase { + + public void testOutOfMemoryErrorWhileMergingIsRethrownAndIsUncaught() throws IOException, InterruptedException { + engine.close(); + final AtomicReference maybeFatal = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); + try { + /* + * We want to test that the out of memory error thrown from the merge goes uncaught; this gives us confidence that an out of + * memory error thrown while merging will lead to the node being torn down. + */ + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { + maybeFatal.set(e); + latch.countDown(); + }); + final AtomicReference> segmentsReference = new AtomicReference<>(); + + try (Engine e = createEngine( + defaultSettings, + store, + primaryTranslogDir, + newMergePolicy(), + (directory, iwc) -> new IndexWriter(directory, iwc) { + @Override + public void merge(final MergePolicy.OneMerge merge) throws IOException { + throw new OutOfMemoryError("640K ought to be enough for anybody"); + } + + @Override + public synchronized MergePolicy.OneMerge getNextMerge() { + /* + * This will be called when we flush when we will not be ready to return the segments. After the segments are on + * disk, we can only return them from here once or the merge scheduler will be stuck in a loop repeatedly + * peeling off the same segments to schedule for merging. + */ + if (segmentsReference.get() == null) { + return super.getNextMerge(); + } else { + final List segments = segmentsReference.getAndSet(null); + return new MergePolicy.OneMerge(segments); + } + } + }, + null)) { + // force segments to exist on disk + final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + e.index(indexForDoc(doc1)); + e.flush(); + final List segments = + StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList()); + segmentsReference.set(segments); + // trigger a background merge that will be managed by the concurrent merge scheduler + e.forceMerge(randomBoolean(), 0, false, false, false); + /* + * Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have + * to wait for these events to finish. + */ + latch.await(); + assertNotNull(maybeFatal.get()); + assertThat(maybeFatal.get(), instanceOf(OutOfMemoryError.class)); + assertThat(maybeFatal.get(), hasToString(containsString("640K ought to be enough for anybody"))); + } + } finally { + Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); + } + } + + +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java new file mode 100644 index 00000000000..5c2ef977b16 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -0,0 +1,441 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LiveIndexWriterConfig; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.Sort; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.ToLongBiFunction; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; + +public abstract class EngineTestCase extends ESTestCase { + + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); + protected final AllocationId allocationId = AllocationId.newInitializing(); + protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); + + protected ThreadPool threadPool; + + protected Store store; + protected Store storeReplica; + + protected InternalEngine engine; + protected InternalEngine replicaEngine; + + protected IndexSettings defaultSettings; + protected String codecName; + protected Path primaryTranslogDir; + protected Path replicaTranslogDir; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + CodecService codecService = new CodecService(null, logger); + String name = Codec.getDefault().getName(); + if (Arrays.asList(codecService.availableCodecs()).contains(name)) { + // some codecs are read only so we only take the ones that we have in the service and randomly + // selected by lucene test case. + codecName = name; + } else { + codecName = "default"; + } + defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), + between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) + .build()); // TODO randomize more settings + threadPool = new TestThreadPool(getClass().getName()); + store = createStore(); + storeReplica = createStore(); + Lucene.cleanLuceneIndex(store.directory()); + Lucene.cleanLuceneIndex(storeReplica.directory()); + primaryTranslogDir = createTempDir("translog-primary"); + engine = createEngine(store, primaryTranslogDir); + LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); + + assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName()); + assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); + if (randomBoolean()) { + engine.config().setEnableGcDeletes(false); + } + replicaTranslogDir = createTempDir("translog-replica"); + replicaEngine = createEngine(storeReplica, replicaTranslogDir); + currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig(); + + assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); + assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); + if (randomBoolean()) { + engine.config().setEnableGcDeletes(false); + } + } + + public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) { + return copy(config, openMode, config.getAnalyzer()); + } + + public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { + return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), + config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), + config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), + config.getIndexSort(), config.getTranslogRecoveryRunner()); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + if (engine != null && engine.isClosed.get() == false) { + engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + } + if (replicaEngine != null && replicaEngine.isClosed.get() == false) { + replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + } + IOUtils.close( + replicaEngine, storeReplica, + engine, store); + terminate(threadPool); + } + + + protected static ParseContext.Document testDocumentWithTextField() { + return testDocumentWithTextField("test"); + } + + protected static ParseContext.Document testDocumentWithTextField(String value) { + ParseContext.Document document = testDocument(); + document.add(new TextField("value", value, Field.Store.YES)); + return document; + } + + + protected static ParseContext.Document testDocument() { + return new ParseContext.Document(); + } + + public static ParsedDocument createParsedDoc(String id, String routing) { + return testParsedDocument(id, routing, testDocumentWithTextField(), new BytesArray("{ \"value\" : \"test\" }"), null); + } + + protected static ParsedDocument testParsedDocument( + String id, String routing, ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { + Field uidField = new Field("_id", Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); + Field versionField = new NumericDocValuesField("_version", 0); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + BytesRef ref = source.toBytesRef(); + document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length)); + return new ParsedDocument(versionField, seqID, id, "test", routing, Arrays.asList(document), source, XContentType.JSON, + mappingUpdate); + } + + protected Store createStore() throws IOException { + return createStore(newDirectory()); + } + + protected Store createStore(final Directory directory) throws IOException { + return createStore(INDEX_SETTINGS, directory); + } + + protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException { + final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { + @Override + public Directory newDirectory() throws IOException { + return directory; + } + }; + return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); + } + + protected Translog createTranslog() throws IOException { + return createTranslog(primaryTranslogDir); + } + + protected Translog createTranslog(Path translogPath) throws IOException { + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); + return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.UNASSIGNED_SEQ_NO); + } + + protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); + } + + protected InternalEngine createEngine( + Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier) throws IOException { + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier); + } + + protected InternalEngine createEngine( + Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier, + ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine( + defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, null); + + } + + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier) throws IOException { + return createEngine( + indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine( + indexSettings, + store, + translogPath, + mergePolicy, + indexWriterFactory, + sequenceNumbersServiceSupplier, + seqNoForOperation, + null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation, + @Nullable Sort indexSort) throws IOException { + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); + InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config); + if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + internalEngine.recoverFromTranslog(); + } + return internalEngine; + } + + @FunctionalInterface + public interface IndexWriterFactory { + + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException; + } + + public static InternalEngine createInternalEngine( + @Nullable final IndexWriterFactory indexWriterFactory, + @Nullable final BiFunction sequenceNumbersServiceSupplier, + @Nullable final ToLongBiFunction seqNoForOperation, + final EngineConfig config) { + if (sequenceNumbersServiceSupplier == null) { + return new InternalEngine(config) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } + + @Override + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null + ? seqNoForOperation.applyAsLong(this, operation) + : super.doGenerateSeqNoForOperation(operation); + } + }; + } else { + return new InternalEngine(config, sequenceNumbersServiceSupplier) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } + + @Override + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null + ? seqNoForOperation.applyAsLong(this, operation) + : super.doGenerateSeqNoForOperation(operation); + } + }; + } + + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener) { + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener, Sort indexSort) { + IndexWriterConfig iwc = newIndexWriterConfig(); + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + final EngineConfig.OpenMode openMode; + try { + if (Lucene.indexExists(store.directory()) == false) { + openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG; + } else { + openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; + } + } catch (IOException e) { + throw new ElasticsearchException("can't find index?", e); + } + Engine.EventListener listener = new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, @Nullable Exception e) { + // we don't need to notify anybody in this test + } + }; + final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(), + indexSettings.getSettings())); + final List refreshListenerList = + refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); + EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, + mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, + TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); + + return config; + } + + protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); + protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); + protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); + protected static final BytesArray SOURCE = bytesArray("{}"); + + protected static BytesArray bytesArray(String string) { + return new BytesArray(string.getBytes(Charset.defaultCharset())); + } + + protected Term newUid(String id) { + return new Term("_id", Uid.encodeId(id)); + } + + protected Term newUid(ParsedDocument doc) { + return newUid(doc.id()); + } + + protected Engine.Get newGet(boolean realtime, ParsedDocument doc) { + return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc)); + } + + protected Engine.Index indexForDoc(ParsedDocument doc) { + return new Engine.Index(newUid(doc), doc); + } + + protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, + boolean isRetry) { + return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, System.nanoTime(), + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java b/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java new file mode 100644 index 00000000000..3ab55b687bd --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; + +public class TranslogDeletionPolicies { + + public static TranslogDeletionPolicy createTranslogDeletionPolicy() { + return new TranslogDeletionPolicy( + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() + ); + } + + public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { + return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), + indexSettings.getTranslogRetentionAge().getMillis()); + } + +}