[Discovery] UnicastZenPing should also ping last known discoNodes

At the moment, when a node looses connection to the master (due to a partition or the master was stopped), we ping the unicast hosts in order to discover other nodes and elect a new master or get of another master than has been elected in the mean time. This can go wrong if all unicast targets are on the same side of a minority partition and therefore will never rejoin once the partition is healed.

Closes #7336
This commit is contained in:
Boaz Leskes 2014-08-19 14:09:14 +02:00
parent ff8b7409f7
commit d5552a980f
8 changed files with 201 additions and 18 deletions

View File

@ -141,7 +141,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, ElectMasterService electMasterService, Version version,
DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
@ -152,6 +152,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.discoverySettings = discoverySettings;
this.pingService = pingService;
this.version = version;
this.electMaster = electMasterService;
// also support direct discovery.zen settings, for cases when it gets extended
this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
@ -167,7 +169,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
logger.debug("using ping.timeout [{}], join.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, joinTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);
this.electMaster = new ElectMasterService(settings);
nodeSettingsService.addListener(new ApplySettings());
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName);

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
@ -44,6 +45,7 @@ public class ZenDiscoveryModule extends AbstractModule {
@Override
protected void configure() {
bind(ElectMasterService.class).asEagerSingleton();
bind(ZenPingService.class).asEagerSingleton();
Multibinder<UnicastHostsProvider> unicastHostsProviderMultibinder = Multibinder.newSetBinder(binder(), UnicastHostsProvider.class);
for (Class<? extends UnicastHostsProvider> unicastHostProvider : unicastHostProviders) {

View File

@ -24,12 +24,10 @@ import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.*;
/**
*
@ -42,6 +40,7 @@ public class ElectMasterService extends AbstractComponent {
private volatile int minimumMasterNodes;
@Inject
public ElectMasterService(Settings settings) {
super(settings);
this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1);
@ -69,6 +68,18 @@ public class ElectMasterService extends AbstractComponent {
return count >= minimumMasterNodes;
}
/**
* Returns the given nodes sorted by likelyhood of being elected as master, most likely first.
* Non-master nodes are not removed but are rather put in the end
* @param nodes
* @return
*/
public List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
ArrayList<DiscoveryNode> sortedNodes = Lists.newArrayList(nodes);
CollectionUtil.introSort(sortedNodes, nodeComparator);
return sortedNodes;
}
/**
* Returns a list of the next possible masters.
*/
@ -120,6 +131,12 @@ public class ElectMasterService extends AbstractComponent {
@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
if (o1.masterNode() && !o2.masterNode()) {
return -1;
}
if (!o1.masterNode() && o2.masterNode()) {
return 1;
}
return o1.id().compareTo(o2.id());
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
@ -55,20 +56,20 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
// here for backward comp. with discovery plugins
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
@Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, unicastHostsProviders);
ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, electMasterService, unicastHostsProviders);
}
@Inject
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
Version version, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
Version version, ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder();
if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version));
}
// always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, unicastHostsProviders));
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, electMasterService, unicastHostsProviders));
this.zenPings = zenPingsBuilder.build();
}

View File

