From d5552a980fd5b5e08644dc0621b2a7d12068c5df Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 19 Aug 2014 14:09:14 +0200 Subject: [PATCH] [Discovery] UnicastZenPing should also ping last known discoNodes At the moment, when a node looses connection to the master (due to a partition or the master was stopped), we ping the unicast hosts in order to discover other nodes and elect a new master or get of another master than has been elected in the mean time. This can go wrong if all unicast targets are on the same side of a minority partition and therefore will never rejoin once the partition is healed. Closes #7336 --- .../discovery/zen/ZenDiscovery.java | 5 +- .../discovery/zen/ZenDiscoveryModule.java | 2 + .../zen/elect/ElectMasterService.java | 25 ++++- .../discovery/zen/ping/ZenPingService.java | 9 +- .../zen/ping/unicast/UnicastZenPing.java | 30 ++++- .../DiscoveryWithServiceDisruptions.java | 37 +++++- .../discovery/zen/ElectMasterServiceTest.java | 105 ++++++++++++++++++ .../zen/ping/unicast/UnicastZenPingTests.java | 6 +- 8 files changed, 201 insertions(+), 18 deletions(-) create mode 100644 src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTest.java diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7c76afd1640..8bfcb5bdb14 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -141,7 +141,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService, - DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, + DiscoveryNodeService discoveryNodeService, ZenPingService pingService, ElectMasterService electMasterService, Version version, DiscoverySettings discoverySettings) { super(settings); this.clusterName = clusterName; @@ -152,6 +152,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.discoverySettings = discoverySettings; this.pingService = pingService; this.version = version; + this.electMaster = electMasterService; + // also support direct discovery.zen settings, for cases when it gets extended this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3))))); @@ -167,7 +169,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen logger.debug("using ping.timeout [{}], join.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, joinTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes); - this.electMaster = new ElectMasterService(settings); nodeSettingsService.addListener(new ApplySettings()); this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java index e67c4e2af39..33987662bfa 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; @@ -44,6 +45,7 @@ public class ZenDiscoveryModule extends AbstractModule { @Override protected void configure() { + bind(ElectMasterService.class).asEagerSingleton(); bind(ZenPingService.class).asEagerSingleton(); Multibinder unicastHostsProviderMultibinder = Multibinder.newSetBinder(binder(), UnicastHostsProvider.class); for (Class unicastHostProvider : unicastHostProviders) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java index bcfa1dc2f02..9ba26387ec5 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java +++ b/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java @@ -24,12 +24,10 @@ import com.google.common.collect.Lists; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import java.util.Arrays; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; +import java.util.*; /** * @@ -42,6 +40,7 @@ public class ElectMasterService extends AbstractComponent { private volatile int minimumMasterNodes; + @Inject public ElectMasterService(Settings settings) { super(settings); this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1); @@ -69,6 +68,18 @@ public class ElectMasterService extends AbstractComponent { return count >= minimumMasterNodes; } + /** + * Returns the given nodes sorted by likelyhood of being elected as master, most likely first. + * Non-master nodes are not removed but are rather put in the end + * @param nodes + * @return + */ + public List sortByMasterLikelihood(Iterable nodes) { + ArrayList sortedNodes = Lists.newArrayList(nodes); + CollectionUtil.introSort(sortedNodes, nodeComparator); + return sortedNodes; + } + /** * Returns a list of the next possible masters. */ @@ -120,6 +131,12 @@ public class ElectMasterService extends AbstractComponent { @Override public int compare(DiscoveryNode o1, DiscoveryNode o2) { + if (o1.masterNode() && !o2.masterNode()) { + return -1; + } + if (!o1.masterNode() && o2.masterNode()) { + return 1; + } return o1.id().compareTo(o2.id()); } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index 53ee9248eac..39f710f7acd 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing; import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; @@ -55,20 +56,20 @@ public class ZenPingService extends AbstractLifecycleComponent implemen // here for backward comp. with discovery plugins public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, - @Nullable Set unicastHostsProviders) { - this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, unicastHostsProviders); + ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) { + this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, electMasterService, unicastHostsProviders); } @Inject public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, - Version version, @Nullable Set unicastHostsProviders) { + Version version, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) { super(settings); ImmutableList.Builder zenPingsBuilder = ImmutableList.builder(); if (componentSettings.getAsBoolean("multicast.enabled", true)) { zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version)); } // always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast - zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, unicastHostsProviders)); + zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, electMasterService, unicastHostsProviders)); this.zenPings = zenPingsBuilder.build(); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 5b7cf033467..ee9526f6d5f 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -19,8 +19,12 @@ package org.elasticsearch.discovery.zen.ping.unicast; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.Lists; -import org.elasticsearch.*; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -35,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -62,6 +67,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private final ThreadPool threadPool; private final TransportService transportService; private final ClusterName clusterName; + private final ElectMasterService electMasterService; private final int concurrentConnects; @@ -78,11 +84,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private final CopyOnWriteArrayList hostsProviders = new CopyOnWriteArrayList<>(); - public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version, @Nullable Set unicastHostsProviders) { + public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, + Version version, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.clusterName = clusterName; + this.electMasterService = electMasterService; if (unicastHostsProviders != null) { for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) { @@ -244,18 +252,30 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen DiscoveryNodes discoNodes = nodesProvider.nodes(); pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName); - HashSet nodesToPing = new HashSet<>(Arrays.asList(nodes)); + HashSet nodesToPingSet = new HashSet<>(); for (PingResponse temporalResponse : temporalResponses) { // Only send pings to nodes that have the same cluster name. if (clusterName.equals(temporalResponse.clusterName())) { - nodesToPing.add(temporalResponse.target()); + nodesToPingSet.add(temporalResponse.target()); } } for (UnicastHostsProvider provider : hostsProviders) { - nodesToPing.addAll(provider.buildDynamicNodes()); + nodesToPingSet.addAll(provider.buildDynamicNodes()); } + // add all possible master nodes that were active in the last known cluster configuration + for (ObjectCursor masterNode : discoNodes.getMasterNodes().values()) { + nodesToPingSet.add(masterNode.value); + } + + // sort the nodes by likelihood of being an active master + List sortedNodesToPing = electMasterService.sortByMasterLikelihood(nodesToPingSet); + + // new add the the unicast targets first + ArrayList nodesToPing = Lists.newArrayList(nodes); + nodesToPing.addAll(sortedNodesToPing); + final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); for (final DiscoveryNode node : nodesToPing) { // make sure we are connected diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java index 82bf85716c7..ff52ca351c6 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java @@ -649,6 +649,42 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes assertMaster(masterNode, nodes); } + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE") + public void isolatedUnicastNodes() throws Exception { + List nodes = startUnicastCluster(3, new int[]{0}, -1); + // Figure out what is the elected master node + final String unicastTarget = nodes.get(0); + + Set unicastTargetSide = new HashSet<>(); + unicastTargetSide.add(unicastTarget); + + Set restOfClusterSide = new HashSet<>(); + restOfClusterSide.addAll(nodes); + restOfClusterSide.remove(unicastTarget); + + // Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list + // includes all the other nodes that have pinged it and the issue doesn't manifest + for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { + for (ZenPing zenPing : pingService.zenPings()) { + ((UnicastZenPing) zenPing).clearTemporalReponses(); + } + } + + // Simulate a network issue between the unicast target node and the rest of the cluster + NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(unicastTargetSide, restOfClusterSide, getRandom()); + setDisruptionScheme(networkDisconnect); + networkDisconnect.startDisrupting(); + // Wait until elected master has removed that the unlucky node... + ensureStableCluster(2, nodes.get(1)); + + // The isolate master node must report no master, so it starts with pinging + assertNoMaster(unicastTarget); + networkDisconnect.stopDisrupting(); + // Wait until the master node sees all 3 nodes again. + ensureStableCluster(3); + } + /** Test cluster join with issues in cluster state publishing * */ @Test @@ -695,7 +731,6 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes nonMasterTransportService.clearRule(discoveryNodes.masterNode()); ensureStableCluster(2); - } diff --git a/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTest.java b/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTest.java new file mode 100644 index 00000000000..df8f67c536f --- /dev/null +++ b/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.*; + +public class ElectMasterServiceTest extends ElasticsearchTestCase { + + ElectMasterService electMasterService() { + return new ElectMasterService(ImmutableSettings.EMPTY); + } + + List generateRandomNodes() { + int count = scaledRandomIntBetween(1, 100); + ArrayList nodes = new ArrayList<>(count); + + Map master = new HashMap<>(); + master.put("master", "true"); + Map nonMaster = new HashMap<>(); + nonMaster.put("master", "false"); + + for (int i = 0; i < count; i++) { + Map attributes = randomBoolean() ? master : nonMaster; + DiscoveryNode node = new DiscoveryNode("n_" + i, "n_" + i, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); + nodes.add(node); + } + + Collections.shuffle(nodes, getRandom()); + return nodes; + } + + @Test + public void sortByMasterLikelihood() { + List nodes = generateRandomNodes(); + List sortedNodes = electMasterService().sortByMasterLikelihood(nodes); + assertEquals(nodes.size(), sortedNodes.size()); + DiscoveryNode prevNode = sortedNodes.get(0); + for (int i = 1; i < sortedNodes.size(); i++) { + DiscoveryNode node = sortedNodes.get(i); + if (!prevNode.masterNode()) { + assertFalse(node.masterNode()); + } else if (node.masterNode()) { + assertTrue(prevNode.id().compareTo(node.id()) < 0); + } + prevNode = node; + } + + } + + @Test + public void electMaster() { + List nodes = generateRandomNodes(); + ElectMasterService service = electMasterService(); + int min_master_nodes = randomIntBetween(0, nodes.size()); + service.minimumMasterNodes(min_master_nodes); + + int master_nodes = 0; + for (DiscoveryNode node : nodes) { + if (node.masterNode()) { + master_nodes++; + } + } + DiscoveryNode master = null; + if (service.hasEnoughMasterNodes(nodes)) { + master = service.electMaster(nodes); + } + + if (master_nodes == 0) { + assertNull(master); + } else if (min_master_nodes > 0 && master_nodes < min_master_nodes) { + assertNull(master); + } else { + for (DiscoveryNode node : nodes) { + if (node.masterNode()) { + assertTrue(master.id().compareTo(node.id()) <= 0); + } + } + } + } +} diff --git a/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java b/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java index 8f18cb11d38..7ecc23b68ef 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.test.ElasticsearchTestCase; @@ -55,6 +56,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase { ThreadPool threadPool = new ThreadPool(getClass().getName()); ClusterName clusterName = new ClusterName("test"); NetworkService networkService = new NetworkService(settings); + ElectMasterService electMasterService = new ElectMasterService(settings); NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); @@ -73,7 +75,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase { addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort()) .build(); - UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, null); + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null); zenPingA.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() { @@ -87,7 +89,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase { }); zenPingA.start(); - UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, null); + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, electMasterService, null); zenPingB.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() {