HDFS-16440. RBF: Support router get HAServiceStatus with Lifeline RPC address (#3971)

This commit is contained in:
YulongZ 2022-02-16 00:44:17 +08:00 committed by GitHub
parent 0c194f2157
commit 48bef285a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 5 deletions

View File

@ -48,6 +48,8 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
/** /**
* The {@link Router} periodically checks the state of a Namenode (usually on * The {@link Router} periodically checks the state of a Namenode (usually on
* the same server) and reports their high availability (HA) state and * the same server) and reports their high availability (HA) state and
@ -346,7 +348,8 @@ public class NamenodeHeartbeatService extends PeriodicService {
// Determine if NN is active // Determine if NN is active
// TODO: dynamic timeout // TODO: dynamic timeout
if (localTargetHAProtocol == null) { if (localTargetHAProtocol == null) {
localTargetHAProtocol = localTarget.getProxy(conf, 30*1000); localTargetHAProtocol = localTarget.getHealthMonitorProxy(conf, 30*1000);
LOG.debug("Get HA status with address {}", lifelineAddress);
} }
HAServiceStatus status = localTargetHAProtocol.getServiceStatus(); HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
report.setHAServiceState(status.getState()); report.setHAServiceState(status.getState());
@ -373,6 +376,11 @@ public class NamenodeHeartbeatService extends PeriodicService {
return report; return report;
} }
@VisibleForTesting
NNHAServiceTarget getLocalTarget(){
return this.localTarget;
}
/** /**
* Get the description of the Namenode to monitor. * Get the description of the Namenode to monitor.
* @return Description of the Namenode to monitor. * @return Description of the Namenode to monitor.

View File

@ -211,6 +211,50 @@ public class TestRouterNamenodeHeartbeat {
assertEquals(NAMENODES[1], standby.getNamenodeId()); assertEquals(NAMENODES[1], standby.getNamenodeId());
} }
@Test
public void testNamenodeHeartbeatServiceHAServiceProtocolProxy(){
testNamenodeHeartbeatServiceHAServiceProtocol(
"test-ns", "nn", 1000, -1, -1, 1003,
"host01.test:1000", "host02.test:1000");
testNamenodeHeartbeatServiceHAServiceProtocol(
"test-ns", "nn", 1000, 1001, -1, 1003,
"host01.test:1001", "host02.test:1001");
testNamenodeHeartbeatServiceHAServiceProtocol(
"test-ns", "nn", 1000, -1, 1002, 1003,
"host01.test:1002", "host02.test:1002");
testNamenodeHeartbeatServiceHAServiceProtocol(
"test-ns", "nn", 1000, 1001, 1002, 1003,
"host01.test:1002", "host02.test:1002");
}
private void testNamenodeHeartbeatServiceHAServiceProtocol(
String nsId, String nnId,
int rpcPort, int servicePort,
int lifelinePort, int webAddressPort,
String expected0, String expected1) {
Configuration conf = generateNamenodeConfiguration(nsId, nnId,
rpcPort, servicePort, lifelinePort, webAddressPort);
Router testRouter = new Router();
testRouter.setConf(conf);
Collection<NamenodeHeartbeatService> heartbeatServices =
testRouter.createNamenodeHeartbeatServices();
assertEquals(2, heartbeatServices.size());
Iterator<NamenodeHeartbeatService> iterator = heartbeatServices.iterator();
NamenodeHeartbeatService service0 = iterator.next();
service0.init(conf);
assertNotNull(service0.getLocalTarget());
assertEquals(expected0, service0.getLocalTarget().getHealthMonitorAddress().toString());
NamenodeHeartbeatService service1 = iterator.next();
service1.init(conf);
assertNotNull(service1.getLocalTarget());
assertEquals(expected1, service1.getLocalTarget().getHealthMonitorAddress().toString());
}
@Test @Test
public void testNamenodeHeartbeatServiceNNResolution() { public void testNamenodeHeartbeatServiceNNResolution() {
String nsId = "test-ns"; String nsId = "test-ns";
@ -261,10 +305,14 @@ public class TestRouterNamenodeHeartbeat {
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
MockDomainNameResolver.DOMAIN + ":" + rpcPort); MockDomainNameResolver.DOMAIN + ":" + rpcPort);
if (servicePort >= 0){
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix, conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix,
MockDomainNameResolver.DOMAIN + ":" + servicePort); MockDomainNameResolver.DOMAIN + ":" + servicePort);
}
if (lifelinePort >= 0){
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY + "." + suffix, conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY + "." + suffix,
MockDomainNameResolver.DOMAIN + ":" + lifelinePort); MockDomainNameResolver.DOMAIN + ":" + lifelinePort);
}
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
MockDomainNameResolver.DOMAIN + ":" + webAddressPort); MockDomainNameResolver.DOMAIN + ":" + webAddressPort);