From af0f2e27d189ceefa71fbb5178cc69006a62a257 Mon Sep 17 00:00:00 2001 From: cnauroth Date: Tue, 27 Oct 2015 23:07:14 -0700 Subject: [PATCH] HDFS-9311. Support optional offload of NameNode HA service health checks to a separate RPC server. Contributed by Chris Nauroth. (cherry picked from commit bf8e45298218f70e38838152f69c7705d8606bd6) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java --- .../org/apache/hadoop/ha/HAServiceTarget.java | 50 +++++++++- .../org/apache/hadoop/ha/HealthMonitor.java | 2 +- .../org/apache/hadoop/ha/DummyHAService.java | 47 +++++++++- .../apache/hadoop/ha/TestHealthMonitor.java | 10 +- ...althMonitorWithDedicatedHealthAddress.java | 37 ++++++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 ++ .../java/org/apache/hadoop/hdfs/DFSUtil.java | 23 +++++ .../hadoop/hdfs/server/namenode/NameNode.java | 64 +++++++++++-- .../server/namenode/NameNodeRpcServer.java | 75 ++++++++++++++- .../hadoop/hdfs/tools/NNHAServiceTarget.java | 11 +++ .../src/main/resources/hdfs-default.xml | 39 ++++++++ .../apache/hadoop/hdfs/MiniDFSCluster.java | 30 +++--- .../TestNameNodeRespectsBindHostKeys.java | 50 +++++++++- .../server/namenode/ha/TestNNHealthCheck.java | 93 +++++++++++++------ 15 files changed, 473 insertions(+), 67 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitorWithDedicatedHealthAddress.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java index 56678b427e9..98aab99854c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java @@ -49,6 +49,23 @@ public abstract class HAServiceTarget { */ public abstract InetSocketAddress getAddress(); + /** + * Returns an optional separate RPC server address for health checks at the + * target node. If defined, then this address is used by the health monitor + * for the {@link HAServiceProtocol#monitorHealth()} and + * {@link HAServiceProtocol#getServiceStatus()} calls. This can be useful for + * separating out these calls onto separate RPC handlers to protect against + * resource exhaustion in the main RPC handler pool. If null (which is the + * default implementation), then all RPC calls go to the address defined by + * {@link #getAddress()}. + * + * @return IPC address of the lifeline RPC server on the target node, or null + * if no lifeline RPC server is used + */ + public InetSocketAddress getHealthMonitorAddress() { + return null; + } + /** * @return the IPC address of the ZKFC on the target node */ @@ -73,15 +90,42 @@ public abstract class HAServiceTarget { */ public HAServiceProtocol getProxy(Configuration conf, int timeoutMs) throws IOException { + return getProxyForAddress(conf, timeoutMs, getAddress()); + } + + /** + * Returns a proxy to connect to the target HA service for health monitoring. + * If {@link #getHealthMonitorAddress()} is implemented to return a non-null + * address, then this proxy will connect to that address. Otherwise, the + * returned proxy defaults to using {@link #getAddress()}, which means this + * method's behavior is identical to {@link #getProxy(Configuration, int)}. + * + * @param conf Configuration + * @param timeoutMs timeout in milliseconds + * @return a proxy to connect to the target HA service for health monitoring + * @throws IOException if there is an error + */ + public HAServiceProtocol getHealthMonitorProxy(Configuration conf, + int timeoutMs) throws IOException { + InetSocketAddress addr = getHealthMonitorAddress(); + if (addr == null) { + addr = getAddress(); + } + return getProxyForAddress(conf, timeoutMs, addr); + } + + private HAServiceProtocol getProxyForAddress(Configuration conf, + int timeoutMs, InetSocketAddress addr) throws IOException { Configuration confCopy = new Configuration(conf); // Lower the timeout so we quickly fail to connect - confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + confCopy.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy); return new HAServiceProtocolClientSideTranslatorPB( - getAddress(), + addr, confCopy, factory, timeoutMs); } - + /** * @return a proxy to the ZKFC which is associated with this HA service. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java index 8c8762938e4..24c149c4583 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -191,7 +191,7 @@ public class HealthMonitor { * Connect to the service to be monitored. Stubbed out for easier testing. */ protected HAServiceProtocol createProxy() throws IOException { - return targetToMonitor.getProxy(conf, rpcTimeout); + return targetToMonitor.getHealthMonitorProxy(conf, rpcTimeout); } private void doHealthChecks() throws InterruptedException { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java index aef6c4da282..551da56007e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java @@ -49,10 +49,10 @@ class DummyHAService extends HAServiceTarget { public static final Log LOG = LogFactory.getLog(DummyHAService.class); private static final String DUMMY_FENCE_KEY = "dummy.fence.key"; volatile HAServiceState state; - HAServiceProtocol proxy; + HAServiceProtocol proxy, healthMonitorProxy; ZKFCProtocol zkfcProxy = null; NodeFencer fencer; - InetSocketAddress address; + InetSocketAddress address, healthMonitorAddress; boolean isHealthy = true; boolean actUnreachable = false; boolean failToBecomeActive, failToBecomeStandby, failToFence; @@ -80,6 +80,7 @@ class DummyHAService extends HAServiceTarget { } Configuration conf = new Configuration(); this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT); + this.healthMonitorProxy = makeHealthMonitorMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT); try { conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName()); this.fencer = Mockito.spy( @@ -92,7 +93,18 @@ class DummyHAService extends HAServiceTarget { this.index = instances.size(); } } - + + DummyHAService(HAServiceState state, InetSocketAddress address, + InetSocketAddress healthMonitorAddress, boolean testWithProtoBufRPC) { + this(state, address, testWithProtoBufRPC); + if (testWithProtoBufRPC) { + this.healthMonitorAddress = startAndGetRPCServerAddress( + healthMonitorAddress); + } else { + this.healthMonitorAddress = healthMonitorAddress; + } + } + public void setSharedResource(DummySharedResource rsrc) { this.sharedResource = rsrc; } @@ -134,11 +146,31 @@ class DummyHAService extends HAServiceTarget { return Mockito.spy(service); } + private HAServiceProtocol makeHealthMonitorMock(Configuration conf, + int timeoutMs) { + HAServiceProtocol service; + if (!testWithProtoBufRPC) { + service = new MockHAProtocolImpl(); + } else { + try { + service = super.getHealthMonitorProxy(conf, timeoutMs); + } catch (IOException e) { + return null; + } + } + return Mockito.spy(service); + } + @Override public InetSocketAddress getAddress() { return address; } + @Override + public InetSocketAddress getHealthMonitorAddress() { + return healthMonitorAddress; + } + @Override public InetSocketAddress getZKFCAddress() { return null; @@ -152,6 +184,15 @@ class DummyHAService extends HAServiceTarget { } return proxy; } + + @Override + public HAServiceProtocol getHealthMonitorProxy(Configuration conf, + int timeout) throws IOException { + if (testWithProtoBufRPC) { + proxy = makeHealthMonitorMock(conf, timeout); + } + return proxy; + } @Override public ZKFCProtocol getZKFCProxy(Configuration conf, int timeout) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java index b58793feae4..6c465437796 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java @@ -55,8 +55,7 @@ public class TestHealthMonitor { conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); - svc = new DummyHAService(HAServiceState.ACTIVE, - new InetSocketAddress("0.0.0.0", 0), true); + svc = createDummyHAService(); hm = new HealthMonitor(conf, svc) { @Override protected HAServiceProtocol createProxy() throws IOException { @@ -73,7 +72,12 @@ public class TestHealthMonitor { LOG.info("Waiting for HEALTHY signal"); waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); } - + + protected DummyHAService createDummyHAService() { + return new DummyHAService(HAServiceState.ACTIVE, + new InetSocketAddress("0.0.0.0", 0), true); + } + @Test(timeout=15000) public void testMonitor() throws Exception { LOG.info("Mocking bad health check, waiting for UNHEALTHY"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitorWithDedicatedHealthAddress.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitorWithDedicatedHealthAddress.java new file mode 100644 index 00000000000..3212c109f71 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitorWithDedicatedHealthAddress.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; + +/** + * Repeats all tests of {@link TestHealthMonitor}, but using a separate + * dedicated health check RPC address. + */ +public class TestHealthMonitorWithDedicatedHealthAddress + extends TestHealthMonitor { + + @Override + protected DummyHAService createDummyHAService() { + return new DummyHAService(HAServiceState.ACTIVE, + new InetSocketAddress("0.0.0.0", 0), + new InetSocketAddress("0.0.0.0", 0), true); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3b725244d18..932706b906b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -757,6 +757,9 @@ Release 2.8.0 - UNRELEASED HDFS-9307. fuseConnect should be private to fuse_connect.c (Mingliang Liu via Colin P. McCabe) + HDFS-9311. Support optional offload of NameNode HA service health checks to + a separate RPC server. (cnauroth) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c80edb60080..98341084ac8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -138,6 +138,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host"; public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address"; public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host"; + public static final String DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY = + "dfs.namenode.lifeline.rpc-address"; + public static final String DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY = + "dfs.namenode.lifeline.rpc-bind-host"; public static final String DFS_NAMENODE_MAX_OBJECTS_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_MAX_OBJECTS_KEY; public static final long DFS_NAMENODE_MAX_OBJECTS_DEFAULT = 0; @@ -466,6 +470,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100; public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count"; public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10; + public static final int DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT = 1; + public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY = "dfs.namenode.lifeline.handler.count"; public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count"; public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10; public static final String DFS_SUPPORT_APPEND_KEY = "dfs.support.append"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index b1a2d9e3f40..5178daa8104 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; @@ -573,6 +574,28 @@ public class DFSUtil { return addressList; } + /** + * Map a logical namenode ID to its lifeline address. Use the given + * nameservice if specified, or the configured one if none is given. + * + * @param conf Configuration + * @param nsId which nameservice nnId is a part of, optional + * @param nnId the namenode ID to get the service addr for + * @return the lifeline addr, null if it could not be determined + */ + public static String getNamenodeLifelineAddr(final Configuration conf, + String nsId, String nnId) { + + if (nsId == null) { + nsId = getOnlyNameServiceIdOrNull(conf); + } + + String lifelineAddrKey = DFSUtilClient.concatSuffixes( + DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, nsId, nnId); + + return conf.get(lifelineAddrKey); + } + /** * Flatten the given map, as returned by other functions in this class, * into a flat list of {@link ConfiguredNNAddress} instances. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 730c15a8f7b..6102bdc3d6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -121,6 +121,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_BIND_HOST_K import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; @@ -227,6 +229,8 @@ public class NameNode implements NameNodeStatusMXBean { DFS_NAMENODE_SHARED_EDITS_DIR_KEY, DFS_NAMENODE_CHECKPOINT_DIR_KEY, DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY, + DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, DFS_NAMENODE_HTTP_ADDRESS_KEY, @@ -487,6 +491,21 @@ public class NameNode implements NameNodeStatusMXBean { return role.equals(that); } + /** + * Given a configuration get the address of the lifeline RPC server. + * If the lifeline RPC is not configured returns null. + * + * @param conf configuration + * @return address or null + */ + InetSocketAddress getLifelineRpcServerAddress(Configuration conf) { + String addr = getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY); + if (addr == null) { + return null; + } + return NetUtils.createSocketAddr(addr); + } + /** * Given a configuration get the address of the service rpc server * If the service rpc is not configured returns null @@ -498,29 +517,60 @@ public class NameNode implements NameNodeStatusMXBean { protected InetSocketAddress getRpcServerAddress(Configuration conf) { return DFSUtilClient.getNNAddress(conf); } - + + /** + * Given a configuration get the bind host of the lifeline RPC server. + * If the bind host is not configured returns null. + * + * @param conf configuration + * @return bind host or null + */ + String getLifelineRpcServerBindHost(Configuration conf) { + return getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY); + } + /** Given a configuration get the bind host of the service rpc server * If the bind host is not configured returns null. */ protected String getServiceRpcServerBindHost(Configuration conf) { - String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY); - if (addr == null || addr.isEmpty()) { - return null; - } - return addr; + return getTrimmedOrNull(conf, DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY); } /** Given a configuration get the bind host of the client rpc server * If the bind host is not configured returns null. */ protected String getRpcServerBindHost(Configuration conf) { - String addr = conf.getTrimmed(DFS_NAMENODE_RPC_BIND_HOST_KEY); + return getTrimmedOrNull(conf, DFS_NAMENODE_RPC_BIND_HOST_KEY); + } + + /** + * Gets a trimmed value from configuration, or null if no value is defined. + * + * @param conf configuration + * @param key configuration key to get + * @return trimmed value, or null if no value is defined + */ + private static String getTrimmedOrNull(Configuration conf, String key) { + String addr = conf.getTrimmed(key); if (addr == null || addr.isEmpty()) { return null; } return addr; } + /** + * Modifies the configuration to contain the lifeline RPC address setting. + * + * @param conf configuration to modify + * @param lifelineRPCAddress lifeline RPC address + */ + void setRpcLifelineServerAddress(Configuration conf, + InetSocketAddress lifelineRPCAddress) { + LOG.info("Setting lifeline RPC address {}", lifelineRPCAddress); + conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, + NetUtils.getHostPortString(lifelineRPCAddress)); + } + /** * Modifies the configuration passed to contain the service rpc address setting */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index a19fdd0bf4d..78349016794 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH; @@ -210,6 +212,10 @@ class NameNodeRpcServer implements NamenodeProtocols { /** The RPC server that listens to requests from DataNodes */ private final RPC.Server serviceRpcServer; private final InetSocketAddress serviceRPCAddress; + + /** The RPC server that listens to lifeline requests */ + private final RPC.Server lifelineRpcServer; + private final InetSocketAddress lifelineRPCAddress; /** The RPC server that listens to requests from clients */ protected final RPC.Server clientRpcServer; @@ -336,6 +342,42 @@ class NameNodeRpcServer implements NamenodeProtocols { serviceRpcServer = null; serviceRPCAddress = null; } + + InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf); + if (lifelineRpcAddr != null) { + RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, + ProtobufRpcEngine.class); + String bindHost = nn.getLifelineRpcServerBindHost(conf); + if (bindHost == null) { + bindHost = lifelineRpcAddr.getHostName(); + } + LOG.info("Lifeline RPC server is binding to {}:{}", bindHost, + lifelineRpcAddr.getPort()); + + int lifelineHandlerCount = conf.getInt( + DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, + DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT); + + lifelineRpcServer = new RPC.Builder(conf) + .setProtocol(HAServiceProtocolPB.class) + .setInstance(haPbService) + .setBindAddress(bindHost) + .setPort(lifelineRpcAddr.getPort()) + .setNumHandlers(lifelineHandlerCount) + .setVerbose(false) + .setSecretManager(namesystem.getDelegationTokenSecretManager()) + .build(); + + // Update the address with the correct port + InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress(); + lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(), + listenAddr.getPort()); + nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress); + } else { + lifelineRpcServer = null; + lifelineRPCAddress = null; + } + InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); String bindHost = nn.getRpcServerBindHost(conf); if (bindHost == null) { @@ -379,12 +421,15 @@ class NameNodeRpcServer implements NamenodeProtocols { if (serviceRpcServer != null) { serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); } + if (lifelineRpcServer != null) { + lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); + } } // The rpc-server port can be ephemeral... ensure we have the correct info InetSocketAddress listenAddr = clientRpcServer.getListenerAddress(); - clientRpcAddress = new InetSocketAddress( - rpcAddr.getHostName(), listenAddr.getPort()); + clientRpcAddress = new InetSocketAddress( + rpcAddr.getHostName(), listenAddr.getPort()); nn.setRpcServerAddress(conf, clientRpcAddress); minimumDataNodeVersion = conf.get( @@ -416,7 +461,16 @@ class NameNodeRpcServer implements NamenodeProtocols { if (serviceRpcServer != null) { serviceRpcServer.setTracer(nn.tracer); } - } + if (lifelineRpcServer != null) { + lifelineRpcServer.setTracer(nn.tracer); + } + } + + /** Allow access to the lifeline RPC server for testing */ + @VisibleForTesting + RPC.Server getLifelineRpcServer() { + return lifelineRpcServer; + } /** Allow access to the client RPC server for testing */ @VisibleForTesting @@ -438,6 +492,9 @@ class NameNodeRpcServer implements NamenodeProtocols { if (serviceRpcServer != null) { serviceRpcServer.start(); } + if (lifelineRpcServer != null) { + lifelineRpcServer.start(); + } } /** @@ -448,6 +505,9 @@ class NameNodeRpcServer implements NamenodeProtocols { if (serviceRpcServer != null) { serviceRpcServer.join(); } + if (lifelineRpcServer != null) { + lifelineRpcServer.join(); + } } /** @@ -460,8 +520,15 @@ class NameNodeRpcServer implements NamenodeProtocols { if (serviceRpcServer != null) { serviceRpcServer.stop(); } + if (lifelineRpcServer != null) { + lifelineRpcServer.stop(); + } } - + + InetSocketAddress getLifelineRpcAddress() { + return lifelineRPCAddress; + } + InetSocketAddress getServiceRpcAddress() { return serviceRPCAddress; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java index 247ac0274f3..d579a4dd335 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java @@ -49,6 +49,7 @@ public class NNHAServiceTarget extends HAServiceTarget { private static final String NAMENODE_ID_KEY = "namenodeid"; private final InetSocketAddress addr; + private final InetSocketAddress lifelineAddr; private InetSocketAddress zkfcAddr; private NodeFencer fencer; private BadFencingConfigurationException fenceConfigError; @@ -90,6 +91,11 @@ public class NNHAServiceTarget extends HAServiceTarget { this.addr = NetUtils.createSocketAddr(serviceAddr, HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); + String lifelineAddrStr = + DFSUtil.getNamenodeLifelineAddr(targetConf, nsId, nnId); + this.lifelineAddr = (lifelineAddrStr != null) ? + NetUtils.createSocketAddr(lifelineAddrStr) : null; + this.autoFailoverEnabled = targetConf.getBoolean( DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT); @@ -119,6 +125,11 @@ public class NNHAServiceTarget extends HAServiceTarget { return addr; } + @Override + public InetSocketAddress getHealthMonitorAddress() { + return lifelineAddr; + } + @Override public InetSocketAddress getZKFCAddress() { Preconditions.checkState(autoFailoverEnabled, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1389bc92bfd..5759df5518f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -78,6 +78,33 @@ + + dfs.namenode.lifeline.rpc-address + + + NameNode RPC lifeline address. This is an optional separate RPC address + that can be used to isolate health checks and liveness to protect against + resource exhaustion in the main RPC handler pool. In the case of + HA/Federation where multiple NameNodes exist, the name service ID is added + to the name e.g. dfs.namenode.lifeline.rpc-address.ns1. The value of this + property will take the form of nn-host1:rpc-port. If this property is not + defined, then the NameNode will not start a lifeline RPC server. By + default, the property is not defined. + + + + + dfs.namenode.lifeline.rpc-bind-host + + + The actual address the lifeline RPC server will bind to. If this optional + address is set, it overrides only the hostname portion of + dfs.namenode.lifeline.rpc-address. It can also be specified per name node + or name service for HA/Federation. This is useful for making the name node + listen on all interfaces by setting it to 0.0.0.0. + + + dfs.namenode.secondary.http-address 0.0.0.0:50090 @@ -681,6 +708,18 @@ + + dfs.namenode.lifeline.handler.count + 1 + + Sets number of RPC server threads the NameNode runs for handling the + lifeline RPC server. The default value is 1, because this RPC server + handles only HA health check requests from ZKFC. These are lightweight + requests that run single-threaded from the ZKFC client side. This property + has no effect if dfs.namenode.lifeline.rpc-address is not defined. + + + dfs.namenode.safemode.threshold-pct 0.999f diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 3d87bbff8a4..e73b59b4e54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -37,6 +37,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_K import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; @@ -869,26 +870,23 @@ public class MiniDFSCluster { nameserviceId, nnId); destConf.set(key, srcConf.get(key)); - key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, - nameserviceId, nnId); + copyKey(srcConf, destConf, nameserviceId, nnId, + DFS_NAMENODE_HTTP_ADDRESS_KEY); + copyKey(srcConf, destConf, nameserviceId, nnId, + DFS_NAMENODE_HTTPS_ADDRESS_KEY); + copyKey(srcConf, destConf, nameserviceId, nnId, + DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY); + copyKey(srcConf, destConf, nameserviceId, nnId, + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY); + } + + private static void copyKey(Configuration srcConf, Configuration destConf, + String nameserviceId, String nnId, String baseKey) { + String key = DFSUtil.addKeySuffixes(baseKey, nameserviceId, nnId); String val = srcConf.get(key); if (val != null) { destConf.set(key, srcConf.get(key)); } - - key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY, - nameserviceId, nnId); - val = srcConf.get(key); - if (val != null) { - destConf.set(key, srcConf.get(key)); - } - - key = DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, - nameserviceId, nnId); - val = srcConf.get(key); - if (val != null) { - destConf.set(key, srcConf.get(key)); - } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java index ed00e37eef6..b0fa93d837c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRespectsBindHostKeys.java @@ -47,9 +47,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; * * - DFS_NAMENODE_RPC_BIND_HOST_KEY * - DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY + * - DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY * - DFS_NAMENODE_HTTP_BIND_HOST_KEY * - DFS_NAMENODE_HTTPS_BIND_HOST_KEY - */ public class TestNameNodeRespectsBindHostKeys { public static final Log LOG = LogFactory.getLog(TestNameNodeRespectsBindHostKeys.class); @@ -66,6 +66,12 @@ public class TestNameNodeRespectsBindHostKeys { return rpcServer.getServiceRpcServer().getListenerAddress().getAddress().toString(); } + private static String getLifelineRpcServerAddress(MiniDFSCluster cluster) { + NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc(); + return rpcServer.getLifelineRpcServer().getListenerAddress().getAddress() + .toString(); + } + @Test (timeout=300000) public void testRpcBindHostKey() throws IOException { Configuration conf = new HdfsConfiguration(); @@ -148,6 +154,48 @@ public class TestNameNodeRespectsBindHostKeys { } } + @Test (timeout=300000) + public void testLifelineRpcBindHostKey() throws IOException { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + + LOG.info("Testing without " + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY); + + conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, LOCALHOST_SERVER_ADDRESS); + + // NN should not bind the wildcard address by default. + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); + cluster.waitActive(); + String address = getLifelineRpcServerAddress(cluster); + assertThat("Bind address not expected to be wildcard by default.", + address, not("/" + WILDCARD_ADDRESS)); + } finally { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + LOG.info("Testing with " + DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY); + + // Tell NN to bind the wildcard address. + conf.set(DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY, WILDCARD_ADDRESS); + + // Verify that NN binds wildcard address now. + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); + cluster.waitActive(); + String address = getLifelineRpcServerAddress(cluster); + assertThat("Bind address " + address + " is not wildcard.", + address, is("/" + WILDCARD_ADDRESS)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + @Test(timeout=300000) public void testHttpBindHostKey() throws IOException { Configuration conf = new HdfsConfiguration(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java index ab2a8dd0614..6519588ce51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java @@ -17,57 +17,92 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; import static org.junit.Assert.fail; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.tools.NNHAServiceTarget; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; public class TestNNHealthCheck { + private MiniDFSCluster cluster; + private Configuration conf; + + @Before + public void setup() { + conf = new Configuration(); + } + + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + @Test public void testNNHealthCheck() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf) + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .build(); + doNNHealthCheckTest(); + } + + @Test + public void testNNHealthCheckWithLifelineAddress() throws IOException { + conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0"); + cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(0) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .build(); + doNNHealthCheckTest(); + } - NameNodeResourceChecker mockResourceChecker = Mockito.mock( - NameNodeResourceChecker.class); - Mockito.doReturn(true).when(mockResourceChecker).hasAvailableDiskSpace(); - cluster.getNameNode(0).getNamesystem() - .setNNResourceChecker(mockResourceChecker); - - NamenodeProtocols rpc = cluster.getNameNodeRpc(0); - - // Should not throw error, which indicates healthy. + private void doNNHealthCheckTest() throws IOException { + NameNodeResourceChecker mockResourceChecker = Mockito.mock( + NameNodeResourceChecker.class); + Mockito.doReturn(true).when(mockResourceChecker).hasAvailableDiskSpace(); + cluster.getNameNode(0).getNamesystem() + .setNNResourceChecker(mockResourceChecker); + + NNHAServiceTarget haTarget = new NNHAServiceTarget(conf, + DFSUtil.getNamenodeNameServiceId(conf), "nn1"); + HAServiceProtocol rpc = haTarget.getHealthMonitorProxy(conf, conf.getInt( + HA_HM_RPC_TIMEOUT_KEY, HA_HM_RPC_TIMEOUT_DEFAULT)); + + // Should not throw error, which indicates healthy. + rpc.monitorHealth(); + + Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace(); + + try { + // Should throw error - NN is unhealthy. rpc.monitorHealth(); - - Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace(); - - try { - // Should throw error - NN is unhealthy. - rpc.monitorHealth(); - fail("Should not have succeeded in calling monitorHealth"); - } catch (HealthCheckFailedException hcfe) { - GenericTestUtils.assertExceptionContains( - "The NameNode has no resources available", hcfe); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } + fail("Should not have succeeded in calling monitorHealth"); + } catch (HealthCheckFailedException hcfe) { + GenericTestUtils.assertExceptionContains( + "The NameNode has no resources available", hcfe); + } catch (RemoteException re) { + GenericTestUtils.assertExceptionContains( + "The NameNode has no resources available", + re.unwrapRemoteException(HealthCheckFailedException.class)); } } }