diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 5eba1a29fc6..ac065583868 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -278,7 +278,7 @@ public abstract class Engine implements Closeable { } /** returns the translog for this engine */ - public abstract Translog translog(); + public abstract Translog getTranslog(); protected void ensureOpen() { if (isClosed.get()) { diff --git a/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index b886673f977..bb8006ad6b9 100644 --- a/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -29,6 +29,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArray; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; @@ -39,9 +41,12 @@ import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.nio.file.Path; import java.util.concurrent.TimeUnit; /* @@ -77,6 +82,8 @@ public final class EngineConfig { private final boolean ignoreUnknownTranslog; private final QueryCache filterCache; private final QueryCachingPolicy filterCachingPolicy; + private final BigArrays bigArrays; + private final Path translogPath; /** * Index setting for index concurrency / number of threadstates in the indexwriter. @@ -139,10 +146,10 @@ public final class EngineConfig { * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService, - IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, - MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer, - Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, - TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache filterCache, QueryCachingPolicy filterCachingPolicy) { + IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, + MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer, + Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, + TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache filterCache, QueryCachingPolicy filterCachingPolicy, BigArrays bigArrays, Path translogPath) { this.shardId = shardId; this.threadPool = threadPool; this.indexingService = indexingService; @@ -156,6 +163,8 @@ public final class EngineConfig { this.similarity = similarity; this.codecService = codecService; this.failedEngineListener = failedEngineListener; + this.bigArrays = bigArrays; + this.translogPath = translogPath; Settings indexSettings = indexSettingsService.getSettings(); this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false); this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); @@ -421,4 +430,25 @@ public final class EngineConfig { public QueryCachingPolicy getFilterCachingPolicy() { return filterCachingPolicy; } + + /** + * Returns a BigArrays instance for this engine + */ + public BigArrays getBigArrays() { + return bigArrays; + } + + /** + * Returns the translog path for this engine + */ + public Path getTranslogPath() { + return translogPath; + } + + /** + * Returns the {@link org.elasticsearch.index.settings.IndexSettingsService} for this engine. + */ + public IndexSettingsService getIndesSettingService() { + return indexSettingsService; + } } diff --git a/src/main/java/org/elasticsearch/index/engine/EngineFactory.java b/src/main/java/org/elasticsearch/index/engine/EngineFactory.java index 72a80306160..b29148edff5 100644 --- a/src/main/java/org/elasticsearch/index/engine/EngineFactory.java +++ b/src/main/java/org/elasticsearch/index/engine/EngineFactory.java @@ -25,7 +25,7 @@ import org.elasticsearch.index.translog.fs.FsTranslog; */ public interface EngineFactory { - public Engine newReadWriteEngine(EngineConfig config, FsTranslog translog, boolean skipTranslogRecovery); + public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery); public Engine newReadOnlyEngine(EngineConfig config); } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e03cb7b97f6..0ee9cbc7772 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -104,19 +104,18 @@ public class InternalEngine extends Engine { private final IndexThrottle throttle; - public InternalEngine(EngineConfig engineConfig, FsTranslog translog, boolean skipInitialTranslogRecovery) throws EngineException { + public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException { super(engineConfig); - Preconditions.checkNotNull(translog, "Translog must be provided to the engine"); this.versionMap = new LiveVersionMap(); store.incRef(); IndexWriter writer = null; + FsTranslog translog = null; SearcherManager manager = null; boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); this.indexingService = engineConfig.getIndexingService(); this.warmer = engineConfig.getWarmer(); - this.translog = translog; this.mergePolicyProvider = engineConfig.getMergePolicyProvider(); this.mergeScheduler = engineConfig.getMergeScheduler(); this.dirtyLocks = new Object[engineConfig.getIndexConcurrency() * 50]; // we multiply it to have enough... @@ -130,10 +129,12 @@ public class InternalEngine extends Engine { try { writer = createWriter(); indexWriter = writer; + translog = new FsTranslog(engineConfig.getShardId(), engineConfig.getIndesSettingService(), engineConfig.getBigArrays(), engineConfig.getTranslogPath(), engineConfig.getThreadPool()); committedTranslogId = loadCommittedTranslogId(writer, translog); } catch (IOException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); } + this.translog = translog; manager = createSearcherManager(); this.searcherManager = manager; this.versionMap.setManager(searcherManager); @@ -154,7 +155,7 @@ public class InternalEngine extends Engine { success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(writer, manager); + IOUtils.closeWhileHandlingException(writer, translog, manager); versionMap.clear(); if (isClosed.get() == false) { // failure we need to dec the store reference @@ -166,7 +167,7 @@ public class InternalEngine extends Engine { } @Override - public Translog translog() { + public Translog getTranslog() { ensureOpen(); return translog; } @@ -913,7 +914,6 @@ public class InternalEngine extends Engine { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { this.versionMap.clear(); - logger.trace("close searcherManager"); try { IOUtils.close(searcherManager); } catch (Throwable t) { diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java b/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java index 6ac375bc02b..c9c13e3d879 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngineFactory.java @@ -22,8 +22,8 @@ import org.elasticsearch.index.translog.fs.FsTranslog; public class InternalEngineFactory implements EngineFactory { @Override - public Engine newReadWriteEngine(EngineConfig config, FsTranslog translog, boolean skipTranslogRecovery) { - return new InternalEngine(config, translog, skipTranslogRecovery); + public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { + return new InternalEngine(config, skipTranslogRecovery); } @Override diff --git a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 1e68e7bcdf2..d967d3f8b8d 100644 --- a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -169,8 +169,8 @@ public class ShadowEngine extends Engine { } @Override - public Translog translog() { - throw new UnsupportedOperationException("shard engines don't have translogs"); + public Translog getTranslog() { + throw new UnsupportedOperationException("shadow engines don't have translogs"); } @Override diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 79fd9024ee1..56b63f4ca44 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -101,7 +101,6 @@ import org.elasticsearch.index.suggest.stats.SuggestStats; import org.elasticsearch.index.termvectors.ShardTermVectorsService; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; -import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndicesLifecycle; @@ -259,8 +258,8 @@ public class IndexShard extends AbstractIndexShardComponent { return true; } - public Translog translog() { - return engine().translog(); + public Translog.View newTranslogView() { + return engine().getTranslog().newView(); } public ShardIndexingService indexingService() { @@ -655,7 +654,7 @@ public class IndexShard extends AbstractIndexShardComponent { } public TranslogStats translogStats() { - return engine().translog().stats(); + return engine().getTranslog().stats(); } public SuggestStats suggestStats() { @@ -817,12 +816,18 @@ public class IndexShard extends AbstractIndexShardComponent { * After the store has been recovered, we need to start the engine. This method starts a new engine but skips * the replay of the transaction log which is required in cases where we restore a previous index or recover from * a remote peer. + * + * @param wipeTranslogs if set to true all skipped / uncommitted translogs are removed. */ - public void skipTranslogRecovery() { + public void skipTranslogRecovery(boolean wipeTranslogs) throws IOException { assert engineUnsafe() == null : "engine was already created"; Map recoveredTypes = internalPerformTranslogRecovery(true); assert recoveredTypes.isEmpty(); assert recoveryState.getTranslog().recoveredOperations() == 0; + if (wipeTranslogs) { + final Translog translog = engine().getTranslog(); + translog.markCommitted(translog.currentId()); + } } /** called if recovery has to be restarted after network error / delay ** */ @@ -964,7 +969,7 @@ public class IndexShard extends AbstractIndexShardComponent { } Engine engine = engineUnsafe(); if (engine != null) { - engine.translog().updateBuffer(shardTranslogBufferSize); + engine.getTranslog().updateBuffer(shardTranslogBufferSize); } } @@ -1218,21 +1223,7 @@ public class IndexShard extends AbstractIndexShardComponent { } protected Engine newEngine(boolean skipTranslogRecovery, EngineConfig config) { - final FsTranslog translog; - try { - translog = new FsTranslog(shardId, indexSettingsService, bigArrays, path, threadPool); - } catch (IOException e) { - throw new EngineCreationFailureException(shardId, "failed to create translog", e); - } - Engine engine = null; - try { - engine = engineFactory.newReadWriteEngine(config, translog, skipTranslogRecovery); - } finally { - if (engine == null) { - IOUtils.closeWhileHandlingException(translog); - } - } - return engine; + return engineFactory.newReadWriteEngine(config, skipTranslogRecovery); } /** @@ -1293,6 +1284,6 @@ public class IndexShard extends AbstractIndexShardComponent { }; return new EngineConfig(shardId, threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, mergePolicyProvider, mergeScheduler, - mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy()); + mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy(), bigArrays, shardPath().resolveTranslog()); } } diff --git a/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 7b6109a34d4..9f5537856e6 100644 --- a/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -121,10 +121,4 @@ public final class ShadowIndexShard extends IndexShard { public boolean allowsPrimaryPromotion() { return false; } - - @Override - @Nullable - public Translog translog() { - throw new UnsupportedOperationException("shadow shards don't have a translog"); - } } diff --git a/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java b/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java index 7c698457605..de183c02e9f 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java +++ b/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java @@ -125,8 +125,7 @@ public class IndexShardSnapshotAndRestoreService extends AbstractIndexShardCompo snapshotShardId = new ShardId(restoreSource.index(), shardId.id()); } indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState); - indexShard.skipTranslogRecovery(); - indexShard.translog().markCommitted(indexShard.translog().currentId()); + indexShard.skipTranslogRecovery(true); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId); diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/src/main/java/org/elasticsearch/index/translog/TranslogService.java index 724f9854bf3..633aeae2e83 100644 --- a/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -142,7 +142,7 @@ public class TranslogService extends AbstractIndexShardComponent implements Clos return; } - if (indexShard.translog() == null) { + if (indexShard.engine().getTranslog() == null) { reschedule(); return; } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 3fab0e20d8c..4fbca30cec1 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -115,8 +115,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog, private final AtomicBoolean closed = new AtomicBoolean(); public FsTranslog(ShardId shardId, IndexSettingsService indexSettingsService, - BigArrays bigArrays, ShardPath shardPath, ThreadPool threadPool) throws IOException { - this(shardId, indexSettingsService.getSettings(), indexSettingsService, bigArrays, shardPath.resolveTranslog(), threadPool); + BigArrays bigArrays, Path location, ThreadPool threadPool) throws IOException { + this(shardId, indexSettingsService.getSettings(), indexSettingsService, bigArrays, location, threadPool); } public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 6d1796cb4db..426f4cf89d8 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -196,7 +196,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent 0); InternalEngine holder; try { - holder = createEngine(store, translog); + holder = createEngine(store, translogPath); } catch (EngineCreationFailureException ex) { assertEquals(store.refCount(), refCount); continue; @@ -1387,7 +1387,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { try { assertEquals(store.refCount(), refCount + 1); holder.close(); - holder = createEngine(store, translog); + holder = createEngine(store, translogPath); assertEquals(store.refCount(), refCount + 1); } catch (EngineCreationFailureException ex) { // all is fine @@ -1508,8 +1508,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { .put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb").build(); IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), indexSettings); try (Store store = createStore(); - Engine engine = new InternalEngine(config(indexSettingsService, store, createMergeScheduler(indexSettingsService)), - createTranslog(createTempDir()), false)) { + Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService)), + false)) { for (int i = 0; i < 100; i++) { String id = Integer.toString(i); ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null); @@ -1550,9 +1550,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { translog.markCommitted(translog.currentId()); // we have to re-open the translog because o.w. it will complain about commit information going backwards, which is OK as we did a fake markComitted translog.close(); - translog = createTranslog(); try { - engine = createEngine(store, translog); + engine = createEngine(store, primaryTranslogDir); fail("engine shouldn't start without a valid translog id"); } catch (EngineCreationFailureException ex) { // expected @@ -1560,7 +1559,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { // now it should be OK. IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings) .put(EngineConfig.INDEX_IGNORE_UNKNOWN_TRANSLOG, true).build()); - engine = createEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService)); + engine = createEngine(indexSettingsService, store, primaryTranslogDir, createMergeScheduler(indexSettingsService)); } @TestLogging("index.translog:TRACE") @@ -1593,13 +1592,11 @@ public class InternalEngineTests extends ElasticsearchTestCase { directory.setRandomIOExceptionRate(randomDouble()); directory.setFailOnOpenInput(randomBoolean()); directory.setAllowRandomFileNotFoundException(randomBoolean()); - final FsTranslog translog = createTranslog(); try { - engine = createEngine(store, translog); + engine = createEngine(store, primaryTranslogDir); started = true; break; } catch (EngineCreationFailureException ex) { - translog.close(); } } @@ -1608,11 +1605,11 @@ public class InternalEngineTests extends ElasticsearchTestCase { directory.setFailOnOpenInput(false); directory.setAllowRandomFileNotFoundException(false); if (started == false) { - engine = createEngine(store, createTranslog()); + engine = createEngine(store, primaryTranslogDir); } } else { // no mock directory, no fun. - engine = createEngine(store, createTranslog()); + engine = createEngine(store, primaryTranslogDir); } try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); @@ -1643,7 +1640,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { directory.setPreventDoubleWrite(false); } engine.close(); - engine = new InternalEngine(engine.config(), createTranslog(), true); + engine = new InternalEngine(engine.config(), true); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); @@ -1684,7 +1681,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { parser.mappingUpdate = dynamicUpdate(); engine.close(); - engine = new InternalEngine(engine.config(), createTranslog(), false); // we need to reuse the engine config unless the parser.mappingModified won't work + engine = new InternalEngine(engine.config(), false); // we need to reuse the engine config unless the parser.mappingModified won't work try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); @@ -1700,7 +1697,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { } engine.close(); - engine = createEngine(store, createTranslog()); + engine = createEngine(store, primaryTranslogDir); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); assertThat(topDocs.totalHits, equalTo(numDocs)); @@ -1730,7 +1727,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { } engine.close(); - engine = createEngine(store, createTranslog()); + engine = createEngine(store, primaryTranslogDir); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs + 1); assertThat(topDocs.totalHits, equalTo(numDocs + 1)); @@ -1742,7 +1739,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { engine.refresh("test"); } else { engine.close(); - engine = createEngine(store, createTranslog()); + engine = createEngine(store, primaryTranslogDir); } try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); diff --git a/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index f5eb610b695..21a9d851eed 100644 --- a/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -94,7 +94,6 @@ public class ShadowEngineTests extends ElasticsearchTestCase { private Store store; private Store storeReplica; - protected FsTranslog translog; protected Engine primaryEngine; protected Engine replicaEngine; @@ -130,8 +129,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase { storeReplica = createStore(dirPath); Lucene.cleanLuceneIndex(store.directory()); Lucene.cleanLuceneIndex(storeReplica.directory()); - translog = createTranslog(); - primaryEngine = createInternalEngine(store, translog); + primaryEngine = createInternalEngine(store, createTempDir("translog-primary")); LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)primaryEngine).getCurrentIndexWriterConfig(); assertEquals(primaryEngine.config().getCodec().getName(), codecService.codec(codecName).getName()); @@ -155,7 +153,6 @@ public class ShadowEngineTests extends ElasticsearchTestCase { replicaEngine.close(); storeReplica.close(); primaryEngine.close(); - translog.close(); store.close(); terminate(threadPool); } @@ -198,10 +195,6 @@ public class ShadowEngineTests extends ElasticsearchTestCase { return new Store(shardId, EMPTY_SETTINGS, directoryService, new DummyShardLock(shardId)); } - protected FsTranslog createTranslog() throws IOException { - return new FsTranslog(shardId, EMPTY_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, createTempDir("translog-primary")); - } - protected IndexDeletionPolicy createIndexDeletionPolicy() { return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS); } @@ -223,20 +216,20 @@ public class ShadowEngineTests extends ElasticsearchTestCase { return createShadowEngine(indexSettingsService, store, createMergeScheduler(indexSettingsService)); } - protected InternalEngine createInternalEngine(Store store, FsTranslog translog) { + protected InternalEngine createInternalEngine(Store store, Path translogPath) { IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); - return createInternalEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService)); + return createInternalEngine(indexSettingsService, store, translogPath, createMergeScheduler(indexSettingsService)); } protected ShadowEngine createShadowEngine(IndexSettingsService indexSettingsService, Store store, MergeSchedulerProvider mergeSchedulerProvider) { - return new ShadowEngine(config(indexSettingsService, store, mergeSchedulerProvider)); + return new ShadowEngine(config(indexSettingsService, store, null, mergeSchedulerProvider)); } - protected InternalEngine createInternalEngine(IndexSettingsService indexSettingsService, Store store, FsTranslog translog, MergeSchedulerProvider mergeSchedulerProvider) { - return new InternalEngine(config(indexSettingsService, store, mergeSchedulerProvider), translog, true); + protected InternalEngine createInternalEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider) { + return new InternalEngine(config(indexSettingsService, store, translogPath, mergeSchedulerProvider), true); } - public EngineConfig config(IndexSettingsService indexSettingsService, Store store, MergeSchedulerProvider mergeSchedulerProvider) { + public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider) { IndexWriterConfig iwc = newIndexWriterConfig(); EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService , null, store, createSnapshotDeletionPolicy(), createMergePolicy(), mergeSchedulerProvider, @@ -244,7 +237,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase { @Override public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test - }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()); + }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), BigArrays.NON_RECYCLING_INSTANCE, translogPath); return config; } @@ -918,7 +911,6 @@ public class ShadowEngineTests extends ElasticsearchTestCase { break; } } - translog.close(); holder.close(); assertEquals(store.refCount(), refCount); } @@ -967,8 +959,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase { // Create an InternalEngine, which creates the index so the shadow // replica will handle it correctly Store pStore = createStore(srDir); - FsTranslog pTranslog = createTranslog(); - InternalEngine pEngine = createInternalEngine(pStore, pTranslog); + InternalEngine pEngine = createInternalEngine(pStore, createTempDir("translog-primary")); // create a document ParseContext.Document document = testDocumentWithTextField(); @@ -980,6 +971,6 @@ public class ShadowEngineTests extends ElasticsearchTestCase { t.join(); assertTrue("ShadowEngine should have been able to be created", succeeded.get()); // (shadow engine is already shut down in the try-with-resources) - IOUtils.close(srStore, pTranslog, pEngine, pStore); + IOUtils.close(srStore, pEngine, pStore); } } diff --git a/src/test/java/org/elasticsearch/test/engine/MockEngineFactory.java b/src/test/java/org/elasticsearch/test/engine/MockEngineFactory.java index 84baa67d2fb..0adb6380e77 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockEngineFactory.java +++ b/src/test/java/org/elasticsearch/test/engine/MockEngineFactory.java @@ -28,8 +28,8 @@ import org.elasticsearch.index.translog.fs.FsTranslog; */ public final class MockEngineFactory implements EngineFactory { @Override - public Engine newReadWriteEngine(EngineConfig config, FsTranslog translog, boolean skipTranslogRecovery) { - return new MockInternalEngine(config, translog, skipTranslogRecovery); + public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) { + return new MockInternalEngine(config, skipTranslogRecovery); } @Override diff --git a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java index 29b813c4499..7c7bd96a31f 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -33,8 +33,8 @@ final class MockInternalEngine extends InternalEngine { private final boolean randomizeFlushOnClose; - MockInternalEngine(EngineConfig config, FsTranslog translog, boolean skipInitialTranslogRecovery) throws EngineException { - super(config, translog, skipInitialTranslogRecovery); + MockInternalEngine(EngineConfig config, boolean skipInitialTranslogRecovery) throws EngineException { + super(config, skipInitialTranslogRecovery); randomizeFlushOnClose = IndexMetaData.isOnSharedFilesystem(config.getIndexSettings()) == false; }