HDFS-14652. HealthMonitor connection retry times should be configurable. Contributed by Chen Zhang.

This commit is contained in:
Wei-Chiu Chuang 2019-08-01 16:13:10 -07:00
parent b94eba9f11
commit d086d058d8
4 changed files with 33 additions and 12 deletions

View File

@ -259,8 +259,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** /**
* HA health monitor and failover controller. * HA health monitor and failover controller.
*/ */
/** How often to retry connecting to the service. */
public static final String HA_HM_CONNECT_RETRY_INTERVAL_KEY = public static final String HA_HM_CONNECT_RETRY_INTERVAL_KEY =
"ha.health-monitor.connect-retry-interval.ms"; "ha.health-monitor.connect-retry-interval.ms";
public static final long HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT = 1000; public static final long HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT = 1000;
@ -275,6 +273,12 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
"ha.health-monitor.sleep-after-disconnect.ms"; "ha.health-monitor.sleep-after-disconnect.ms";
public static final long HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT = 1000; public static final long HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT = 1000;
/** How many time to retry connecting to the service. */
public static final String HA_HM_RPC_CONNECT_MAX_RETRIES_KEY =
"ha.health-monitor.rpc.connect.max.retries";
public static final int HA_HM_RPC_CONNECT_MAX_RETRIES_DEFAULT = 1;
/** How often to retry connecting to the service. */
/* Timeout for the actual monitorHealth() calls. */ /* Timeout for the actual monitorHealth() calls. */
public static final String HA_HM_RPC_TIMEOUT_KEY = public static final String HA_HM_RPC_TIMEOUT_KEY =
"ha.health-monitor.rpc-timeout.ms"; "ha.health-monitor.rpc-timeout.ms";

View File

@ -107,19 +107,30 @@ public abstract class HAServiceTarget {
*/ */
public HAServiceProtocol getHealthMonitorProxy(Configuration conf, public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
int timeoutMs) throws IOException { int timeoutMs) throws IOException {
return getHealthMonitorProxy(conf, timeoutMs, 1);
}
public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
int timeoutMs, int retries) throws IOException {
InetSocketAddress addr = getHealthMonitorAddress(); InetSocketAddress addr = getHealthMonitorAddress();
if (addr == null) { if (addr == null) {
addr = getAddress(); addr = getAddress();
} }
return getProxyForAddress(conf, timeoutMs, addr); return getProxyForAddress(conf, timeoutMs, retries, addr);
} }
private HAServiceProtocol getProxyForAddress(Configuration conf, private HAServiceProtocol getProxyForAddress(Configuration conf,
int timeoutMs, InetSocketAddress addr) throws IOException { int timeoutMs, InetSocketAddress addr) throws IOException {
// Lower the timeout by setting retries to 1, so we quickly fail to connect
return getProxyForAddress(conf, timeoutMs, 1, addr);
}
private HAServiceProtocol getProxyForAddress(Configuration conf,
int timeoutMs, int retries, InetSocketAddress addr) throws IOException {
Configuration confCopy = new Configuration(conf); Configuration confCopy = new Configuration(conf);
// Lower the timeout so we quickly fail to connect
confCopy.setInt( confCopy.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
retries);
SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy); SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
return new HAServiceProtocolClientSideTranslatorPB( return new HAServiceProtocolClientSideTranslatorPB(
addr, addr,

View File

@ -55,6 +55,7 @@ public class HealthMonitor {
private long checkIntervalMillis; private long checkIntervalMillis;
private long sleepAfterDisconnectMillis; private long sleepAfterDisconnectMillis;
private int rpcConnectRetries;
private int rpcTimeout; private int rpcTimeout;
private volatile boolean shouldRun = true; private volatile boolean shouldRun = true;
@ -124,6 +125,8 @@ public class HealthMonitor {
this.connectRetryInterval = conf.getLong( this.connectRetryInterval = conf.getLong(
HA_HM_CONNECT_RETRY_INTERVAL_KEY, HA_HM_CONNECT_RETRY_INTERVAL_KEY,
HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT); HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT);
this.rpcConnectRetries = conf.getInt(HA_HM_RPC_CONNECT_MAX_RETRIES_KEY,
HA_HM_RPC_CONNECT_MAX_RETRIES_DEFAULT);
this.rpcTimeout = conf.getInt( this.rpcTimeout = conf.getInt(
HA_HM_RPC_TIMEOUT_KEY, HA_HM_RPC_TIMEOUT_KEY,
HA_HM_RPC_TIMEOUT_DEFAULT); HA_HM_RPC_TIMEOUT_DEFAULT);
@ -191,7 +194,7 @@ public class HealthMonitor {
* Connect to the service to be monitored. Stubbed out for easier testing. * Connect to the service to be monitored. Stubbed out for easier testing.
*/ */
protected HAServiceProtocol createProxy() throws IOException { protected HAServiceProtocol createProxy() throws IOException {
return targetToMonitor.getHealthMonitorProxy(conf, rpcTimeout); return targetToMonitor.getHealthMonitorProxy(conf, rpcTimeout, rpcConnectRetries);
} }
private void doHealthChecks() throws InterruptedException { private void doHealthChecks() throws InterruptedException {

View File

@ -39,6 +39,7 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_CONNECT_MAX_RETRIES_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT;
/** /**
@ -63,6 +64,7 @@ class DummyHAService extends HAServiceTarget {
public int fenceCount = 0; public int fenceCount = 0;
public int activeTransitionCount = 0; public int activeTransitionCount = 0;
boolean testWithProtoBufRPC = false; boolean testWithProtoBufRPC = false;
int rpcTimeout;
static ArrayList<DummyHAService> instances = Lists.newArrayList(); static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index; int index;
@ -82,7 +84,8 @@ class DummyHAService extends HAServiceTarget {
} }
Configuration conf = new Configuration(); Configuration conf = new Configuration();
this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT); this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
this.healthMonitorProxy = makeHealthMonitorMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT); this.healthMonitorProxy = makeHealthMonitorMock(conf,
HA_HM_RPC_TIMEOUT_DEFAULT, HA_HM_RPC_CONNECT_MAX_RETRIES_DEFAULT);
try { try {
conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName()); conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
this.fencer = Mockito.spy( this.fencer = Mockito.spy(
@ -149,13 +152,13 @@ class DummyHAService extends HAServiceTarget {
} }
private HAServiceProtocol makeHealthMonitorMock(Configuration conf, private HAServiceProtocol makeHealthMonitorMock(Configuration conf,
int timeoutMs) { int timeoutMs, int retries) {
HAServiceProtocol service; HAServiceProtocol service;
if (!testWithProtoBufRPC) { if (!testWithProtoBufRPC) {
service = new MockHAProtocolImpl(); service = new MockHAProtocolImpl();
} else { } else {
try { try {
service = super.getHealthMonitorProxy(conf, timeoutMs); service = super.getHealthMonitorProxy(conf, timeoutMs, retries);
} catch (IOException e) { } catch (IOException e) {
return null; return null;
} }
@ -189,9 +192,9 @@ class DummyHAService extends HAServiceTarget {
@Override @Override
public HAServiceProtocol getHealthMonitorProxy(Configuration conf, public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
int timeout) throws IOException { int timeout, int retries) throws IOException {
if (testWithProtoBufRPC) { if (testWithProtoBufRPC) {
proxy = makeHealthMonitorMock(conf, timeout); proxy = makeHealthMonitorMock(conf, timeout, retries);
} }
return proxy; return proxy;
} }