HDFS-9311. Support optional offload of NameNode HA service health checks to a separate RPC server. Contributed by Chris Nauroth.

(cherry picked from commit bf8e452982)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
This commit is contained in:
cnauroth 2015-10-27 23:07:14 -07:00
parent 0377795e06
commit af0f2e27d1
15 changed files with 473 additions and 67 deletions

View File

@ -49,6 +49,23 @@ public abstract class HAServiceTarget {
*/
public abstract InetSocketAddress getAddress();
/**
* Returns an optional separate RPC server address for health checks at the
* target node. If defined, then this address is used by the health monitor
* for the {@link HAServiceProtocol#monitorHealth()} and
* {@link HAServiceProtocol#getServiceStatus()} calls. This can be useful for
* separating out these calls onto separate RPC handlers to protect against
* resource exhaustion in the main RPC handler pool. If null (which is the
* default implementation), then all RPC calls go to the address defined by
* {@link #getAddress()}.
*
* @return IPC address of the lifeline RPC server on the target node, or null
* if no lifeline RPC server is used
*/
public InetSocketAddress getHealthMonitorAddress() {
return null;
}
/**
* @return the IPC address of the ZKFC on the target node
*/
@ -73,15 +90,42 @@ public abstract void checkFencingConfigured()
*/
public HAServiceProtocol getProxy(Configuration conf, int timeoutMs)
throws IOException {
return getProxyForAddress(conf, timeoutMs, getAddress());
}
/**
* Returns a proxy to connect to the target HA service for health monitoring.
* If {@link #getHealthMonitorAddress()} is implemented to return a non-null
* address, then this proxy will connect to that address. Otherwise, the
* returned proxy defaults to using {@link #getAddress()}, which means this
* method's behavior is identical to {@link #getProxy(Configuration, int)}.
*
* @param conf Configuration
* @param timeoutMs timeout in milliseconds
* @return a proxy to connect to the target HA service for health monitoring
* @throws IOException if there is an error
*/
public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
int timeoutMs) throws IOException {
InetSocketAddress addr = getHealthMonitorAddress();
if (addr == null) {
addr = getAddress();
}
return getProxyForAddress(conf, timeoutMs, addr);
}
private HAServiceProtocol getProxyForAddress(Configuration conf,
int timeoutMs, InetSocketAddress addr) throws IOException {
Configuration confCopy = new Configuration(conf);
// Lower the timeout so we quickly fail to connect
confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
confCopy.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
return new HAServiceProtocolClientSideTranslatorPB(
getAddress(),
addr,
confCopy, factory, timeoutMs);
}
/**
* @return a proxy to the ZKFC which is associated with this HA service.
*/

View File

