Wait for new master when failing shard
This commit handles the situation when we are failing a shard and either no master is known, or the known master left while failing the shard. We handle this situation by waiting for a new master to be reelected, and then sending the shard failed request to the new master.
This commit is contained in:
parent
120210d024
commit
703ff2b5ab
|
@ -31,6 +31,8 @@ import org.elasticsearch.action.support.TransportActions;
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.MasterNodeChangePredicate;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
|
@ -42,6 +44,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -76,6 +79,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -882,14 +886,27 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
if (ignoreReplicaException(exp)) {
|
||||
onReplicaFailure(nodeId, exp);
|
||||
} else {
|
||||
logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
|
||||
shardStateAction.shardFailed(clusterService.state(), shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
|
||||
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
|
||||
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
|
||||
ReplicationFailedShardStateListener listener = new ReplicationFailedShardStateListener(observer, shard, exp, message, nodeId);
|
||||
shardFailed(observer.observedState(), shard, exp, message, listener);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void shardFailed(ClusterState clusterState, ShardRouting shard, TransportException exp, String message, ShardStateAction.Listener listener) {
|
||||
logger.warn("{} {}", exp, shardId, message);
|
||||
shardStateAction.shardFailed(
|
||||
clusterState,
|
||||
shard,
|
||||
indexUUID,
|
||||
message,
|
||||
exp,
|
||||
shardFailedTimeout,
|
||||
listener);
|
||||
}
|
||||
|
||||
void onReplicaFailure(String nodeId, @Nullable Throwable e) {
|
||||
// Only version conflict should be ignored from being put into the _shards header?
|
||||
|
@ -957,30 +974,86 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
}
|
||||
|
||||
public class ReplicationFailedShardStateListener implements ShardStateAction.Listener {
|
||||
private final ClusterStateObserver observer;
|
||||
private final ShardRouting shard;
|
||||
private final TransportException exp;
|
||||
private final String message;
|
||||
private final String nodeId;
|
||||
private Throwable failure;
|
||||
|
||||
public ReplicationFailedShardStateListener(String nodeId, Throwable failure) {
|
||||
public ReplicationFailedShardStateListener(
|
||||
ClusterStateObserver observer, ShardRouting shard, TransportException exp,
|
||||
String message,
|
||||
String nodeId) {
|
||||
this.observer = observer;
|
||||
this.shard = shard;
|
||||
this.exp = exp;
|
||||
this.message = message;
|
||||
this.nodeId = nodeId;
|
||||
this.failure = failure;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
onReplicaFailure(nodeId, failure);
|
||||
// TODO: validate the cluster state and retry?
|
||||
onReplicaFailure(nodeId, exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onShardFailedNoMaster() {
|
||||
onReplicaFailure(nodeId, failure);
|
||||
waitForNewMasterAndRetry();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
|
||||
if (e instanceof ReceiveTimeoutTransportException) {
|
||||
logger.trace("timeout sending shard failure to master [{}]", e, master);
|
||||
// TODO: recheck the cluster state and retry indefinitely?
|
||||
onReplicaFailure(nodeId, exp);
|
||||
} else if (e.getCause() instanceof NotMasterException) {
|
||||
waitForNewMasterAndRetry();
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForNewMasterAndRetry() {
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
retry(state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
logger.error("{} node closed while handling failed shard [{}]", exp, shard.shardId(), shard);
|
||||
forceFinishAsFailed(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
// we wait indefinitely for a new master
|
||||
assert false;
|
||||
}
|
||||
}, MasterNodeChangePredicate.INSTANCE);
|
||||
}
|
||||
|
||||
private void retry(ClusterState clusterState) {
|
||||
if (!isFailed(shard, clusterState)) {
|
||||
shardFailed(clusterState, shard, exp, message, this);
|
||||
} else {
|
||||
// the shard has already been failed, so just signal replica failure
|
||||
onReplicaFailure(nodeId, exp);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isFailed(ShardRouting shardRouting, ClusterState clusterState) {
|
||||
// verify that the shard we requested to fail is no longer in the cluster state
|
||||
RoutingNode routingNode = clusterState.getRoutingNodes().node(shardRouting.currentNodeId());
|
||||
if (routingNode == null) {
|
||||
// the node left
|
||||
return true;
|
||||
} else {
|
||||
// the same shard is gone
|
||||
ShardRouting sr = routingNode.get(shardRouting.getId());
|
||||
return sr == null || !sr.isSameAllocation(shardRouting);
|
||||
}
|
||||
onReplicaFailure(nodeId, failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue