[Internal] Extract a common base class for (Master|Nodes)FaultDetection

They share a lot of settings and some logic.

Closes #7512
This commit is contained in:
Boaz Leskes 2014-08-29 10:10:58 +02:00
parent d8a5ff0047
commit 596a4a0735
7 changed files with 119 additions and 112 deletions

View File

@ -0,0 +1,95 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
* A base class for {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection} & {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection},
* making sure both use the same setting.
*/
public abstract class FaultDetection extends AbstractComponent {
public static final String SETTING_CONNECT_ON_NETWORK_DISCONNECT = "discovery.zen.fd.connect_on_network_disconnect";
public static final String SETTING_PING_INTERVAL = "discovery.zen.fd.ping_interval";
public static final String SETTING_PING_TIMEOUT = "discovery.zen.fd.ping_timeout";
public static final String SETTING_PING_RETRIES = "discovery.zen.fd.ping_retries";
public static final String SETTING_REGISTER_CONNECTION_LISTENER = "discovery.zen.fd.register_connection_listener";
protected final ThreadPool threadPool;
protected final ClusterName clusterName;
protected final TransportService transportService;
// used mainly for testing, should always be true
protected final boolean registerConnectionListener;
protected final FDConnectionListener connectionListener;
protected final boolean connectOnNetworkDisconnect;
protected final TimeValue pingInterval;
protected final TimeValue pingRetryTimeout;
protected final int pingRetryCount;
public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.connectOnNetworkDisconnect = settings.getAsBoolean(SETTING_CONNECT_ON_NETWORK_DISCONNECT, false);
this.pingInterval = settings.getAsTime(SETTING_PING_INTERVAL, timeValueSeconds(1));
this.pingRetryTimeout = settings.getAsTime(SETTING_PING_TIMEOUT, timeValueSeconds(30));
this.pingRetryCount = settings.getAsInt(SETTING_PING_RETRIES, 3);
this.registerConnectionListener = settings.getAsBoolean(SETTING_REGISTER_CONNECTION_LISTENER, true);
this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
}
}
public void close() {
transportService.removeConnectionListener(connectionListener);
}
/**
* This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event
*/
abstract void handleTransportDisconnect(DiscoveryNode node);
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -37,13 +36,12 @@ import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.transport.TransportRequestOptions.options;
/**
* A fault detection that pings the master periodically to see if its alive.
*/
public class MasterFaultDetection extends AbstractComponent {
public class MasterFaultDetection extends FaultDetection {
public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping";
@ -54,31 +52,10 @@ public class MasterFaultDetection extends AbstractComponent {
void onDisconnectedFromMaster();
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final ClusterName clusterName;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
private final boolean connectOnNetworkDisconnect;
private final TimeValue pingInterval;
private final TimeValue pingRetryTimeout;
private final int pingRetryCount;
// used mainly for testing, should always be true
private final boolean registerConnectionListener;
private final FDConnectionListener connectionListener;
private volatile MasterPinger masterPinger;
private final Object masterNodeMutex = new Object();
@ -91,25 +68,11 @@ public class MasterFaultDetection extends AbstractComponent {
public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
DiscoveryNodesProvider nodesProvider, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
super(settings, threadPool, transportService, clusterName);
this.nodesProvider = nodesProvider;
this.clusterName = clusterName;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);
logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
}
transportService.registerHandler(MASTER_PING_ACTION_NAME, new MasterPingRequestHandler());
}
@ -188,13 +151,14 @@ public class MasterFaultDetection extends AbstractComponent {
}
public void close() {
super.close();
stop("closing");
this.listeners.clear();
transportService.removeConnectionListener(connectionListener);
transportService.removeHandler(MASTER_PING_ACTION_NAME);
}
private void handleTransportDisconnect(DiscoveryNode node) {
@Override
protected void handleTransportDisconnect(DiscoveryNode node) {
synchronized (masterNodeMutex) {
if (!node.equals(this.masterNode)) {
return;
@ -245,17 +209,6 @@ public class MasterFaultDetection extends AbstractComponent {
}
}
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
private class MasterPinger implements Runnable {
private volatile boolean running = true;

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -38,14 +37,13 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.node.DiscoveryNodes.EMPTY_NODES;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.transport.TransportRequestOptions.options;
/**
* A fault detection of multiple nodes.
*/
public class NodesFaultDetection extends AbstractComponent {
public class NodesFaultDetection extends FaultDetection {
public static final String PING_ACTION_NAME = "internal:discovery/zen/fd/ping";
@ -57,30 +55,10 @@ public class NodesFaultDetection extends AbstractComponent {
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private final boolean connectOnNetworkDisconnect;
private final TimeValue pingInterval;
private final TimeValue pingRetryTimeout;
private final int pingRetryCount;
// used mainly for testing, should always be true
private final boolean registerConnectionListener;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
private final FDConnectionListener connectionListener;
private volatile DiscoveryNodes latestNodes = EMPTY_NODES;
private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
@ -88,25 +66,11 @@ public class NodesFaultDetection extends AbstractComponent {
private volatile boolean running = false;
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);
super(settings, threadPool, transportService, clusterName);
logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
transportService.registerHandler(PING_ACTION_NAME, new PingRequestHandler());
this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
}
}
public void addListener(Listener listener) {
@ -158,12 +122,13 @@ public class NodesFaultDetection extends AbstractComponent {
}
public void close() {
super.close();
stop();
transportService.removeHandler(PING_ACTION_NAME);
transportService.removeConnectionListener(connectionListener);
}
private void handleTransportDisconnect(DiscoveryNode node) {
@Override
protected void handleTransportDisconnect(DiscoveryNode node) {
if (!latestNodes.nodeExists(node.id())) {
return;
}
@ -296,18 +261,6 @@ public class NodesFaultDetection extends AbstractComponent {
volatile boolean running = true;
}
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {
@Override

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
@ -114,8 +115,8 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
}
final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder()
.put("discovery.zen.fd.ping_timeout", "1s") // for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // for hitting simulated network failures quickly
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s") // for hitting simulated network failures quickly
.put(FaultDetection.SETTING_PING_RETRIES, "1") // for hitting simulated network failures quickly
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
import org.elasticsearch.node.service.NodeService;
@ -131,7 +132,8 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase {
ImmutableSettings.Builder settings = ImmutableSettings.builder();
boolean shouldRetry = randomBoolean();
// make sure we don't ping
settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m");
settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry)
.put(FaultDetection.SETTING_PING_INTERVAL, "5m");
NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA, new ClusterName("test"));
nodesFD.start();
nodesFD.updateNodes(buildNodesForA(true), -1);
@ -165,7 +167,8 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase {
ImmutableSettings.Builder settings = ImmutableSettings.builder();
boolean shouldRetry = randomBoolean();
// make sure we don't ping
settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m");
settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry)
.put(FaultDetection.SETTING_PING_INTERVAL, "5m");
ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20));
final DiscoveryNodes nodes = buildNodesForA(false);
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA,

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -54,8 +55,8 @@ public class ZenDiscoveryRejoinOnMaster extends ElasticsearchIntegrationTest {
@Test
public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Exception {
Settings defaultSettings = ImmutableSettings.builder()
.put("discovery.zen.fd.ping_timeout", "1s")
.put("discovery.zen.fd.ping_retries", "1")
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s")
.put(FaultDetection.SETTING_PING_RETRIES, "1")
.put("discovery.type", "zen")
.build();

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
@ -54,8 +55,8 @@ public class TransportIndexFailuresTest extends ElasticsearchIntegrationTest {
private static final Settings nodeSettings = ImmutableSettings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put(FaultDetection.SETTING_PING_RETRIES, "1") // <-- for hitting simulated network failures quickly
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.minimum_master_nodes", 1)
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())