CCR: Expose the operation primary term

Relates #32442
This commit is contained in:
Nhat Nguyen 2018-08-06 10:55:37 -04:00
parent 5881322b3f
commit c394eb9ae9
4 changed files with 11 additions and 6 deletions

View File

@ -380,6 +380,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return this.pendingPrimaryTerm; return this.pendingPrimaryTerm;
} }
/** Returns the primary term that is currently being used to assign to operations */
public long getOperationPrimaryTerm() {
return this.operationPrimaryTerm;
}
/** /**
* Returns the latest cluster routing entry received with this shard. * Returns the latest cluster routing entry received with this shard.
*/ */

View File

@ -78,7 +78,7 @@ public class TransportBulkShardOperationsAction
index.type(), index.type(),
index.id(), index.id(),
index.seqNo(), index.seqNo(),
primary.getPrimaryTerm(), primary.getOperationPrimaryTerm(),
index.version(), index.version(),
BytesReference.toBytes(index.source()), BytesReference.toBytes(index.source()),
index.routing(), index.routing(),
@ -91,12 +91,12 @@ public class TransportBulkShardOperationsAction
delete.id(), delete.id(),
delete.uid(), delete.uid(),
delete.seqNo(), delete.seqNo(),
primary.getPrimaryTerm(), primary.getOperationPrimaryTerm(),
delete.version()); delete.version());
break; break;
case NO_OP: case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation; final Translog.NoOp noOp = (Translog.NoOp) operation;
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getPrimaryTerm(), noOp.reason()); operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getOperationPrimaryTerm(), noOp.reason());
break; break;
default: default:
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]"); throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");

View File

@ -65,12 +65,12 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat(snapshot.totalOperations(), equalTo(operations.size()));
Translog.Operation operation; Translog.Operation operation;
while ((operation = snapshot.next()) != null) { while ((operation = snapshot.next()) != null) {
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm())); assertThat(operation.primaryTerm(), equalTo(followerPrimary.getOperationPrimaryTerm()));
} }
} }
for (final Translog.Operation operation : result.replicaRequest().getOperations()) { for (final Translog.Operation operation : result.replicaRequest().getOperations()) {
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm())); assertThat(operation.primaryTerm(), equalTo(followerPrimary.getOperationPrimaryTerm()));
} }
closeShards(followerPrimary); closeShards(followerPrimary);

View File

@ -57,7 +57,7 @@ public class FollowEngineIndexShardTests extends IndexShardTestCase {
true, true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED,
replicaRouting.allocationId()); replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, indexShard.updateShardState(primaryRouting, indexShard.getOperationPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()), 0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet()); new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());