@ -19,8 +19,12 @@
package org.elasticsearch.discovery.zen.ping.unicast;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import org.elasticsearch.*;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -35,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -62,6 +67,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private final ElectMasterService electMasterService;
private final int concurrentConnects;
@ -78,11 +84,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final CopyOnWriteArrayList<UnicastHostsProvider> hostsProviders = new CopyOnWriteArrayList<>();
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
Version version, ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.electMasterService = electMasterService;
if (unicastHostsProviders != null) {
for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) {
@ -244,18 +252,30 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);
HashSet<DiscoveryNode> nodesToPing = new HashSet<>(Arrays.asList(nodes));
HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>();
for (PingResponse temporalResponse : temporalResponses) {
// Only send pings to nodes that have the same cluster name.
if (clusterName.equals(temporalResponse.clusterName())) {
nodesToPing.add(temporalResponse.target());
nodesToPingSet.add(temporalResponse.target());
}
}
for (UnicastHostsProvider provider : hostsProviders) {
nodesToPing.addAll(provider.buildDynamicNodes());
nodesToPingSet.addAll(provider.buildDynamicNodes());
}
// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : discoNodes.getMasterNodes().values()) {
nodesToPingSet.add(masterNode.value);
}
// sort the nodes by likelihood of being an active master
List<DiscoveryNode> sortedNodesToPing = electMasterService.sortByMasterLikelihood(nodesToPingSet);
// new add the the unicast targets first
ArrayList<DiscoveryNode> nodesToPing = Lists.newArrayList(nodes);
nodesToPing.addAll(sortedNodesToPing);
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) {
// make sure we are connected

View File

@ -649,6 +649,42 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
assertMaster(masterNode, nodes);
}
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE")
public void isolatedUnicastNodes() throws Exception {
List<String> nodes = startUnicastCluster(3, new int[]{0}, -1);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);
Set<String> unicastTargetSide = new HashSet<>();
unicastTargetSide.add(unicastTarget);
Set<String> restOfClusterSide = new HashSet<>();
restOfClusterSide.addAll(nodes);
restOfClusterSide.remove(unicastTarget);
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
((UnicastZenPing) zenPing).clearTemporalReponses();
}
}
// Simulate a network issue between the unicast target node and the rest of the cluster
NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(unicastTargetSide, restOfClusterSide, getRandom());
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();
// Wait until elected master has removed that the unlucky node...
ensureStableCluster(2, nodes.get(1));
// The isolate master node must report no master, so it starts with pinging
assertNoMaster(unicastTarget);
networkDisconnect.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3);
}
/** Test cluster join with issues in cluster state publishing * */
@Test
@ -695,7 +731,6 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
nonMasterTransportService.clearRule(discoveryNodes.masterNode());
ensureStableCluster(2);
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.*;
public class ElectMasterServiceTest extends ElasticsearchTestCase {
ElectMasterService electMasterService() {
return new ElectMasterService(ImmutableSettings.EMPTY);
}
List<DiscoveryNode> generateRandomNodes() {
int count = scaledRandomIntBetween(1, 100);
ArrayList<DiscoveryNode> nodes = new ArrayList<>(count);
Map<String, String> master = new HashMap<>();
master.put("master", "true");
Map<String, String> nonMaster = new HashMap<>();
nonMaster.put("master", "false");
for (int i = 0; i < count; i++) {
Map<String, String> attributes = randomBoolean() ? master : nonMaster;
DiscoveryNode node = new DiscoveryNode("n_" + i, "n_" + i, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
nodes.add(node);
}
Collections.shuffle(nodes, getRandom());
return nodes;
}
@Test
public void sortByMasterLikelihood() {
List<DiscoveryNode> nodes = generateRandomNodes();
List<DiscoveryNode> sortedNodes = electMasterService().sortByMasterLikelihood(nodes);
assertEquals(nodes.size(), sortedNodes.size());
DiscoveryNode prevNode = sortedNodes.get(0);
for (int i = 1; i < sortedNodes.size(); i++) {
DiscoveryNode node = sortedNodes.get(i);
if (!prevNode.masterNode()) {
assertFalse(node.masterNode());
} else if (node.masterNode()) {
assertTrue(prevNode.id().compareTo(node.id()) < 0);
}
prevNode = node;
}
}
@Test
public void electMaster() {
List<DiscoveryNode> nodes = generateRandomNodes();
ElectMasterService service = electMasterService();
int min_master_nodes = randomIntBetween(0, nodes.size());
service.minimumMasterNodes(min_master_nodes);
int master_nodes = 0;
for (DiscoveryNode node : nodes) {
if (node.masterNode()) {
master_nodes++;
}
}
DiscoveryNode master = null;
if (service.hasEnoughMasterNodes(nodes)) {
master = service.electMaster(nodes);
}
if (master_nodes == 0) {
assertNull(master);
} else if (min_master_nodes > 0 && master_nodes < min_master_nodes) {
assertNull(master);
} else {
for (DiscoveryNode node : nodes) {
if (node.masterNode()) {
assertTrue(master.id().compareTo(node.id()) <= 0);
}
}
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -55,6 +56,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
ThreadPool threadPool = new ThreadPool(getClass().getName());
ClusterName clusterName = new ClusterName("test");
NetworkService networkService = new NetworkService(settings);
ElectMasterService electMasterService = new ElectMasterService(settings);
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
@ -73,7 +75,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort())
.build();
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, null);
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
@ -87,7 +89,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
});
zenPingA.start();
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, null);
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, electMasterService, null);
zenPingB.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {