From 769349a9abeb077ebff1e443b66e1944093465c1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 30 Oct 2017 13:38:02 -0400 Subject: [PATCH] Add following engine implementation This commit is a first step towards a following engine implementation. Future work will build on this by using this engine to execute operations on a following engine from another engine (typically a remote leader engine) that has already assigned sequence numbers to such operations. Relates #2776 --- .../elasticsearch/xpack/ccr/CcrSettings.java | 4 +- .../ccr/index/engine/FollowingEngine.java | 49 +++ .../index/engine/FollowingEngineFactory.java | 5 +- .../index/engine/FollowingEngineTests.java | 294 ++++++++++++++++++ 4 files changed, 347 insertions(+), 5 deletions(-) create mode 100644 x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java create mode 100644 x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 5ed365531f1..f6859de7756 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -15,7 +15,7 @@ import java.util.List; /** * Container class for CCR settings. */ -final class CcrSettings { +public final class CcrSettings { // prevent construction private CcrSettings() { @@ -30,7 +30,7 @@ final class CcrSettings { /** * Index setting for a following index. */ - static final Setting CCR_FOLLOWING_INDEX_SETTING = + public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope); /** diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java new file mode 100644 index 00000000000..3a2f48ecaa5 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.index.engine; + +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.xpack.ccr.CcrSettings; + +/** + * An engine implementation for following shards. + */ +public final class FollowingEngine extends InternalEngine { + + /** + * Construct a new following engine with the specified engine configuration. + * + * @param engineConfig the engine configuration + */ + FollowingEngine(final EngineConfig engineConfig) { + super(validateEngineConfig(engineConfig)); + } + + private static EngineConfig validateEngineConfig(final EngineConfig engineConfig) { + if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(engineConfig.getIndexSettings().getSettings()) == false) { + throw new IllegalArgumentException("a following engine can not be constructed for a non-following index"); + } + return engineConfig; + } + + @Override + protected long doGenerateSeqNoForOperation(final Operation operation) { + assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + : "primary operations on following indices must have an assigned sequence number"; + return operation.seqNo(); + } + + @Override + protected boolean assertOriginPrimarySequenceNumber(final long seqNo) { + assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on following indices must have an assigned sequence number"; + return true; + } + +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineFactory.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineFactory.java index 3619a2794b9..4d20fbe522d 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineFactory.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineFactory.java @@ -13,12 +13,11 @@ import org.elasticsearch.index.engine.InternalEngine; /** * An engine factory for following engines. */ -public class FollowingEngineFactory implements EngineFactory { +public final class FollowingEngineFactory implements EngineFactory { @Override public Engine newReadWriteEngine(final EngineConfig config) { - // TODO: implement following engine - return new InternalEngine(config); + return new FollowingEngine(config); } } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java new file mode 100644 index 00000000000..ea68d96ebf9 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -0,0 +1,294 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.index.engine; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.TranslogHandler; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; + +public class FollowingEngineTests extends ESTestCase { + + private ThreadPool threadPool; + private Index index; + private ShardId shardId; + + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("following-engine-tests"); + index = new Index("index", "uuid"); + shardId = new ShardId(index, 0); + } + + public void tearDown() throws Exception { + terminate(threadPool); + super.tearDown(); + } + + public void testFollowingEngineRejectsNonFollowingIndex() throws IOException { + final Settings.Builder builder = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT); + if (randomBoolean()) { + builder.put("index.xpack.ccr.following_index", false); + } + final Settings settings = builder.build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new FollowingEngine(engineConfig)); + assertThat(e, hasToString(containsString("a following engine can not be constructed for a non-following index"))); + } + } + + public void testIndexSeqNoIsMaintained() throws IOException { + final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); + runIndexTest( + seqNo, + (followingEngine, index) -> { + final Engine.IndexResult result = followingEngine.index(index); + assertThat(result.getSeqNo(), equalTo(seqNo)); + }); + } + + public void testUnassignedSeqNoAssertionOnSeqNoForIndexOperation() throws IOException { + runIndexTest( + SequenceNumbers.UNASSIGNED_SEQ_NO, + (followingEngine, index) -> { + final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.doGenerateSeqNoForOperation(index)); + assertThat( + e, + hasToString(containsString("primary operations on following indices must have an assigned sequence number"))); + }); + } + + public void testUnassignedSeqNoAssertionOnIndex() throws IOException { + runIndexTest( + SequenceNumbers.UNASSIGNED_SEQ_NO, + (followingEngine, index) -> { + final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.index(index)); + assertThat( + e, + hasToString(containsString("primary operations on following indices must have an assigned sequence number"))); + }); + } + + public void runIndexTest( + final long seqNo, final CheckedBiConsumer consumer) throws IOException { + final Settings settings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put("index.xpack.ccr.following_index", true) + .build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine followingEngine = new FollowingEngine(engineConfig)) { + final String id = "id"; + final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); + final String type = "type"; + final Field versionField = new NumericDocValuesField("_version", 0); + final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + final ParseContext.Document document = new ParseContext.Document(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + final BytesReference source = new BytesArray(new byte[]{1}); + final ParsedDocument parsedDocument = new ParsedDocument( + versionField, + seqID, + id, + type, + "routing", + Collections.singletonList(document), + source, + XContentType.JSON, + null); + + final Engine.Index index = new Engine.Index( + new Term("_id", parsedDocument.id()), + parsedDocument, + seqNo, + (long) randomIntBetween(1, 8), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + System.currentTimeMillis(), + randomBoolean()); + + consumer.accept(followingEngine, index); + } + } + } + + public void testDeleteSeqNoIsMaintained() throws IOException { + final long seqNo = randomIntBetween(0, Integer.MAX_VALUE); + runDeleteTest( + seqNo, + (followingEngine, delete) -> { + final Engine.DeleteResult result = followingEngine.delete(delete); + assertThat(result.getSeqNo(), equalTo(seqNo)); + }); + } + + public void testUnassignedSeqNoAssertionOnSeqNoForDeleteOperation() throws IOException { + runDeleteTest( + SequenceNumbers.UNASSIGNED_SEQ_NO, + (followingEngine, delete) -> { + final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.doGenerateSeqNoForOperation(delete)); + assertThat( + e, + hasToString(containsString("primary operations on following indices must have an assigned sequence number"))); + }); + } + + public void testUnassignedSeqNoAssertionOnDelete() throws IOException { + runDeleteTest( + SequenceNumbers.UNASSIGNED_SEQ_NO, + (followingEngine, delete) -> { + final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.delete(delete)); + assertThat( + e, + hasToString(containsString("primary operations on following indices must have an assigned sequence number"))); + }); + } + + public void runDeleteTest( + final long seqNo, + final CheckedBiConsumer consumer) throws IOException { + final Settings settings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put("index.xpack.ccr.following_index", true) + .build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine followingEngine = new FollowingEngine(engineConfig)) { + final String id = "id"; + final Engine.Delete delete = new Engine.Delete( + "type", + id, + new Term("_id", id), + seqNo, + randomIntBetween(1, 8), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis()); + + consumer.accept(followingEngine, delete); + } + } + } + + private static EngineConfig engineConfig( + final ShardId shardId, + final IndexSettings indexSettings, + final ThreadPool threadPool, + final Store store, + final Logger logger, + final NamedXContentRegistry xContentRegistry) throws IOException { + final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); + final Path translogPath = createTempDir("translog"); + final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + return new EngineConfig( + EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, + shardId, + "allocation-id", + threadPool, + indexSettings, + null, + store, + newMergePolicy(), + indexWriterConfig.getAnalyzer(), + indexWriterConfig.getSimilarity(), + new CodecService(null, logger), + new Engine.EventListener() { + @Override + public void onFailedEngine(String reason, Exception e) { + + } + }, + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + randomBoolean(), + translogConfig, + TimeValue.timeValueMinutes(5), + Collections.emptyList(), + null, + new TranslogHandler( + xContentRegistry, IndexSettingsModule.newIndexSettings(shardId.getIndexName(), indexSettings.getSettings()))); + } + + private static Store createStore( + final ShardId shardId, final IndexSettings indexSettings, final Directory directory) throws IOException { + final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { + @Override + public Directory newDirectory() throws IOException { + return directory; + } + }; + return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); + } + +}