diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index d4464267d41..bd61fb92781 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -134,6 +134,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen } localNode = new DiscoveryNode(settings.get("name"), UUID.randomUUID().toString(), transportService.boundAddress().publishAddress(), nodeAttributes); latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build(); + nodesFD.updateNodes(latestDiscoNodes); pingService.start(); if (nodeAttributes.containsKey("zen.master") && nodeAttributes.get("zen.master").equals("false")) { @@ -260,7 +261,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen try { membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout); } catch (Exception e) { - logger.warn("Failed to send join request to master [{}], retrying...", e, masterNode); + if (e instanceof ElasticSearchException) { + logger.info("Failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticSearchException) e).getDetailedMessage()); + } else { + logger.info("Failed to send join request to master [{}], reason [{}]", masterNode, e.getMessage()); + } + if (logger.isTraceEnabled()) { + logger.trace("Detailed failed reason", e); + } // failed to send the join request, retry retry = true; continue; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index f348ca6601a..fa065517995 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen.fd; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; @@ -35,7 +36,6 @@ import java.io.IOException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.cluster.node.DiscoveryNode.*; import static org.elasticsearch.util.TimeValue.*; /** @@ -188,7 +188,7 @@ public class MasterFaultDetection extends AbstractComponent { @Override public void run() { if (masterNode != null) { final DiscoveryNode sentToNode = masterNode; - transportService.sendRequest(masterNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode()), pingRetryTimeout, + transportService.sendRequest(masterNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), sentToNode.id()), pingRetryTimeout, new BaseTransportResponseHandler<MasterPingResponseResponse>() { @Override public MasterPingResponseResponse newInstance() { return new MasterPingResponseResponse(); @@ -218,7 +218,8 @@ public class MasterFaultDetection extends AbstractComponent { // not good, failure notifyMasterFailure(sentToNode, "Failed on ping, tried [" + pingRetryCount + "] times, each with [" + pingRetryTimeout + "] timeout"); } else { - transportService.sendRequest(sentToNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode()), pingRetryTimeout, this); + // resend the request, not reschedule, rely on send timeout + transportService.sendRequest(sentToNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), sentToNode.id()), pingRetryTimeout, this); } } } @@ -237,28 +238,39 @@ public class MasterFaultDetection extends AbstractComponent { @Override public void messageReceived(MasterPingRequest request, TransportChannel channel) throws Exception { DiscoveryNodes nodes = nodesProvider.nodes(); - channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.node.id()))); + // check if we are really the same master as the one we seemed to be think we are + // this can happen if the master got "kill -9" and then another node started using the same port + if (!request.masterNodeId.equals(nodes.localNodeId())) { + throw new ElasticSearchIllegalStateException("Got ping as master with id [" + request.masterNodeId + "], but not master and no id"); + } + // send a response, and note if we are connected to the master or not + channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.nodeId))); } } private static class MasterPingRequest implements Streamable { - private DiscoveryNode node; + private String nodeId; + + private String masterNodeId; private MasterPingRequest() { } - private MasterPingRequest(DiscoveryNode node) { - this.node = node; + private MasterPingRequest(String nodeId, String masterNodeId) { + this.nodeId = nodeId; + this.masterNodeId = masterNodeId; } @Override public void readFrom(StreamInput in) throws IOException { - node = readNode(in); + nodeId = in.readUTF(); + masterNodeId = in.readUTF(); } @Override public void writeTo(StreamOutput out) throws IOException { - node.writeTo(out); + out.writeUTF(nodeId); + out.writeUTF(masterNodeId); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 024319ee842..2eb5852418d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen.fd; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.threadpool.ThreadPool; @@ -186,7 +187,7 @@ public class NodesFaultDetection extends AbstractComponent { if (!running) { return; } - transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(), pingRetryTimeout, + transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), pingRetryTimeout, new BaseTransportResponseHandler<PingResponse>() { @Override public PingResponse newInstance() { return new PingResponse(); @@ -216,8 +217,8 @@ public class NodesFaultDetection extends AbstractComponent { notifyNodeFailure(node, "Failed on ping, tried [" + pingRetryCount + "] times, each with [" + pingRetryTimeout + "] timeout"); } } else { - // resend the request - transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(), pingRetryTimeout, this); + // resend the request, not reschedule, rely on send timeout + transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), pingRetryTimeout, this); } } } @@ -240,7 +241,7 @@ public class NodesFaultDetection extends AbstractComponent { } - private class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> { + class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> { public static final String ACTION = "discovery/zen/fd/ping"; @@ -249,20 +250,34 @@ public class NodesFaultDetection extends AbstractComponent { } @Override public void messageReceived(PingRequest request, TransportChannel channel) throws Exception { + // if we are not the node we are supposed to be pinged, send an exception + // this can happen when a kill -9 is sent, and another node is started using the same port + if (!latestNodes.localNodeId().equals(request.nodeId)) { + throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]"); + } channel.sendResponse(new PingResponse()); } } - private static class PingRequest implements Streamable { + static class PingRequest implements Streamable { - private PingRequest() { + // the (assumed) node id we are pinging + private String nodeId; + + PingRequest() { + } + + PingRequest(String nodeId) { + this.nodeId = nodeId; } @Override public void readFrom(StreamInput in) throws IOException { + nodeId = in.readUTF(); } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(nodeId); } }