diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index 44adb995863..8d56562b5ea 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -23,6 +23,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.IndexAnalyzers; @@ -117,20 +118,23 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner { return opsRecovered; } - private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type. + final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null; switch (operation.opType()) { case INDEX: final Translog.Index index = (Translog.Index) operation; final String indexName = mapperService.index().getName(); final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), new SourceToParse(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source()), - index.routing()), index.seqNo(), index.primaryTerm(), - index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); + index.routing()), index.seqNo(), index.primaryTerm(), index.version(), versionType, origin, + index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); return engineIndex; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), null, origin, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); + delete.primaryTerm(), delete.version(), versionType, origin, System.nanoTime(), + SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); return engineDelete; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 58bc89f39d8..4f4699b163b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -16,6 +16,7 @@ import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.VersionType; @@ -117,7 +118,12 @@ public final class FollowingEngine extends InternalEngine { @Override protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { - assert getMaxSeqNoOfUpdatesOrDeletes() >= seqNo : seqNo + " < " + getMaxSeqNoOfUpdatesOrDeletes(); + if (Assertions.ENABLED) { + final long localCheckpoint = getProcessedLocalCheckpoint(); + final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes(); + assert localCheckpoint < maxSeqNoOfUpdates || maxSeqNoOfUpdates >= seqNo : + "maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo; + } super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index e6fba3f5741..801a8cfc9a0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -484,7 +484,7 @@ public class FollowingEngineTests extends ESTestCase { for (Thread thread : threads) { thread.join(); } - assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); + assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); EngineTestCase.assertConsistentHistoryBetweenTranslogAndLuceneIndex(follower, createMapperService("test")); EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(follower); @@ -535,7 +535,12 @@ public class FollowingEngineTests extends ESTestCase { try (Translog.Snapshot snapshot = shuffleSnapshot(leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true))) { follower.advanceMaxSeqNoOfUpdatesOrDeletes(leader.getMaxSeqNoOfUpdatesOrDeletes()); - translogHandler.run(follower, snapshot); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + EngineTestCase.applyOperation(follower, + translogHandler.convertToEngineOp(op, randomFrom(Engine.Operation.Origin.values()))); + } + follower.syncTranslog(); } } }