Maintain order of operations semantics on follower
A following engine even for a primary shard needs to maintain order of operations semantics as if it were behaving like a replica. That is, rather than assuming that the order of operations presented to the engine is the de facto order of operations as is the case for a leader engine for a primary shard, a following engine must behave like all replicas behave which is that they resolve order of operations based on sequence numbers. This commit causes this to be the case for following engines. Relates #2931
This commit is contained in:
parent
bf4e18fdfc
commit
913936f2b0
|
@ -5,6 +5,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ccr.index.engine;
|
package org.elasticsearch.xpack.ccr.index.engine;
|
||||||
|
|
||||||
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.engine.InternalEngine;
|
import org.elasticsearch.index.engine.InternalEngine;
|
||||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||||
|
@ -12,6 +13,8 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An engine implementation for following shards.
|
* An engine implementation for following shards.
|
||||||
*/
|
*/
|
||||||
|
@ -34,16 +37,21 @@ public final class FollowingEngine extends InternalEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long doGenerateSeqNoForOperation(final Operation operation) {
|
public IndexResult index(final Index index) throws IOException {
|
||||||
assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
|
preFlight(index);
|
||||||
: "primary operations on following indices must have an assigned sequence number";
|
return super.index(index);
|
||||||
return operation.seqNo();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean assertOriginPrimarySequenceNumber(final long seqNo) {
|
public DeleteResult delete(final Delete delete) throws IOException {
|
||||||
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on following indices must have an assigned sequence number";
|
preFlight(delete);
|
||||||
return true;
|
return super.delete(delete);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void preFlight(final Operation operation) {
|
||||||
|
if (operation.origin() == Operation.Origin.PRIMARY) {
|
||||||
|
throw new IllegalStateException("a following engine does not accept primary operations");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,12 +29,12 @@ import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.index.codec.CodecService;
|
import org.elasticsearch.index.codec.CodecService;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
|
import org.elasticsearch.index.engine.EngineTestCase;
|
||||||
import org.elasticsearch.index.engine.TranslogHandler;
|
import org.elasticsearch.index.engine.TranslogHandler;
|
||||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.ParseContext;
|
import org.elasticsearch.index.mapper.ParseContext;
|
||||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.DirectoryService;
|
import org.elasticsearch.index.store.DirectoryService;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
|
@ -48,7 +48,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.function.Supplier;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -95,36 +95,59 @@ public class FollowingEngineTests extends ESTestCase {
|
||||||
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
|
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
|
||||||
runIndexTest(
|
runIndexTest(
|
||||||
seqNo,
|
seqNo,
|
||||||
|
1,
|
||||||
|
VersionType.EXTERNAL,
|
||||||
|
Engine.Operation.Origin.REPLICA,
|
||||||
(followingEngine, index) -> {
|
(followingEngine, index) -> {
|
||||||
final Engine.IndexResult result = followingEngine.index(index);
|
final Engine.IndexResult result = followingEngine.index(index);
|
||||||
assertThat(result.getSeqNo(), equalTo(seqNo));
|
assertThat(result.getSeqNo(), equalTo(seqNo));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUnassignedSeqNoAssertionOnSeqNoForIndexOperation() throws IOException {
|
public void testPrimaryIndexOperationsAreRejected() throws IOException {
|
||||||
runIndexTest(
|
runIndexTest(
|
||||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
randomNonNegativeLong(),
|
||||||
|
Versions.MATCH_ANY,
|
||||||
|
VersionType.INTERNAL,
|
||||||
|
Engine.Operation.Origin.PRIMARY,
|
||||||
(followingEngine, index) -> {
|
(followingEngine, index) -> {
|
||||||
final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.doGenerateSeqNoForOperation(index));
|
final IllegalStateException e = expectThrows(IllegalStateException.class, () -> followingEngine.index(index));
|
||||||
assertThat(
|
assertThat(e, hasToString(containsString("a following engine does not accept primary operations")));
|
||||||
e,
|
|
||||||
hasToString(containsString("primary operations on following indices must have an assigned sequence number")));
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUnassignedSeqNoAssertionOnIndex() throws IOException {
|
/*
|
||||||
runIndexTest(
|
* A following engine (whether or not it is an engine for a primary or replica shard) needs to maintain ordering semantics as the
|
||||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
* operations presented to it can arrive out of order (while a leader engine that is for a primary shard dictates the order). This test
|
||||||
(followingEngine, index) -> {
|
* ensures that these semantics are maintained.
|
||||||
final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.index(index));
|
*/
|
||||||
assertThat(
|
public void testOutOfOrderDocuments() throws IOException {
|
||||||
e,
|
final Settings settings =
|
||||||
hasToString(containsString("primary operations on following indices must have an assigned sequence number")));
|
Settings.builder()
|
||||||
});
|
.put("index.number_of_shards", 1)
|
||||||
|
.put("index.number_of_replicas", 0)
|
||||||
|
.put("index.version.created", Version.CURRENT)
|
||||||
|
.put("index.xpack.ccr.following_index", true)
|
||||||
|
.build();
|
||||||
|
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
|
||||||
|
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
|
||||||
|
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
|
||||||
|
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
|
||||||
|
try (FollowingEngine followingEngine = new FollowingEngine(engineConfig)) {
|
||||||
|
final VersionType versionType =
|
||||||
|
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE);
|
||||||
|
final List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(true, versionType, false, 2, 2, 20);
|
||||||
|
EngineTestCase.assertOpsOnReplica(ops, followingEngine, true, logger);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void runIndexTest(
|
public void runIndexTest(
|
||||||
final long seqNo, final CheckedBiConsumer<FollowingEngine, Engine.Index, IOException> consumer) throws IOException {
|
final long seqNo,
|
||||||
|
final long version,
|
||||||
|
final VersionType versionType,
|
||||||
|
final Engine.Operation.Origin origin,
|
||||||
|
final CheckedBiConsumer<FollowingEngine, Engine.Index, IOException> consumer) throws IOException {
|
||||||
final Settings settings =
|
final Settings settings =
|
||||||
Settings.builder()
|
Settings.builder()
|
||||||
.put("index.number_of_shards", 1)
|
.put("index.number_of_shards", 1)
|
||||||
|
@ -165,9 +188,9 @@ public class FollowingEngineTests extends ESTestCase {
|
||||||
parsedDocument,
|
parsedDocument,
|
||||||
seqNo,
|
seqNo,
|
||||||
(long) randomIntBetween(1, 8),
|
(long) randomIntBetween(1, 8),
|
||||||
Versions.MATCH_ANY,
|
version,
|
||||||
VersionType.INTERNAL,
|
versionType,
|
||||||
Engine.Operation.Origin.PRIMARY,
|
origin,
|
||||||
System.currentTimeMillis(),
|
System.currentTimeMillis(),
|
||||||
System.currentTimeMillis(),
|
System.currentTimeMillis(),
|
||||||
randomBoolean());
|
randomBoolean());
|
||||||
|
@ -181,36 +204,32 @@ public class FollowingEngineTests extends ESTestCase {
|
||||||
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
|
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
|
||||||
runDeleteTest(
|
runDeleteTest(
|
||||||
seqNo,
|
seqNo,
|
||||||
|
1,
|
||||||
|
VersionType.EXTERNAL,
|
||||||
|
Engine.Operation.Origin.REPLICA,
|
||||||
(followingEngine, delete) -> {
|
(followingEngine, delete) -> {
|
||||||
final Engine.DeleteResult result = followingEngine.delete(delete);
|
final Engine.DeleteResult result = followingEngine.delete(delete);
|
||||||
assertThat(result.getSeqNo(), equalTo(seqNo));
|
assertThat(result.getSeqNo(), equalTo(seqNo));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUnassignedSeqNoAssertionOnSeqNoForDeleteOperation() throws IOException {
|
public void testDeleteIndexOperationsAreRejected() throws IOException {
|
||||||
runDeleteTest(
|
runDeleteTest(
|
||||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
randomNonNegativeLong(),
|
||||||
|
Versions.MATCH_ANY,
|
||||||
|
VersionType.INTERNAL,
|
||||||
|
Engine.Operation.Origin.PRIMARY,
|
||||||
(followingEngine, delete) -> {
|
(followingEngine, delete) -> {
|
||||||
final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.doGenerateSeqNoForOperation(delete));
|
final IllegalStateException e = expectThrows(IllegalStateException.class, () -> followingEngine.delete(delete));
|
||||||
assertThat(
|
assertThat(e, hasToString(containsString("a following engine does not accept primary operations")));
|
||||||
e,
|
|
||||||
hasToString(containsString("primary operations on following indices must have an assigned sequence number")));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testUnassignedSeqNoAssertionOnDelete() throws IOException {
|
|
||||||
runDeleteTest(
|
|
||||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
|
||||||
(followingEngine, delete) -> {
|
|
||||||
final AssertionError e = expectThrows(AssertionError.class, () -> followingEngine.delete(delete));
|
|
||||||
assertThat(
|
|
||||||
e,
|
|
||||||
hasToString(containsString("primary operations on following indices must have an assigned sequence number")));
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void runDeleteTest(
|
public void runDeleteTest(
|
||||||
final long seqNo,
|
final long seqNo,
|
||||||
|
final long version,
|
||||||
|
final VersionType versionType,
|
||||||
|
final Engine.Operation.Origin origin,
|
||||||
final CheckedBiConsumer<FollowingEngine, Engine.Delete, IOException> consumer) throws IOException {
|
final CheckedBiConsumer<FollowingEngine, Engine.Delete, IOException> consumer) throws IOException {
|
||||||
final Settings settings =
|
final Settings settings =
|
||||||
Settings.builder()
|
Settings.builder()
|
||||||
|
@ -231,9 +250,9 @@ public class FollowingEngineTests extends ESTestCase {
|
||||||
new Term("_id", id),
|
new Term("_id", id),
|
||||||
seqNo,
|
seqNo,
|
||||||
randomIntBetween(1, 8),
|
randomIntBetween(1, 8),
|
||||||
Versions.MATCH_ANY,
|
version,
|
||||||
VersionType.INTERNAL,
|
versionType,
|
||||||
Engine.Operation.Origin.PRIMARY,
|
origin,
|
||||||
System.currentTimeMillis());
|
System.currentTimeMillis());
|
||||||
|
|
||||||
consumer.accept(followingEngine, delete);
|
consumer.accept(followingEngine, delete);
|
||||||
|
|
Loading…
Reference in New Issue