@ -191,7 +191,7 @@ private void tryConnect() {
* Connect to the service to be monitored. Stubbed out for easier testing.
*/
protected HAServiceProtocol createProxy() throws IOException {
return targetToMonitor.getProxy(conf, rpcTimeout);
return targetToMonitor.getHealthMonitorProxy(conf, rpcTimeout);
}
private void doHealthChecks() throws InterruptedException {

View File

@ -49,10 +49,10 @@ class DummyHAService extends HAServiceTarget {
public static final Log LOG = LogFactory.getLog(DummyHAService.class);
private static final String DUMMY_FENCE_KEY = "dummy.fence.key";
volatile HAServiceState state;
HAServiceProtocol proxy;
HAServiceProtocol proxy, healthMonitorProxy;
ZKFCProtocol zkfcProxy = null;
NodeFencer fencer;
InetSocketAddress address;
InetSocketAddress address, healthMonitorAddress;
boolean isHealthy = true;
boolean actUnreachable = false;
boolean failToBecomeActive, failToBecomeStandby, failToFence;
@ -80,6 +80,7 @@ class DummyHAService extends HAServiceTarget {
}
Configuration conf = new Configuration();
this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
this.healthMonitorProxy = makeHealthMonitorMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
try {
conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
this.fencer = Mockito.spy(
@ -92,7 +93,18 @@ class DummyHAService extends HAServiceTarget {
this.index = instances.size();
}
}
DummyHAService(HAServiceState state, InetSocketAddress address,
InetSocketAddress healthMonitorAddress, boolean testWithProtoBufRPC) {
this(state, address, testWithProtoBufRPC);
if (testWithProtoBufRPC) {
this.healthMonitorAddress = startAndGetRPCServerAddress(
healthMonitorAddress);
} else {
this.healthMonitorAddress = healthMonitorAddress;
}
}
public void setSharedResource(DummySharedResource rsrc) {
this.sharedResource = rsrc;
}
@ -134,11 +146,31 @@ private HAServiceProtocol makeMock(Configuration conf, int timeoutMs) {
return Mockito.spy(service);
}
private HAServiceProtocol makeHealthMonitorMock(Configuration conf,
int timeoutMs) {
HAServiceProtocol service;
if (!testWithProtoBufRPC) {
service = new MockHAProtocolImpl();
} else {
try {
service = super.getHealthMonitorProxy(conf, timeoutMs);
} catch (IOException e) {
return null;
}
}
return Mockito.spy(service);
}
@Override
public InetSocketAddress getAddress() {
return address;
}
@Override
public InetSocketAddress getHealthMonitorAddress() {
return healthMonitorAddress;
}
@Override
public InetSocketAddress getZKFCAddress() {
return null;
@ -152,6 +184,15 @@ public HAServiceProtocol getProxy(Configuration conf, int timeout)
}
return proxy;
}
@Override
public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
int timeout) throws IOException {
if (testWithProtoBufRPC) {
proxy = makeHealthMonitorMock(conf, timeout);
}
return proxy;
}
@Override
public ZKFCProtocol getZKFCProxy(Configuration conf, int timeout)

View File

@ -55,8 +55,7 @@ public void setupHM() throws InterruptedException, IOException {
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
svc = new DummyHAService(HAServiceState.ACTIVE,
new InetSocketAddress("0.0.0.0", 0), true);
svc = createDummyHAService();
hm = new HealthMonitor(conf, svc) {
@Override
protected HAServiceProtocol createProxy() throws IOException {
@ -73,7 +72,12 @@ protected HAServiceProtocol createProxy() throws IOException {
LOG.info("Waiting for HEALTHY signal");
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
}
protected DummyHAService createDummyHAService() {
return new DummyHAService(HAServiceState.ACTIVE,
new InetSocketAddress("0.0.0.0", 0), true);
}
@Test(timeout=15000)
public void testMonitor() throws Exception {
LOG.info("Mocking bad health check, waiting for UNHEALTHY");

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import java.net.InetSocketAddress;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
/**
* Repeats all tests of {@link TestHealthMonitor}, but using a separate
* dedicated health check RPC address.
*/
public class TestHealthMonitorWithDedicatedHealthAddress
extends TestHealthMonitor {
@Override
protected DummyHAService createDummyHAService() {
return new DummyHAService(HAServiceState.ACTIVE,
new InetSocketAddress("0.0.0.0", 0),
new InetSocketAddress("0.0.0.0", 0), true);
}
}

View File

@ -757,6 +757,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9307. fuseConnect should be private to fuse_connect.c (Mingliang Liu
via Colin P. McCabe)
HDFS-9311. Support optional offload of NameNode HA service health checks to
a separate RPC server. (cnauroth)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -138,6 +138,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host";
public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address";
public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host";
public static final String DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY =
"dfs.namenode.lifeline.rpc-address";
public static final String DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY =
"dfs.namenode.lifeline.rpc-bind-host";
public static final String DFS_NAMENODE_MAX_OBJECTS_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
public static final long DFS_NAMENODE_MAX_OBJECTS_DEFAULT = 0;
@ -466,6 +470,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
public static final int DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT = 1;
public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY = "dfs.namenode.lifeline.handler.count";
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_SUPPORT_APPEND_KEY = "dfs.support.append";

View File

@ -27,6 +27,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
@ -573,6 +574,28 @@ public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddress
return addressList;
}
/**
* Map a logical namenode ID to its lifeline address. Use the given
* nameservice if specified, or the configured one if none is given.
*
* @param conf Configuration
* @param nsId which nameservice nnId is a part of, optional
* @param nnId the namenode ID to get the service addr for
* @return the lifeline addr, null if it could not be determined
*/
public static String getNamenodeLifelineAddr(final Configuration conf,
String nsId, String nnId) {
if (nsId == null) {
nsId = getOnlyNameServiceIdOrNull(conf);
}
String lifelineAddrKey = DFSUtilClient.concatSuffixes(
DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, nsId, nnId);
return conf.get(lifelineAddrKey);
}
/**
* Flatten the given map, as returned by other functions in this class,
* into a flat list of {@link ConfiguredNNAddress} instances.

View File

@ -121,6 +121,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@ -227,6 +229,8 @@ public static enum OperationCategory {
DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
DFS_NAMENODE_CHECKPOINT_DIR_KEY,
DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_HTTP_ADDRESS_KEY,
@ -487,6 +491,21 @@ boolean isRole(NamenodeRole that) {
return role.equals(that);
}
/**
* Given a configuration get the address of the lifeline RPC server.
* If the lifeline RPC is not configured returns null.
*
* @param conf configuration
* @return address or null
*/
InetSocketAddress getLifelineRpcServerAddress(Configuration conf) {
String addr = getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
if (addr == null) {
return null;
}
return NetUtils.createSocketAddr(addr);
}
/**
* Given a configuration get the address of the service rpc server
* If the service rpc is not configured returns null
@ -498,29 +517,60 @@ protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) {
protected InetSocketAddress getRpcServerAddress(Configuration conf) {
return DFSUtilClient.getNNAddress(conf);
}
/**
* Given a configuration get the bind host of the lifeline RPC server.
* If the bind host is not configured returns null.
*
* @param conf configuration
* @return bind host or null
*/
String getLifelineRpcServerBindHost(Configuration conf) {
return getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
}
/** Given a configuration get the bind host of the service rpc server
* If the bind host is not configured returns null.
*/
protected String getServiceRpcServerBindHost(Configuration conf) {
String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
if (addr == null || addr.isEmpty()) {
return null;
}
return addr;
return getTrimmedOrNull(conf, DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
}
/** Given a configuration get the bind host of the client rpc server
* If the bind host is not configured returns null.
*/
protected String getRpcServerBindHost(Configuration conf) {
String addr = conf.getTrimmed(DFS_NAMENODE_RPC_BIND_HOST_KEY);
return getTrimmedOrNull(conf, DFS_NAMENODE_RPC_BIND_HOST_KEY);
}
/**
* Gets a trimmed value from configuration, or null if no value is defined.
*
* @param conf configuration
* @param key configuration key to get
* @return trimmed value, or null if no value is defined
*/
private static String getTrimmedOrNull(Configuration conf, String key) {
String addr = conf.getTrimmed(key);
if (addr == null || addr.isEmpty()) {
return null;
}
return addr;
}
/**
* Modifies the configuration to contain the lifeline RPC address setting.
*
* @param conf configuration to modify
* @param lifelineRPCAddress lifeline RPC address
*/
void setRpcLifelineServerAddress(Configuration conf,
InetSocketAddress lifelineRPCAddress) {
LOG.info("Setting lifeline RPC address {}", lifelineRPCAddress);
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
NetUtils.getHostPortString(lifelineRPCAddress));
}
/**
* Modifies the configuration passed to contain the service rpc address setting
*/

View File

@ -19,6 +19,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
@ -210,6 +212,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
/** The RPC server that listens to requests from DataNodes */
private final RPC.Server serviceRpcServer;
private final InetSocketAddress serviceRPCAddress;
/** The RPC server that listens to lifeline requests */
private final RPC.Server lifelineRpcServer;
private final InetSocketAddress lifelineRPCAddress;
/** The RPC server that listens to requests from clients */
protected final RPC.Server clientRpcServer;
@ -336,6 +342,42 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
serviceRpcServer = null;
serviceRPCAddress = null;
}
InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
if (lifelineRpcAddr != null) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
String bindHost = nn.getLifelineRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = lifelineRpcAddr.getHostName();
}
LOG.info("Lifeline RPC server is binding to {}:{}", bindHost,
lifelineRpcAddr.getPort());
int lifelineHandlerCount = conf.getInt(
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY,
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT);
lifelineRpcServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
.setBindAddress(bindHost)
.setPort(lifelineRpcAddr.getPort())
.setNumHandlers(lifelineHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
// Update the address with the correct port
InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
listenAddr.getPort());
nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress);
} else {
lifelineRpcServer = null;
lifelineRPCAddress = null;
}
InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
String bindHost = nn.getRpcServerBindHost(conf);
if (bindHost == null) {
@ -379,12 +421,15 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
if (serviceRpcServer != null) {
serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
if (lifelineRpcServer != null) {
lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
}
// The rpc-server port can be ephemeral... ensure we have the correct info
InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
clientRpcAddress = new InetSocketAddress(
rpcAddr.getHostName(), listenAddr.getPort());
clientRpcAddress = new InetSocketAddress(
rpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServerAddress(conf, clientRpcAddress);
minimumDataNodeVersion = conf.get(
@ -416,7 +461,16 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
if (serviceRpcServer != null) {
serviceRpcServer.setTracer(nn.tracer);
}
}
if (lifelineRpcServer != null) {
lifelineRpcServer.setTracer(nn.tracer);
}
}
/** Allow access to the lifeline RPC server for testing */
@VisibleForTesting
RPC.Server getLifelineRpcServer() {
return lifelineRpcServer;
}
/** Allow access to the client RPC server for testing */
@VisibleForTesting
@ -438,6 +492,9 @@ void start() {
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
if (lifelineRpcServer != null) {
lifelineRpcServer.start();
}
}
/**
@ -448,6 +505,9 @@ void join() throws InterruptedException {
if (serviceRpcServer != null) {
serviceRpcServer.join();
}
if (lifelineRpcServer != null) {
lifelineRpcServer.join();
}
}
/**
@ -460,8 +520,15 @@ void stop() {
if (serviceRpcServer != null) {
serviceRpcServer.stop();
}
if (lifelineRpcServer != null) {
lifelineRpcServer.stop();
}
}
InetSocketAddress getLifelineRpcAddress() {
return lifelineRPCAddress;
}
InetSocketAddress getServiceRpcAddress() {
return serviceRPCAddress;
}

View File

@ -49,6 +49,7 @@ public class NNHAServiceTarget extends HAServiceTarget {
private static final String NAMENODE_ID_KEY = "namenodeid";
private final InetSocketAddress addr;
private final InetSocketAddress lifelineAddr;
private InetSocketAddress zkfcAddr;
private NodeFencer fencer;
private BadFencingConfigurationException fenceConfigError;
@ -90,6 +91,11 @@ public NNHAServiceTarget(Configuration conf,
this.addr = NetUtils.createSocketAddr(serviceAddr,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
String lifelineAddrStr =
DFSUtil.getNamenodeLifelineAddr(targetConf, nsId, nnId);
this.lifelineAddr = (lifelineAddrStr != null) ?
NetUtils.createSocketAddr(lifelineAddrStr) : null;
this.autoFailoverEnabled = targetConf.getBoolean(
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
@ -119,6 +125,11 @@ public InetSocketAddress getAddress() {
return addr;
}
@Override
public InetSocketAddress getHealthMonitorAddress() {
return lifelineAddr;
}
@Override
public InetSocketAddress getZKFCAddress() {
Preconditions.checkState(autoFailoverEnabled,

View File

@ -78,6 +78,33 @@
</description>
</property>
<property>
<name>dfs.namenode.lifeline.rpc-address</name>
<value></value>
<description>
NameNode RPC lifeline address. This is an optional separate RPC address
that can be used to isolate health checks and liveness to protect against
resource exhaustion in the main RPC handler pool. In the case of
HA/Federation where multiple NameNodes exist, the name service ID is added
to the name e.g. dfs.namenode.lifeline.rpc-address.ns1. The value of this
property will take the form of nn-host1:rpc-port. If this property is not
defined, then the NameNode will not start a lifeline RPC server. By
default, the property is not defined.
</description>
</property>
<property>
<name>dfs.namenode.lifeline.rpc-bind-host</name>
<value></value>
<description>
The actual address the lifeline RPC server will bind to. If this optional
address is set, it overrides only the hostname portion of
dfs.namenode.lifeline.rpc-address. It can also be specified per name node
or name service for HA/Federation. This is useful for making the name node
listen on all interfaces by setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>0.0.0.0:50090</value>
@ -681,6 +708,18 @@
</description>
</property>
<property>
<name>dfs.namenode.lifeline.handler.count</name>
<value>1</value>
<description>
Sets number of RPC server threads the NameNode runs for handling the
lifeline RPC server. The default value is 1, because this RPC server
handles only HA health check requests from ZKFC. These are lightweight
requests that run single-threaded from the ZKFC client side. This property
has no effect if dfs.namenode.lifeline.rpc-address is not defined.
</description>
</property>
<property>
<name>dfs.namenode.safemode.threshold-pct</name>
<value>0.999f</value>

View File

@ -37,6 +37,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
@ -869,26 +870,23 @@ private static void copyKeys(Configuration srcConf, Configuration destConf,
nameserviceId, nnId);
destConf.set(key, srcConf.get(key));
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
nameserviceId, nnId);
copyKey(srcConf, destConf, nameserviceId, nnId,
DFS_NAMENODE_HTTP_ADDRESS_KEY);
copyKey(srcConf, destConf, nameserviceId, nnId,
DFS_NAMENODE_HTTPS_ADDRESS_KEY);
copyKey(srcConf, destConf, nameserviceId, nnId,
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
copyKey(srcConf, destConf, nameserviceId, nnId,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
}
private static void copyKey(Configuration srcConf, Configuration destConf,
String nameserviceId, String nnId, String baseKey) {
String key = DFSUtil.addKeySuffixes(baseKey, nameserviceId, nnId);
String val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
nameserviceId, nnId);
val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
nameserviceId, nnId);
val = srcConf.get(key);
if (val != null) {
destConf.set(key, srcConf.get(key));
}
}
/**

View File

@ -47,9 +47,9 @@
*
* - DFS_NAMENODE_RPC_BIND_HOST_KEY
* - DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY
* - DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY
* - DFS_NAMENODE_HTTP_BIND_HOST_KEY
* - DFS_NAMENODE_HTTPS_BIND_HOST_KEY
*/
public class TestNameNodeRespectsBindHostKeys {
public static final Log LOG = LogFactory.getLog(TestNameNodeRespectsBindHostKeys.class);
@ -66,6 +66,12 @@ private static String getServiceRpcServerAddress(MiniDFSCluster cluster) {
return rpcServer.getServiceRpcServer().getListenerAddress().getAddress().toString();
}
private static String getLifelineRpcServerAddress(MiniDFSCluster cluster) {
NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc();
return rpcServer.getLifelineRpcServer().getListenerAddress().getAddress()
.toString();
}
@Test (timeout=300000)
public void testRpcBindHostKey() throws IOException {
Configuration conf = new HdfsConfiguration();
@ -148,6 +154,48 @@ public void testServiceRpcBindHostKey() throws IOException {
}
}
@Test (timeout=300000)
public void testLifelineRpcBindHostKey() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
LOG.info("Testing without " + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, LOCALHOST_SERVER_ADDRESS);
// NN should not bind the wildcard address by default.
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
String address = getLifelineRpcServerAddress(cluster);
assertThat("Bind address not expected to be wildcard by default.",
address, not("/" + WILDCARD_ADDRESS));
} finally {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
LOG.info("Testing with " + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
// Tell NN to bind the wildcard address.
conf.set(DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY, WILDCARD_ADDRESS);
// Verify that NN binds wildcard address now.
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
String address = getLifelineRpcServerAddress(cluster);
assertThat("Bind address " + address + " is not wildcard.",
address, is("/" + WILDCARD_ADDRESS));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test(timeout=300000)
public void testHttpBindHostKey() throws IOException {
Configuration conf = new HdfsConfiguration();

View File

@ -17,57 +17,92 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestNNHealthCheck {
private MiniDFSCluster cluster;
private Configuration conf;
@Before
public void setup() {
conf = new Configuration();
}
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testNNHealthCheck() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf)
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.build();
doNNHealthCheckTest();
}
@Test
public void testNNHealthCheckWithLifelineAddress() throws IOException {
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.build();
doNNHealthCheckTest();
}
NameNodeResourceChecker mockResourceChecker = Mockito.mock(
NameNodeResourceChecker.class);
Mockito.doReturn(true).when(mockResourceChecker).hasAvailableDiskSpace();
cluster.getNameNode(0).getNamesystem()
.setNNResourceChecker(mockResourceChecker);
NamenodeProtocols rpc = cluster.getNameNodeRpc(0);
// Should not throw error, which indicates healthy.
private void doNNHealthCheckTest() throws IOException {
NameNodeResourceChecker mockResourceChecker = Mockito.mock(
NameNodeResourceChecker.class);
Mockito.doReturn(true).when(mockResourceChecker).hasAvailableDiskSpace();
cluster.getNameNode(0).getNamesystem()
.setNNResourceChecker(mockResourceChecker);
NNHAServiceTarget haTarget = new NNHAServiceTarget(conf,
DFSUtil.getNamenodeNameServiceId(conf), "nn1");
HAServiceProtocol rpc = haTarget.getHealthMonitorProxy(conf, conf.getInt(
HA_HM_RPC_TIMEOUT_KEY, HA_HM_RPC_TIMEOUT_DEFAULT));
// Should not throw error, which indicates healthy.
rpc.monitorHealth();
Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace();
try {
// Should throw error - NN is unhealthy.
rpc.monitorHealth();
Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace();
try {
// Should throw error - NN is unhealthy.
rpc.monitorHealth();
fail("Should not have succeeded in calling monitorHealth");
} catch (HealthCheckFailedException hcfe) {
GenericTestUtils.assertExceptionContains(
"The NameNode has no resources available", hcfe);
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
fail("Should not have succeeded in calling monitorHealth");
} catch (HealthCheckFailedException hcfe) {
GenericTestUtils.assertExceptionContains(
"The NameNode has no resources available", hcfe);
} catch (RemoteException re) {
GenericTestUtils.assertExceptionContains(
"The NameNode has no resources available",
re.unwrapRemoteException(HealthCheckFailedException.class));
}
}
}