HDFS-12450. Fixing TestNamenodeHeartbeat and support non-HA. Contributed by Inigo Goiri.

(cherry picked from commit 928d1e87f9)
This commit is contained in:
Inigo Goiri 2017-09-15 16:02:12 -07:00
parent 1f06b81ecb
commit 67785fe006
2 changed files with 50 additions and 20 deletions

View File

@ -94,8 +94,9 @@ public class NamenodeHeartbeatService extends PeriodicService {
*/ */
public NamenodeHeartbeatService( public NamenodeHeartbeatService(
ActiveNamenodeResolver resolver, String nsId, String nnId) { ActiveNamenodeResolver resolver, String nsId, String nnId) {
super(NamenodeHeartbeatService.class.getSimpleName() + " " + nsId + " " + super(NamenodeHeartbeatService.class.getSimpleName() +
nnId); (nsId == null ? "" : " " + nsId) +
(nnId == null ? "" : " " + nnId));
this.resolver = resolver; this.resolver = resolver;
@ -109,28 +110,28 @@ public class NamenodeHeartbeatService extends PeriodicService {
this.conf = configuration; this.conf = configuration;
String nnDesc = nameserviceId;
if (this.namenodeId != null && !this.namenodeId.isEmpty()) { if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
this.localTarget = new NNHAServiceTarget( this.localTarget = new NNHAServiceTarget(
conf, nameserviceId, namenodeId); conf, nameserviceId, namenodeId);
nnDesc += "-" + namenodeId;
} else { } else {
this.localTarget = null; this.localTarget = null;
} }
// Get the RPC address for the clients to connect // Get the RPC address for the clients to connect
this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId); this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
LOG.info("{}-{} RPC address: {}", LOG.info("{} RPC address: {}", nnDesc, rpcAddress);
nameserviceId, namenodeId, rpcAddress);
// Get the Service RPC address for monitoring // Get the Service RPC address for monitoring
this.serviceAddress = this.serviceAddress =
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId); DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
if (this.serviceAddress == null) { if (this.serviceAddress == null) {
LOG.error("Cannot locate RPC service address for NN {}-{}, " + LOG.error("Cannot locate RPC service address for NN {}, " +
"using RPC address {}", nameserviceId, namenodeId, this.rpcAddress); "using RPC address {}", nnDesc, this.rpcAddress);
this.serviceAddress = this.rpcAddress; this.serviceAddress = this.rpcAddress;
} }
LOG.info("{}-{} Service RPC address: {}", LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress);
nameserviceId, namenodeId, serviceAddress);
// Get the Lifeline RPC address for faster monitoring // Get the Lifeline RPC address for faster monitoring
this.lifelineAddress = this.lifelineAddress =
@ -138,13 +139,12 @@ public class NamenodeHeartbeatService extends PeriodicService {
if (this.lifelineAddress == null) { if (this.lifelineAddress == null) {
this.lifelineAddress = this.serviceAddress; this.lifelineAddress = this.serviceAddress;
} }
LOG.info("{}-{} Lifeline RPC address: {}", LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress);
nameserviceId, namenodeId, lifelineAddress);
// Get the Web address for UI // Get the Web address for UI
this.webAddress = this.webAddress =
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId); DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
LOG.info("{}-{} Web address: {}", nameserviceId, namenodeId, webAddress); LOG.info("{} Web address: {}", nnDesc, webAddress);
this.setIntervalMs(conf.getLong( this.setIntervalMs(conf.getLong(
DFS_ROUTER_HEARTBEAT_INTERVAL_MS, DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
@ -173,7 +173,7 @@ public class NamenodeHeartbeatService extends PeriodicService {
String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
String ret = conf.get(confKey); String ret = conf.get(confKey);
if (nsId != null && nnId != null) { if (nsId != null || nnId != null) {
// Get if for the proper nameservice and namenode // Get if for the proper nameservice and namenode
confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId); confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
ret = conf.get(confKey); ret = conf.get(confKey);
@ -182,10 +182,16 @@ public class NamenodeHeartbeatService extends PeriodicService {
if (ret == null) { if (ret == null) {
Map<String, InetSocketAddress> rpcAddresses = Map<String, InetSocketAddress> rpcAddresses =
DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
if (rpcAddresses.containsKey(nnId)) { InetSocketAddress sockAddr = null;
InetSocketAddress sockAddr = rpcAddresses.get(nnId); if (nnId != null) {
sockAddr = rpcAddresses.get(nnId);
} else if (rpcAddresses.size() == 1) {
// Get the only namenode in the namespace
sockAddr = rpcAddresses.values().iterator().next();
}
if (sockAddr != null) {
InetAddress addr = sockAddr.getAddress(); InetAddress addr = sockAddr.getAddress();
ret = addr.getHostAddress() + ":" + sockAddr.getPort(); ret = addr.getHostName() + ":" + sockAddr.getPort();
} }
} }
} }
@ -279,11 +285,16 @@ public class NamenodeHeartbeatService extends PeriodicService {
HAServiceStatus status = haProtocol.getServiceStatus(); HAServiceStatus status = haProtocol.getServiceStatus();
report.setHAServiceState(status.getState()); report.setHAServiceState(status.getState());
} catch (Throwable e) { } catch (Throwable e) {
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
LOG.error("HA for {} is not enabled", getNamenodeDesc());
localTarget = null;
} else {
// Failed to fetch HA status, ignoring failure // Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}: {}", LOG.error("Cannot fetch HA status for {}: {}",
getNamenodeDesc(), e.getMessage(), e); getNamenodeDesc(), e.getMessage(), e);
} }
} }
}
} catch(IOException e) { } catch(IOException e) {
LOG.error("Cannot communicate with {}: {}", LOG.error("Cannot communicate with {}: {}",
getNamenodeDesc(), e.getMessage()); getNamenodeDesc(), e.getMessage());

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
@ -31,6 +33,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
@ -136,8 +139,7 @@ public class RouterDFSCluster {
private RouterClient adminClient; private RouterClient adminClient;
private URI fileSystemUri; private URI fileSystemUri;
public RouterContext(Configuration conf, String nsId, String nnId) public RouterContext(Configuration conf, String nsId, String nnId) {
throws URISyntaxException {
this.conf = conf; this.conf = conf;
this.nameserviceId = nsId; this.nameserviceId = nsId;
this.namenodeId = nnId; this.namenodeId = nnId;
@ -397,10 +399,14 @@ public class RouterDFSCluster {
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.rpcPort); "127.0.0.1:" + context.rpcPort);
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.servicePort);
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.httpPort); "127.0.0.1:" + context.httpPort);
conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
"0.0.0.0"); "0.0.0.0");
conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY + "." + suffix,
"0.0.0.0");
} }
} }
@ -457,6 +463,19 @@ public class RouterDFSCluster {
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId); conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
} }
// Namenodes to monitor
StringBuilder sb = new StringBuilder();
for (String ns : this.nameservices) {
for (NamenodeContext context : getNamenodes(ns)) {
String suffix = context.getConfSuffix();
if (sb.length() != 0) {
sb.append(",");
}
sb.append(suffix);
}
}
conf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
// Add custom overrides if available // Add custom overrides if available
if (this.routerOverrides != null) { if (this.routerOverrides != null) {
for (Entry<String, String> entry : this.routerOverrides) { for (Entry<String, String> entry : this.routerOverrides) {