Do not start a recovery process if the primary shard is currently allocated on a node which is not part of the cluster state
If a source node disconnect during recover, the target node will respond by canceling the recovery. Typically the master will respond by removing the disconnected node from the cluster state, promoting another shard to become primary. This is sent it to all nodes and the target node will start recovering from the new primary. However, if the drop of a node caused the node count to go bellow min_master_node, the master will step down and will not promote shard immediately. When a new master is elected we may publish a new cluster state (who's point is to notify of a new master) which is not yet updated. This caused the node to start a recovery to a non existent node. Before we aborted the recovery without cleaning up the shard, causing subsequent correct cluster states to be ignored. We should not start the recovery process but wait for another cluster state to come in. Closes #6024
This commit is contained in:
parent
b55d8ed2e3
commit
694bf287d6
|
@ -367,7 +367,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
if (sendRefreshMapping) {
|
||||
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
|
||||
new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, indexMetaData.uuid(),
|
||||
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()));
|
||||
typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId())
|
||||
);
|
||||
}
|
||||
}
|
||||
// go over and remove mappings
|
||||
|
@ -489,7 +490,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
|
||||
RoutingTable routingTable = event.state().routingTable();
|
||||
RoutingNodes.RoutingNodeIterator routingNode = event.state().readOnlyRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());;
|
||||
RoutingNodes.RoutingNodeIterator routingNode = event.state().readOnlyRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
|
||||
|
||||
if (routingNode == null) {
|
||||
failedShards.clear();
|
||||
return;
|
||||
|
@ -551,7 +553,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
|
||||
private void cleanFailedShards(final ClusterChangedEvent event) {
|
||||
RoutingTable routingTable = event.state().routingTable();
|
||||
RoutingNodes.RoutingNodeIterator routingNode = event.state().readOnlyRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());;
|
||||
RoutingNodes.RoutingNodeIterator routingNode = event.state().readOnlyRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
|
||||
|
||||
if (routingNode == null) {
|
||||
failedShards.clear();
|
||||
return;
|
||||
|
@ -611,10 +614,42 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
return;
|
||||
} else {
|
||||
if (indexShard.ignoreRecoveryAttempt()) {
|
||||
logger.trace("ignoring recovery instruction for an existing shard {} (shard state: [{}])", indexShard.shardId(), indexShard.state());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// figure out where to recover from (node or disk, in which case sourceNode is null)
|
||||
DiscoveryNode sourceNode = null;
|
||||
if (!shardRouting.primary()) {
|
||||
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
|
||||
for (ShardRouting entry : shardRoutingTable) {
|
||||
if (entry.primary() && entry.started()) {
|
||||
// only recover from started primary, if we can't find one, we will do it next round
|
||||
sourceNode = nodes.get(entry.currentNodeId());
|
||||
if (sourceNode == null) {
|
||||
logger.trace("can't recover replica because primary shard {} is assigned to an unknown node. ignoring.", entry);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (sourceNode == null) {
|
||||
logger.trace("can't recover replica for {} because a primary shard can not be found. ignoring.", shardRouting.shardId());
|
||||
return;
|
||||
}
|
||||
|
||||
} else if (shardRouting.relocatingNodeId() != null) {
|
||||
sourceNode = nodes.get(shardRouting.relocatingNodeId());
|
||||
if (sourceNode == null) {
|
||||
logger.trace("can't recover from remote primary shard {} because it is assigned to an unknown node [{}]. ignoring.", shardRouting.shardId(), shardRouting.relocatingNodeId());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// if there is no shard, create it
|
||||
if (!indexService.hasShard(shardId)) {
|
||||
if (failedShards.containsKey(shardRouting.shardId())) {
|
||||
|
@ -650,63 +685,45 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
if (indexShard.ignoreRecoveryAttempt()) {
|
||||
// we are already recovering (we can get to this state since the cluster event can happen several
|
||||
// times while we recover)
|
||||
logger.trace("ignoring recovery instruction for shard {} (shard state: [{}])", indexShard.shardId(), indexShard.state());
|
||||
return;
|
||||
}
|
||||
|
||||
if (sourceNode != null) {
|
||||
try {
|
||||
// we don't mark this one as relocated at the end.
|
||||
// For primaries: requests in any case are routed to both when its relocating and that way we handle
|
||||
// the edge case where its mark as relocated, and we might need to roll it back...
|
||||
// For replicas: we are recovering a backup from a primary
|
||||
|
||||
if (!shardRouting.primary()) {
|
||||
// recovery from primary
|
||||
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
|
||||
for (ShardRouting entry : shardRoutingTable) {
|
||||
if (entry.primary() && entry.started()) {
|
||||
// only recover from started primary, if we can't find one, we will do it next round
|
||||
final DiscoveryNode sourceNode = nodes.get(entry.currentNodeId());
|
||||
try {
|
||||
// we are recovering a backup from a primary, so no need to mark it as relocated
|
||||
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
|
||||
false, indexShard.store().list(), RecoveryState.Type.REPLICA);
|
||||
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
|
||||
} catch (Throwable e) {
|
||||
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
|
||||
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
|
||||
false, indexShard.store().list(), type);
|
||||
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
|
||||
|
||||
} catch (Throwable e) {
|
||||
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
|
||||
}
|
||||
} else {
|
||||
if (shardRouting.relocatingNodeId() == null) {
|
||||
// we are the first primary, recover from the gateway
|
||||
// if its post api allocation, the index should exists
|
||||
boolean indexShouldExists = indexShardRouting.primaryAllocatedPostApi();
|
||||
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
|
||||
shardGatewayService.recover(indexShouldExists, new IndexShardGatewayService.RecoveryListener() {
|
||||
@Override
|
||||
public void onRecoveryDone() {
|
||||
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery from gateway");
|
||||
}
|
||||
// we are the first primary, recover from the gateway
|
||||
// if its post api allocation, the index should exists
|
||||
boolean indexShouldExists = indexShardRouting.primaryAllocatedPostApi();
|
||||
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
|
||||
shardGatewayService.recover(indexShouldExists, new IndexShardGatewayService.RecoveryListener() {
|
||||
@Override
|
||||
public void onRecoveryDone() {
|
||||
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery from gateway");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onIgnoreRecovery(String reason) {
|
||||
}
|
||||
@Override
|
||||
public void onIgnoreRecovery(String reason) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRecoveryFailed(IndexShardGatewayRecoveryException e) {
|
||||
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// relocating primaries, recovery from the relocating shard
|
||||
final DiscoveryNode sourceNode = nodes.get(shardRouting.relocatingNodeId());
|
||||
try {
|
||||
// we don't mark this one as relocated at the end, requests in any case are routed to both when its relocating
|
||||
// and that way we handle the edge case where its mark as relocated, and we might need to roll it back...
|
||||
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
|
||||
false, indexShard.store().list(), RecoveryState.Type.RELOCATION);
|
||||
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
|
||||
} catch (Throwable e) {
|
||||
@Override
|
||||
public void onRecoveryFailed(IndexShardGatewayRecoveryException e) {
|
||||
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -187,10 +187,9 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus recoveryStatus, final RecoveryListener listener) {
|
||||
if (request.sourceNode() == null) {
|
||||
listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update");
|
||||
return;
|
||||
}
|
||||
|
||||
assert request.sourceNode() != null : "can't do a recovery without a source node";
|
||||
|
||||
final InternalIndexShard shard = recoveryStatus.indexShard;
|
||||
if (shard == null) {
|
||||
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
|
||||
|
|
|
@ -161,7 +161,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@TestLogging("cluster.service:TRACE,discovery:TRACE")
|
||||
@TestLogging("cluster.service:TRACE,discovery:TRACE,indices.cluster:TRACE")
|
||||
public void multipleNodesShutdownNonMasterNodes() throws Exception {
|
||||
Settings settings = settingsBuilder()
|
||||
.put("discovery.type", "zen")
|
||||
|
|
Loading…
Reference in New Issue