Zen Discovery: ungraceful shutdown of the master and start of replacement node might cause the cluster not to elect a new master, closes #200.

This commit is contained in:
kimchy 2010-06-01 22:29:38 +03:00
parent ec662cc019
commit a7ad295f63
3 changed files with 51 additions and 16 deletions

View File

@ -134,6 +134,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
localNode = new DiscoveryNode(settings.get("name"), UUID.randomUUID().toString(), transportService.boundAddress().publishAddress(), nodeAttributes);
latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
nodesFD.updateNodes(latestDiscoNodes);
pingService.start();
if (nodeAttributes.containsKey("zen.master") && nodeAttributes.get("zen.master").equals("false")) {
@ -260,7 +261,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
try {
membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout);
} catch (Exception e) {
logger.warn("Failed to send join request to master [{}], retrying...", e, masterNode);
if (e instanceof ElasticSearchException) {
logger.info("Failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticSearchException) e).getDetailedMessage());
} else {
logger.info("Failed to send join request to master [{}], reason [{}]", masterNode, e.getMessage());
}
if (logger.isTraceEnabled()) {
logger.trace("Detailed failed reason", e);
}
// failed to send the join request, retry
retry = true;
continue;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
@ -35,7 +36,6 @@ import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
import static org.elasticsearch.util.TimeValue.*;
/**
@ -188,7 +188,7 @@ public class MasterFaultDetection extends AbstractComponent {
@Override public void run() {
if (masterNode != null) {
final DiscoveryNode sentToNode = masterNode;
transportService.sendRequest(masterNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode()), pingRetryTimeout,
transportService.sendRequest(masterNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), sentToNode.id()), pingRetryTimeout,
new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
@ -218,7 +218,8 @@ public class MasterFaultDetection extends AbstractComponent {
// not good, failure
notifyMasterFailure(sentToNode, "Failed on ping, tried [" + pingRetryCount + "] times, each with [" + pingRetryTimeout + "] timeout");
} else {
transportService.sendRequest(sentToNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode()), pingRetryTimeout, this);
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(sentToNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), sentToNode.id()), pingRetryTimeout, this);
}
}
}
@ -237,28 +238,39 @@ public class MasterFaultDetection extends AbstractComponent {
@Override public void messageReceived(MasterPingRequest request, TransportChannel channel) throws Exception {
DiscoveryNodes nodes = nodesProvider.nodes();
channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.node.id())));
// check if we are really the same master as the one we seemed to be think we are
// this can happen if the master got "kill -9" and then another node started using the same port
if (!request.masterNodeId.equals(nodes.localNodeId())) {
throw new ElasticSearchIllegalStateException("Got ping as master with id [" + request.masterNodeId + "], but not master and no id");
}
// send a response, and note if we are connected to the master or not
channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.nodeId)));
}
}
private static class MasterPingRequest implements Streamable {
private DiscoveryNode node;
private String nodeId;
private String masterNodeId;
private MasterPingRequest() {
}
private MasterPingRequest(DiscoveryNode node) {
this.node = node;
private MasterPingRequest(String nodeId, String masterNodeId) {
this.nodeId = nodeId;
this.masterNodeId = masterNodeId;
}
@Override public void readFrom(StreamInput in) throws IOException {
node = readNode(in);
nodeId = in.readUTF();
masterNodeId = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeUTF(nodeId);
out.writeUTF(masterNodeId);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.threadpool.ThreadPool;
@ -186,7 +187,7 @@ public class NodesFaultDetection extends AbstractComponent {
if (!running) {
return;
}
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(), pingRetryTimeout,
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), pingRetryTimeout,
new BaseTransportResponseHandler<PingResponse>() {
@Override public PingResponse newInstance() {
return new PingResponse();
@ -216,8 +217,8 @@ public class NodesFaultDetection extends AbstractComponent {
notifyNodeFailure(node, "Failed on ping, tried [" + pingRetryCount + "] times, each with [" + pingRetryTimeout + "] timeout");
}
} else {
// resend the request
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(), pingRetryTimeout, this);
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), pingRetryTimeout, this);
}
}
}
@ -240,7 +241,7 @@ public class NodesFaultDetection extends AbstractComponent {
}
private class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {
class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {
public static final String ACTION = "discovery/zen/fd/ping";
@ -249,20 +250,34 @@ public class NodesFaultDetection extends AbstractComponent {
}
@Override public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
// if we are not the node we are supposed to be pinged, send an exception
// this can happen when a kill -9 is sent, and another node is started using the same port
if (!latestNodes.localNodeId().equals(request.nodeId)) {
throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");
}
channel.sendResponse(new PingResponse());
}
}
private static class PingRequest implements Streamable {
static class PingRequest implements Streamable {
private PingRequest() {
// the (assumed) node id we are pinging
private String nodeId;
PingRequest() {
}
PingRequest(String nodeId) {
this.nodeId = nodeId;
}
@Override public void readFrom(StreamInput in) throws IOException {
nodeId = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(nodeId);
}
}