HDFS-12450. Fixing TestNamenodeHeartbeat and support non-HA. Contributed by Inigo Goiri.
(cherry picked from commit928d1e87f9
) (cherry picked from commit67785fe006
)
This commit is contained in:
parent
ec84369538
commit
9ad1f90b9d
|
@ -94,8 +94,9 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
||||||
*/
|
*/
|
||||||
public NamenodeHeartbeatService(
|
public NamenodeHeartbeatService(
|
||||||
ActiveNamenodeResolver resolver, String nsId, String nnId) {
|
ActiveNamenodeResolver resolver, String nsId, String nnId) {
|
||||||
super(NamenodeHeartbeatService.class.getSimpleName() + " " + nsId + " " +
|
super(NamenodeHeartbeatService.class.getSimpleName() +
|
||||||
nnId);
|
(nsId == null ? "" : " " + nsId) +
|
||||||
|
(nnId == null ? "" : " " + nnId));
|
||||||
|
|
||||||
this.resolver = resolver;
|
this.resolver = resolver;
|
||||||
|
|
||||||
|
@ -109,28 +110,28 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
||||||
|
|
||||||
this.conf = configuration;
|
this.conf = configuration;
|
||||||
|
|
||||||
|
String nnDesc = nameserviceId;
|
||||||
if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
|
if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
|
||||||
this.localTarget = new NNHAServiceTarget(
|
this.localTarget = new NNHAServiceTarget(
|
||||||
conf, nameserviceId, namenodeId);
|
conf, nameserviceId, namenodeId);
|
||||||
|
nnDesc += "-" + namenodeId;
|
||||||
} else {
|
} else {
|
||||||
this.localTarget = null;
|
this.localTarget = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the RPC address for the clients to connect
|
// Get the RPC address for the clients to connect
|
||||||
this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
|
this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
|
||||||
LOG.info("{}-{} RPC address: {}",
|
LOG.info("{} RPC address: {}", nnDesc, rpcAddress);
|
||||||
nameserviceId, namenodeId, rpcAddress);
|
|
||||||
|
|
||||||
// Get the Service RPC address for monitoring
|
// Get the Service RPC address for monitoring
|
||||||
this.serviceAddress =
|
this.serviceAddress =
|
||||||
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
|
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
|
||||||
if (this.serviceAddress == null) {
|
if (this.serviceAddress == null) {
|
||||||
LOG.error("Cannot locate RPC service address for NN {}-{}, " +
|
LOG.error("Cannot locate RPC service address for NN {}, " +
|
||||||
"using RPC address {}", nameserviceId, namenodeId, this.rpcAddress);
|
"using RPC address {}", nnDesc, this.rpcAddress);
|
||||||
this.serviceAddress = this.rpcAddress;
|
this.serviceAddress = this.rpcAddress;
|
||||||
}
|
}
|
||||||
LOG.info("{}-{} Service RPC address: {}",
|
LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress);
|
||||||
nameserviceId, namenodeId, serviceAddress);
|
|
||||||
|
|
||||||
// Get the Lifeline RPC address for faster monitoring
|
// Get the Lifeline RPC address for faster monitoring
|
||||||
this.lifelineAddress =
|
this.lifelineAddress =
|
||||||
|
@ -138,13 +139,12 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
||||||
if (this.lifelineAddress == null) {
|
if (this.lifelineAddress == null) {
|
||||||
this.lifelineAddress = this.serviceAddress;
|
this.lifelineAddress = this.serviceAddress;
|
||||||
}
|
}
|
||||||
LOG.info("{}-{} Lifeline RPC address: {}",
|
LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress);
|
||||||
nameserviceId, namenodeId, lifelineAddress);
|
|
||||||
|
|
||||||
// Get the Web address for UI
|
// Get the Web address for UI
|
||||||
this.webAddress =
|
this.webAddress =
|
||||||
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
|
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
|
||||||
LOG.info("{}-{} Web address: {}", nameserviceId, namenodeId, webAddress);
|
LOG.info("{} Web address: {}", nnDesc, webAddress);
|
||||||
|
|
||||||
this.setIntervalMs(conf.getLong(
|
this.setIntervalMs(conf.getLong(
|
||||||
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
|
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
|
||||||
|
@ -173,7 +173,7 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
||||||
String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
String ret = conf.get(confKey);
|
String ret = conf.get(confKey);
|
||||||
|
|
||||||
if (nsId != null && nnId != null) {
|
if (nsId != null || nnId != null) {
|
||||||
// Get if for the proper nameservice and namenode
|
// Get if for the proper nameservice and namenode
|
||||||
confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
|
confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
|
||||||
ret = conf.get(confKey);
|
ret = conf.get(confKey);
|
||||||
|
@ -182,10 +182,16 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
||||||
if (ret == null) {
|
if (ret == null) {
|
||||||
Map<String, InetSocketAddress> rpcAddresses =
|
Map<String, InetSocketAddress> rpcAddresses =
|
||||||
DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
|
DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
|
||||||
if (rpcAddresses.containsKey(nnId)) {
|
InetSocketAddress sockAddr = null;
|
||||||
InetSocketAddress sockAddr = rpcAddresses.get(nnId);
|
if (nnId != null) {
|
||||||
|
sockAddr = rpcAddresses.get(nnId);
|
||||||
|
} else if (rpcAddresses.size() == 1) {
|
||||||
|
// Get the only namenode in the namespace
|
||||||
|
sockAddr = rpcAddresses.values().iterator().next();
|
||||||
|
}
|
||||||
|
if (sockAddr != null) {
|
||||||
InetAddress addr = sockAddr.getAddress();
|
InetAddress addr = sockAddr.getAddress();
|
||||||
ret = addr.getHostAddress() + ":" + sockAddr.getPort();
|
ret = addr.getHostName() + ":" + sockAddr.getPort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -279,9 +285,14 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
||||||
HAServiceStatus status = haProtocol.getServiceStatus();
|
HAServiceStatus status = haProtocol.getServiceStatus();
|
||||||
report.setHAServiceState(status.getState());
|
report.setHAServiceState(status.getState());
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// Failed to fetch HA status, ignoring failure
|
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
|
||||||
LOG.error("Cannot fetch HA status for {}: {}",
|
LOG.error("HA for {} is not enabled", getNamenodeDesc());
|
||||||
getNamenodeDesc(), e.getMessage(), e);
|
localTarget = null;
|
||||||
|
} else {
|
||||||
|
// Failed to fetch HA status, ignoring failure
|
||||||
|
LOG.error("Cannot fetch HA status for {}: {}",
|
||||||
|
getNamenodeDesc(), e.getMessage(), e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
|
|
|
@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
|
||||||
|
@ -31,6 +33,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
|
||||||
|
@ -136,8 +139,7 @@ public class RouterDFSCluster {
|
||||||
private RouterClient adminClient;
|
private RouterClient adminClient;
|
||||||
private URI fileSystemUri;
|
private URI fileSystemUri;
|
||||||
|
|
||||||
public RouterContext(Configuration conf, String nsId, String nnId)
|
public RouterContext(Configuration conf, String nsId, String nnId) {
|
||||||
throws URISyntaxException {
|
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.nameserviceId = nsId;
|
this.nameserviceId = nsId;
|
||||||
this.namenodeId = nnId;
|
this.namenodeId = nnId;
|
||||||
|
@ -397,10 +399,14 @@ public class RouterDFSCluster {
|
||||||
|
|
||||||
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
|
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
|
||||||
"127.0.0.1:" + context.rpcPort);
|
"127.0.0.1:" + context.rpcPort);
|
||||||
|
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix,
|
||||||
|
"127.0.0.1:" + context.servicePort);
|
||||||
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
|
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
|
||||||
"127.0.0.1:" + context.httpPort);
|
"127.0.0.1:" + context.httpPort);
|
||||||
conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
|
conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
|
||||||
"0.0.0.0");
|
"0.0.0.0");
|
||||||
|
conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY + "." + suffix,
|
||||||
|
"0.0.0.0");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,6 +463,19 @@ public class RouterDFSCluster {
|
||||||
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
|
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Namenodes to monitor
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (String ns : this.nameservices) {
|
||||||
|
for (NamenodeContext context : getNamenodes(ns)) {
|
||||||
|
String suffix = context.getConfSuffix();
|
||||||
|
if (sb.length() != 0) {
|
||||||
|
sb.append(",");
|
||||||
|
}
|
||||||
|
sb.append(suffix);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
|
||||||
|
|
||||||
// Add custom overrides if available
|
// Add custom overrides if available
|
||||||
if (this.routerOverrides != null) {
|
if (this.routerOverrides != null) {
|
||||||
for (Entry<String, String> entry : this.routerOverrides) {
|
for (Entry<String, String> entry : this.routerOverrides) {
|
||||||
|
|
Loading…
Reference in New Issue