From 4b8456e9540bfb351c1ed9acdf4c292f4b10931e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 28 Jul 2014 16:04:25 +0200 Subject: [PATCH] [Discovery] Master fault detection and nodes fault detection should take cluster name into account. Both master fault detection and nodes fault detection request should also send the cluster name, so that on the receiving side the handling of these requests can be failed with an error. This error can be caught on the sending side and for master fault detection the node can fail the master locally and for nodes fault detection the node can be failed. Note this validation will most likely never fail in a production cluster, but in during automated tests where cluster / nodes are created and destroyed very frequently. --- .../discovery/zen/ZenDiscovery.java | 4 +- .../zen/fd/MasterFaultDetection.java | 40 ++++++++++++++++--- .../discovery/zen/fd/NodesFaultDetection.java | 39 ++++++++++++++---- .../discovery/ZenFaultDetectionTests.java | 7 +++- 4 files changed, 74 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7da058ca2af..07028f19a36 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -166,10 +166,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.electMaster = new ElectMasterService(settings); nodeSettingsService.addListener(new ApplySettings()); - this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this); + this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName); this.masterFD.addListener(new MasterNodeFailureListener()); - this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService); + this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); this.nodesFD.addListener(new NodeFailureListener()); this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings); diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index b601884002c..b4f635184e7 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -20,6 +20,8 @@ package org.elasticsearch.discovery.zen.fd; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; @@ -58,6 +60,8 @@ public class MasterFaultDetection extends AbstractComponent { private final DiscoveryNodesProvider nodesProvider; + private final ClusterName clusterName; + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @@ -85,11 +89,13 @@ public class MasterFaultDetection extends AbstractComponent { private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean(); - public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, DiscoveryNodesProvider nodesProvider) { + public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, + DiscoveryNodesProvider nodesProvider, ClusterName clusterName) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.nodesProvider = nodesProvider; + this.clusterName = clusterName; this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false); this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1)); @@ -270,8 +276,10 @@ public class MasterFaultDetection extends AbstractComponent { threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); return; } - transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), - new BaseTransportResponseHandler() { + final MasterPingRequest request = new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id(), clusterName); + final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); + transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler() { + @Override public MasterPingResponseResponse newInstance() { return new MasterPingResponseResponse(); @@ -328,7 +336,7 @@ public class MasterFaultDetection extends AbstractComponent { notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this); + transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this); } } } @@ -351,6 +359,14 @@ public class MasterFaultDetection extends AbstractComponent { } static class NotMasterException extends ElasticsearchIllegalStateException { + + NotMasterException(String msg) { + super(msg); + } + + NotMasterException() { + } + @Override public Throwable fillInStackTrace() { return null; @@ -379,6 +395,12 @@ public class MasterFaultDetection extends AbstractComponent { if (!request.masterNodeId.equals(nodes.localNodeId())) { throw new NotMasterException(); } + + if (request.clusterName != null && !request.clusterName.equals(clusterName)) { + logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]", request.clusterName, clusterName); + throw new NotMasterException("master fault detection ping request is targeted for a different [" + request.clusterName + "] cluster then us [" + clusterName + "]"); + } + // if we are no longer master, fail... if (!nodes.localNodeMaster()) { throw new NoLongerMasterException(); @@ -402,13 +424,15 @@ public class MasterFaultDetection extends AbstractComponent { private String nodeId; private String masterNodeId; + private ClusterName clusterName; private MasterPingRequest() { } - private MasterPingRequest(String nodeId, String masterNodeId) { + private MasterPingRequest(String nodeId, String masterNodeId, ClusterName clusterName) { this.nodeId = nodeId; this.masterNodeId = masterNodeId; + this.clusterName = clusterName; } @Override @@ -416,6 +440,9 @@ public class MasterFaultDetection extends AbstractComponent { super.readFrom(in); nodeId = in.readString(); masterNodeId = in.readString(); + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + clusterName = ClusterName.readClusterName(in); + } } @Override @@ -423,6 +450,9 @@ public class MasterFaultDetection extends AbstractComponent { super.writeTo(out); out.writeString(nodeId); out.writeString(masterNodeId); + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + clusterName.writeTo(out); + } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index b808e080f21..87dfe2ba2f7 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -20,6 +20,8 @@ package org.elasticsearch.discovery.zen.fd; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; @@ -54,6 +56,7 @@ public class NodesFaultDetection extends AbstractComponent { private final ThreadPool threadPool; private final TransportService transportService; + private final ClusterName clusterName; private final boolean connectOnNetworkDisconnect; @@ -78,10 +81,11 @@ public class NodesFaultDetection extends AbstractComponent { private volatile boolean running = false; - public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) { + public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { super(settings); this.threadPool = threadPool; this.transportService = transportService; + this.clusterName = clusterName; this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false); this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1)); @@ -204,8 +208,9 @@ public class NodesFaultDetection extends AbstractComponent { if (!running) { return; } - transportService.sendRequest(node, PING_ACTION_NAME, new PingRequest(node.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), - new BaseTransportResponseHandler() { + final PingRequest pingRequest = new PingRequest(node.id(), clusterName); + final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); + transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler() { @Override public PingResponse newInstance() { return new PingResponse(); @@ -252,8 +257,7 @@ public class NodesFaultDetection extends AbstractComponent { } } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(node, PING_ACTION_NAME, new PingRequest(node.id()), - options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this); + transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this); } } } @@ -298,6 +302,10 @@ public class NodesFaultDetection extends AbstractComponent { if (!latestNodes.localNodeId().equals(request.nodeId)) { throw new ElasticsearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]"); } + if (request.clusterName != null && !request.clusterName.equals(clusterName)) { + // Don't introduce new exception for bwc reasons + throw new ElasticsearchIllegalStateException("Got pinged with cluster name [" + request.clusterName + "], but I'm part of cluster [" + clusterName + "]"); + } channel.sendResponse(new PingResponse()); } @@ -308,28 +316,45 @@ public class NodesFaultDetection extends AbstractComponent { } - static class PingRequest extends TransportRequest { + public static class PingRequest extends TransportRequest { // the (assumed) node id we are pinging private String nodeId; + private ClusterName clusterName; + PingRequest() { } - PingRequest(String nodeId) { + PingRequest(String nodeId, ClusterName clusterName) { this.nodeId = nodeId; + this.clusterName = clusterName; + } + + public String nodeId() { + return nodeId; + } + + public ClusterName clusterName() { + return clusterName; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); nodeId = in.readString(); + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + clusterName = ClusterName.readClusterName(in); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(nodeId); + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + clusterName.writeTo(out); + } } } diff --git a/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index 3f65ed1591e..457e7a5b4cd 100644 --- a/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.discovery; import com.google.common.collect.ImmutableMap; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ImmutableSettings; @@ -131,7 +132,7 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase { boolean shouldRetry = randomBoolean(); // make sure we don't ping settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m"); - NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA); + NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA, new ClusterName("test")); nodesFD.start(); nodesFD.updateNodes(buildNodesForA(true)); final String[] failureReason = new String[1]; @@ -165,6 +166,7 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase { boolean shouldRetry = randomBoolean(); // make sure we don't ping settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m"); + ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20)); final DiscoveryNodes nodes = buildNodesForA(false); MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, new DiscoveryNodesProvider() { @@ -177,7 +179,8 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase { public NodeService nodeService() { return null; } - } + }, + clusterName ); masterFD.start(nodeB, "test");