Always use primary term from primary to index docs on replica (#47583)

Ensures that we always use the primary term established by the primary to index docs on the
replica. Makes the logic around replication less brittle by always using the operation primary
term on the replica that is coming from the primary.
This commit is contained in:
Yannick Welsch 2019-11-13 11:42:13 +01:00
parent 999d66fc87
commit 2dfa0133d5
11 changed files with 96 additions and 75 deletions

View File

@ -176,6 +176,7 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
private final Exception cause;
private final RestStatus status;
private final long seqNo;
private final long term;
private final boolean aborted;
public static ConstructingObjectParser<Failure, Void> PARSER =
@ -198,33 +199,36 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
/**
* For write failures before operation was assigned a sequence number.
*
* use @{link {@link #Failure(String, String, String, Exception, long)}}
* use @{link {@link #Failure(String, String, String, Exception, long, long)}}
* to record operation sequence no with failure
*/
public Failure(String index, String type, String id, Exception cause) {
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbers.UNASSIGNED_SEQ_NO, false);
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM, false);
}
public Failure(String index, String type, String id, Exception cause, boolean aborted) {
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbers.UNASSIGNED_SEQ_NO, aborted);
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM, aborted);
}
public Failure(String index, String type, String id, Exception cause, RestStatus status) {
this(index, type, id, cause, status, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
this(index, type, id, cause, status, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, false);
}
/** For write failures after operation was assigned a sequence number. */
public Failure(String index, String type, String id, Exception cause, long seqNo) {
this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo, false);
public Failure(String index, String type, String id, Exception cause, long seqNo, long term) {
this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo, term, false);
}
public Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo, boolean aborted) {
private Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo, long term, boolean aborted) {
this.index = index;
this.type = type;
this.id = id;
this.cause = cause;
this.status = status;
this.seqNo = seqNo;
this.term = term;
this.aborted = aborted;
}
@ -237,35 +241,26 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
id = in.readOptionalString();
cause = in.readException();
status = ExceptionsHelper.status(cause);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
seqNo = in.readZLong();
seqNo = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
term = in.readVLong();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (supportsAbortedFlag(in.getVersion())) {
aborted = in.readBoolean();
} else {
aborted = false;
term = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}
aborted = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(getIndex());
out.writeString(getType());
out.writeOptionalString(getId());
out.writeException(getCause());
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(getSeqNo());
out.writeString(index);
out.writeString(type);
out.writeOptionalString(id);
out.writeException(cause);
out.writeZLong(seqNo);
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeVLong(term);
}
if (supportsAbortedFlag(out.getVersion())) {
out.writeBoolean(aborted);
}
}
private static boolean supportsAbortedFlag(Version version) {
// The "aborted" flag was not in 6.0.0-beta2
return version.after(Version.V_6_0_0_beta2);
out.writeBoolean(aborted);
}
/**
@ -319,6 +314,15 @@ public class BulkItemResponse implements Writeable, StatusToXContentObject {
return seqNo;
}
/**
* The operation primary term of the primary
* NOTE: {@link SequenceNumbers#UNASSIGNED_PRIMARY_TERM}
* indicates primary term was not assigned by primary
*/
public long getTerm() {
return term;
}
/**
* Whether this failure is the result of an <em>abort</em>.
* If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}.

View File

@ -274,7 +274,7 @@ class BulkPrimaryExecutionContext {
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(),
result.getFailure(), result.getSeqNo()));
result.getFailure(), result.getSeqNo(), result.getTerm()));
break;
default:
throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType());

View File

@ -413,7 +413,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
if (response.getFailure().getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue; // ignore replication as we didn't generate a sequence number for this request.
}
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), response.getFailure().getMessage());
final long primaryTerm;
if (response.getFailure().getTerm() == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
// primary is on older version, just take the current primary term
primaryTerm = replica.getOperationPrimaryTerm();
} else {
primaryTerm = response.getFailure().getTerm();
}
operationResult = replica.markSeqNoAsNoop(response.getFailure().getSeqNo(), primaryTerm,
response.getFailure().getMessage());
} else {
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
@ -437,13 +446,13 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
final ShardId shardId = replica.shardId();
final SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), indexRequest.type(), indexRequest.id(),
indexRequest.source(), indexRequest.getContentType(), indexRequest.routing());
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(), indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
break;
case DELETE:
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
deleteRequest.type(), deleteRequest.id());
result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(), deleteRequest.type(), deleteRequest.id());
break;
default:
assert false : "Unexpected request operation type on replica: " + docWriteRequest + ";primary result: " + primaryResponse;

View File

@ -728,10 +728,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
ifPrimaryTerm, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
}
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
boolean isRetry, SourceToParse sourceToParse)
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long opPrimaryTerm, long version, long autoGeneratedTimeStamp,
boolean isRetry, SourceToParse sourceToParse)
throws IOException {
return applyIndexOperation(getEngine(), seqNo, getOperationPrimaryTerm(), version, null, UNASSIGNED_SEQ_NO, 0,
return applyIndexOperation(getEngine(), seqNo, opPrimaryTerm, version, null, UNASSIGNED_SEQ_NO, 0,
autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse);
}
@ -817,8 +817,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return result;
}
public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException {
return markSeqNoAsNoop(getEngine(), seqNo, getOperationPrimaryTerm(), reason, Engine.Operation.Origin.REPLICA);
public Engine.NoOpResult markSeqNoAsNoop(long seqNo, long opPrimaryTerm, String reason) throws IOException {
return markSeqNoAsNoop(getEngine(), seqNo, opPrimaryTerm, reason, Engine.Operation.Origin.REPLICA);
}
private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason,
@ -855,9 +855,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
ifSeqNo, ifPrimaryTerm, Engine.Operation.Origin.PRIMARY);
}
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String type, String id)
throws IOException {
return applyDeleteOperation(
getEngine(), seqNo, getOperationPrimaryTerm(), version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA);
getEngine(), seqNo, opPrimaryTerm, version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA);
}
private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id,

View File

@ -747,15 +747,14 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
DocWriteRequest.OpType.DELETE,
DocWriteRequest.OpType.INDEX
),
new BulkItemResponse.Failure("index", "_doc", "1",
exception, 1L)
new BulkItemResponse.Failure("index", "_doc", "1", exception, 1L, 1L)
));
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
itemRequests[0] = itemRequest;
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shard.shardId(), RefreshPolicy.NONE, itemRequests);
TransportShardBulkAction.performOnReplica(bulkShardRequest, shard);
verify(shard, times(1)).markSeqNoAsNoop(1, exception.toString());
verify(shard, times(1)).markSeqNoAsNoop(1, 1, exception.toString());
closeShards(shard);
}

View File

@ -195,6 +195,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
// slip the extra document into the replica
remainingReplica.applyIndexOperationOnReplica(
remainingReplica.getLocalCheckpoint() + 1,
remainingReplica.getOperationPrimaryTerm(),
1,
randomNonNegativeLong(),
false,

View File

@ -936,7 +936,7 @@ public class IndexShardTests extends IndexShardTestCase {
long localCheckPoint = indexShard.getLastKnownGlobalCheckpoint() + randomInt(100);
// advance local checkpoint
for (int i = 0; i <= localCheckPoint; i++) {
indexShard.markSeqNoAsNoop(i, "dummy doc");
indexShard.markSeqNoAsNoop(i, indexShard.getOperationPrimaryTerm(), "dummy doc");
}
indexShard.sync(); // advance local checkpoint
newGlobalCheckPoint = randomIntBetween((int) indexShard.getLastKnownGlobalCheckpoint(), (int) localCheckPoint);
@ -2016,18 +2016,19 @@ public class IndexShardTests extends IndexShardTestCase {
* - If flush and then recover from the existing store, delete #1 will be removed while index #0 is still retained and replayed.
*/
final IndexShard shard = newStartedShard(false);
long primaryTerm = shard.getOperationPrimaryTerm();
shard.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete
shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id");
shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "_doc", "id");
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
shard.applyIndexOperationOnReplica(0, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(shard.shardId().getIndexName(), "_doc", "id", new BytesArray("{}"), XContentType.JSON));
shard.applyIndexOperationOnReplica(3, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
shard.applyIndexOperationOnReplica(3, primaryTerm, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(shard.shardId().getIndexName(), "_doc", "id-3", new BytesArray("{}"), XContentType.JSON));
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
shard.applyIndexOperationOnReplica(2, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
shard.applyIndexOperationOnReplica(2, primaryTerm, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(shard.shardId().getIndexName(), "_doc", "id-2", new BytesArray("{}"), XContentType.JSON));
shard.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
shard.applyIndexOperationOnReplica(5, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(shard.shardId().getIndexName(), "_doc", "id-5", new BytesArray("{}"), XContentType.JSON));
shard.sync(); // advance local checkpoint
@ -2148,7 +2149,7 @@ public class IndexShardTests extends IndexShardTestCase {
updateMappings(otherShard, shard.indexSettings().getIndexMetaData());
SourceToParse sourceToParse = new SourceToParse(shard.shardId().getIndexName(), "_doc", "1",
new BytesArray("{}"), XContentType.JSON);
otherShard.applyIndexOperationOnReplica(1, 1,
otherShard.applyIndexOperationOnReplica(1, otherShard.getOperationPrimaryTerm(), 1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
final ShardRouting primaryShardRouting = shard.routingEntry();
@ -2266,17 +2267,17 @@ public class IndexShardTests extends IndexShardTestCase {
final IndexShard shard = newStartedShard(false);
final String indexName = shard.shardId().getIndexName();
// Index #0, index #1
shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
shard.applyIndexOperationOnReplica(0, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "_doc", "doc-0", new BytesArray("{}"), XContentType.JSON));
flushShard(shard);
shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here.
shard.applyIndexOperationOnReplica(1, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
shard.applyIndexOperationOnReplica(1, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON));
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
shard.getEngine().rollTranslogGeneration();
shard.markSeqNoAsNoop(1, "test");
shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
shard.markSeqNoAsNoop(1, primaryTerm, "test");
shard.applyIndexOperationOnReplica(2, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON));
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2"));
@ -3305,7 +3306,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
SourceToParse sourceToParse = new SourceToParse(indexShard.shardId().getIndexName(), "_doc", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, 1,
indexShard.applyIndexOperationOnReplica(i, indexShard.getOperationPrimaryTerm(), 1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
if (!gap && i == localCheckpoint + 1) {
localCheckpoint++;
@ -4118,8 +4119,8 @@ public class IndexShardTests extends IndexShardTestCase {
if (rarely()) {
seqNo++; // create gaps in sequence numbers
}
shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(shard.shardId.getIndexName(), "_doc", Long.toString(i), new BytesArray("{}"), XContentType.JSON));
shard.applyIndexOperationOnReplica(seqNo, shard.getOperationPrimaryTerm(), 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false, new SourceToParse(shard.shardId.getIndexName(), "_doc", Long.toString(i), new BytesArray("{}"), XContentType.JSON));
shard.updateGlobalCheckpointOnReplica(shard.getLocalCheckpoint(), "test");
if (randomInt(100) < 10) {
shard.flush(new FlushRequest());

View File

@ -142,8 +142,9 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
List<Long> seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList());
Randomness.shuffle(seqNos);
for (long seqNo : seqNos) {
shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse(
shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON));
shard.applyIndexOperationOnReplica(seqNo, 1, shard.getOperationPrimaryTerm(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false, new SourceToParse(shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(),
new BytesArray("{}"), XContentType.JSON));
if (randomInt(100) < 5) {
shard.flush(new FlushRequest().waitIfOngoing(true));
}

View File

@ -138,26 +138,27 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
// create out of order delete and index op on replica
final IndexShard orgReplica = shards.getReplicas().get(0);
final String indexName = orgReplica.shardId().getIndexName();
final long primaryTerm = orgReplica.getOperationPrimaryTerm();
// delete #1
orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
orgReplica.applyDeleteOperationOnReplica(1, primaryTerm, 2, "type", "id");
getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation
// index #0
orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
orgReplica.applyIndexOperationOnReplica(0, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
// index #3
orgReplica.applyIndexOperationOnReplica(3, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
orgReplica.applyIndexOperationOnReplica(3, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON));
// Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
// index #2
orgReplica.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
orgReplica.applyIndexOperationOnReplica(2, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON));
orgReplica.sync(); // advance local checkpoint
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
// index #5 -> force NoOp #4.
orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
orgReplica.applyIndexOperationOnReplica(5, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON));
final int translogOps;
@ -203,26 +204,27 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
// create out of order delete and index op on replica
final IndexShard orgReplica = shards.getReplicas().get(0);
final String indexName = orgReplica.shardId().getIndexName();
final long primaryTerm = orgReplica.getOperationPrimaryTerm();
// delete #1
orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
orgReplica.applyDeleteOperationOnReplica(1, primaryTerm, 2, "type", "id");
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
// index #0
orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
orgReplica.applyIndexOperationOnReplica(0, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
// index #3
orgReplica.applyIndexOperationOnReplica(3, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
orgReplica.applyIndexOperationOnReplica(3, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON));
// Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
// index #2
orgReplica.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
orgReplica.applyIndexOperationOnReplica(2, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON));
orgReplica.sync(); // advance local checkpoint
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
// index #5 -> force NoOp #4.
orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
orgReplica.applyIndexOperationOnReplica(5, primaryTerm, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
new SourceToParse(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON));
if (randomBoolean()) {

View File

@ -751,7 +751,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
} else {
final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;
shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates
result = shard.applyIndexOperationOnReplica(seqNo, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
result = shard.applyIndexOperationOnReplica(seqNo, shard.getOperationPrimaryTerm(), 0,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
shard.sync(); // advance local checkpoint
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new TransportReplicationAction.RetryOnReplicaException(shard.shardId,
@ -778,7 +779,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
} else {
final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;
shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates
result = shard.applyDeleteOperationOnReplica(seqNo, 0L, type, id);
result = shard.applyDeleteOperationOnReplica(seqNo, shard.getOperationPrimaryTerm(), 0L, type, id);
shard.sync(); // advance local checkpoint
}
return result;

View File

@ -61,13 +61,15 @@ public class FollowEngineIndexShardTests extends IndexShardTestCase {
final String id = Long.toString(i);
SourceToParse sourceToParse = new SourceToParse(indexShard.shardId().getIndexName(), "_doc", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(++seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
indexShard.applyIndexOperationOnReplica(++seqNo, indexShard.getOperationPrimaryTerm(), 1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
}
long seqNoBeforeGap = seqNo;
seqNo += 8;
SourceToParse sourceToParse = new SourceToParse(indexShard.shardId().getIndexName(), "_doc", "9",
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
indexShard.applyIndexOperationOnReplica(seqNo, indexShard.getOperationPrimaryTerm(), 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false, sourceToParse);
// promote the replica to primary:
final ShardRouting replicaRouting = indexShard.routingEntry();