diff --git a/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java b/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java index b1df4f5ccc0..c6692cec08b 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java @@ -21,7 +21,6 @@ package org.elasticsearch.bootstrap; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.MergePolicy; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.logging.Loggers; @@ -68,11 +67,7 @@ class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionH // visible for testing static boolean isFatalUncaught(Throwable e) { - return isFatalCause(e) || (e instanceof MergePolicy.MergeException && isFatalCause(e.getCause())); - } - - private static boolean isFatalCause(Throwable cause) { - return cause instanceof Error; + return e instanceof Error; } // visible for testing diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ac020999873..cb07cf5e696 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1579,23 +1579,15 @@ public class InternalEngine extends Engine { } } - @SuppressWarnings("finally") private boolean failOnTragicEvent(AlreadyClosedException ex) { final boolean engineFailed; // if we are already closed due to some tragic exception // we need to fail the engine. it might have already been failed before // but we are double-checking it's failed and closed if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) { - if (indexWriter.getTragicException() instanceof Error) { - try { - logger.error("tragic event in index writer", ex); - } finally { - throw (Error) indexWriter.getTragicException(); - } - } else { - failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException()); - engineFailed = true; - } + maybeDie("tragic event in index writer", indexWriter.getTragicException()); + failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException()); + engineFailed = true; } else if (translog.isOpen() == false && translog.getTragicException() != null) { failEngine("already closed by tragic event on the translog", translog.getTragicException()); engineFailed = true; @@ -1916,7 +1908,6 @@ public class InternalEngine extends Engine { @Override protected void handleMergeException(final Directory dir, final Throwable exc) { - logger.error("failed to merge", exc); engineConfig.getThreadPool().generic().execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -1925,13 +1916,39 @@ public class InternalEngine extends Engine { @Override protected void doRun() throws Exception { - MergePolicy.MergeException e = new MergePolicy.MergeException(exc, dir); - failEngine("merge failed", e); + /* + * We do this on another thread rather than the merge thread that we are initially called on so that we have complete + * confidence that the call stack does not contain catch statements that would cause the error that might be thrown + * here from being caught and never reaching the uncaught exception handler. + */ + maybeDie("fatal error while merging", exc); + logger.error("failed to merge", exc); + failEngine("merge failed", new MergePolicy.MergeException(exc, dir)); } }); } } + /** + * If the specified throwable is a fatal error, this throwable will be thrown. Callers should ensure that there are no catch statements + * that would catch an error in the stack as the fatal error here should go uncaught and be handled by the uncaught exception handler + * that we install during bootstrap. If the specified throwable is indeed a fatal error, the specified message will attempt to be logged + * before throwing the fatal error. If the specified throwable is not a fatal error, this method is a no-op. + * + * @param maybeMessage the message to maybe log + * @param maybeFatal the throwable that is maybe fatal + */ + @SuppressWarnings("finally") + private void maybeDie(final String maybeMessage, final Throwable maybeFatal) { + if (maybeFatal instanceof Error) { + try { + logger.error(maybeMessage, maybeFatal); + } finally { + throw (Error) maybeFatal; + } + } + } + /** * Commits the specified index writer. * diff --git a/core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java b/core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java index 6e40153b467..e2bf07b7d0b 100644 --- a/core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.bootstrap; -import org.apache.lucene.index.MergePolicy; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -131,7 +130,6 @@ public class ElasticsearchUncaughtExceptionHandlerTests extends ESTestCase { } public void testIsFatalCause() { - assertFatal(new MergePolicy.MergeException(new OutOfMemoryError(), null)); assertFatal(new OutOfMemoryError()); assertFatal(new StackOverflowError()); assertFatal(new InternalError()); diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index d1eef05c2ef..5d4385cbd38 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -30,7 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f6d2d996580..e196c6b4d0b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -26,8 +26,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.filter.RegexFilter; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; @@ -45,7 +43,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.LogDocMergePolicy; -import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; @@ -72,19 +69,16 @@ import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; @@ -95,7 +89,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; @@ -119,20 +112,13 @@ import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.test.DummyShardLock; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.io.UncheckedIOException; @@ -166,14 +152,13 @@ import java.util.function.ToLongBiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.shuffle; import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -185,313 +170,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -public class InternalEngineTests extends ESTestCase { - - protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); - protected final AllocationId allocationId = AllocationId.newInitializing(); - private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); - - protected ThreadPool threadPool; - - private Store store; - private Store storeReplica; - - protected InternalEngine engine; - protected InternalEngine replicaEngine; - - private IndexSettings defaultSettings; - private String codecName; - private Path primaryTranslogDir; - private Path replicaTranslogDir; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - - CodecService codecService = new CodecService(null, logger); - String name = Codec.getDefault().getName(); - if (Arrays.asList(codecService.availableCodecs()).contains(name)) { - // some codecs are read only so we only take the ones that we have in the service and randomly - // selected by lucene test case. - codecName = name; - } else { - codecName = "default"; - } - defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us - .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), - between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) - .build()); // TODO randomize more settings - threadPool = new TestThreadPool(getClass().getName()); - store = createStore(); - storeReplica = createStore(); - Lucene.cleanLuceneIndex(store.directory()); - Lucene.cleanLuceneIndex(storeReplica.directory()); - primaryTranslogDir = createTempDir("translog-primary"); - engine = createEngine(store, primaryTranslogDir); - LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); - - assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName()); - assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); - if (randomBoolean()) { - engine.config().setEnableGcDeletes(false); - } - replicaTranslogDir = createTempDir("translog-replica"); - replicaEngine = createEngine(storeReplica, replicaTranslogDir); - currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig(); - - assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); - assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); - if (randomBoolean()) { - engine.config().setEnableGcDeletes(false); - } - } - - public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) { - return copy(config, openMode, config.getAnalyzer()); - } - - public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { - return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), - config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), - config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getIndexSort(), config.getTranslogRecoveryRunner()); - } - - @Override - @After - public void tearDown() throws Exception { - super.tearDown(); - if (engine != null && engine.isClosed.get() == false) { - engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - } - if (replicaEngine != null && replicaEngine.isClosed.get() == false) { - replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - } - IOUtils.close( - replicaEngine, storeReplica, - engine, store); - terminate(threadPool); - } - - - private static Document testDocumentWithTextField() { - return testDocumentWithTextField("test"); - } - - private static Document testDocumentWithTextField(String value) { - Document document = testDocument(); - document.add(new TextField("value", value, Field.Store.YES)); - return document; - } - - - private static Document testDocument() { - return new Document(); - } - - public static ParsedDocument createParsedDoc(String id, String routing) { - return testParsedDocument(id, routing, testDocumentWithTextField(), new BytesArray("{ \"value\" : \"test\" }"), null); - } - - private static ParsedDocument testParsedDocument(String id, String routing, Document document, BytesReference source, Mapping mappingUpdate) { - Field uidField = new Field("_id", Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); - Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - document.add(uidField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - BytesRef ref = source.toBytesRef(); - document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length)); - return new ParsedDocument(versionField, seqID, id, "test", routing, Arrays.asList(document), source, XContentType.JSON, - mappingUpdate); - } - - protected Store createStore() throws IOException { - return createStore(newDirectory()); - } - - protected Store createStore(final Directory directory) throws IOException { - return createStore(INDEX_SETTINGS, directory); - } - - protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException { - final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { - @Override - public Directory newDirectory() throws IOException { - return directory; - } - }; - return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); - } - - protected Translog createTranslog() throws IOException { - return createTranslog(primaryTranslogDir); - } - - protected Translog createTranslog(Path translogPath) throws IOException { - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.UNASSIGNED_SEQ_NO); - } - - protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { - return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); - } - - protected InternalEngine createEngine(Store store, - Path translogPath, - BiFunction sequenceNumbersServiceSupplier) throws IOException { - return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier); - } - - protected InternalEngine createEngine(Store store, - Path translogPath, - BiFunction sequenceNumbersServiceSupplier, - ToLongBiFunction seqNoForOperation) throws IOException { - return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null); - } - - protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, null); - - } - - protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - @Nullable IndexWriterFactory indexWriterFactory) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null); - } - - protected InternalEngine createEngine( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - @Nullable IndexWriterFactory indexWriterFactory, - @Nullable BiFunction sequenceNumbersServiceSupplier) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null); - } - - protected InternalEngine createEngine( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - @Nullable IndexWriterFactory indexWriterFactory, - @Nullable BiFunction sequenceNumbersServiceSupplier, - @Nullable ToLongBiFunction seqNoForOperation) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, null); - } - - protected InternalEngine createEngine( - IndexSettings indexSettings, - Store store, - Path translogPath, - MergePolicy mergePolicy, - @Nullable IndexWriterFactory indexWriterFactory, - @Nullable BiFunction sequenceNumbersServiceSupplier, - @Nullable ToLongBiFunction seqNoForOperation, - @Nullable Sort indexSort) throws IOException { - EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); - InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config); - if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - internalEngine.recoverFromTranslog(); - } - return internalEngine; - } - - @FunctionalInterface - public interface IndexWriterFactory { - - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException; - } - - public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory, - @Nullable final BiFunction sequenceNumbersServiceSupplier, - @Nullable final ToLongBiFunction seqNoForOperation, - final EngineConfig config) { - if (sequenceNumbersServiceSupplier == null) { - return new InternalEngine(config) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } - - @Override - protected long doGenerateSeqNoForOperation(final Operation operation) { - return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); - } - }; - } else { - return new InternalEngine(config, sequenceNumbersServiceSupplier) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } - - @Override - protected long doGenerateSeqNoForOperation(final Operation operation) { - return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); - } - }; - } - - } - - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null); - } - - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - ReferenceManager.RefreshListener refreshListener, Sort indexSort) { - IndexWriterConfig iwc = newIndexWriterConfig(); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - final EngineConfig.OpenMode openMode; - try { - if (Lucene.indexExists(store.directory()) == false) { - openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG; - } else { - openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; - } - } catch (IOException e) { - throw new ElasticsearchException("can't find index?", e); - } - Engine.EventListener listener = new Engine.EventListener() { - @Override - public void onFailedEngine(String reason, @Nullable Exception e) { - // we don't need to notify anybody in this test - } - }; - final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(), - indexSettings.getSettings())); - final List refreshListenerList = - refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); - EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, - mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); - - return config; - } - - private static final BytesReference B_1 = new BytesArray(new byte[]{1}); - private static final BytesReference B_2 = new BytesArray(new byte[]{2}); - private static final BytesReference B_3 = new BytesArray(new byte[]{3}); - private static final BytesArray SOURCE = bytesArray("{}"); - - private static BytesArray bytesArray(String string) { - return new BytesArray(string.getBytes(Charset.defaultCharset())); - } +public class InternalEngineTests extends EngineTestCase { public void testSegments() throws Exception { try (Store store = createStore(); @@ -2487,29 +2166,6 @@ public class InternalEngineTests extends ESTestCase { } } - protected Term newUid(String id) { - return new Term("_id", Uid.encodeId(id)); - } - - protected Term newUid(ParsedDocument doc) { - return newUid(doc.id()); - } - - protected Engine.Get newGet(boolean realtime, ParsedDocument doc) { - return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc)); - } - - private Engine.Index indexForDoc(ParsedDocument doc) { - return new Engine.Index(newUid(doc), doc); - } - - private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, - boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, System.nanoTime(), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); - } - public void testExtractShardId() { try (Engine.Searcher test = this.engine.acquireSearcher("test")) { ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader()); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 94a631906fc..f62d292730e 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -43,18 +43,6 @@ import static org.hamcrest.Matchers.equalTo; public class TranslogDeletionPolicyTests extends ESTestCase { - public static TranslogDeletionPolicy createTranslogDeletionPolicy() { - return new TranslogDeletionPolicy( - IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), - IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() - ); - } - - public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { - return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), - indexSettings.getTranslogRetentionAge().getMillis()); - } - public void testNoRetention() throws IOException { long now = System.currentTimeMillis(); Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 771302a903f..78ed6697b22 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -107,7 +107,7 @@ import java.util.stream.LongStream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index e2314cff014..55b7e22eb8a 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -44,7 +44,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java new file mode 100644 index 00000000000..c32b3ab2020 --- /dev/null +++ b/qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.SegmentCommitInfo; +import org.elasticsearch.index.mapper.ParsedDocument; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; + +public class EvilInternalEngineTests extends EngineTestCase { + + public void testOutOfMemoryErrorWhileMergingIsRethrownAndIsUncaught() throws IOException, InterruptedException { + engine.close(); + final AtomicReference maybeFatal = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); + try { + /* + * We want to test that the out of memory error thrown from the merge goes uncaught; this gives us confidence that an out of + * memory error thrown while merging will lead to the node being torn down. + */ + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { + maybeFatal.set(e); + latch.countDown(); + }); + final AtomicReference> segmentsReference = new AtomicReference<>(); + + try (Engine e = createEngine( + defaultSettings, + store, + primaryTranslogDir, + newMergePolicy(), + (directory, iwc) -> new IndexWriter(directory, iwc) { + @Override + public void merge(final MergePolicy.OneMerge merge) throws IOException { + throw new OutOfMemoryError("640K ought to be enough for anybody"); + } + + @Override + public synchronized MergePolicy.OneMerge getNextMerge() { + /* + * This will be called when we flush when we will not be ready to return the segments. After the segments are on + * disk, we can only return them from here once or the merge scheduler will be stuck in a loop repeatedly + * peeling off the same segments to schedule for merging. + */ + if (segmentsReference.get() == null) { + return super.getNextMerge(); + } else { + final List segments = segmentsReference.getAndSet(null); + return new MergePolicy.OneMerge(segments); + } + } + }, + null)) { + // force segments to exist on disk + final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + e.index(indexForDoc(doc1)); + e.flush(); + final List segments = + StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList()); + segmentsReference.set(segments); + // trigger a background merge that will be managed by the concurrent merge scheduler + e.forceMerge(randomBoolean(), 0, false, false, false); + /* + * Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have + * to wait for these events to finish. + */ + latch.await(); + assertNotNull(maybeFatal.get()); + assertThat(maybeFatal.get(), instanceOf(OutOfMemoryError.class)); + assertThat(maybeFatal.get(), hasToString(containsString("640K ought to be enough for anybody"))); + } + } finally { + Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); + } + } + + +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java new file mode 100644 index 00000000000..5c2ef977b16 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -0,0 +1,441 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LiveIndexWriterConfig; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.Sort; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.ToLongBiFunction; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; + +public abstract class EngineTestCase extends ESTestCase { + + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); + protected final AllocationId allocationId = AllocationId.newInitializing(); + protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); + + protected ThreadPool threadPool; + + protected Store store; + protected Store storeReplica; + + protected InternalEngine engine; + protected InternalEngine replicaEngine; + + protected IndexSettings defaultSettings; + protected String codecName; + protected Path primaryTranslogDir; + protected Path replicaTranslogDir; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + CodecService codecService = new CodecService(null, logger); + String name = Codec.getDefault().getName(); + if (Arrays.asList(codecService.availableCodecs()).contains(name)) { + // some codecs are read only so we only take the ones that we have in the service and randomly + // selected by lucene test case. + codecName = name; + } else { + codecName = "default"; + } + defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), + between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) + .build()); // TODO randomize more settings + threadPool = new TestThreadPool(getClass().getName()); + store = createStore(); + storeReplica = createStore(); + Lucene.cleanLuceneIndex(store.directory()); + Lucene.cleanLuceneIndex(storeReplica.directory()); + primaryTranslogDir = createTempDir("translog-primary"); + engine = createEngine(store, primaryTranslogDir); + LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); + + assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName()); + assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); + if (randomBoolean()) { + engine.config().setEnableGcDeletes(false); + } + replicaTranslogDir = createTempDir("translog-replica"); + replicaEngine = createEngine(storeReplica, replicaTranslogDir); + currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig(); + + assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); + assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); + if (randomBoolean()) { + engine.config().setEnableGcDeletes(false); + } + } + + public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) { + return copy(config, openMode, config.getAnalyzer()); + } + + public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { + return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), + config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), + config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), + config.getIndexSort(), config.getTranslogRecoveryRunner()); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + if (engine != null && engine.isClosed.get() == false) { + engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + } + if (replicaEngine != null && replicaEngine.isClosed.get() == false) { + replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + } + IOUtils.close( + replicaEngine, storeReplica, + engine, store); + terminate(threadPool); + } + + + protected static ParseContext.Document testDocumentWithTextField() { + return testDocumentWithTextField("test"); + } + + protected static ParseContext.Document testDocumentWithTextField(String value) { + ParseContext.Document document = testDocument(); + document.add(new TextField("value", value, Field.Store.YES)); + return document; + } + + + protected static ParseContext.Document testDocument() { + return new ParseContext.Document(); + } + + public static ParsedDocument createParsedDoc(String id, String routing) { + return testParsedDocument(id, routing, testDocumentWithTextField(), new BytesArray("{ \"value\" : \"test\" }"), null); + } + + protected static ParsedDocument testParsedDocument( + String id, String routing, ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { + Field uidField = new Field("_id", Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); + Field versionField = new NumericDocValuesField("_version", 0); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + BytesRef ref = source.toBytesRef(); + document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length)); + return new ParsedDocument(versionField, seqID, id, "test", routing, Arrays.asList(document), source, XContentType.JSON, + mappingUpdate); + } + + protected Store createStore() throws IOException { + return createStore(newDirectory()); + } + + protected Store createStore(final Directory directory) throws IOException { + return createStore(INDEX_SETTINGS, directory); + } + + protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException { + final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { + @Override + public Directory newDirectory() throws IOException { + return directory; + } + }; + return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); + } + + protected Translog createTranslog() throws IOException { + return createTranslog(primaryTranslogDir); + } + + protected Translog createTranslog(Path translogPath) throws IOException { + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); + return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.UNASSIGNED_SEQ_NO); + } + + protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); + } + + protected InternalEngine createEngine( + Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier) throws IOException { + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier); + } + + protected InternalEngine createEngine( + Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier, + ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine( + defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, null); + + } + + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier) throws IOException { + return createEngine( + indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine( + indexSettings, + store, + translogPath, + mergePolicy, + indexWriterFactory, + sequenceNumbersServiceSupplier, + seqNoForOperation, + null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation, + @Nullable Sort indexSort) throws IOException { + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); + InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config); + if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + internalEngine.recoverFromTranslog(); + } + return internalEngine; + } + + @FunctionalInterface + public interface IndexWriterFactory { + + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException; + } + + public static InternalEngine createInternalEngine( + @Nullable final IndexWriterFactory indexWriterFactory, + @Nullable final BiFunction sequenceNumbersServiceSupplier, + @Nullable final ToLongBiFunction seqNoForOperation, + final EngineConfig config) { + if (sequenceNumbersServiceSupplier == null) { + return new InternalEngine(config) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } + + @Override + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null + ? seqNoForOperation.applyAsLong(this, operation) + : super.doGenerateSeqNoForOperation(operation); + } + }; + } else { + return new InternalEngine(config, sequenceNumbersServiceSupplier) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } + + @Override + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null + ? seqNoForOperation.applyAsLong(this, operation) + : super.doGenerateSeqNoForOperation(operation); + } + }; + } + + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener) { + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener, Sort indexSort) { + IndexWriterConfig iwc = newIndexWriterConfig(); + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + final EngineConfig.OpenMode openMode; + try { + if (Lucene.indexExists(store.directory()) == false) { + openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG; + } else { + openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; + } + } catch (IOException e) { + throw new ElasticsearchException("can't find index?", e); + } + Engine.EventListener listener = new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, @Nullable Exception e) { + // we don't need to notify anybody in this test + } + }; + final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(), + indexSettings.getSettings())); + final List refreshListenerList = + refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); + EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, + mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, + TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); + + return config; + } + + protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); + protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); + protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); + protected static final BytesArray SOURCE = bytesArray("{}"); + + protected static BytesArray bytesArray(String string) { + return new BytesArray(string.getBytes(Charset.defaultCharset())); + } + + protected Term newUid(String id) { + return new Term("_id", Uid.encodeId(id)); + } + + protected Term newUid(ParsedDocument doc) { + return newUid(doc.id()); + } + + protected Engine.Get newGet(boolean realtime, ParsedDocument doc) { + return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc)); + } + + protected Engine.Index indexForDoc(ParsedDocument doc) { + return new Engine.Index(newUid(doc), doc); + } + + protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, + boolean isRetry) { + return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, System.nanoTime(), + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java b/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java new file mode 100644 index 00000000000..3ab55b687bd --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; + +public class TranslogDeletionPolicies { + + public static TranslogDeletionPolicy createTranslogDeletionPolicy() { + return new TranslogDeletionPolicy( + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() + ); + } + + public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { + return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), + indexSettings.getTranslogRetentionAge().getMillis()); + } + +}