diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 3a2f48ecaa5..fab0b3e1789 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.index.engine; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.seqno.SeqNoStats; @@ -12,6 +13,8 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.xpack.ccr.CcrSettings; +import java.io.IOException; + /** * An engine implementation for following shards. */ @@ -34,16 +37,21 @@ public final class FollowingEngine extends InternalEngine { } @Override - protected long doGenerateSeqNoForOperation(final Operation operation) { - assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO - : "primary operations on following indices must have an assigned sequence number"; - return operation.seqNo(); + public IndexResult index(final Index index) throws IOException { + preFlight(index); + return super.index(index); } @Override - protected boolean assertOriginPrimarySequenceNumber(final long seqNo) { - assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on following indices must have an assigned sequence number"; - return true; + public DeleteResult delete(final Delete delete) throws IOException { + preFlight(delete); + return super.delete(delete); + } + + private void preFlight(final Operation operation) { + if (operation.origin() == Operation.Origin.PRIMARY) { + throw new IllegalStateException("a following engine does not accept primary operations"); + } } } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index ea68d96ebf9..4ee32baab70 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -29,12 +29,12 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.TranslogHandler; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; @@ -48,7 +48,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; import java.util.Collections; -import java.util.function.Supplier; +import java.util.List; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -95,36 +95,59 @@ public class FollowingEngineTests extends ESTestCase { final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); runIndexTest( seqNo, + 1, + VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, (followingEngine, index) -> { final Engine.IndexResult result = followingEngine.index(index); assertThat(result.getSeqNo(), equalTo(seqNo)); }); } - public void testUnassignedSeqNoAssertionOnSeqNoForIndexOperation() throws IOException { + public void testPrimaryIndexOperationsAreRejected() throws IOException { runIndexTest( - SequenceNumbers.UNASSIGNED_SEQ_NO, + randomNonNegativeLong(), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, (followingEngine, index) -> { - final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.doGenerateSeqNoForOperation(index)); - assertThat( - e, - hasToString(containsString("primary operations on following indices must have an assigned sequence number"))); + final IllegalStateException e = expectThrows(IllegalStateException.class, () -> followingEngine.index(index)); + assertThat(e, hasToString(containsString("a following engine does not accept primary operations"))); }); } - public void testUnassignedSeqNoAssertionOnIndex() throws IOException { - runIndexTest( - SequenceNumbers.UNASSIGNED_SEQ_NO, - (followingEngine, index) -> { - final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.index(index)); - assertThat( - e, - hasToString(containsString("primary operations on following indices must have an assigned sequence number"))); - }); + /* + * A following engine (whether or not it is an engine for a primary or replica shard) needs to maintain ordering semantics as the + * operations presented to it can arrive out of order (while a leader engine that is for a primary shard dictates the order). This test + * ensures that these semantics are maintained. + */ + public void testOutOfOrderDocuments() throws IOException { + final Settings settings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put("index.xpack.ccr.following_index", true) + .build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine followingEngine = new FollowingEngine(engineConfig)) { + final VersionType versionType = + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE); + final List ops = EngineTestCase.generateSingleDocHistory(true, versionType, false, 2, 2, 20); + EngineTestCase.assertOpsOnReplica(ops, followingEngine, true, logger); + } + } } public void runIndexTest( - final long seqNo, final CheckedBiConsumer consumer) throws IOException { + final long seqNo, + final long version, + final VersionType versionType, + final Engine.Operation.Origin origin, + final CheckedBiConsumer consumer) throws IOException { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) @@ -165,9 +188,9 @@ public class FollowingEngineTests extends ESTestCase { parsedDocument, seqNo, (long) randomIntBetween(1, 8), - Versions.MATCH_ANY, - VersionType.INTERNAL, - Engine.Operation.Origin.PRIMARY, + version, + versionType, + origin, System.currentTimeMillis(), System.currentTimeMillis(), randomBoolean()); @@ -181,36 +204,32 @@ public class FollowingEngineTests extends ESTestCase { final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); runDeleteTest( seqNo, + 1, + VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, (followingEngine, delete) -> { final Engine.DeleteResult result = followingEngine.delete(delete); assertThat(result.getSeqNo(), equalTo(seqNo)); }); } - public void testUnassignedSeqNoAssertionOnSeqNoForDeleteOperation() throws IOException { + public void testDeleteIndexOperationsAreRejected() throws IOException { runDeleteTest( - SequenceNumbers.UNASSIGNED_SEQ_NO, + randomNonNegativeLong(), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, (followingEngine, delete) -> { - final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.doGenerateSeqNoForOperation(delete)); - assertThat( - e, - hasToString(containsString("primary operations on following indices must have an assigned sequence number"))); - }); - } - - public void testUnassignedSeqNoAssertionOnDelete() throws IOException { - runDeleteTest( - SequenceNumbers.UNASSIGNED_SEQ_NO, - (followingEngine, delete) -> { - final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.delete(delete)); - assertThat( - e, - hasToString(containsString("primary operations on following indices must have an assigned sequence number"))); + final IllegalStateException e = expectThrows(IllegalStateException.class, () -> followingEngine.delete(delete)); + assertThat(e, hasToString(containsString("a following engine does not accept primary operations"))); }); } public void runDeleteTest( final long seqNo, + final long version, + final VersionType versionType, + final Engine.Operation.Origin origin, final CheckedBiConsumer consumer) throws IOException { final Settings settings = Settings.builder() @@ -231,9 +250,9 @@ public class FollowingEngineTests extends ESTestCase { new Term("_id", id), seqNo, randomIntBetween(1, 8), - Versions.MATCH_ANY, - VersionType.INTERNAL, - Engine.Operation.Origin.PRIMARY, + version, + versionType, + origin, System.currentTimeMillis()); consumer.accept(followingEngine, delete);