HDFS-9311. Support optional offload of NameNode HA service health checks to a separate RPC server. Contributed by Chris Nauroth.
This commit is contained in:
parent
1f7ecb0c84
commit
bf8e452982
|
@ -49,6 +49,23 @@ public abstract class HAServiceTarget {
|
|||
*/
|
||||
public abstract InetSocketAddress getAddress();
|
||||
|
||||
/**
|
||||
* Returns an optional separate RPC server address for health checks at the
|
||||
* target node. If defined, then this address is used by the health monitor
|
||||
* for the {@link HAServiceProtocol#monitorHealth()} and
|
||||
* {@link HAServiceProtocol#getServiceStatus()} calls. This can be useful for
|
||||
* separating out these calls onto separate RPC handlers to protect against
|
||||
* resource exhaustion in the main RPC handler pool. If null (which is the
|
||||
* default implementation), then all RPC calls go to the address defined by
|
||||
* {@link #getAddress()}.
|
||||
*
|
||||
* @return IPC address of the lifeline RPC server on the target node, or null
|
||||
* if no lifeline RPC server is used
|
||||
*/
|
||||
public InetSocketAddress getHealthMonitorAddress() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the IPC address of the ZKFC on the target node
|
||||
*/
|
||||
|
@ -73,15 +90,42 @@ public abstract class HAServiceTarget {
|
|||
*/
|
||||
public HAServiceProtocol getProxy(Configuration conf, int timeoutMs)
|
||||
throws IOException {
|
||||
return getProxyForAddress(conf, timeoutMs, getAddress());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a proxy to connect to the target HA service for health monitoring.
|
||||
* If {@link #getHealthMonitorAddress()} is implemented to return a non-null
|
||||
* address, then this proxy will connect to that address. Otherwise, the
|
||||
* returned proxy defaults to using {@link #getAddress()}, which means this
|
||||
* method's behavior is identical to {@link #getProxy(Configuration, int)}.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param timeoutMs timeout in milliseconds
|
||||
* @return a proxy to connect to the target HA service for health monitoring
|
||||
* @throws IOException if there is an error
|
||||
*/
|
||||
public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
|
||||
int timeoutMs) throws IOException {
|
||||
InetSocketAddress addr = getHealthMonitorAddress();
|
||||
if (addr == null) {
|
||||
addr = getAddress();
|
||||
}
|
||||
return getProxyForAddress(conf, timeoutMs, addr);
|
||||
}
|
||||
|
||||
private HAServiceProtocol getProxyForAddress(Configuration conf,
|
||||
int timeoutMs, InetSocketAddress addr) throws IOException {
|
||||
Configuration confCopy = new Configuration(conf);
|
||||
// Lower the timeout so we quickly fail to connect
|
||||
confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
|
||||
confCopy.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
|
||||
SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
|
||||
return new HAServiceProtocolClientSideTranslatorPB(
|
||||
getAddress(),
|
||||
addr,
|
||||
confCopy, factory, timeoutMs);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return a proxy to the ZKFC which is associated with this HA service.
|
||||
*/
|
||||
|
|
|
@ -191,7 +191,7 @@ public class HealthMonitor {
|
|||
* Connect to the service to be monitored. Stubbed out for easier testing.
|
||||
*/
|
||||
protected HAServiceProtocol createProxy() throws IOException {
|
||||
return targetToMonitor.getProxy(conf, rpcTimeout);
|
||||
return targetToMonitor.getHealthMonitorProxy(conf, rpcTimeout);
|
||||
}
|
||||
|
||||
private void doHealthChecks() throws InterruptedException {
|
||||
|
|
|
@ -49,10 +49,10 @@ class DummyHAService extends HAServiceTarget {
|
|||
public static final Log LOG = LogFactory.getLog(DummyHAService.class);
|
||||
private static final String DUMMY_FENCE_KEY = "dummy.fence.key";
|
||||
volatile HAServiceState state;
|
||||
HAServiceProtocol proxy;
|
||||
HAServiceProtocol proxy, healthMonitorProxy;
|
||||
ZKFCProtocol zkfcProxy = null;
|
||||
NodeFencer fencer;
|
||||
InetSocketAddress address;
|
||||
InetSocketAddress address, healthMonitorAddress;
|
||||
boolean isHealthy = true;
|
||||
boolean actUnreachable = false;
|
||||
boolean failToBecomeActive, failToBecomeStandby, failToFence;
|
||||
|
@ -80,6 +80,7 @@ class DummyHAService extends HAServiceTarget {
|
|||
}
|
||||
Configuration conf = new Configuration();
|
||||
this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
|
||||
this.healthMonitorProxy = makeHealthMonitorMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
|
||||
try {
|
||||
conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
|
||||
this.fencer = Mockito.spy(
|
||||
|
@ -92,7 +93,18 @@ class DummyHAService extends HAServiceTarget {
|
|||
this.index = instances.size();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
DummyHAService(HAServiceState state, InetSocketAddress address,
|
||||
InetSocketAddress healthMonitorAddress, boolean testWithProtoBufRPC) {
|
||||
this(state, address, testWithProtoBufRPC);
|
||||
if (testWithProtoBufRPC) {
|
||||
this.healthMonitorAddress = startAndGetRPCServerAddress(
|
||||
healthMonitorAddress);
|
||||
} else {
|
||||
this.healthMonitorAddress = healthMonitorAddress;
|
||||
}
|
||||
}
|
||||
|
||||
public void setSharedResource(DummySharedResource rsrc) {
|
||||
this.sharedResource = rsrc;
|
||||
}
|
||||
|
@ -134,11 +146,31 @@ class DummyHAService extends HAServiceTarget {
|
|||
return Mockito.spy(service);
|
||||
}
|
||||
|
||||
private HAServiceProtocol makeHealthMonitorMock(Configuration conf,
|
||||
int timeoutMs) {
|
||||
HAServiceProtocol service;
|
||||
if (!testWithProtoBufRPC) {
|
||||
service = new MockHAProtocolImpl();
|
||||
} else {
|
||||
try {
|
||||
service = super.getHealthMonitorProxy(conf, timeoutMs);
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return Mockito.spy(service);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getAddress() {
|
||||
return address;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getHealthMonitorAddress() {
|
||||
return healthMonitorAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getZKFCAddress() {
|
||||
return null;
|
||||
|
@ -152,6 +184,15 @@ class DummyHAService extends HAServiceTarget {
|
|||
}
|
||||
return proxy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HAServiceProtocol getHealthMonitorProxy(Configuration conf,
|
||||
int timeout) throws IOException {
|
||||
if (testWithProtoBufRPC) {
|
||||
proxy = makeHealthMonitorMock(conf, timeout);
|
||||
}
|
||||
return proxy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZKFCProtocol getZKFCProxy(Configuration conf, int timeout)
|
||||
|
|
|
@ -55,8 +55,7 @@ public class TestHealthMonitor {
|
|||
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
|
||||
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
|
||||
|
||||
svc = new DummyHAService(HAServiceState.ACTIVE,
|
||||
new InetSocketAddress("0.0.0.0", 0), true);
|
||||
svc = createDummyHAService();
|
||||
hm = new HealthMonitor(conf, svc) {
|
||||
@Override
|
||||
protected HAServiceProtocol createProxy() throws IOException {
|
||||
|
@ -73,7 +72,12 @@ public class TestHealthMonitor {
|
|||
LOG.info("Waiting for HEALTHY signal");
|
||||
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
|
||||
}
|
||||
|
||||
|
||||
protected DummyHAService createDummyHAService() {
|
||||
return new DummyHAService(HAServiceState.ACTIVE,
|
||||
new InetSocketAddress("0.0.0.0", 0), true);
|
||||
}
|
||||
|
||||
@Test(timeout=15000)
|
||||
public void testMonitor() throws Exception {
|
||||
LOG.info("Mocking bad health check, waiting for UNHEALTHY");
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ha;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
|
||||
/**
|
||||
* Repeats all tests of {@link TestHealthMonitor}, but using a separate
|
||||
* dedicated health check RPC address.
|
||||
*/
|
||||
public class TestHealthMonitorWithDedicatedHealthAddress
|
||||
extends TestHealthMonitor {
|
||||
|
||||
@Override
|
||||
protected DummyHAService createDummyHAService() {
|
||||
return new DummyHAService(HAServiceState.ACTIVE,
|
||||
new InetSocketAddress("0.0.0.0", 0),
|
||||
new InetSocketAddress("0.0.0.0", 0), true);
|
||||
}
|
||||
}
|
|
@ -1593,6 +1593,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-9307. fuseConnect should be private to fuse_connect.c (Mingliang Liu
|
||||
via Colin P. McCabe)
|
||||
|
||||
HDFS-9311. Support optional offload of NameNode HA service health checks to
|
||||
a separate RPC server. (cnauroth)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -139,6 +139,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host";
|
||||
public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address";
|
||||
public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host";
|
||||
public static final String DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY =
|
||||
"dfs.namenode.lifeline.rpc-address";
|
||||
public static final String DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY =
|
||||
"dfs.namenode.lifeline.rpc-bind-host";
|
||||
public static final String DFS_NAMENODE_MAX_OBJECTS_KEY =
|
||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
|
||||
public static final long DFS_NAMENODE_MAX_OBJECTS_DEFAULT = 0;
|
||||
|
@ -481,6 +485,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final int DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
|
||||
public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
|
||||
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
|
||||
public static final int DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT = 1;
|
||||
public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY = "dfs.namenode.lifeline.handler.count";
|
||||
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
|
||||
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
|
||||
public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";
|
||||
|
|
|
@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DE
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||
|
@ -571,6 +572,28 @@ public class DFSUtil {
|
|||
return addressList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Map a logical namenode ID to its lifeline address. Use the given
|
||||
* nameservice if specified, or the configured one if none is given.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param nsId which nameservice nnId is a part of, optional
|
||||
* @param nnId the namenode ID to get the service addr for
|
||||
* @return the lifeline addr, null if it could not be determined
|
||||
*/
|
||||
public static String getNamenodeLifelineAddr(final Configuration conf,
|
||||
String nsId, String nnId) {
|
||||
|
||||
if (nsId == null) {
|
||||
nsId = getOnlyNameServiceIdOrNull(conf);
|
||||
}
|
||||
|
||||
String lifelineAddrKey = DFSUtilClient.concatSuffixes(
|
||||
DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, nsId, nnId);
|
||||
|
||||
return conf.get(lifelineAddrKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Flatten the given map, as returned by other functions in this class,
|
||||
* into a flat list of {@link ConfiguredNNAddress} instances.
|
||||
|
|
|
@ -122,6 +122,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_BIND_HOST_K
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
|
@ -228,6 +230,8 @@ public class NameNode implements NameNodeStatusMXBean {
|
|||
DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
DFS_NAMENODE_CHECKPOINT_DIR_KEY,
|
||||
DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
|
||||
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
|
||||
DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY,
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||
DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
|
||||
DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
||||
|
@ -487,6 +491,21 @@ public class NameNode implements NameNodeStatusMXBean {
|
|||
return role.equals(that);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a configuration get the address of the lifeline RPC server.
|
||||
* If the lifeline RPC is not configured returns null.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @return address or null
|
||||
*/
|
||||
InetSocketAddress getLifelineRpcServerAddress(Configuration conf) {
|
||||
String addr = getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
|
||||
if (addr == null) {
|
||||
return null;
|
||||
}
|
||||
return NetUtils.createSocketAddr(addr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a configuration get the address of the service rpc server
|
||||
* If the service rpc is not configured returns null
|
||||
|
@ -498,29 +517,60 @@ public class NameNode implements NameNodeStatusMXBean {
|
|||
protected InetSocketAddress getRpcServerAddress(Configuration conf) {
|
||||
return DFSUtilClient.getNNAddress(conf);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Given a configuration get the bind host of the lifeline RPC server.
|
||||
* If the bind host is not configured returns null.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @return bind host or null
|
||||
*/
|
||||
String getLifelineRpcServerBindHost(Configuration conf) {
|
||||
return getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
|
||||
}
|
||||
|
||||
/** Given a configuration get the bind host of the service rpc server
|
||||
* If the bind host is not configured returns null.
|
||||
*/
|
||||
protected String getServiceRpcServerBindHost(Configuration conf) {
|
||||
String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
|
||||
if (addr == null || addr.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return addr;
|
||||
return getTrimmedOrNull(conf, DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
|
||||
}
|
||||
|
||||
/** Given a configuration get the bind host of the client rpc server
|
||||
* If the bind host is not configured returns null.
|
||||
*/
|
||||
protected String getRpcServerBindHost(Configuration conf) {
|
||||
String addr = conf.getTrimmed(DFS_NAMENODE_RPC_BIND_HOST_KEY);
|
||||
return getTrimmedOrNull(conf, DFS_NAMENODE_RPC_BIND_HOST_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a trimmed value from configuration, or null if no value is defined.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @param key configuration key to get
|
||||
* @return trimmed value, or null if no value is defined
|
||||
*/
|
||||
private static String getTrimmedOrNull(Configuration conf, String key) {
|
||||
String addr = conf.getTrimmed(key);
|
||||
if (addr == null || addr.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return addr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Modifies the configuration to contain the lifeline RPC address setting.
|
||||
*
|
||||
* @param conf configuration to modify
|
||||
* @param lifelineRPCAddress lifeline RPC address
|
||||
*/
|
||||
void setRpcLifelineServerAddress(Configuration conf,
|
||||
InetSocketAddress lifelineRPCAddress) {
|
||||
LOG.info("Setting lifeline RPC address {}", lifelineRPCAddress);
|
||||
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
|
||||
NetUtils.getHostPortString(lifelineRPCAddress));
|
||||
}
|
||||
|
||||
/**
|
||||
* Modifies the configuration passed to contain the service rpc address setting
|
||||
*/
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
|
||||
|
@ -211,6 +213,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
/** The RPC server that listens to requests from DataNodes */
|
||||
private final RPC.Server serviceRpcServer;
|
||||
private final InetSocketAddress serviceRPCAddress;
|
||||
|
||||
/** The RPC server that listens to lifeline requests */
|
||||
private final RPC.Server lifelineRpcServer;
|
||||
private final InetSocketAddress lifelineRPCAddress;
|
||||
|
||||
/** The RPC server that listens to requests from clients */
|
||||
protected final RPC.Server clientRpcServer;
|
||||
|
@ -339,6 +345,41 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
serviceRPCAddress = null;
|
||||
}
|
||||
|
||||
InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
|
||||
if (lifelineRpcAddr != null) {
|
||||
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
String bindHost = nn.getLifelineRpcServerBindHost(conf);
|
||||
if (bindHost == null) {
|
||||
bindHost = lifelineRpcAddr.getHostName();
|
||||
}
|
||||
LOG.info("Lifeline RPC server is binding to {}:{}", bindHost,
|
||||
lifelineRpcAddr.getPort());
|
||||
|
||||
int lifelineHandlerCount = conf.getInt(
|
||||
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY,
|
||||
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT);
|
||||
|
||||
lifelineRpcServer = new RPC.Builder(conf)
|
||||
.setProtocol(HAServiceProtocolPB.class)
|
||||
.setInstance(haPbService)
|
||||
.setBindAddress(bindHost)
|
||||
.setPort(lifelineRpcAddr.getPort())
|
||||
.setNumHandlers(lifelineHandlerCount)
|
||||
.setVerbose(false)
|
||||
.setSecretManager(namesystem.getDelegationTokenSecretManager())
|
||||
.build();
|
||||
|
||||
// Update the address with the correct port
|
||||
InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
|
||||
lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
|
||||
listenAddr.getPort());
|
||||
nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress);
|
||||
} else {
|
||||
lifelineRpcServer = null;
|
||||
lifelineRPCAddress = null;
|
||||
}
|
||||
|
||||
InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
|
||||
String bindHost = nn.getRpcServerBindHost(conf);
|
||||
if (bindHost == null) {
|
||||
|
@ -385,12 +426,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
if (serviceRpcServer != null) {
|
||||
serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
||||
}
|
||||
if (lifelineRpcServer != null) {
|
||||
lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
||||
}
|
||||
}
|
||||
|
||||
// The rpc-server port can be ephemeral... ensure we have the correct info
|
||||
InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
|
||||
clientRpcAddress = new InetSocketAddress(
|
||||
rpcAddr.getHostName(), listenAddr.getPort());
|
||||
clientRpcAddress = new InetSocketAddress(
|
||||
rpcAddr.getHostName(), listenAddr.getPort());
|
||||
nn.setRpcServerAddress(conf, clientRpcAddress);
|
||||
|
||||
minimumDataNodeVersion = conf.get(
|
||||
|
@ -422,7 +466,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
if (serviceRpcServer != null) {
|
||||
serviceRpcServer.setTracer(nn.tracer);
|
||||
}
|
||||
}
|
||||
if (lifelineRpcServer != null) {
|
||||
lifelineRpcServer.setTracer(nn.tracer);
|
||||
}
|
||||
}
|
||||
|
||||
/** Allow access to the lifeline RPC server for testing */
|
||||
@VisibleForTesting
|
||||
RPC.Server getLifelineRpcServer() {
|
||||
return lifelineRpcServer;
|
||||
}
|
||||
|
||||
/** Allow access to the client RPC server for testing */
|
||||
@VisibleForTesting
|
||||
|
@ -444,6 +497,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
if (serviceRpcServer != null) {
|
||||
serviceRpcServer.start();
|
||||
}
|
||||
if (lifelineRpcServer != null) {
|
||||
lifelineRpcServer.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -454,6 +510,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
if (serviceRpcServer != null) {
|
||||
serviceRpcServer.join();
|
||||
}
|
||||
if (lifelineRpcServer != null) {
|
||||
lifelineRpcServer.join();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -466,8 +525,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
if (serviceRpcServer != null) {
|
||||
serviceRpcServer.stop();
|
||||
}
|
||||
if (lifelineRpcServer != null) {
|
||||
lifelineRpcServer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
InetSocketAddress getLifelineRpcAddress() {
|
||||
return lifelineRPCAddress;
|
||||
}
|
||||
|
||||
InetSocketAddress getServiceRpcAddress() {
|
||||
return serviceRPCAddress;
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ public class NNHAServiceTarget extends HAServiceTarget {
|
|||
private static final String NAMENODE_ID_KEY = "namenodeid";
|
||||
|
||||
private final InetSocketAddress addr;
|
||||
private final InetSocketAddress lifelineAddr;
|
||||
private InetSocketAddress zkfcAddr;
|
||||
private NodeFencer fencer;
|
||||
private BadFencingConfigurationException fenceConfigError;
|
||||
|
@ -90,6 +91,11 @@ public class NNHAServiceTarget extends HAServiceTarget {
|
|||
this.addr = NetUtils.createSocketAddr(serviceAddr,
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
|
||||
|
||||
String lifelineAddrStr =
|
||||
DFSUtil.getNamenodeLifelineAddr(targetConf, nsId, nnId);
|
||||
this.lifelineAddr = (lifelineAddrStr != null) ?
|
||||
NetUtils.createSocketAddr(lifelineAddrStr) : null;
|
||||
|
||||
this.autoFailoverEnabled = targetConf.getBoolean(
|
||||
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
|
||||
|
@ -119,6 +125,11 @@ public class NNHAServiceTarget extends HAServiceTarget {
|
|||
return addr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getHealthMonitorAddress() {
|
||||
return lifelineAddr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getZKFCAddress() {
|
||||
Preconditions.checkState(autoFailoverEnabled,
|
||||
|
|
|
@ -78,6 +78,33 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.lifeline.rpc-address</name>
|
||||
<value></value>
|
||||
<description>
|
||||
NameNode RPC lifeline address. This is an optional separate RPC address
|
||||
that can be used to isolate health checks and liveness to protect against
|
||||
resource exhaustion in the main RPC handler pool. In the case of
|
||||
HA/Federation where multiple NameNodes exist, the name service ID is added
|
||||
to the name e.g. dfs.namenode.lifeline.rpc-address.ns1. The value of this
|
||||
property will take the form of nn-host1:rpc-port. If this property is not
|
||||
defined, then the NameNode will not start a lifeline RPC server. By
|
||||
default, the property is not defined.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.lifeline.rpc-bind-host</name>
|
||||
<value></value>
|
||||
<description>
|
||||
The actual address the lifeline RPC server will bind to. If this optional
|
||||
address is set, it overrides only the hostname portion of
|
||||
dfs.namenode.lifeline.rpc-address. It can also be specified per name node
|
||||
or name service for HA/Federation. This is useful for making the name node
|
||||
listen on all interfaces by setting it to 0.0.0.0.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.secondary.http-address</name>
|
||||
<value>0.0.0.0:50090</value>
|
||||
|
@ -681,6 +708,18 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.lifeline.handler.count</name>
|
||||
<value>1</value>
|
||||
<description>
|
||||
Sets number of RPC server threads the NameNode runs for handling the
|
||||
lifeline RPC server. The default value is 1, because this RPC server
|
||||
handles only HA health check requests from ZKFC. These are lightweight
|
||||
requests that run single-threaded from the ZKFC client side. This property
|
||||
has no effect if dfs.namenode.lifeline.rpc-address is not defined.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.safemode.threshold-pct</name>
|
||||
<value>0.999f</value>
|
||||
|
|
|
@ -37,6 +37,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_K
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
|
||||
|
@ -908,26 +909,23 @@ public class MiniDFSCluster {
|
|||
nameserviceId, nnId);
|
||||
destConf.set(key, srcConf.get(key));
|
||||
|
||||
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
||||
nameserviceId, nnId);
|
||||
copyKey(srcConf, destConf, nameserviceId, nnId,
|
||||
DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
copyKey(srcConf, destConf, nameserviceId, nnId,
|
||||
DFS_NAMENODE_HTTPS_ADDRESS_KEY);
|
||||
copyKey(srcConf, destConf, nameserviceId, nnId,
|
||||
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
|
||||
copyKey(srcConf, destConf, nameserviceId, nnId,
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
|
||||
}
|
||||
|
||||
private static void copyKey(Configuration srcConf, Configuration destConf,
|
||||
String nameserviceId, String nnId, String baseKey) {
|
||||
String key = DFSUtil.addKeySuffixes(baseKey, nameserviceId, nnId);
|
||||
String val = srcConf.get(key);
|
||||
if (val != null) {
|
||||
destConf.set(key, srcConf.get(key));
|
||||
}
|
||||
|
||||
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
|
||||
nameserviceId, nnId);
|
||||
val = srcConf.get(key);
|
||||
if (val != null) {
|
||||
destConf.set(key, srcConf.get(key));
|
||||
}
|
||||
|
||||
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||
nameserviceId, nnId);
|
||||
val = srcConf.get(key);
|
||||
if (val != null) {
|
||||
destConf.set(key, srcConf.get(key));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -46,9 +46,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
*
|
||||
* - DFS_NAMENODE_RPC_BIND_HOST_KEY
|
||||
* - DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY
|
||||
* - DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY
|
||||
* - DFS_NAMENODE_HTTP_BIND_HOST_KEY
|
||||
* - DFS_NAMENODE_HTTPS_BIND_HOST_KEY
|
||||
|
||||
*/
|
||||
public class TestNameNodeRespectsBindHostKeys {
|
||||
public static final Log LOG = LogFactory.getLog(TestNameNodeRespectsBindHostKeys.class);
|
||||
|
@ -65,6 +65,12 @@ public class TestNameNodeRespectsBindHostKeys {
|
|||
return rpcServer.getServiceRpcServer().getListenerAddress().getAddress().toString();
|
||||
}
|
||||
|
||||
private static String getLifelineRpcServerAddress(MiniDFSCluster cluster) {
|
||||
NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc();
|
||||
return rpcServer.getLifelineRpcServer().getListenerAddress().getAddress()
|
||||
.toString();
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
public void testRpcBindHostKey() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
|
@ -147,6 +153,48 @@ public class TestNameNodeRespectsBindHostKeys {
|
|||
}
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
public void testLifelineRpcBindHostKey() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
LOG.info("Testing without " + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
|
||||
|
||||
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, LOCALHOST_SERVER_ADDRESS);
|
||||
|
||||
// NN should not bind the wildcard address by default.
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
cluster.waitActive();
|
||||
String address = getLifelineRpcServerAddress(cluster);
|
||||
assertThat("Bind address not expected to be wildcard by default.",
|
||||
address, not("/" + WILDCARD_ADDRESS));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Testing with " + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
|
||||
|
||||
// Tell NN to bind the wildcard address.
|
||||
conf.set(DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY, WILDCARD_ADDRESS);
|
||||
|
||||
// Verify that NN binds wildcard address now.
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
cluster.waitActive();
|
||||
String address = getLifelineRpcServerAddress(cluster);
|
||||
assertThat("Bind address " + address + " is not wildcard.",
|
||||
address, is("/" + WILDCARD_ADDRESS));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300000)
|
||||
public void testHttpBindHostKey() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
|
|
|
@ -17,57 +17,92 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestNNHealthCheck {
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private Configuration conf;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
conf = new Configuration();
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdown() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNNHealthCheck() throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(0)
|
||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||
.build();
|
||||
doNNHealthCheckTest();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNNHealthCheckWithLifelineAddress() throws IOException {
|
||||
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(0)
|
||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||
.build();
|
||||
doNNHealthCheckTest();
|
||||
}
|
||||
|
||||
NameNodeResourceChecker mockResourceChecker = Mockito.mock(
|
||||
NameNodeResourceChecker.class);
|
||||
Mockito.doReturn(true).when(mockResourceChecker).hasAvailableDiskSpace();
|
||||
cluster.getNameNode(0).getNamesystem()
|
||||
.setNNResourceChecker(mockResourceChecker);
|
||||
|
||||
NamenodeProtocols rpc = cluster.getNameNodeRpc(0);
|
||||
|
||||
// Should not throw error, which indicates healthy.
|
||||
private void doNNHealthCheckTest() throws IOException {
|
||||
NameNodeResourceChecker mockResourceChecker = Mockito.mock(
|
||||
NameNodeResourceChecker.class);
|
||||
Mockito.doReturn(true).when(mockResourceChecker).hasAvailableDiskSpace();
|
||||
cluster.getNameNode(0).getNamesystem()
|
||||
.setNNResourceChecker(mockResourceChecker);
|
||||
|
||||
NNHAServiceTarget haTarget = new NNHAServiceTarget(conf,
|
||||
DFSUtil.getNamenodeNameServiceId(conf), "nn1");
|
||||
HAServiceProtocol rpc = haTarget.getHealthMonitorProxy(conf, conf.getInt(
|
||||
HA_HM_RPC_TIMEOUT_KEY, HA_HM_RPC_TIMEOUT_DEFAULT));
|
||||
|
||||
// Should not throw error, which indicates healthy.
|
||||
rpc.monitorHealth();
|
||||
|
||||
Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace();
|
||||
|
||||
try {
|
||||
// Should throw error - NN is unhealthy.
|
||||
rpc.monitorHealth();
|
||||
|
||||
Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace();
|
||||
|
||||
try {
|
||||
// Should throw error - NN is unhealthy.
|
||||
rpc.monitorHealth();
|
||||
fail("Should not have succeeded in calling monitorHealth");
|
||||
} catch (HealthCheckFailedException hcfe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"The NameNode has no resources available", hcfe);
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
fail("Should not have succeeded in calling monitorHealth");
|
||||
} catch (HealthCheckFailedException hcfe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"The NameNode has no resources available", hcfe);
|
||||
} catch (RemoteException re) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"The NameNode has no resources available",
|
||||
re.unwrapRemoteException(HealthCheckFailedException.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue