HDFS-15281. Make sure ZKFC uses dfs.namenode.rpc-address to bind to host address (#1964)
Contributed by Dhiraj Hegde. Signed-off-by: Mingliang Liu <liuml07@apache.org> Signed-off-by: Inigo Goiri <inigoiri@apache.org>
This commit is contained in:
parent
4ebe78496f
commit
23b15089ad
|
@ -317,9 +317,10 @@ public abstract class ZKFailoverController {
|
||||||
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
|
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
|
||||||
healthMonitor.start();
|
healthMonitor.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void initRPC() throws IOException {
|
protected void initRPC() throws IOException {
|
||||||
InetSocketAddress bindAddr = getRpcAddressToBindTo();
|
InetSocketAddress bindAddr = getRpcAddressToBindTo();
|
||||||
|
LOG.info("ZKFC RpcServer binding to {}", bindAddr);
|
||||||
rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
|
rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -111,21 +111,39 @@ public class DFSZKFailoverController extends ZKFailoverController {
|
||||||
@Override
|
@Override
|
||||||
protected InetSocketAddress getRpcAddressToBindTo() {
|
protected InetSocketAddress getRpcAddressToBindTo() {
|
||||||
int zkfcPort = getZkfcPort(conf);
|
int zkfcPort = getZkfcPort(conf);
|
||||||
return new InetSocketAddress(localTarget.getAddress().getAddress(),
|
String zkfcBindAddr = getZkfcServerBindHost(conf);
|
||||||
zkfcPort);
|
if (zkfcBindAddr == null || zkfcBindAddr.isEmpty()) {
|
||||||
|
zkfcBindAddr = localTarget.getAddress().getAddress().getHostAddress();
|
||||||
|
}
|
||||||
|
return new InetSocketAddress(zkfcBindAddr, zkfcPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected PolicyProvider getPolicyProvider() {
|
protected PolicyProvider getPolicyProvider() {
|
||||||
return new HDFSPolicyProvider();
|
return new HDFSPolicyProvider();
|
||||||
}
|
}
|
||||||
|
|
||||||
static int getZkfcPort(Configuration conf) {
|
static int getZkfcPort(Configuration conf) {
|
||||||
return conf.getInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY,
|
return conf.getInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY,
|
||||||
DFSConfigKeys.DFS_HA_ZKFC_PORT_DEFAULT);
|
DFSConfigKeys.DFS_HA_ZKFC_PORT_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a configuration get the bind host that could be used by ZKFC.
|
||||||
|
* We derive it from NN service rpc bind host or NN rpc bind host.
|
||||||
|
*
|
||||||
|
* @param conf input configuration
|
||||||
|
* @return the bind host address found in conf
|
||||||
|
*/
|
||||||
|
private static String getZkfcServerBindHost(Configuration conf) {
|
||||||
|
String addr = conf.getTrimmed(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
|
||||||
|
if (addr == null || addr.isEmpty()) {
|
||||||
|
addr = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY);
|
||||||
|
}
|
||||||
|
return addr;
|
||||||
|
}
|
||||||
|
|
||||||
public static DFSZKFailoverController create(Configuration conf) {
|
public static DFSZKFailoverController create(Configuration conf) {
|
||||||
Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
|
Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
|
||||||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.tools;
|
package org.apache.hadoop.hdfs.tools;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@ -50,6 +51,8 @@ import org.junit.Test;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
public class TestDFSZKFailoverController extends ClientBaseWithFixes {
|
public class TestDFSZKFailoverController extends ClientBaseWithFixes {
|
||||||
|
private static final String LOCALHOST_SERVER_ADDRESS = "127.0.0.1";
|
||||||
|
private static final String WILDCARD_ADDRESS = "0.0.0.0";
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private TestContext ctx;
|
private TestContext ctx;
|
||||||
|
@ -199,6 +202,27 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes {
|
||||||
waitForHAState(0, HAServiceState.ACTIVE);
|
waitForHAState(0, HAServiceState.ACTIVE);
|
||||||
waitForHAState(1, HAServiceState.STANDBY);
|
waitForHAState(1, HAServiceState.STANDBY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testWithoutBindAddressSet() throws Exception {
|
||||||
|
DFSZKFailoverController zkfc = DFSZKFailoverController.create(
|
||||||
|
conf);
|
||||||
|
|
||||||
|
assertEquals("Bind address not expected to be wildcard by default.",
|
||||||
|
zkfc.getRpcAddressToBindTo().getHostString(),
|
||||||
|
LOCALHOST_SERVER_ADDRESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testWithBindAddressSet() throws Exception {
|
||||||
|
conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, WILDCARD_ADDRESS);
|
||||||
|
DFSZKFailoverController zkfc = DFSZKFailoverController.create(
|
||||||
|
conf);
|
||||||
|
String addr = zkfc.getRpcAddressToBindTo().getHostString();
|
||||||
|
|
||||||
|
assertEquals("Bind address " + addr + " is not wildcard.",
|
||||||
|
addr, WILDCARD_ADDRESS);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testManualFailoverWithDFSHAAdmin() throws Exception {
|
public void testManualFailoverWithDFSHAAdmin() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue