From 8afe09a7494bb34dec29f3f3cc4391547afdff62 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Sep 2018 11:59:16 -0400 Subject: [PATCH] Pass TranslogRecoveryRunner to engine from outside (#33449) This commit allows us to use different TranslogRecoveryRunner when recovering an engine from its local translog. This change is a prerequisite for the commit-based rollback PR. Relates #32867 --- .../elasticsearch/index/engine/Engine.java | 10 ++- .../index/engine/EngineConfig.java | 21 +----- .../index/engine/InternalEngine.java | 8 +-- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../index/engine/InternalEngineTests.java | 72 +++++++++---------- .../index/shard/RefreshListenersTests.java | 4 +- .../index/engine/EngineTestCase.java | 18 +++-- .../index/engine/TranslogHandler.java | 2 +- .../index/engine/FollowingEngineTests.java | 6 +- 9 files changed, 67 insertions(+), 78 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0c54fb916f5..a64c3f88eb3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1642,9 +1642,10 @@ public abstract class Engine implements Closeable { * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive). * This operation will close the engine if the recovery fails. * - * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered + * @param translogRecoveryRunner the translog recovery runner + * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered */ - public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException; + public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException; /** * Do not replay translog operations, but make the engine be ready. @@ -1662,4 +1663,9 @@ public abstract class Engine implements Closeable { * Tries to prune buffered deletes from the version map. */ public abstract void maybePruneDeletes(); + + @FunctionalInterface + public interface TranslogRecoveryRunner { + int run(Engine engine, Translog.Snapshot snapshot) throws IOException; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 23a90553f60..f95ba96d343 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -37,13 +37,11 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.List; import java.util.function.LongSupplier; @@ -76,7 +74,6 @@ public final class EngineConfig { private final List internalRefreshListener; @Nullable private final Sort indexSort; - private final TranslogRecoveryRunner translogRecoveryRunner; @Nullable private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; @@ -127,9 +124,8 @@ public final class EngineConfig { TranslogConfig translogConfig, TimeValue flushMergesAfter, List externalRefreshListener, List internalRefreshListener, Sort indexSort, - TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService, - LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, - TombstoneDocSupplier tombstoneDocSupplier) { + CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, + LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) { this.shardId = shardId; this.allocationId = allocationId; this.indexSettings = indexSettings; @@ -163,7 +159,6 @@ public final class EngineConfig { this.externalRefreshListener = externalRefreshListener; this.internalRefreshListener = internalRefreshListener; this.indexSort = indexSort; - this.translogRecoveryRunner = translogRecoveryRunner; this.circuitBreakerService = circuitBreakerService; this.globalCheckpointSupplier = globalCheckpointSupplier; this.primaryTermSupplier = primaryTermSupplier; @@ -324,18 +319,6 @@ public final class EngineConfig { */ public TimeValue getFlushMergesAfter() { return flushMergesAfter; } - @FunctionalInterface - public interface TranslogRecoveryRunner { - int run(Engine engine, Translog.Snapshot snapshot) throws IOException; - } - - /** - * Returns a runner that implements the translog recovery from the given snapshot - */ - public TranslogRecoveryRunner getTranslogRecoveryRunner() { - return translogRecoveryRunner; - } - /** * The refresh listeners to add to Lucene for externally visible refreshes */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f902ce07502..ea2b53bea8d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -393,7 +393,7 @@ public class InternalEngine extends Engine { } @Override - public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException { + public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { flushLock.lock(); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); @@ -401,7 +401,7 @@ public class InternalEngine extends Engine { throw new IllegalStateException("Engine has already been recovered"); } try { - recoverFromTranslogInternal(recoverUpToSeqNo); + recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo); } catch (Exception e) { try { pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush @@ -423,13 +423,13 @@ public class InternalEngine extends Engine { pendingTranslogRecovery.set(false); // we are good - now we can commit } - private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException { + private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) { - opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot); + opsRecovered = translogRecoveryRunner.run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f1e7dec6995..bceb106aeef 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1314,7 +1314,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl **/ public void openEngineAndRecoverFromTranslog() throws IOException { innerOpenEngineAndTranslog(); - getEngine().recoverFromTranslog(Long.MAX_VALUE); + getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE); } /** @@ -2233,7 +2233,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), - indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier()); + indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier()); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a44829890d5..6d9cdd0f225 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -661,7 +661,7 @@ public class InternalEngineTests extends EngineTestCase { trimUnsafeCommits(engine.config()); engine = new InternalEngine(engine.config()); assertTrue(engine.isRecovering()); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); searcher.close(); @@ -678,7 +678,7 @@ public class InternalEngineTests extends EngineTestCase { engine = new InternalEngine(engine.config()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); @@ -708,7 +708,7 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(engine.config()); try (Engine recoveringEngine = new InternalEngine(engine.config())){ - recoveringEngine.recoverFromTranslog(Long.MAX_VALUE); + recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); searcher.searcher().search(new MatchAllDocsQuery(), collector); @@ -744,7 +744,7 @@ public class InternalEngineTests extends EngineTestCase { } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); - recoveringEngine.recoverFromTranslog(Long.MAX_VALUE); + recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertTrue(committed.get()); } finally { IOUtils.close(recoveringEngine); @@ -778,7 +778,7 @@ public class InternalEngineTests extends EngineTestCase { initialEngine.close(); trimUnsafeCommits(initialEngine.config()); recoveringEngine = new InternalEngine(initialEngine.config()); - recoveringEngine.recoverFromTranslog(Long.MAX_VALUE); + recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); assertEquals(docs, topDocs.totalHits.value); @@ -811,14 +811,14 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); } trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); - engine.recoverFromTranslog(upToSeqNo); + engine.recoverFromTranslog(translogHandler, upToSeqNo); assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); } @@ -1202,7 +1202,7 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(config); engine = new InternalEngine(config); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1221,7 +1221,7 @@ public class InternalEngineTests extends EngineTestCase { engine.close(); trimUnsafeCommits(config); engine = new InternalEngine(config); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } @@ -2187,7 +2187,7 @@ public class InternalEngineTests extends EngineTestCase { trimUnsafeCommits(initialEngine.engineConfig); try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){ - recoveringEngine.recoverFromTranslog(Long.MAX_VALUE); + recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertThat( @@ -2508,7 +2508,7 @@ public class InternalEngineTests extends EngineTestCase { try (InternalEngine engine = createEngine(config)) { engine.index(firstIndexRequest); globalCheckpoint.set(engine.getLocalCheckpoint()); - expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(Long.MAX_VALUE)); + expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE)); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); @@ -2530,7 +2530,7 @@ public class InternalEngineTests extends EngineTestCase { assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); @@ -2547,7 +2547,7 @@ public class InternalEngineTests extends EngineTestCase { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); } @@ -2561,7 +2561,7 @@ public class InternalEngineTests extends EngineTestCase { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); @@ -2667,7 +2667,7 @@ public class InternalEngineTests extends EngineTestCase { } } }) { - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); globalCheckpoint.set(engine.getLocalCheckpoint()); @@ -2678,7 +2678,7 @@ public class InternalEngineTests extends EngineTestCase { try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier))) { - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2739,30 +2739,28 @@ public class InternalEngineTests extends EngineTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); } assertVisibleCount(engine, numDocs); - - TranslogHandler parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - parser.mappingUpdate = dynamicUpdate(); + translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); + translogHandler.mappingUpdate = dynamicUpdate(); engine.close(); trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier)); engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, numDocs, false); - parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(numDocs, parser.appliedOperations()); - if (parser.mappingUpdate != null) { - assertEquals(1, parser.getRecoveredTypes().size()); - assertTrue(parser.getRecoveredTypes().containsKey("test")); + assertEquals(numDocs, translogHandler.appliedOperations()); + if (translogHandler.mappingUpdate != null) { + assertEquals(1, translogHandler.getRecoveredTypes().size()); + assertTrue(translogHandler.getRecoveredTypes().containsKey("test")); } else { - assertEquals(0, parser.getRecoveredTypes().size()); + assertEquals(0, translogHandler.getRecoveredTypes().size()); } engine.close(); + translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier); assertVisibleCount(engine, numDocs, false); - parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(0, parser.appliedOperations()); + assertEquals(0, translogHandler.appliedOperations()); final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); @@ -2786,13 +2784,13 @@ public class InternalEngineTests extends EngineTestCase { } engine.close(); + translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings()); engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs + 1); assertThat(topDocs.totalHits.value, equalTo(numDocs + 1L)); } - parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(flush ? 1 : 2, parser.appliedOperations()); + assertEquals(flush ? 1 : 2, translogHandler.appliedOperations()); engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc), primaryTerm.get())); if (randomBoolean()) { engine.refresh("test"); @@ -2836,7 +2834,7 @@ public class InternalEngineTests extends EngineTestCase { threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), - config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), + config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -3455,7 +3453,7 @@ public class InternalEngineTests extends EngineTestCase { } try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); @@ -3738,7 +3736,7 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(initialEngine.config()); try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.recoverFromTranslog(Long.MAX_VALUE); + recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); } @@ -3849,7 +3847,7 @@ public class InternalEngineTests extends EngineTestCase { throw new UnsupportedOperationException(); } }; - noOpEngine.recoverFromTranslog(Long.MAX_VALUE); + noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = "filling gaps"; noOpEngine.noOp(new Engine.NoOp(maxSeqNo + 1, primaryTerm.get(), LOCAL_TRANSLOG_RECOVERY, System.nanoTime(), reason)); @@ -4127,7 +4125,7 @@ public class InternalEngineTests extends EngineTestCase { trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); - recoveringEngine.recoverFromTranslog(Long.MAX_VALUE); + recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2)); @@ -4163,7 +4161,7 @@ public class InternalEngineTests extends EngineTestCase { if (flushed) { assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } - recoveringEngine.recoverFromTranslog(Long.MAX_VALUE); + recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); assertEquals(0, recoveringEngine.fillSeqNoGaps(3)); @@ -4356,7 +4354,7 @@ public class InternalEngineTests extends EngineTestCase { super.commitIndexWriter(writer, translog, syncId); } }) { - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { ParseContext.Document document = testDocumentWithTextField(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index b93f170174c..a43c7c214ae 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -131,10 +131,10 @@ public class RefreshListenersTests extends ESTestCase { indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, - (e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm, + new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); - engine.recoverFromTranslog(Long.MAX_VALUE); + engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index b558cd1ba90..0e22d0a7eda 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -125,6 +125,7 @@ public abstract class EngineTestCase extends ESTestCase { protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); protected ThreadPool threadPool; + protected TranslogHandler translogHandler; protected Store store; protected Store storeReplica; @@ -189,6 +190,7 @@ public abstract class EngineTestCase extends ESTestCase { Lucene.cleanLuceneIndex(store.directory()); Lucene.cleanLuceneIndex(storeReplica.directory()); primaryTranslogDir = createTempDir("translog-primary"); + translogHandler = createTranslogHandler(defaultSettings); engine = createEngine(store, primaryTranslogDir); LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); @@ -213,7 +215,7 @@ public abstract class EngineTestCase extends ESTestCase { config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), - config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), + config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier(), tombstoneDocSupplier()); } @@ -222,7 +224,7 @@ public abstract class EngineTestCase extends ESTestCase { config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), - config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), + config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); } @@ -232,7 +234,7 @@ public abstract class EngineTestCase extends ESTestCase { config.getWarmer(), config.getStore(), mergePolicy, config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), - config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), + config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); } @@ -377,6 +379,10 @@ public abstract class EngineTestCase extends ESTestCase { () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTermSupplier); } + protected TranslogHandler createTranslogHandler(IndexSettings indexSettings) { + return new TranslogHandler(xContentRegistry(), indexSettings); + } + protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); } @@ -478,7 +484,7 @@ public abstract class EngineTestCase extends ESTestCase { } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - internalEngine.recoverFromTranslog(Long.MAX_VALUE); + internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return internalEngine; } @@ -553,14 +559,12 @@ public abstract class EngineTestCase extends ESTestCase { // we don't need to notify anybody in this test } }; - final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(), - indexSettings.getSettings())); final List refreshListenerList = refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler, + TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) : diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index 9999a3b3748..12785841ef2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -46,7 +46,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static org.elasticsearch.index.mapper.SourceToParse.source; -public class TranslogHandler implements EngineConfig.TranslogRecoveryRunner { +public class TranslogHandler implements Engine.TranslogRecoveryRunner { private final MapperService mapperService; public Mapping mappingUpdate = null; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 677b8955490..6897e3bf3f7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -44,7 +44,6 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -254,8 +253,6 @@ public class FollowingEngineTests extends ESTestCase { Collections.emptyList(), Collections.emptyList(), null, - new TranslogHandler( - xContentRegistry, IndexSettingsModule.newIndexSettings(shardId.getIndexName(), indexSettings.getSettings())), new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm.get(), @@ -280,7 +277,8 @@ public class FollowingEngineTests extends ESTestCase { SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L); store.associateIndexWithNewTranslog(translogUuid); FollowingEngine followingEngine = new FollowingEngine(config); - followingEngine.recoverFromTranslog(Long.MAX_VALUE); + TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); + followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return followingEngine; }