[Discovery] Change (Master|Nodes)FaultDetection's connect_on_network_disconnect default to false

The previous default was true, which means that after a node disconnected event we try to connect to it as an extra validation. This can result in slow detection of network partitions if the extra reconnect times out before failure.

Also added tests to verify the settings' behaviour
This commit is contained in:
Boaz Leskes 2014-06-14 20:08:16 +02:00
parent e39ac7eef4
commit 7db9e98ee7
3 changed files with 215 additions and 2 deletions

View File

@ -91,7 +91,7 @@ public class MasterFaultDetection extends AbstractComponent {
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);

View File

@ -83,7 +83,7 @@ public class NodesFaultDetection extends AbstractComponent {
this.threadPool = threadPool;
this.transportService = transportService;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);

View File

@ -0,0 +1,213 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.local.LocalTransport;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
public class ZenFaultDetectionTests extends ElasticsearchTestCase {
protected ThreadPool threadPool;
protected static final Version version0 = Version.fromId(/*0*/99);
protected DiscoveryNode nodeA;
protected MockTransportService serviceA;
protected static final Version version1 = Version.fromId(199);
protected DiscoveryNode nodeB;
protected MockTransportService serviceB;
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new ThreadPool();
serviceA = build(ImmutableSettings.builder().put("name", "TS_A").build(), version0);
nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version0);
serviceB = build(ImmutableSettings.builder().put("name", "TS_B").build(), version1);
nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version1);
// wait till all nodes are properly connected and the event has been sent, so tests in this class
// will not get this callback called on the connections done in this setup
final CountDownLatch latch = new CountDownLatch(4);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
latch.countDown();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
fail("disconnect should not be called " + node);
}
};
serviceA.addConnectionListener(waitForConnection);
serviceB.addConnectionListener(waitForConnection);
serviceA.connectToNode(nodeB);
serviceA.connectToNode(nodeA);
serviceB.connectToNode(nodeA);
serviceB.connectToNode(nodeB);
assertThat("failed to wait for all nodes to connect", latch.await(5, TimeUnit.SECONDS), equalTo(true));
serviceA.removeConnectionListener(waitForConnection);
serviceB.removeConnectionListener(waitForConnection);
}
@After
public void tearDown() throws Exception {
super.tearDown();
serviceA.close();
serviceB.close();
threadPool.shutdown();
}
protected MockTransportService build(Settings settings, Version version) {
MockTransportService transportService = new MockTransportService(ImmutableSettings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool);
transportService.start();
return transportService;
}
private DiscoveryNodes buildNodesForA(boolean master) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
builder.put(nodeA);
builder.put(nodeB);
builder.localNodeId(nodeA.id());
builder.masterNodeId(master ? nodeA.id() : nodeB.id());
return builder.build();
}
private DiscoveryNodes buildNodesForB(boolean master) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
builder.put(nodeA);
builder.put(nodeB);
builder.localNodeId(nodeB.id());
builder.masterNodeId(master ? nodeB.id() : nodeA.id());
return builder.build();
}
@Test
public void testNodesFaultDetectionConnectOnDisconnect() throws InterruptedException {
ImmutableSettings.Builder settings = ImmutableSettings.builder();
boolean shouldRetry = randomBoolean();
// make sure we don't ping
settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m");
NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA);
nodesFD.start();
nodesFD.updateNodes(buildNodesForA(true));
final String[] failureReason = new String[1];
final DiscoveryNode[] failureNode = new DiscoveryNode[1];
final CountDownLatch notified = new CountDownLatch(1);
nodesFD.addListener(new NodesFaultDetection.Listener() {
@Override
public void onNodeFailure(DiscoveryNode node, String reason) {
failureNode[0] = node;
failureReason[0] = reason;
notified.countDown();
}
});
// will raise a disconnect on A
serviceB.stop();
notified.await(30, TimeUnit.SECONDS);
assertEquals(nodeB, failureNode[0]);
Matcher<String> matcher = Matchers.containsString("verified");
if (!shouldRetry) {
matcher = Matchers.not(matcher);
}
assertThat(failureReason[0], matcher);
}
@Test
public void testMasterFaultDetectionConnectOnDisconnect() throws InterruptedException {
ImmutableSettings.Builder settings = ImmutableSettings.builder();
boolean shouldRetry = randomBoolean();
// make sure we don't ping
settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m");
final DiscoveryNodes nodes = buildNodesForA(false);
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA,
new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
return nodes;
}
@Override
public NodeService nodeService() {
return null;
}
}
);
masterFD.start(nodeB, "test");
final String[] failureReason = new String[1];
final DiscoveryNode[] failureNode = new DiscoveryNode[1];
final CountDownLatch notified = new CountDownLatch(1);
masterFD.addListener(new MasterFaultDetection.Listener() {
@Override
public void onMasterFailure(DiscoveryNode masterNode, String reason) {
failureNode[0] = masterNode;
failureReason[0] = reason;
notified.countDown();
}
@Override
public void onDisconnectedFromMaster() {
}
});
// will raise a disconnect on A
serviceB.stop();
notified.await(30, TimeUnit.SECONDS);
assertEquals(nodeB, failureNode[0]);
Matcher<String> matcher = Matchers.containsString("verified");
if (!shouldRetry) {
matcher = Matchers.not(matcher);
}
assertThat(failureReason[0], matcher);
}
}