[Discovery] Master fault detection and nodes fault detection should take cluster name into account.

Both master fault detection and nodes fault detection request should also send the cluster name, so that on the receiving side the handling of these requests can be failed with an error. This error can be caught on the sending side and for master fault detection the node can fail the master locally and for nodes fault detection the node can be failed.

Note this validation will most likely never fail in a production cluster, but in during automated tests where cluster / nodes are created and destroyed very frequently.
This commit is contained in:
Martijn van Groningen 2014-07-28 16:04:25 +02:00 committed by Boaz Leskes
parent 364374dd03
commit 4b8456e954
4 changed files with 74 additions and 16 deletions

View File

@ -166,10 +166,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.electMaster = new ElectMasterService(settings);
nodeSettingsService.addListener(new ApplySettings());
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD.addListener(new NodeFailureListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);

View File

@ -20,6 +20,8 @@
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
@ -58,6 +60,8 @@ public class MasterFaultDetection extends AbstractComponent {
private final DiscoveryNodesProvider nodesProvider;
private final ClusterName clusterName;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
@ -85,11 +89,13 @@ public class MasterFaultDetection extends AbstractComponent {
private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();
public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, DiscoveryNodesProvider nodesProvider) {
public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
DiscoveryNodesProvider nodesProvider, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.clusterName = clusterName;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
@ -270,8 +276,10 @@ public class MasterFaultDetection extends AbstractComponent {
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
return;
}
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<MasterPingResponseResponse>() {
final MasterPingRequest request = new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id(), clusterName);
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override
public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
@ -328,7 +336,7 @@ public class MasterFaultDetection extends AbstractComponent {
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this);
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
}
}
}
@ -351,6 +359,14 @@ public class MasterFaultDetection extends AbstractComponent {
}
static class NotMasterException extends ElasticsearchIllegalStateException {
NotMasterException(String msg) {
super(msg);
}
NotMasterException() {
}
@Override
public Throwable fillInStackTrace() {
return null;
@ -379,6 +395,12 @@ public class MasterFaultDetection extends AbstractComponent {
if (!request.masterNodeId.equals(nodes.localNodeId())) {
throw new NotMasterException();
}
if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]", request.clusterName, clusterName);
throw new NotMasterException("master fault detection ping request is targeted for a different [" + request.clusterName + "] cluster then us [" + clusterName + "]");
}
// if we are no longer master, fail...
if (!nodes.localNodeMaster()) {
throw new NoLongerMasterException();
@ -402,13 +424,15 @@ public class MasterFaultDetection extends AbstractComponent {
private String nodeId;
private String masterNodeId;
private ClusterName clusterName;
private MasterPingRequest() {
}
private MasterPingRequest(String nodeId, String masterNodeId) {
private MasterPingRequest(String nodeId, String masterNodeId, ClusterName clusterName) {
this.nodeId = nodeId;
this.masterNodeId = masterNodeId;
this.clusterName = clusterName;
}
@Override
@ -416,6 +440,9 @@ public class MasterFaultDetection extends AbstractComponent {
super.readFrom(in);
nodeId = in.readString();
masterNodeId = in.readString();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
clusterName = ClusterName.readClusterName(in);
}
}
@Override
@ -423,6 +450,9 @@ public class MasterFaultDetection extends AbstractComponent {
super.writeTo(out);
out.writeString(nodeId);
out.writeString(masterNodeId);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
clusterName.writeTo(out);
}
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
@ -54,6 +56,7 @@ public class NodesFaultDetection extends AbstractComponent {
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private final boolean connectOnNetworkDisconnect;
@ -78,10 +81,11 @@ public class NodesFaultDetection extends AbstractComponent {
private volatile boolean running = false;
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
@ -204,8 +208,9 @@ public class NodesFaultDetection extends AbstractComponent {
if (!running) {
return;
}
transportService.sendRequest(node, PING_ACTION_NAME, new PingRequest(node.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<PingResponse>() {
final PingRequest pingRequest = new PingRequest(node.id(), clusterName);
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler<PingResponse>() {
@Override
public PingResponse newInstance() {
return new PingResponse();
@ -252,8 +257,7 @@ public class NodesFaultDetection extends AbstractComponent {
}
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PING_ACTION_NAME, new PingRequest(node.id()),
options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this);
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this);
}
}
}
@ -298,6 +302,10 @@ public class NodesFaultDetection extends AbstractComponent {
if (!latestNodes.localNodeId().equals(request.nodeId)) {
throw new ElasticsearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");
}
if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
// Don't introduce new exception for bwc reasons
throw new ElasticsearchIllegalStateException("Got pinged with cluster name [" + request.clusterName + "], but I'm part of cluster [" + clusterName + "]");
}
channel.sendResponse(new PingResponse());
}
@ -308,28 +316,45 @@ public class NodesFaultDetection extends AbstractComponent {
}
static class PingRequest extends TransportRequest {
public static class PingRequest extends TransportRequest {
// the (assumed) node id we are pinging
private String nodeId;
private ClusterName clusterName;
PingRequest() {
}
PingRequest(String nodeId) {
PingRequest(String nodeId, ClusterName clusterName) {
this.nodeId = nodeId;
this.clusterName = clusterName;
}
public String nodeId() {
return nodeId;
}
public ClusterName clusterName() {
return clusterName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
clusterName = ClusterName.readClusterName(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
clusterName.writeTo(out);
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.discovery;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -131,7 +132,7 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase {
boolean shouldRetry = randomBoolean();
// make sure we don't ping
settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m");
NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA);
NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA, new ClusterName("test"));
nodesFD.start();
nodesFD.updateNodes(buildNodesForA(true));
final String[] failureReason = new String[1];
@ -165,6 +166,7 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase {
boolean shouldRetry = randomBoolean();
// make sure we don't ping
settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m");
ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20));
final DiscoveryNodes nodes = buildNodesForA(false);
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA,
new DiscoveryNodesProvider() {
@ -177,7 +179,8 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase {
public NodeService nodeService() {
return null;
}
}
},
clusterName
);
masterFD.start(nodeB, "test");