When sending shard start/failed message due to a cluster state change, use the master indicated in the new state rather than current
This commit also adds extra protection in other cases against a master node being de-elected and thus being null. Closes #6189
This commit is contained in:
parent
84593f0d7c
commit
1f28cd0ba8
|
@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
|
@ -76,37 +76,51 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason) throws ElasticsearchException {
|
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason) throws ElasticsearchException {
|
||||||
|
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
|
||||||
|
if (masterNode == null) {
|
||||||
|
logger.debug("can't send shard failed for {}. no master known.", shardRouting);
|
||||||
|
}
|
||||||
|
shardFailed(shardRouting, indexUUID, reason, masterNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) throws ElasticsearchException {
|
||||||
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
|
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
|
||||||
logger.warn("{} sending failed shard for {}", shardRouting.shardId(), shardRoutingEntry);
|
logger.warn("{} sending failed shard for {}", shardRouting.shardId(), shardRoutingEntry);
|
||||||
DiscoveryNodes nodes = clusterService.state().nodes();
|
if (clusterService.localNode().equals(masterNode)) {
|
||||||
if (nodes.localNodeMaster()) {
|
|
||||||
innerShardFailed(shardRoutingEntry);
|
innerShardFailed(shardRoutingEntry);
|
||||||
} else {
|
} else {
|
||||||
transportService.sendRequest(clusterService.state().nodes().masterNode(),
|
transportService.sendRequest(clusterService.state().nodes().masterNode(),
|
||||||
ShardFailedTransportHandler.ACTION, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
ShardFailedTransportHandler.ACTION, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||||
@Override
|
@Override
|
||||||
public void handleException(TransportException exp) {
|
public void handleException(TransportException exp) {
|
||||||
logger.warn("failed to send failed shard to [{}]", exp, clusterService.state().nodes().masterNode());
|
logger.warn("failed to send failed shard to {}", exp, masterNode);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticsearchException {
|
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticsearchException {
|
||||||
|
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
|
||||||
|
if (masterNode == null) {
|
||||||
|
logger.debug("can't send shard started for {}. no master known.", shardRouting);
|
||||||
|
}
|
||||||
|
shardStarted(shardRouting, indexUUID, reason, masterNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) throws ElasticsearchException {
|
||||||
|
|
||||||
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
|
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
|
||||||
|
|
||||||
logger.debug("sending shard started for {}", shardRoutingEntry);
|
logger.debug("sending shard started for {}", shardRoutingEntry);
|
||||||
|
|
||||||
DiscoveryNodes nodes = clusterService.state().nodes();
|
if (clusterService.localNode().equals(masterNode)) {
|
||||||
if (nodes.localNodeMaster()) {
|
|
||||||
innerShardStarted(shardRoutingEntry);
|
innerShardStarted(shardRoutingEntry);
|
||||||
} else {
|
} else {
|
||||||
transportService.sendRequest(clusterService.state().nodes().masterNode(),
|
transportService.sendRequest(masterNode,
|
||||||
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
ShardStartedTransportHandler.ACTION, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||||
@Override
|
@Override
|
||||||
public void handleException(TransportException exp) {
|
public void handleException(TransportException exp) {
|
||||||
logger.warn("failed to send shard started to [{}]", exp, clusterService.state().nodes().masterNode());
|
logger.warn("failed to send shard started to [{}]", exp, masterNode);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -517,8 +517,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
// the master thinks we are started, but we don't have this shard at all, mark it as failed
|
// the master thinks we are started, but we don't have this shard at all, mark it as failed
|
||||||
logger.warn("[{}][{}] master [{}] marked shard as started, but shard has not been created, mark shard as failed", shardRouting.index(), shardId, nodes.masterNode());
|
logger.warn("[{}][{}] master [{}] marked shard as started, but shard has not been created, mark shard as failed", shardRouting.index(), shardId, nodes.masterNode());
|
||||||
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
|
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
|
||||||
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(),
|
if (nodes.masterNode() != null) {
|
||||||
"master " + nodes.masterNode() + " marked shard as started, but shard has not been created, mark shard as failed");
|
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(),
|
||||||
|
"master " + nodes.masterNode() + " marked shard as started, but shard has not been created, mark shard as failed",
|
||||||
|
nodes.masterNode()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -606,11 +610,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
// for master to confirm a shard started message (either master failover, or a cluster event before
|
// for master to confirm a shard started message (either master failover, or a cluster event before
|
||||||
// we managed to tell the master we started), mark us as started
|
// we managed to tell the master we started), mark us as started
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started",
|
logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}",
|
||||||
indexShard.shardId(), indexShard.state());
|
indexShard.shardId(), indexShard.state(), nodes.masterNode());
|
||||||
|
}
|
||||||
|
if (nodes.masterNode() != null) {
|
||||||
|
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(),
|
||||||
|
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started",
|
||||||
|
nodes.masterNode());
|
||||||
}
|
}
|
||||||
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(),
|
|
||||||
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started");
|
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
if (indexShard.ignoreRecoveryAttempt()) {
|
if (indexShard.ignoreRecoveryAttempt()) {
|
||||||
|
@ -676,7 +683,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
logger.warn("[{}][{}] failed to remove shard after failed creation", e1, shardRouting.index(), shardRouting.id());
|
logger.warn("[{}][{}] failed to remove shard after failed creation", e1, shardRouting.index(), shardRouting.id());
|
||||||
}
|
}
|
||||||
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
|
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
|
||||||
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to create shard, message [" + detailedMessage(e) + "]");
|
if (nodes.masterNode() != null) {
|
||||||
|
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to create shard, message [" + detailedMessage(e) + "]",
|
||||||
|
nodes.masterNode()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
logger.debug("can't send shard failed for {} as there is no current master", shardRouting.shardId());
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue