Inline PrimaryPhase#doRun

Relates #16725
This commit is contained in:
Jason Tedor 2016-02-12 21:44:21 -05:00
parent 001c2c6282
commit 373c3dfa57
1 changed files with 26 additions and 17 deletions

View File

@ -677,27 +677,36 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
// closed in finishAsFailed(e) in the case of error
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
if (indexShardReference.isRelocated() == false) {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
}
ReplicationPhase replicationPhase = new ReplicationPhase(task, primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
finishAndMoveToReplication(replicationPhase);
executeLocally();
} else {
// delegate primary phase to relocation target
// it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
// phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
final ShardRouting primary = indexShardReference.routingEntry();
indexShardReference.close();
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
DiscoveryNode relocatingNode = state.nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions,
TransportChannelResponseHandler.responseHandler(logger, TransportReplicationAction.this::newResponseInstance, channel,
"rerouting indexing to target primary " + primary));
executeRemotely();
}
}
private void executeLocally() throws Exception {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
}
ReplicationPhase replicationPhase = new ReplicationPhase(task, primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
finishAndMoveToReplication(replicationPhase);
}
private void executeRemotely() {
// delegate primary phase to relocation target
// it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
// phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
final ShardRouting primary = indexShardReference.routingEntry();
indexShardReference.close();
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
DiscoveryNode relocatingNode = state.nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions,
TransportChannelResponseHandler.responseHandler(logger, TransportReplicationAction.this::newResponseInstance, channel,
"rerouting indexing to target primary " + primary));
}
/**
* checks whether we can perform a write based on the write consistency setting
* returns **null* if OK to proceed, or a string describing the reason to stop