parent
373c3dfa57
commit
fe2a29211b
|
@ -461,43 +461,37 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
protected void doRun() {
|
||||
setPhase(task, "routing");
|
||||
final ClusterState state = observer.observedState();
|
||||
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel());
|
||||
if (blockException != null) {
|
||||
handleBlockException(blockException);
|
||||
return;
|
||||
}
|
||||
final String concreteIndex = resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request) : request.index();
|
||||
blockException = state.blocks().indexBlockedException(indexBlockLevel(), concreteIndex);
|
||||
if (blockException != null) {
|
||||
handleBlockException(blockException);
|
||||
if (handleBlockExceptions(state)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
|
||||
final String concreteIndex = concreteIndex(state);
|
||||
resolveRequest(state.metaData(), concreteIndex, request);
|
||||
assert request.shardId() != null : "request shardId must be set in resolveRequest";
|
||||
|
||||
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
|
||||
final ShardRouting primary = indexShard.primaryShard();
|
||||
if (primary == null || primary.active() == false) {
|
||||
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), actionName, request, state.version());
|
||||
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
|
||||
return;
|
||||
}
|
||||
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
|
||||
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
|
||||
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
|
||||
final ShardRouting primary = primary(state);
|
||||
if (retryIfUnavailable(state, primary)) {
|
||||
return;
|
||||
}
|
||||
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
if (primary.currentNodeId().equals(state.nodes().localNodeId())) {
|
||||
performLocalAction(state, primary, node);
|
||||
} else {
|
||||
performRemoteAction(state, primary, node);
|
||||
}
|
||||
}
|
||||
|
||||
private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
|
||||
setPhase(task, "waiting_on_primary");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
|
||||
}
|
||||
performAction(node, transportPrimaryAction, true);
|
||||
} else {
|
||||
}
|
||||
|
||||
private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
|
||||
if (state.version() < request.routedBasedOnClusterVersion()) {
|
||||
logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion());
|
||||
retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]");
|
||||
|
@ -513,6 +507,42 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
setPhase(task, "rerouted");
|
||||
performAction(node, actionName, false);
|
||||
}
|
||||
|
||||
private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
|
||||
if (primary == null || primary.active() == false) {
|
||||
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), actionName, request, state.version());
|
||||
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
|
||||
return true;
|
||||
}
|
||||
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
|
||||
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
|
||||
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private String concreteIndex(ClusterState state) {
|
||||
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request) : request.index();
|
||||
}
|
||||
|
||||
private ShardRouting primary(ClusterState state) {
|
||||
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
|
||||
return indexShard.primaryShard();
|
||||
}
|
||||
|
||||
private boolean handleBlockExceptions(ClusterState state) {
|
||||
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel());
|
||||
if (blockException != null) {
|
||||
handleBlockException(blockException);
|
||||
return true;
|
||||
}
|
||||
blockException = state.blocks().indexBlockedException(indexBlockLevel(), concreteIndex(state));
|
||||
if (blockException != null) {
|
||||
handleBlockException(blockException);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void handleBlockException(ClusterBlockException blockException) {
|
||||
|
|
Loading…
Reference in New Issue