Override indexing plans
Operations from a leader shard will be indexed into the engine with the origin set to primary. The problem is here is that then we have primary semantics in the engine such as assertions about sequence numbers being unassigned, and we do not have correct semantics for out-of-order delivery of operations (as we should on a following engine, whether or not it is primary since the ordering is determined from the leader). This commit handles this by always using the replica plan for indexing into a following engine, whether or not the engine is for a primary shard. Relates #3000
This commit is contained in:
parent
913936f2b0
commit
8e0b34b507
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ccr.index.engine;
|
||||
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.InternalEngine;
|
||||
|
@ -36,22 +37,47 @@ public final class FollowingEngine extends InternalEngine {
|
|||
return engineConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexResult index(final Index index) throws IOException {
|
||||
preFlight(index);
|
||||
return super.index(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteResult delete(final Delete delete) throws IOException {
|
||||
preFlight(delete);
|
||||
return super.delete(delete);
|
||||
}
|
||||
|
||||
private void preFlight(final Operation operation) {
|
||||
if (operation.origin() == Operation.Origin.PRIMARY) {
|
||||
throw new IllegalStateException("a following engine does not accept primary operations");
|
||||
/*
|
||||
* We assert here so that this goes uncaught in unit tests and fails nodes in standalone tests (we want a harsh failure so that we
|
||||
* do not have a situation where a shard fails and is recovered elsewhere and a test subsequently passes). We throw an exception so
|
||||
* that we also prevent issues in production code.
|
||||
*/
|
||||
assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
if (operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
throw new IllegalStateException("a following engine does not accept operations without an assigned sequence number");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
|
||||
preFlight(index);
|
||||
return planIndexingAsNonPrimary(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
|
||||
preFlight(delete);
|
||||
return planDeletionAsNonPrimary(delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
|
||||
// sequence number should be set when operation origin is primary
|
||||
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on a following index must have an assigned sequence number";
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean assertNonPrimaryOrigin(final Operation operation) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
|
||||
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
|
||||
: "version [" + index.version() + "], type [" + index.versionType() + "]";
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -13,11 +13,11 @@ import org.apache.lucene.index.Term;
|
|||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.CheckedBiConsumer;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
|
@ -95,27 +95,13 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
|
||||
runIndexTest(
|
||||
seqNo,
|
||||
1,
|
||||
VersionType.EXTERNAL,
|
||||
Engine.Operation.Origin.REPLICA,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
(followingEngine, index) -> {
|
||||
final Engine.IndexResult result = followingEngine.index(index);
|
||||
assertThat(result.getSeqNo(), equalTo(seqNo));
|
||||
});
|
||||
}
|
||||
|
||||
public void testPrimaryIndexOperationsAreRejected() throws IOException {
|
||||
runIndexTest(
|
||||
randomNonNegativeLong(),
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
(followingEngine, index) -> {
|
||||
final IllegalStateException e = expectThrows(IllegalStateException.class, () -> followingEngine.index(index));
|
||||
assertThat(e, hasToString(containsString("a following engine does not accept primary operations")));
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* A following engine (whether or not it is an engine for a primary or replica shard) needs to maintain ordering semantics as the
|
||||
* operations presented to it can arrive out of order (while a leader engine that is for a primary shard dictates the order). This test
|
||||
|
@ -144,8 +130,6 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
|
||||
public void runIndexTest(
|
||||
final long seqNo,
|
||||
final long version,
|
||||
final VersionType versionType,
|
||||
final Engine.Operation.Origin origin,
|
||||
final CheckedBiConsumer<FollowingEngine, Engine.Index, IOException> consumer) throws IOException {
|
||||
final Settings settings =
|
||||
|
@ -183,16 +167,25 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
XContentType.JSON,
|
||||
null);
|
||||
|
||||
final long version;
|
||||
final long autoGeneratedIdTimestamp;
|
||||
if (randomBoolean()) {
|
||||
version = 1;
|
||||
autoGeneratedIdTimestamp = System.currentTimeMillis();
|
||||
} else {
|
||||
version = randomNonNegativeLong();
|
||||
autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
|
||||
}
|
||||
final Engine.Index index = new Engine.Index(
|
||||
new Term("_id", parsedDocument.id()),
|
||||
parsedDocument,
|
||||
seqNo,
|
||||
(long) randomIntBetween(1, 8),
|
||||
version,
|
||||
versionType,
|
||||
VersionType.EXTERNAL,
|
||||
origin,
|
||||
System.currentTimeMillis(),
|
||||
System.currentTimeMillis(),
|
||||
autoGeneratedIdTimestamp,
|
||||
randomBoolean());
|
||||
|
||||
consumer.accept(followingEngine, index);
|
||||
|
@ -204,31 +197,15 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
|
||||
runDeleteTest(
|
||||
seqNo,
|
||||
1,
|
||||
VersionType.EXTERNAL,
|
||||
Engine.Operation.Origin.REPLICA,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
(followingEngine, delete) -> {
|
||||
final Engine.DeleteResult result = followingEngine.delete(delete);
|
||||
assertThat(result.getSeqNo(), equalTo(seqNo));
|
||||
});
|
||||
}
|
||||
|
||||
public void testDeleteIndexOperationsAreRejected() throws IOException {
|
||||
runDeleteTest(
|
||||
randomNonNegativeLong(),
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
(followingEngine, delete) -> {
|
||||
final IllegalStateException e = expectThrows(IllegalStateException.class, () -> followingEngine.delete(delete));
|
||||
assertThat(e, hasToString(containsString("a following engine does not accept primary operations")));
|
||||
});
|
||||
}
|
||||
|
||||
public void runDeleteTest(
|
||||
final long seqNo,
|
||||
final long version,
|
||||
final VersionType versionType,
|
||||
final Engine.Operation.Origin origin,
|
||||
final CheckedBiConsumer<FollowingEngine, Engine.Delete, IOException> consumer) throws IOException {
|
||||
final Settings settings =
|
||||
|
@ -250,8 +227,8 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
new Term("_id", id),
|
||||
seqNo,
|
||||
randomIntBetween(1, 8),
|
||||
version,
|
||||
versionType,
|
||||
randomNonNegativeLong(),
|
||||
VersionType.EXTERNAL,
|
||||
origin,
|
||||
System.currentTimeMillis());
|
||||
|
||||
|
|
Loading…
Reference in New Issue