diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7390f7b6029..a2d1fbdb3ff 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -22,7 +22,10 @@ package org.elasticsearch.discovery.zen; import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.elasticsearch.*; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -180,7 +183,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen nodeSettingsService.addListener(new ApplySettings()); - this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName); + this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterName, clusterService); this.masterFD.addListener(new MasterNodeFailureListener()); this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 8bb8f452023..21e9b21022b 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -19,16 +19,20 @@ package org.elasticsearch.discovery.zen.fd; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -54,8 +58,7 @@ public class MasterFaultDetection extends FaultDetection { void notListedOnMaster(); } - private final DiscoveryNodesProvider nodesProvider; - + private final ClusterService clusterService; private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); private volatile MasterPinger masterPinger; @@ -69,9 +72,9 @@ public class MasterFaultDetection extends FaultDetection { private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean(); public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, - DiscoveryNodesProvider nodesProvider, ClusterName clusterName) { + ClusterName clusterName, ClusterService clusterService) { super(settings, threadPool, transportService, clusterName); - this.nodesProvider = nodesProvider; + this.clusterService = clusterService; logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); @@ -231,7 +234,7 @@ public class MasterFaultDetection extends FaultDetection { threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); return; } - final MasterPingRequest request = new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id(), clusterName); + final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName); final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler() { @@ -343,8 +346,8 @@ public class MasterFaultDetection extends FaultDetection { } @Override - public void messageReceived(MasterPingRequest request, TransportChannel channel) throws Exception { - DiscoveryNodes nodes = nodesProvider.nodes(); + public void messageReceived(final MasterPingRequest request, final TransportChannel channel) throws Exception { + final DiscoveryNodes nodes = clusterService.state().nodes(); // check if we are really the same master as the one we seemed to be think we are // this can happen if the master got "kill -9" and then another node started using the same port if (!request.masterNodeId.equals(nodes.localNodeId())) { @@ -357,15 +360,57 @@ public class MasterFaultDetection extends FaultDetection { throw new NotMasterException("master fault detection ping request is targeted for a different [" + request.clusterName + "] cluster then us [" + clusterName + "]"); } - // if we are no longer master, fail... - if (!nodes.localNodeMaster()) { - throw new NoLongerMasterException(); + // when we are elected as master or when a node joins, we use a cluster state update thread + // to incorporate that information in the cluster state. That cluster state is published + // before we make it available locally. This means that a master ping can come from a node + // that has already processed the new CS but it is not known locally. + // Therefore, if we fail we have to check again under a cluster state thread to make sure + // all processing is finished. + // + + + if (!nodes.localNodeMaster() || !nodes.nodeExists(request.nodeId)) { + logger.trace("checking ping from [{}] under a cluster state thread", request.nodeId); + clusterService.submitStateUpdateTask("master ping (from: [" + request.nodeId + "])", new ProcessedClusterStateNonMasterUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // if we are no longer master, fail... + DiscoveryNodes nodes = currentState.nodes(); + if (!nodes.localNodeMaster()) { + throw new NoLongerMasterException(); + } + if (!nodes.nodeExists(request.nodeId)) { + throw new NodeDoesNotExistOnMasterException(); + } + return currentState; + } + + @Override + public void onFailure(String source, @Nullable Throwable t) { + if (t == null) { + t = new ElasticsearchException("unknown error while processing ping"); + } + try { + channel.sendResponse(t); + } catch (IOException e) { + logger.warn("error while sending ping response", e); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try { + channel.sendResponse(new MasterPingResponseResponse(true)); + } catch (IOException e) { + logger.warn("error while sending ping response", e); + } + } + }); + } else { + // send a response, and note if we are connected to the master or not + channel.sendResponse(new MasterPingResponseResponse(true)); } - if (!nodes.nodeExists(request.nodeId)) { - throw new NodeDoesNotExistOnMasterException(); - } - // send a response, and note if we are connected to the master or not - channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.nodeId))); } @Override diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java index 6d0288506c8..80625c477e7 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java @@ -107,11 +107,36 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes } private List startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { - if (randomBoolean()) { - return startMulticastCluster(numberOfNodes, minimumMasterNode); - } else { - return startUnicastCluster(numberOfNodes, null, minimumMasterNode); + configureCluster(numberOfNodes, minimumMasterNode); + List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); + ensureStableCluster(numberOfNodes); + + // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results + for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { + for (ZenPing zenPing : pingService.zenPings()) { + if (zenPing instanceof UnicastZenPing) { + ((UnicastZenPing) zenPing).clearTemporalResponses(); + } + } } + return nodes; + } + + + private List startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { + configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); + List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); + ensureStableCluster(numberOfNodes); + + // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results + for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { + for (ZenPing zenPing : pingService.zenPings()) { + if (zenPing instanceof UnicastZenPing) { + ((UnicastZenPing) zenPing).clearTemporalResponses(); + } + } + } + return nodes; } final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder() @@ -124,7 +149,16 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) .build(); - private List startMulticastCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { + private void configureCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { + if (randomBoolean()) { + configureMulticastCluster(numberOfNodes, minimumMasterNode); + } else { + configureUnicastCluster(numberOfNodes, null, minimumMasterNode); + } + + } + + private void configureMulticastCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { if (minimumMasterNode < 0) { minimumMasterNode = numberOfNodes / 2 + 1; } @@ -137,13 +171,9 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes if (discoveryConfig == null) { discoveryConfig = new ClusterDiscoveryConfiguration(numberOfNodes, settings); } - List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); - ensureStableCluster(numberOfNodes); - - return nodes; } - private List startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { + private void configureUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { if (minimumMasterNode < 0) { minimumMasterNode = numberOfNodes / 2 + 1; } @@ -160,19 +190,6 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals); } } - List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); - ensureStableCluster(numberOfNodes); - - // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results - for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { - for (ZenPing zenPing : pingService.zenPings()) { - if (zenPing instanceof UnicastZenPing) { - ((UnicastZenPing) zenPing).clearTemporalResponses(); - } - } - } - - return nodes; } @@ -763,6 +780,68 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes } + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE") + public void testClusterFormingWithASlowNode() throws Exception { + configureCluster(3, 2); + + SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(getRandom(), 0, 0, 5000, 6000); + + // don't wait for initial state, wat want to add the disruption while the cluster is forming.. + internalCluster().startNodesAsync(3, + ImmutableSettings.builder() + .put(DiscoveryService.SETTING_INITIAL_STATE_TIMEOUT, "1ms") + .put(DiscoverySettings.PUBLISH_TIMEOUT, "3s") + .build()).get(); + + logger.info("applying disruption while cluster is forming ..."); + + internalCluster().setDisruptionScheme(disruption); + disruption.startDisrupting(); + + ensureStableCluster(3); + } + + /** + * Adds an asymetric break between a master and one of the nodes and makes + * sure that the node is removed form the cluster, that the node start pinging and that + * the cluster reforms when healed. + */ + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE") + public void testNodeNotReachableFromMaster() throws Exception { + startCluster(3); + + String masterNode = internalCluster().getMasterName(); + String nonMasterNode = null; + while (nonMasterNode == null) { + nonMasterNode = randomFrom(internalCluster().getNodeNames()); + if (nonMasterNode.equals(masterNode)) { + masterNode = null; + } + } + + logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode); + MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); + if (randomBoolean()) { + masterTransportService.addUnresponsiveRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode()); + } else { + masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode()); + } + + logger.info("waiting for [{}] to be removed from cluster", nonMasterNode); + ensureStableCluster(2, masterNode); + + logger.info("waiting for [{}] to have no master", nonMasterNode); + assertNoMaster(nonMasterNode); + + logger.info("healing partition and checking cluster reforms"); + masterTransportService.clearAllRules(); + + ensureStableCluster(3); + } + + protected NetworkPartition addRandomPartition() { NetworkPartition partition; if (randomBoolean()) { diff --git a/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index fd73ce17b45..8f6b44b4339 100644 --- a/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -27,12 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; -import org.elasticsearch.node.service.NodeService; import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.cluster.NoopClusterService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportConnectionListener; @@ -172,21 +171,9 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase { settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry) .put(FaultDetection.SETTING_PING_INTERVAL, "5m"); ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20)); - final DiscoveryNodes nodes = buildNodesForA(false); - MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, - new DiscoveryNodesProvider() { - @Override - public DiscoveryNodes nodes() { - return nodes; - } - - @Override - public NodeService nodeService() { - return null; - } - }, - clusterName - ); + final ClusterState state = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build(); + MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName, + new NoopClusterService(state)); masterFD.start(nodeB, "test"); final String[] failureReason = new String[1]; diff --git a/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java b/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java index 4821ac1f9d7..54d29e06b50 100644 --- a/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java +++ b/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java @@ -20,28 +20,50 @@ package org.elasticsearch.test.cluster; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; import java.util.List; public class NoopClusterService implements ClusterService { + final ClusterState state; + + public NoopClusterService() { + this(ClusterState.builder(new ClusterName("noop")).build()); + } + + public NoopClusterService(ClusterState state) { + if (state.getNodes().size() == 0) { + state = ClusterState.builder(state).nodes( + DiscoveryNodes.builder() + .put(new DiscoveryNode("noop_id", DummyTransportAddress.INSTANCE, Version.CURRENT)) + .localNodeId("noop_id")).build(); + } + + assert state.getNodes().localNode() != null; + this.state = state; + + } + @Override public DiscoveryNode localNode() { - return null; + return state.getNodes().localNode(); } @Override public ClusterState state() { - return null; + return state; } @Override diff --git a/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java b/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java index 46ae0afe54c..746d7f942ba 100644 --- a/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java +++ b/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.TimeValue; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; public class SlowClusterStateProcessing extends SingleNodeDisruption { @@ -99,11 +100,19 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption { if (clusterService == null) { return false; } + final AtomicBoolean stopped = new AtomicBoolean(false); clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - Thread.sleep(duration.millis()); + long count = duration.millis() / 200; + // wait while checking for a stopped + for (; count > 0 && !stopped.get(); count--) { + Thread.sleep(200); + } + if (!stopped.get()) { + Thread.sleep(duration.millis() % 200); + } countDownLatch.countDown(); return currentState; } @@ -116,6 +125,7 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption { try { countDownLatch.await(); } catch (InterruptedException e) { + stopped.set(true); // try to wait again, we really want the cluster state thread to be freed up when stopping disruption countDownLatch.await(); } @@ -137,10 +147,11 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption { if (!interruptClusterStateProcessing(duration)) { continue; } - - duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin))); - if (disrupting && disruptedNode != null) { - Thread.sleep(duration.millis()); + if (intervalBetweenDelaysMax > 0) { + duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin))); + if (disrupting && disruptedNode != null) { + Thread.sleep(duration.millis()); + } } } catch (InterruptedException e) { } catch (Exception e) {