diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index d7eadc28e6f..63e49747ff8 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -342,7 +342,17 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen if (responses == null) { logger.warn("received ping response {} with no matching id [{}]", pingResponse, response.id); } else { - responses.put(pingResponse.target(), pingResponse); + PingResponse existingResponse = responses.get(pingResponse.target()); + if (existingResponse == null) { + responses.put(pingResponse.target(), pingResponse); + } else { + // try and merge the best ping response for it, i.e. if the new one + // doesn't have the master node set, and the existing one does, then + // the existing one is better, so we keep it + if (pingResponse.master() != null) { + responses.put(pingResponse.target(), pingResponse); + } + } } } } finally { diff --git a/src/test/java/org/elasticsearch/cluster/ZenUnicastDiscoveryTests.java b/src/test/java/org/elasticsearch/cluster/ZenUnicastDiscoveryTests.java new file mode 100644 index 00000000000..b05e4101751 --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/ZenUnicastDiscoveryTests.java @@ -0,0 +1,112 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +@LuceneTestCase.Slow +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0) +public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest { + + @Test + @TestLogging("discovery.zen:TRACE") + // The bug zen unicast ping override bug, may rarely manifest itself, it is very timing dependant. + // Without the fix in UnicastZenPing, this test fails roughly 1 out of 10 runs from the command line. + public void testMasterElectionNotMissed() throws Exception { + final Settings settings = settingsBuilder() + // Failure only manifests if multicast ping is disabled! + .put("discovery.zen.ping.multicast.ping.enabled", false) + .put("discovery.zen.minimum_master_nodes", 2) + // Can't use this, b/c at the moment all node will only ping localhost:9300 +// .put("discovery.zen.ping.unicast.hosts", "localhost") + .put("discovery.zen.ping.unicast.hosts", "localhost:15300,localhost:15301,localhost:15302") + .put("transport.tcp.port", "15300-15400") + .build(); + + final CountDownLatch latch = new CountDownLatch(3); + final AtomicArray nodes = new AtomicArray(3); + Runnable r1 = new Runnable() { + + @Override + public void run() { + logger.info("--> start first node"); + nodes.set(0, cluster().startNode(settings)); + latch.countDown(); + } + }; + new Thread(r1).start(); + + sleep(between(500, 3000)); + Runnable r2 = new Runnable() { + + @Override + public void run() { + logger.info("--> start second node"); + nodes.set(1, cluster().startNode(settings)); + latch.countDown(); + } + }; + new Thread(r2).start(); + + + sleep(between(500, 3000)); + Runnable r3 = new Runnable() { + + @Override + public void run() { + logger.info("--> start third node"); + nodes.set(2, cluster().startNode(settings)); + latch.countDown(); + } + }; + new Thread(r3).start(); + latch.await(); + + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + DiscoveryNode masterDiscoNode = null; + for (String node : nodes.toArray(new String[3])) { + ClusterState state = cluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + assertThat(state.nodes().size(), equalTo(3)); + if (masterDiscoNode == null) { + masterDiscoNode = state.nodes().masterNode(); + } else { + assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true)); + } + } + } + +}