Add following engine implementation

This commit is a first step towards a following engine
implementation. Future work will build on this by using this engine to
execute operations on a following engine from another engine (typically
a remote leader engine) that has already assigned sequence numbers to
such operations.

Relates #2776
This commit is contained in:
Jason Tedor 2017-10-30 13:38:02 -04:00 committed by GitHub
parent 41c3dc91c1
commit 769349a9ab
4 changed files with 347 additions and 5 deletions

View File

@ -15,7 +15,7 @@ import java.util.List;
/**
* Container class for CCR settings.
*/
final class CcrSettings {
public final class CcrSettings {
// prevent construction
private CcrSettings() {
@ -30,7 +30,7 @@ final class CcrSettings {
/**
* Index setting for a following index.
*/
static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope);
/**

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.index.engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.xpack.ccr.CcrSettings;
/**
* An engine implementation for following shards.
*/
public final class FollowingEngine extends InternalEngine {
/**
* Construct a new following engine with the specified engine configuration.
*
* @param engineConfig the engine configuration
*/
FollowingEngine(final EngineConfig engineConfig) {
super(validateEngineConfig(engineConfig));
}
private static EngineConfig validateEngineConfig(final EngineConfig engineConfig) {
if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(engineConfig.getIndexSettings().getSettings()) == false) {
throw new IllegalArgumentException("a following engine can not be constructed for a non-following index");
}
return engineConfig;
}
@Override
protected long doGenerateSeqNoForOperation(final Operation operation) {
assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations on following indices must have an assigned sequence number";
return operation.seqNo();
}
@Override
protected boolean assertOriginPrimarySequenceNumber(final long seqNo) {
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on following indices must have an assigned sequence number";
return true;
}
}

View File

@ -13,12 +13,11 @@ import org.elasticsearch.index.engine.InternalEngine;
/**
* An engine factory for following engines.
*/
public class FollowingEngineFactory implements EngineFactory {
public final class FollowingEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(final EngineConfig config) {
// TODO: implement following engine
return new InternalEngine(config);
return new FollowingEngine(config);
}
}

View File

@ -0,0 +1,294 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.TranslogHandler;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
public class FollowingEngineTests extends ESTestCase {
private ThreadPool threadPool;
private Index index;
private ShardId shardId;
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("following-engine-tests");
index = new Index("index", "uuid");
shardId = new ShardId(index, 0);
}
public void tearDown() throws Exception {
terminate(threadPool);
super.tearDown();
}
public void testFollowingEngineRejectsNonFollowingIndex() throws IOException {
final Settings.Builder builder =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT);
if (randomBoolean()) {
builder.put("index.xpack.ccr.following_index", false);
}
final Settings settings = builder.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new FollowingEngine(engineConfig));
assertThat(e, hasToString(containsString("a following engine can not be constructed for a non-following index")));
}
}
public void testIndexSeqNoIsMaintained() throws IOException {
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
runIndexTest(
seqNo,
(followingEngine, index) -> {
final Engine.IndexResult result = followingEngine.index(index);
assertThat(result.getSeqNo(), equalTo(seqNo));
});
}
public void testUnassignedSeqNoAssertionOnSeqNoForIndexOperation() throws IOException {
runIndexTest(
SequenceNumbers.UNASSIGNED_SEQ_NO,
(followingEngine, index) -> {
final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.doGenerateSeqNoForOperation(index));
assertThat(
e,
hasToString(containsString("primary operations on following indices must have an assigned sequence number")));
});
}
public void testUnassignedSeqNoAssertionOnIndex() throws IOException {
runIndexTest(
SequenceNumbers.UNASSIGNED_SEQ_NO,
(followingEngine, index) -> {
final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.index(index));
assertThat(
e,
hasToString(containsString("primary operations on following indices must have an assigned sequence number")));
});
}
public void runIndexTest(
final long seqNo, final CheckedBiConsumer<FollowingEngine, Engine.Index, IOException> consumer) throws IOException {
final Settings settings =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = new FollowingEngine(engineConfig)) {
final String id = "id";
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
final String type = "type";
final Field versionField = new NumericDocValuesField("_version", 0);
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
final ParseContext.Document document = new ParseContext.Document();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
final BytesReference source = new BytesArray(new byte[]{1});
final ParsedDocument parsedDocument = new ParsedDocument(
versionField,
seqID,
id,
type,
"routing",
Collections.singletonList(document),
source,
XContentType.JSON,
null);
final Engine.Index index = new Engine.Index(
new Term("_id", parsedDocument.id()),
parsedDocument,
seqNo,
(long) randomIntBetween(1, 8),
Versions.MATCH_ANY,
VersionType.INTERNAL,
Engine.Operation.Origin.PRIMARY,
System.currentTimeMillis(),
System.currentTimeMillis(),
randomBoolean());
consumer.accept(followingEngine, index);
}
}
}
public void testDeleteSeqNoIsMaintained() throws IOException {
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
runDeleteTest(
seqNo,
(followingEngine, delete) -> {
final Engine.DeleteResult result = followingEngine.delete(delete);
assertThat(result.getSeqNo(), equalTo(seqNo));
});
}
public void testUnassignedSeqNoAssertionOnSeqNoForDeleteOperation() throws IOException {
runDeleteTest(
SequenceNumbers.UNASSIGNED_SEQ_NO,
(followingEngine, delete) -> {
final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.doGenerateSeqNoForOperation(delete));
assertThat(
e,
hasToString(containsString("primary operations on following indices must have an assigned sequence number")));
});
}
public void testUnassignedSeqNoAssertionOnDelete() throws IOException {
runDeleteTest(
SequenceNumbers.UNASSIGNED_SEQ_NO,
(followingEngine, delete) -> {
final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.delete(delete));
assertThat(
e,
hasToString(containsString("primary operations on following indices must have an assigned sequence number")));
});
}
public void runDeleteTest(
final long seqNo,
final CheckedBiConsumer<FollowingEngine, Engine.Delete, IOException> consumer) throws IOException {
final Settings settings =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = new FollowingEngine(engineConfig)) {
final String id = "id";
final Engine.Delete delete = new Engine.Delete(
"type",
id,
new Term("_id", id),
seqNo,
randomIntBetween(1, 8),
Versions.MATCH_ANY,
VersionType.INTERNAL,
Engine.Operation.Origin.PRIMARY,
System.currentTimeMillis());
consumer.accept(followingEngine, delete);
}
}
}
private static EngineConfig engineConfig(
final ShardId shardId,
final IndexSettings indexSettings,
final ThreadPool threadPool,
final Store store,
final Logger logger,
final NamedXContentRegistry xContentRegistry) throws IOException {
final IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
final Path translogPath = createTempDir("translog");
final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
return new EngineConfig(
EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG,
shardId,
"allocation-id",
threadPool,
indexSettings,
null,
store,
newMergePolicy(),
indexWriterConfig.getAnalyzer(),
indexWriterConfig.getSimilarity(),
new CodecService(null, logger),
new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, Exception e) {
}
},
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
randomBoolean(),
translogConfig,
TimeValue.timeValueMinutes(5),
Collections.emptyList(),
null,
new TranslogHandler(
xContentRegistry, IndexSettingsModule.newIndexSettings(shardId.getIndexName(), indexSettings.getSettings())));
}
private static Store createStore(
final ShardId shardId, final IndexSettings indexSettings, final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return directory;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
}
}