From d086d058d87ecb94fc750ba6f3ccae522658ac80 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Thu, 1 Aug 2019 16:13:10 -0700 Subject: [PATCH] HDFS-14652. HealthMonitor connection retry times should be configurable. Contributed by Chen Zhang. --- .../hadoop/fs/CommonConfigurationKeys.java | 10 +++++++--- .../org/apache/hadoop/ha/HAServiceTarget.java | 17 ++++++++++++++--- .../org/apache/hadoop/ha/HealthMonitor.java | 5 ++++- .../org/apache/hadoop/ha/DummyHAService.java | 13 ++++++++----- 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index aba60301056..bd9997d3f18 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -259,8 +259,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** * HA health monitor and failover controller. */ - - /** How often to retry connecting to the service. */ public static final String HA_HM_CONNECT_RETRY_INTERVAL_KEY = "ha.health-monitor.connect-retry-interval.ms"; public static final long HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT = 1000; @@ -274,7 +272,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String HA_HM_SLEEP_AFTER_DISCONNECT_KEY = "ha.health-monitor.sleep-after-disconnect.ms"; public static final long HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT = 1000; - + + /** How many time to retry connecting to the service. */ + public static final String HA_HM_RPC_CONNECT_MAX_RETRIES_KEY = + "ha.health-monitor.rpc.connect.max.retries"; + public static final int HA_HM_RPC_CONNECT_MAX_RETRIES_DEFAULT = 1; + + /** How often to retry connecting to the service. */ /* Timeout for the actual monitorHealth() calls. */ public static final String HA_HM_RPC_TIMEOUT_KEY = "ha.health-monitor.rpc-timeout.ms"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java index 4a2a21bafb0..9d5c8e7b7ea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java @@ -107,19 +107,30 @@ public abstract class HAServiceTarget { */ public HAServiceProtocol getHealthMonitorProxy(Configuration conf, int timeoutMs) throws IOException { + return getHealthMonitorProxy(conf, timeoutMs, 1); + } + + public HAServiceProtocol getHealthMonitorProxy(Configuration conf, + int timeoutMs, int retries) throws IOException { InetSocketAddress addr = getHealthMonitorAddress(); if (addr == null) { addr = getAddress(); } - return getProxyForAddress(conf, timeoutMs, addr); + return getProxyForAddress(conf, timeoutMs, retries, addr); } private HAServiceProtocol getProxyForAddress(Configuration conf, int timeoutMs, InetSocketAddress addr) throws IOException { + // Lower the timeout by setting retries to 1, so we quickly fail to connect + return getProxyForAddress(conf, timeoutMs, 1, addr); + } + + private HAServiceProtocol getProxyForAddress(Configuration conf, + int timeoutMs, int retries, InetSocketAddress addr) throws IOException { Configuration confCopy = new Configuration(conf); - // Lower the timeout so we quickly fail to connect confCopy.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + retries); SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy); return new HAServiceProtocolClientSideTranslatorPB( addr, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java index d1a858fd85f..7c386cfd874 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -55,6 +55,7 @@ public class HealthMonitor { private long checkIntervalMillis; private long sleepAfterDisconnectMillis; + private int rpcConnectRetries; private int rpcTimeout; private volatile boolean shouldRun = true; @@ -124,6 +125,8 @@ public class HealthMonitor { this.connectRetryInterval = conf.getLong( HA_HM_CONNECT_RETRY_INTERVAL_KEY, HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT); + this.rpcConnectRetries = conf.getInt(HA_HM_RPC_CONNECT_MAX_RETRIES_KEY, + HA_HM_RPC_CONNECT_MAX_RETRIES_DEFAULT); this.rpcTimeout = conf.getInt( HA_HM_RPC_TIMEOUT_KEY, HA_HM_RPC_TIMEOUT_DEFAULT); @@ -191,7 +194,7 @@ public class HealthMonitor { * Connect to the service to be monitored. Stubbed out for easier testing. */ protected HAServiceProtocol createProxy() throws IOException { - return targetToMonitor.getHealthMonitorProxy(conf, rpcTimeout); + return targetToMonitor.getHealthMonitorProxy(conf, rpcTimeout, rpcConnectRetries); } private void doHealthChecks() throws InterruptedException { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java index 51112bedefa..064527c3fed 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java @@ -39,6 +39,7 @@ import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_CONNECT_MAX_RETRIES_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT; /** @@ -63,6 +64,7 @@ class DummyHAService extends HAServiceTarget { public int fenceCount = 0; public int activeTransitionCount = 0; boolean testWithProtoBufRPC = false; + int rpcTimeout; static ArrayList instances = Lists.newArrayList(); int index; @@ -82,7 +84,8 @@ class DummyHAService extends HAServiceTarget { } Configuration conf = new Configuration(); this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT); - this.healthMonitorProxy = makeHealthMonitorMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT); + this.healthMonitorProxy = makeHealthMonitorMock(conf, + HA_HM_RPC_TIMEOUT_DEFAULT, HA_HM_RPC_CONNECT_MAX_RETRIES_DEFAULT); try { conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName()); this.fencer = Mockito.spy( @@ -149,13 +152,13 @@ class DummyHAService extends HAServiceTarget { } private HAServiceProtocol makeHealthMonitorMock(Configuration conf, - int timeoutMs) { + int timeoutMs, int retries) { HAServiceProtocol service; if (!testWithProtoBufRPC) { service = new MockHAProtocolImpl(); } else { try { - service = super.getHealthMonitorProxy(conf, timeoutMs); + service = super.getHealthMonitorProxy(conf, timeoutMs, retries); } catch (IOException e) { return null; } @@ -189,9 +192,9 @@ class DummyHAService extends HAServiceTarget { @Override public HAServiceProtocol getHealthMonitorProxy(Configuration conf, - int timeout) throws IOException { + int timeout, int retries) throws IOException { if (testWithProtoBufRPC) { - proxy = makeHealthMonitorMock(conf, timeout); + proxy = makeHealthMonitorMock(conf, timeout, retries); } return proxy; }