Use correct primary term for replicating NOOPs (#25128)

NOOPs should be, same as for indexing operations, written on the replica using the original operation term instead of the current term of the replica.
This commit is contained in:
Yannick Welsch 2017-06-08 14:20:26 +02:00 committed by GitHub
parent 326fa33d4e
commit cd57395c98
7 changed files with 17 additions and 11 deletions

View File

@ -477,7 +477,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = executeFailureNoOpOnReplica(failure, replica);
operationResult = executeFailureNoOpOnReplica(failure, primaryTerm, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
@ -673,9 +673,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return replica.delete(delete);
}
private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, IndexShard replica) throws IOException {
final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOp(
primaryFailure.getSeqNo(), primaryFailure.getMessage());
private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, long primaryTerm,
IndexShard replica) throws IOException {
final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOpOnReplica(
primaryFailure.getSeqNo(), primaryTerm, primaryFailure.getMessage());
return replica.markSeqNoAsNoOp(noOp);
}

View File

@ -628,10 +628,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return result;
}
public Engine.NoOp prepareMarkingSeqNoAsNoOp(long seqNo, String reason) {
public Engine.NoOp prepareMarkingSeqNoAsNoOpOnReplica(long seqNo, long opPrimaryTerm, String reason) {
verifyReplicationTarget();
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
long startTime = System.nanoTime();
return new Engine.NoOp(seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason);
return new Engine.NoOp(seqNo, opPrimaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason);
}
public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException {

View File

@ -541,11 +541,13 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
itemRequests[0] = itemRequest;
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shard.shardId(), RefreshPolicy.NONE, itemRequests);
bulkShardRequest.primaryTerm(randomIntBetween(1, (int) shard.getPrimaryTerm()));
TransportShardBulkAction.performOnReplica(bulkShardRequest, shard);
ArgumentCaptor<Engine.NoOp> noOp = ArgumentCaptor.forClass(Engine.NoOp.class);
verify(shard, times(1)).markSeqNoAsNoOp(noOp.capture());
final Engine.NoOp noOpValue = noOp.getValue();
assertThat(noOpValue.seqNo(), equalTo(1L));
assertThat(noOpValue.primaryTerm(), equalTo(bulkShardRequest.primaryTerm()));
assertThat(noOpValue.reason(), containsString(failureMessage));
closeShards(shard);
}

View File

@ -105,7 +105,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
.build();
IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName())
.settings(settings)
.primaryTerm(0, 1);
.primaryTerm(0, randomIntBetween(1, 100));
for (Map.Entry<String, String> typeMapping : mappings.entrySet()) {
metaData.putMapping(typeMapping.getKey(), typeMapping.getValue());
}

View File

@ -238,7 +238,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
.source("{}", XContentType.JSON)
);
assertTrue(response.isFailed());
assertNoOpTranslogOperationForDocumentFailure(shards, 1, failureMessage);
assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPrimaryTerm(), failureMessage);
shards.assertAllEqual(0);
// add some replicas
@ -252,7 +252,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
.source("{}", XContentType.JSON)
);
assertTrue(response.isFailed());
assertNoOpTranslogOperationForDocumentFailure(shards, 2, failureMessage);
assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPrimaryTerm(), failureMessage);
shards.assertAllEqual(0);
}
}
@ -323,6 +323,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
private static void assertNoOpTranslogOperationForDocumentFailure(
Iterable<IndexShard> replicationGroup,
int expectedOperation,
long expectedPrimaryTerm,
String failureMessage) throws IOException {
for (IndexShard indexShard : replicationGroup) {
try(Translog.View view = indexShard.acquireTranslogView()) {
@ -333,6 +334,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
do {
assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP));
assertThat(op.seqNo(), equalTo(expectedSeqNo));
assertThat(op.primaryTerm(), equalTo(expectedPrimaryTerm));
assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage));
op = snapshot.next();
expectedSeqNo++;

View File

@ -1281,7 +1281,7 @@ public class IndexShardTests extends IndexShardTestCase {
while((operation = snapshot.next()) != null) {
if (operation.opType() == Translog.Operation.Type.NO_OP) {
numNoops++;
assertEquals(1, operation.primaryTerm());
assertEquals(newShard.getPrimaryTerm(), operation.primaryTerm());
assertEquals(0, operation.seqNo());
}
}

View File

@ -159,7 +159,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
.build();
IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName())
.settings(settings)
.primaryTerm(0, 1);
.primaryTerm(0, randomIntBetween(1, 100));
return newShard(shardRouting, metaData.build(), listeners);
}