diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java new file mode 100644 index 00000000000..d3e644f2166 --- /dev/null +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery.zen.fd; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportConnectionListener; +import org.elasticsearch.transport.TransportService; + +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; + +/** + * A base class for {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection} & {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection}, + * making sure both use the same setting. + */ +public abstract class FaultDetection extends AbstractComponent { + + public static final String SETTING_CONNECT_ON_NETWORK_DISCONNECT = "discovery.zen.fd.connect_on_network_disconnect"; + public static final String SETTING_PING_INTERVAL = "discovery.zen.fd.ping_interval"; + public static final String SETTING_PING_TIMEOUT = "discovery.zen.fd.ping_timeout"; + public static final String SETTING_PING_RETRIES = "discovery.zen.fd.ping_retries"; + public static final String SETTING_REGISTER_CONNECTION_LISTENER = "discovery.zen.fd.register_connection_listener"; + + protected final ThreadPool threadPool; + protected final ClusterName clusterName; + protected final TransportService transportService; + + // used mainly for testing, should always be true + protected final boolean registerConnectionListener; + protected final FDConnectionListener connectionListener; + protected final boolean connectOnNetworkDisconnect; + + protected final TimeValue pingInterval; + protected final TimeValue pingRetryTimeout; + protected final int pingRetryCount; + + public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.clusterName = clusterName; + + this.connectOnNetworkDisconnect = settings.getAsBoolean(SETTING_CONNECT_ON_NETWORK_DISCONNECT, false); + this.pingInterval = settings.getAsTime(SETTING_PING_INTERVAL, timeValueSeconds(1)); + this.pingRetryTimeout = settings.getAsTime(SETTING_PING_TIMEOUT, timeValueSeconds(30)); + this.pingRetryCount = settings.getAsInt(SETTING_PING_RETRIES, 3); + this.registerConnectionListener = settings.getAsBoolean(SETTING_REGISTER_CONNECTION_LISTENER, true); + + this.connectionListener = new FDConnectionListener(); + if (registerConnectionListener) { + transportService.addConnectionListener(connectionListener); + } + } + + public void close() { + transportService.removeConnectionListener(connectionListener); + } + + /** + * This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event + */ + abstract void handleTransportDisconnect(DiscoveryNode node); + + private class FDConnectionListener implements TransportConnectionListener { + @Override + public void onNodeConnected(DiscoveryNode node) { + } + + @Override + public void onNodeDisconnected(DiscoveryNode node) { + handleTransportDisconnect(node); + } + } + +} diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index c4b385572a2..49709b7905b 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -24,7 +24,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -37,13 +36,12 @@ import java.io.IOException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.transport.TransportRequestOptions.options; /** * A fault detection that pings the master periodically to see if its alive. */ -public class MasterFaultDetection extends AbstractComponent { +public class MasterFaultDetection extends FaultDetection { public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping"; @@ -54,31 +52,10 @@ public class MasterFaultDetection extends AbstractComponent { void onDisconnectedFromMaster(); } - private final ThreadPool threadPool; - - private final TransportService transportService; - private final DiscoveryNodesProvider nodesProvider; - private final ClusterName clusterName; - private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); - - private final boolean connectOnNetworkDisconnect; - - private final TimeValue pingInterval; - - private final TimeValue pingRetryTimeout; - - private final int pingRetryCount; - - // used mainly for testing, should always be true - private final boolean registerConnectionListener; - - - private final FDConnectionListener connectionListener; - private volatile MasterPinger masterPinger; private final Object masterNodeMutex = new Object(); @@ -91,25 +68,11 @@ public class MasterFaultDetection extends AbstractComponent { public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, DiscoveryNodesProvider nodesProvider, ClusterName clusterName) { - super(settings); - this.threadPool = threadPool; - this.transportService = transportService; + super(settings, threadPool, transportService, clusterName); this.nodesProvider = nodesProvider; - this.clusterName = clusterName; - - this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false); - this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1)); - this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30)); - this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3); - this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true); logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); - this.connectionListener = new FDConnectionListener(); - if (registerConnectionListener) { - transportService.addConnectionListener(connectionListener); - } - transportService.registerHandler(MASTER_PING_ACTION_NAME, new MasterPingRequestHandler()); } @@ -188,13 +151,14 @@ public class MasterFaultDetection extends AbstractComponent { } public void close() { + super.close(); stop("closing"); this.listeners.clear(); - transportService.removeConnectionListener(connectionListener); transportService.removeHandler(MASTER_PING_ACTION_NAME); } - private void handleTransportDisconnect(DiscoveryNode node) { + @Override + protected void handleTransportDisconnect(DiscoveryNode node) { synchronized (masterNodeMutex) { if (!node.equals(this.masterNode)) { return; @@ -245,17 +209,6 @@ public class MasterFaultDetection extends AbstractComponent { } } - private class FDConnectionListener implements TransportConnectionListener { - @Override - public void onNodeConnected(DiscoveryNode node) { - } - - @Override - public void onNodeDisconnected(DiscoveryNode node) { - handleTransportDisconnect(node); - } - } - private class MasterPinger implements Runnable { private volatile boolean running = true; diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 1788d3a257c..90012099116 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -38,14 +37,13 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import static org.elasticsearch.cluster.node.DiscoveryNodes.EMPTY_NODES; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.transport.TransportRequestOptions.options; /** * A fault detection of multiple nodes. */ -public class NodesFaultDetection extends AbstractComponent { +public class NodesFaultDetection extends FaultDetection { public static final String PING_ACTION_NAME = "internal:discovery/zen/fd/ping"; @@ -57,30 +55,10 @@ public class NodesFaultDetection extends AbstractComponent { } - private final ThreadPool threadPool; - - private final TransportService transportService; - private final ClusterName clusterName; - - - private final boolean connectOnNetworkDisconnect; - - private final TimeValue pingInterval; - - private final TimeValue pingRetryTimeout; - - private final int pingRetryCount; - - // used mainly for testing, should always be true - private final boolean registerConnectionListener; - - private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); private final ConcurrentMap nodesFD = newConcurrentMap(); - private final FDConnectionListener connectionListener; - private volatile DiscoveryNodes latestNodes = EMPTY_NODES; private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION; @@ -88,25 +66,11 @@ public class NodesFaultDetection extends AbstractComponent { private volatile boolean running = false; public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { - super(settings); - this.threadPool = threadPool; - this.transportService = transportService; - this.clusterName = clusterName; - - this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false); - this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1)); - this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30)); - this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3); - this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true); + super(settings, threadPool, transportService, clusterName); logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); transportService.registerHandler(PING_ACTION_NAME, new PingRequestHandler()); - - this.connectionListener = new FDConnectionListener(); - if (registerConnectionListener) { - transportService.addConnectionListener(connectionListener); - } } public void addListener(Listener listener) { @@ -158,12 +122,13 @@ public class NodesFaultDetection extends AbstractComponent { } public void close() { + super.close(); stop(); transportService.removeHandler(PING_ACTION_NAME); - transportService.removeConnectionListener(connectionListener); } - private void handleTransportDisconnect(DiscoveryNode node) { + @Override + protected void handleTransportDisconnect(DiscoveryNode node) { if (!latestNodes.nodeExists(node.id())) { return; } @@ -296,18 +261,6 @@ public class NodesFaultDetection extends AbstractComponent { volatile boolean running = true; } - private class FDConnectionListener implements TransportConnectionListener { - @Override - public void onNodeConnected(DiscoveryNode node) { - } - - @Override - public void onNodeDisconnected(DiscoveryNode node) { - handleTransportDisconnect(node); - } - } - - class PingRequestHandler extends BaseTransportRequestHandler { @Override diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java index ff52ca351c6..82abe2eccb1 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; @@ -114,8 +115,8 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes } final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder() - .put("discovery.zen.fd.ping_timeout", "1s") // for hitting simulated network failures quickly - .put("discovery.zen.fd.ping_retries", "1") // for hitting simulated network failures quickly + .put(FaultDetection.SETTING_PING_TIMEOUT, "1s") // for hitting simulated network failures quickly + .put(FaultDetection.SETTING_PING_RETRIES, "1") // for hitting simulated network failures quickly .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put("http.enabled", false) // just to make test quicker diff --git a/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index 553267971e5..082148921e6 100644 --- a/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; import org.elasticsearch.node.service.NodeService; @@ -131,7 +132,8 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase { ImmutableSettings.Builder settings = ImmutableSettings.builder(); boolean shouldRetry = randomBoolean(); // make sure we don't ping - settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m"); + settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry) + .put(FaultDetection.SETTING_PING_INTERVAL, "5m"); NodesFaultDetection nodesFD = new NodesFaultDetection(settings.build(), threadPool, serviceA, new ClusterName("test")); nodesFD.start(); nodesFD.updateNodes(buildNodesForA(true), -1); @@ -165,7 +167,8 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase { ImmutableSettings.Builder settings = ImmutableSettings.builder(); boolean shouldRetry = randomBoolean(); // make sure we don't ping - settings.put("discovery.zen.fd.connect_on_network_disconnect", shouldRetry).put("discovery.zen.fd.ping_interval", "5m"); + settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry) + .put(FaultDetection.SETTING_PING_INTERVAL, "5m"); ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20)); final DiscoveryNodes nodes = buildNodesForA(false); MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, diff --git a/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryRejoinOnMaster.java b/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryRejoinOnMaster.java index f5b0067ea18..1ee31505d5e 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryRejoinOnMaster.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryRejoinOnMaster.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; @@ -54,8 +55,8 @@ public class ZenDiscoveryRejoinOnMaster extends ElasticsearchIntegrationTest { @Test public void testNoShardRelocationsOccurWhenElectedMasterNodeFails() throws Exception { Settings defaultSettings = ImmutableSettings.builder() - .put("discovery.zen.fd.ping_timeout", "1s") - .put("discovery.zen.fd.ping_retries", "1") + .put(FaultDetection.SETTING_PING_TIMEOUT, "1s") + .put(FaultDetection.SETTING_PING_RETRIES, "1") .put("discovery.type", "zen") .build(); diff --git a/src/test/java/org/elasticsearch/index/TransportIndexFailuresTest.java b/src/test/java/org/elasticsearch/index/TransportIndexFailuresTest.java index f8fe46cae1f..c7c20b790dd 100644 --- a/src/test/java/org/elasticsearch/index/TransportIndexFailuresTest.java +++ b/src/test/java/org/elasticsearch/index/TransportIndexFailuresTest.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; @@ -54,8 +55,8 @@ public class TransportIndexFailuresTest extends ElasticsearchIntegrationTest { private static final Settings nodeSettings = ImmutableSettings.settingsBuilder() .put("discovery.type", "zen") // <-- To override the local setting if set externally - .put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly - .put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly + .put(FaultDetection.SETTING_PING_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly + .put(FaultDetection.SETTING_PING_RETRIES, "1") // <-- for hitting simulated network failures quickly .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put("discovery.zen.minimum_master_nodes", 1) .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())