HDFS-15300. RBF: updateActiveNamenode() is invalid when RPC address is IP. Contributed by xuzq.

(cherry picked from commit 936bf09c37)
This commit is contained in:
Ayush Saxena 2020-05-12 21:48:44 +05:30 committed by Wei-Chiu Chuang
parent 5bfc3a4c3a
commit a549b4a82e
4 changed files with 45 additions and 3 deletions

View File

@ -693,6 +693,22 @@ public class NetUtils {
}
}
/**
* Attempt to normalize the given string to "host:port"
* if it like "ip:port".
*
* @param ipPort maybe lik ip:port or host:port.
* @return host:port
*/
public static String normalizeIP2HostName(String ipPort) {
if (null == ipPort || !ipPortPattern.matcher(ipPort).matches()) {
return ipPort;
}
InetSocketAddress address = createSocketAddr(ipPort);
return getHostPortString(address);
}
/**
* Return hostname without throwing exception.
* The returned hostname String format is "hostname".

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeat
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -263,7 +264,8 @@ public class MembershipNamenodeResolver
MembershipState record = MembershipState.newInstance(
routerId, report.getNameserviceId(), report.getNamenodeId(),
report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(),
report.getClusterId(), report.getBlockPoolId(),
NetUtils.normalizeIP2HostName(report.getRpcAddress()),
report.getServiceAddress(), report.getLifelineAddress(),
report.getWebScheme(), report.getWebAddress(), report.getState(),
report.getSafemode());

View File

@ -138,8 +138,15 @@ public final class FederationTestUtils {
public static NamenodeStatusReport createNamenodeReport(String ns, String nn,
HAServiceState state) {
Random rand = new Random();
NamenodeStatusReport report = new NamenodeStatusReport(ns, nn,
"localhost:" + rand.nextInt(10000), "localhost:" + rand.nextInt(10000),
return createNamenodeReport(ns, nn, "localhost:"
+ rand.nextInt(10000), state);
}
public static NamenodeStatusReport createNamenodeReport(String ns, String nn,
String rpcAddress, HAServiceState state) {
Random rand = new Random();
NamenodeStatusReport report = new NamenodeStatusReport(ns, nn, rpcAddress,
"localhost:" + rand.nextInt(10000),
"localhost:" + rand.nextInt(10000), "http",
"testwebaddress-" + ns + nn);
if (state == null) {

View File

@ -307,6 +307,23 @@ public class TestNamenodeResolver {
FederationNamenodeServiceState.ACTIVE, namenode1.getState());
}
@Test
public void testCacheUpdateOnNamenodeStateUpdateWithIp()
throws IOException {
final String rpcAddress = "127.0.0.1:10000";
assertTrue(namenodeResolver.registerNamenode(
createNamenodeReport(NAMESERVICES[0], NAMENODES[0], rpcAddress,
HAServiceState.STANDBY)));
stateStore.refreshCaches(true);
InetSocketAddress inetAddr = getInetSocketAddress(rpcAddress);
namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr);
FederationNamenodeContext namenode =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0);
assertEquals("The namenode state should be ACTIVE post update.",
FederationNamenodeServiceState.ACTIVE, namenode.getState());
}
/**
* Creates InetSocketAddress from the given RPC address.
* @param rpcAddr RPC address (host:port).