diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
index ad9d5e2c2a7..b2f60d93149 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
@@ -25,6 +27,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
@@ -85,6 +88,10 @@ public class NamenodeHeartbeatService extends PeriodicService {
private NNHAServiceTarget localTarget;
/** Cache HA protocol. */
private HAServiceProtocol localTargetHAProtocol;
+ /** Cache NN protocol. */
+ private NamenodeProtocol namenodeProtocol;
+ /** Cache Client protocol. */
+ private ClientProtocol clientProtocol;
/** RPC address for the namenode. */
private String rpcAddress;
/** Service RPC address for the namenode. */
@@ -100,6 +107,9 @@ public class NamenodeHeartbeatService extends PeriodicService {
private String resolvedHost;
private String originalNnId;
+
+ private int healthMonitorTimeoutMs = (int) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
+
/**
* Create a new Namenode status updater.
* @param resolver Namenode resolver service to handle NN registration.
@@ -211,6 +221,15 @@ public class NamenodeHeartbeatService extends PeriodicService {
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
+ long timeoutMs = conf.getTimeDuration(DFS_ROUTER_HEALTH_MONITOR_TIMEOUT,
+ DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+ if (timeoutMs < 0) {
+ LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
+ "Using value of : 0ms instead.", timeoutMs, DFS_ROUTER_HEALTH_MONITOR_TIMEOUT);
+ this.healthMonitorTimeoutMs = 0;
+ } else {
+ this.healthMonitorTimeoutMs = (int) timeoutMs;
+ }
super.serviceInit(configuration);
}
@@ -309,66 +328,26 @@ public class NamenodeHeartbeatService extends PeriodicService {
LOG.debug("Probing NN at service address: {}", serviceAddress);
URI serviceURI = new URI("hdfs://" + serviceAddress);
- // Read the filesystem info from RPC (required)
- NamenodeProtocol nn = NameNodeProxies
- .createProxy(this.conf, serviceURI, NamenodeProtocol.class)
- .getProxy();
- if (nn != null) {
- NamespaceInfo info = nn.versionRequest();
- if (info != null) {
- report.setNamespaceInfo(info);
- }
- }
+ // Read the filesystem info from RPC (required)
+ updateNameSpaceInfoParameters(serviceURI, report);
if (!report.registrationValid()) {
return report;
}
// Check for safemode from the client protocol. Currently optional, but
// should be required at some point for QoS
- try {
- ClientProtocol client = NameNodeProxies
- .createProxy(this.conf, serviceURI, ClientProtocol.class)
- .getProxy();
- if (client != null) {
- boolean isSafeMode = client.setSafeMode(
- SafeModeAction.SAFEMODE_GET, false);
- report.setSafeMode(isSafeMode);
- }
- } catch (Exception e) {
- LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
- }
+ updateSafeModeParameters(serviceURI, report);
// Read the stats from JMX (optional)
updateJMXParameters(webAddress, report);
- if (localTarget != null) {
- // Try to get the HA status
- try {
- // Determine if NN is active
- // TODO: dynamic timeout
- if (localTargetHAProtocol == null) {
- localTargetHAProtocol = localTarget.getHealthMonitorProxy(conf, 30*1000);
- LOG.debug("Get HA status with address {}", lifelineAddress);
- }
- HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
- report.setHAServiceState(status.getState());
- } catch (Throwable e) {
- if (e.getMessage().startsWith("HA for namenode is not enabled")) {
- LOG.error("HA for {} is not enabled", getNamenodeDesc());
- localTarget = null;
- } else {
- // Failed to fetch HA status, ignoring failure
- LOG.error("Cannot fetch HA status for {}: {}",
- getNamenodeDesc(), e.getMessage(), e);
- }
- localTargetHAProtocol = null;
- }
- }
- } catch(IOException e) {
+ // Try to get the HA status
+ updateHAStatusParameters(report);
+ } catch (IOException e) {
LOG.error("Cannot communicate with {}: {}",
getNamenodeDesc(), e.getMessage());
- } catch(Throwable e) {
+ } catch (Throwable e) {
// Generic error that we don't know about
LOG.error("Unexpected exception while communicating with {}: {}",
getNamenodeDesc(), e.getMessage(), e);
@@ -399,6 +378,59 @@ public class NamenodeHeartbeatService extends PeriodicService {
(nnId == null ? "" : " " + nnId);
}
+ /**
+ * Get the namespace information for a Namenode via RPC and add them to the report.
+ * @param serviceURI Server address of the Namenode to monitor.
+ * @param report Namenode status report updating with namespace information data.
+ * @throws IOException This method will throw IOException up, because RBF need
+ * use Namespace Info to identify this NS. If there are some IOExceptions,
+ * RBF doesn't need to get other information from NameNode,
+ * so throw IOException up.
+ */
+ private void updateNameSpaceInfoParameters(URI serviceURI,
+ NamenodeStatusReport report) throws IOException {
+ try {
+ if (this.namenodeProtocol == null) {
+ this.namenodeProtocol = NameNodeProxies.createProxy(this.conf, serviceURI,
+ NamenodeProtocol.class).getProxy();
+ }
+ if (namenodeProtocol != null) {
+ NamespaceInfo info = namenodeProtocol.versionRequest();
+ if (info != null) {
+ report.setNamespaceInfo(info);
+ }
+ }
+ } catch (IOException e) {
+ this.namenodeProtocol = null;
+ throw e;
+ }
+ }
+
+ /**
+ * Get the safemode information for a Namenode via RPC and add them to the report.
+ * Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
+ * So If there are some IOExceptions, RBF can just ignore it and try to collect
+ * other information form namenode continue.
+ * @param serviceURI Server address of the Namenode to monitor.
+ * @param report Namenode status report updating with safemode information data.
+ */
+ private void updateSafeModeParameters(URI serviceURI, NamenodeStatusReport report) {
+ try {
+ if (this.clientProtocol == null) {
+ this.clientProtocol = NameNodeProxies
+ .createProxy(this.conf, serviceURI, ClientProtocol.class)
+ .getProxy();
+ }
+ if (clientProtocol != null) {
+ boolean isSafeMode = clientProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
+ report.setSafeMode(isSafeMode);
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
+ this.clientProtocol = null;
+ }
+ }
+
/**
* Get the parameters for a Namenode from JMX and add them to the report.
* @param address Web interface of the Namenode to monitor.
@@ -415,6 +447,34 @@ public class NamenodeHeartbeatService extends PeriodicService {
}
}
+ /**
+ * Get the HA status for a Namenode via RPC and add them to the report.
+ * @param report Namenode status report updating with HA status information data.
+ */
+ private void updateHAStatusParameters(NamenodeStatusReport report) {
+ if (localTarget != null) {
+ try {
+ // Determine if NN is active
+ if (localTargetHAProtocol == null) {
+ localTargetHAProtocol = localTarget.getHealthMonitorProxy(
+ conf, this.healthMonitorTimeoutMs);
+ LOG.debug("Get HA status with address {}", lifelineAddress);
+ }
+ HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
+ report.setHAServiceState(status.getState());
+ } catch (Throwable e) {
+ if (e.getMessage().startsWith("HA for namenode is not enabled")) {
+ LOG.error("HA for {} is not enabled", getNamenodeDesc());
+ localTarget = null;
+ } else {
+ // Failed to fetch HA status, ignoring failure
+ LOG.error("Cannot fetch HA status for {}", getNamenodeDesc(), e);
+ }
+ localTargetHAProtocol = null;
+ }
+ }
+ }
+
/**
* Fetches NamenodeInfo metrics from namenode.
* @param address Web interface of the Namenode to monitor.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 3a317717ed2..d727ab09f38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -96,6 +96,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
+ public static final String DFS_ROUTER_HEALTH_MONITOR_TIMEOUT =
+ FEDERATION_ROUTER_PREFIX + "health.monitor.timeout";
+ public static final long DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT =
+ TimeUnit.SECONDS.toMillis(30);
public static final String DFS_ROUTER_MONITOR_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.namenode";
public static final String DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 58e4b27ac82..cc5dbd2e05b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -422,6 +422,14 @@
+
+ dfs.federation.router.health.monitor.timeout
+ 30s
+
+ Time out for Router to obtain HAServiceStatus from NameNode.
+
+
+
dfs.federation.router.heartbeat-state.interval
5s