HDFS-16705. RBF supports healthMonitor timeout configurable and caching NN and client proxy in NamenodeHeartbeatService (#4662)

This commit is contained in:
xuzq 2022-08-16 04:55:16 +08:00 committed by GitHub
parent eff3b8c59a
commit 622ca0d51f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 47 deletions

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
@ -25,6 +27,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
@ -85,6 +88,10 @@ public class NamenodeHeartbeatService extends PeriodicService {
private NNHAServiceTarget localTarget;
/** Cache HA protocol. */
private HAServiceProtocol localTargetHAProtocol;
/** Cache NN protocol. */
private NamenodeProtocol namenodeProtocol;
/** Cache Client protocol. */
private ClientProtocol clientProtocol;
/** RPC address for the namenode. */
private String rpcAddress;
/** Service RPC address for the namenode. */
@ -100,6 +107,9 @@ public class NamenodeHeartbeatService extends PeriodicService {
private String resolvedHost;
private String originalNnId;
private int healthMonitorTimeoutMs = (int) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
/**
* Create a new Namenode status updater.
* @param resolver Namenode resolver service to handle NN registration.
@ -211,6 +221,15 @@ public class NamenodeHeartbeatService extends PeriodicService {
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
long timeoutMs = conf.getTimeDuration(DFS_ROUTER_HEALTH_MONITOR_TIMEOUT,
DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
if (timeoutMs < 0) {
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
"Using value of : 0ms instead.", timeoutMs, DFS_ROUTER_HEALTH_MONITOR_TIMEOUT);
this.healthMonitorTimeoutMs = 0;
} else {
this.healthMonitorTimeoutMs = (int) timeoutMs;
}
super.serviceInit(configuration);
}
@ -309,66 +328,26 @@ public class NamenodeHeartbeatService extends PeriodicService {
LOG.debug("Probing NN at service address: {}", serviceAddress);
URI serviceURI = new URI("hdfs://" + serviceAddress);
// Read the filesystem info from RPC (required)
NamenodeProtocol nn = NameNodeProxies
.createProxy(this.conf, serviceURI, NamenodeProtocol.class)
.getProxy();
if (nn != null) {
NamespaceInfo info = nn.versionRequest();
if (info != null) {
report.setNamespaceInfo(info);
}
}
// Read the filesystem info from RPC (required)
updateNameSpaceInfoParameters(serviceURI, report);
if (!report.registrationValid()) {
return report;
}
// Check for safemode from the client protocol. Currently optional, but
// should be required at some point for QoS
try {
ClientProtocol client = NameNodeProxies
.createProxy(this.conf, serviceURI, ClientProtocol.class)
.getProxy();
if (client != null) {
boolean isSafeMode = client.setSafeMode(
SafeModeAction.SAFEMODE_GET, false);
report.setSafeMode(isSafeMode);
}
} catch (Exception e) {
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
}
updateSafeModeParameters(serviceURI, report);
// Read the stats from JMX (optional)
updateJMXParameters(webAddress, report);
if (localTarget != null) {
// Try to get the HA status
try {
// Determine if NN is active
// TODO: dynamic timeout
if (localTargetHAProtocol == null) {
localTargetHAProtocol = localTarget.getHealthMonitorProxy(conf, 30*1000);
LOG.debug("Get HA status with address {}", lifelineAddress);
}
HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
report.setHAServiceState(status.getState());
} catch (Throwable e) {
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
LOG.error("HA for {} is not enabled", getNamenodeDesc());
localTarget = null;
} else {
// Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}: {}",
getNamenodeDesc(), e.getMessage(), e);
}
localTargetHAProtocol = null;
}
}
} catch(IOException e) {
updateHAStatusParameters(report);
} catch (IOException e) {
LOG.error("Cannot communicate with {}: {}",
getNamenodeDesc(), e.getMessage());
} catch(Throwable e) {
} catch (Throwable e) {
// Generic error that we don't know about
LOG.error("Unexpected exception while communicating with {}: {}",
getNamenodeDesc(), e.getMessage(), e);
@ -399,6 +378,59 @@ public class NamenodeHeartbeatService extends PeriodicService {
(nnId == null ? "" : " " + nnId);
}
/**
* Get the namespace information for a Namenode via RPC and add them to the report.
* @param serviceURI Server address of the Namenode to monitor.
* @param report Namenode status report updating with namespace information data.
* @throws IOException This method will throw IOException up, because RBF need
* use Namespace Info to identify this NS. If there are some IOExceptions,
* RBF doesn't need to get other information from NameNode,
* so throw IOException up.
*/
private void updateNameSpaceInfoParameters(URI serviceURI,
NamenodeStatusReport report) throws IOException {
try {
if (this.namenodeProtocol == null) {
this.namenodeProtocol = NameNodeProxies.createProxy(this.conf, serviceURI,
NamenodeProtocol.class).getProxy();
}
if (namenodeProtocol != null) {
NamespaceInfo info = namenodeProtocol.versionRequest();
if (info != null) {
report.setNamespaceInfo(info);
}
}
} catch (IOException e) {
this.namenodeProtocol = null;
throw e;
}
}
/**
* Get the safemode information for a Namenode via RPC and add them to the report.
* Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
* So If there are some IOExceptions, RBF can just ignore it and try to collect
* other information form namenode continue.
* @param serviceURI Server address of the Namenode to monitor.
* @param report Namenode status report updating with safemode information data.
*/
private void updateSafeModeParameters(URI serviceURI, NamenodeStatusReport report) {
try {
if (this.clientProtocol == null) {
this.clientProtocol = NameNodeProxies
.createProxy(this.conf, serviceURI, ClientProtocol.class)
.getProxy();
}
if (clientProtocol != null) {
boolean isSafeMode = clientProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
report.setSafeMode(isSafeMode);
}
} catch (Exception e) {
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
this.clientProtocol = null;
}
}
/**
* Get the parameters for a Namenode from JMX and add them to the report.
* @param address Web interface of the Namenode to monitor.
@ -415,6 +447,34 @@ public class NamenodeHeartbeatService extends PeriodicService {
}
}
/**
* Get the HA status for a Namenode via RPC and add them to the report.
* @param report Namenode status report updating with HA status information data.
*/
private void updateHAStatusParameters(NamenodeStatusReport report) {
if (localTarget != null) {
try {
// Determine if NN is active
if (localTargetHAProtocol == null) {
localTargetHAProtocol = localTarget.getHealthMonitorProxy(
conf, this.healthMonitorTimeoutMs);
LOG.debug("Get HA status with address {}", lifelineAddress);
}
HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
report.setHAServiceState(status.getState());
} catch (Throwable e) {
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
LOG.error("HA for {} is not enabled", getNamenodeDesc());
localTarget = null;
} else {
// Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}", getNamenodeDesc(), e);
}
localTargetHAProtocol = null;
}
}
}
/**
* Fetches NamenodeInfo metrics from namenode.
* @param address Web interface of the Namenode to monitor.

View File

@ -96,6 +96,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_ROUTER_HEALTH_MONITOR_TIMEOUT =
FEDERATION_ROUTER_PREFIX + "health.monitor.timeout";
public static final long DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT =
TimeUnit.SECONDS.toMillis(30);
public static final String DFS_ROUTER_MONITOR_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.namenode";
public static final String DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED =

View File

@ -422,6 +422,14 @@
</description>
</property>
<property>
<name>dfs.federation.router.health.monitor.timeout</name>
<value>30s</value>
<description>
Time out for Router to obtain HAServiceStatus from NameNode.
</description>
</property>
<property>
<name>dfs.federation.router.heartbeat-state.interval</name>
<value>5s</value>