HDFS-13462. Add BIND_HOST configuration for JournalNode's HTTP and RPC Servers. Contributed by Lukas Majercak.

(cherry picked from commit c9b33514b8)
This commit is contained in:
Inigo Goiri 2018-04-17 14:19:23 -07:00
parent fc11555caf
commit 4be0fa28da
6 changed files with 366 additions and 30 deletions

View File

@ -977,15 +977,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_EDITS_DIR_DEFAULT = "/tmp/hadoop/dfs/journalnode/";
public static final String DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address";
public static final int DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485;
public static final String DFS_JOURNALNODE_RPC_BIND_HOST_KEY = "dfs.journalnode.rpc-bind-host";
public static final String DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT;
public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
public static final String DFS_JOURNALNODE_HTTP_BIND_HOST_KEY = "dfs.journalnode.http-bind-host";
public static final String DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
public static final String DFS_JOURNALNODE_HTTPS_ADDRESS_KEY = "dfs.journalnode.https-address";
public static final int DFS_JOURNALNODE_HTTPS_PORT_DEFAULT = 8481;
public static final String DFS_JOURNALNODE_HTTPS_BIND_HOST_KEY = "dfs.journalnode.https-bind-host";
public static final String DFS_JOURNALNODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTPS_PORT_DEFAULT;
public static final String DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
public static final String DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY = "dfs.journalnode.kerberos.principal";
public static final String DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";

View File

@ -36,9 +36,12 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.tracing.TraceUtils;
import org.apache.hadoop.util.DiskChecker;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HTTP_BIND_HOST_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@ -226,7 +229,8 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
registerJNMXBean();
httpServer = new JournalNodeHttpServer(conf, this);
httpServer = new JournalNodeHttpServer(conf, this,
getHttpServerBindAddress(conf));
httpServer.start();
httpServerURI = httpServer.getServerURI().toString();
@ -251,11 +255,6 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
public InetSocketAddress getBoundIpcAddress() {
return rpcServer.getAddress();
}
@Deprecated
public InetSocketAddress getBoundHttpAddress() {
return httpServer.getAddress();
}
public String getHttpServerURI() {
return httpServerURI;
@ -400,7 +399,7 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
private void registerJNMXBean() {
journalNodeInfoBeanName = MBeans.register("JournalNode", "JournalNodeInfo", this);
}
private class ErrorReporter implements StorageErrorReporter {
@Override
public void reportErrorOnFile(File f) {
@ -464,4 +463,53 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
return journalsById.get(jid);
}
public static InetSocketAddress getHttpAddress(Configuration conf) {
String addr = conf.get(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY,
DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT);
return NetUtils.createSocketAddr(addr,
DFSConfigKeys.DFS_JOURNALNODE_HTTP_PORT_DEFAULT,
DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY);
}
protected InetSocketAddress getHttpServerBindAddress(
Configuration configuration) {
InetSocketAddress bindAddress = getHttpAddress(configuration);
// If DFS_JOURNALNODE_HTTP_BIND_HOST_KEY exists then it overrides the
// host name portion of DFS_JOURNALNODE_HTTP_ADDRESS_KEY.
final String bindHost = configuration.getTrimmed(
DFS_JOURNALNODE_HTTP_BIND_HOST_KEY);
if (bindHost != null && !bindHost.isEmpty()) {
bindAddress = new InetSocketAddress(bindHost, bindAddress.getPort());
}
return bindAddress;
}
@VisibleForTesting
public JournalNodeRpcServer getRpcServer() {
return rpcServer;
}
/**
* @return the actual JournalNode HTTP/HTTPS address.
*/
public InetSocketAddress getBoundHttpAddress() {
return httpServer.getAddress();
}
/**
* @return JournalNode HTTP address
*/
public InetSocketAddress getHttpAddress() {
return httpServer.getHttpAddress();
}
/**
* @return JournalNode HTTPS address
*/
public InetSocketAddress getHttpsAddress() {
return httpServer.getHttpsAddress();
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.net.NetUtils;
@ -41,21 +42,37 @@ public class JournalNodeHttpServer {
private HttpServer2 httpServer;
private final JournalNode localJournalNode;
private InetSocketAddress httpAddress;
private InetSocketAddress httpsAddress;
private final InetSocketAddress bindAddress;
private final Configuration conf;
JournalNodeHttpServer(Configuration conf, JournalNode jn) {
JournalNodeHttpServer(Configuration conf, JournalNode jn,
InetSocketAddress bindAddress) {
this.conf = conf;
this.localJournalNode = jn;
this.bindAddress = bindAddress;
}
void start() throws IOException {
final InetSocketAddress httpAddr = getAddress(conf);
final InetSocketAddress httpAddr = bindAddress;
final String httpsAddrString = conf.get(
DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_DEFAULT);
InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);
if (httpsAddr != null) {
// If DFS_JOURNALNODE_HTTPS_BIND_HOST_KEY exists then it overrides the
// host name portion of DFS_NAMENODE_HTTPS_ADDRESS_KEY.
final String bindHost =
conf.getTrimmed(DFSConfigKeys.DFS_JOURNALNODE_HTTPS_BIND_HOST_KEY);
if (bindHost != null && !bindHost.isEmpty()) {
httpsAddr = new InetSocketAddress(bindHost, httpsAddr.getPort());
}
}
HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
httpAddr, httpsAddr, "journal",
DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
@ -67,6 +84,20 @@ public class JournalNodeHttpServer {
httpServer.addInternalServlet("getJournal", "/getJournal",
GetJournalEditServlet.class, true);
httpServer.start();
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
int connIdx = 0;
if (policy.isHttpEnabled()) {
httpAddress = httpServer.getConnectorAddress(connIdx++);
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY,
NetUtils.getHostPortString(httpAddress));
}
if (policy.isHttpsEnabled()) {
httpsAddress = httpServer.getConnectorAddress(connIdx);
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_KEY,
NetUtils.getHostPortString(httpsAddress));
}
}
void stop() throws IOException {
@ -78,15 +109,27 @@ public class JournalNodeHttpServer {
}
}
}
/**
* Return the actual HTTP/HTTPS address bound to by the running server.
*/
public InetSocketAddress getAddress() {
assert httpAddress != null || httpsAddress != null;
return httpAddress != null ? httpAddress : httpsAddress;
}
/**
* Return the actual address bound to by the running server.
*/
@Deprecated
public InetSocketAddress getAddress() {
InetSocketAddress addr = httpServer.getConnectorAddress(0);
assert addr.getPort() != 0;
return addr;
public InetSocketAddress getHttpAddress() {
return httpAddress;
}
/**
* Return the actual address bound to by the running server.
*/
public InetSocketAddress getHttpsAddress() {
return httpsAddress;
}
/**
@ -101,14 +144,6 @@ public class JournalNodeHttpServer {
+ NetUtils.getHostPortString(addr));
}
private static InetSocketAddress getAddress(Configuration conf) {
String addr = conf.get(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY,
DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT);
return NetUtils.createSocketAddr(addr,
DFSConfigKeys.DFS_JOURNALNODE_HTTP_PORT_DEFAULT,
DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY);
}
public static Journal getJournalFromContext(ServletContext context, String jid)
throws IOException {
JournalNode jn = (JournalNode)context.getAttribute(JN_ATTRIBUTE_KEY);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.qjournal.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -53,11 +54,14 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_RPC_BIND_HOST_KEY;
@InterfaceAudience.Private
@VisibleForTesting
public class JournalNodeRpcServer implements QJournalProtocol,
InterQJournalProtocol {
private static final Log LOG = JournalNode.LOG;
private static final int HANDLER_COUNT = 5;
private final JournalNode jn;
private Server server;
@ -73,6 +77,12 @@ public class JournalNodeRpcServer implements QJournalProtocol,
true);
InetSocketAddress addr = getAddress(confCopy);
String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null);
if (bindHost == null) {
bindHost = addr.getHostName();
}
LOG.info("RPC server is binding to " + bindHost + ":" + addr.getPort());
RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class,
ProtobufRpcEngine.class);
QJournalProtocolServerSideTranslatorPB translator =
@ -81,13 +91,13 @@ public class JournalNodeRpcServer implements QJournalProtocol,
.newReflectiveBlockingService(translator);
this.server = new RPC.Builder(confCopy)
.setProtocol(QJournalProtocolPB.class)
.setInstance(service)
.setBindAddress(addr.getHostName())
.setPort(addr.getPort())
.setNumHandlers(HANDLER_COUNT)
.setVerbose(false)
.build();
.setProtocol(QJournalProtocolPB.class)
.setInstance(service)
.setBindAddress(bindHost)
.setPort(addr.getPort())
.setNumHandlers(HANDLER_COUNT)
.setVerbose(false)
.build();
//Adding InterQJournalProtocolPB to server
@ -298,4 +308,10 @@ public class JournalNodeRpcServer implements QJournalProtocol,
.setFromURL(jn.getHttpServerURI())
.build();
}
/** Allow access to the RPC server for testing. */
@VisibleForTesting
Server getRpcServer() {
return server;
}
}

View File

@ -2167,6 +2167,17 @@
</description>
</property>
<property>
<name>dfs.journalnode.rpc-bind-host</name>
<value></value>
<description>
The actual address the RPC server will bind to. If this optional address is
set, it overrides only the hostname portion of dfs.journalnode.rpc-address.
This is useful for making the JournalNode listen on all interfaces by
setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.journalnode.http-address</name>
<value>0.0.0.0:8480</value>
@ -2176,6 +2187,17 @@
</description>
</property>
<property>
<name>dfs.journalnode.http-bind-host</name>
<value></value>
<description>
The actual address the HTTP server will bind to. If this optional address
is set, it overrides only the hostname portion of
dfs.journalnode.http-address. This is useful for making the JournalNode
HTTP server listen on allinterfaces by setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.journalnode.https-address</name>
<value>0.0.0.0:8481</value>
@ -2185,6 +2207,17 @@
</description>
</property>
<property>
<name>dfs.journalnode.https-bind-host</name>
<value></value>
<description>
The actual address the HTTP server will bind to. If this optional address
is set, it overrides only the hostname portion of
dfs.journalnode.https-address. This is useful for making the JournalNode
HTTP server listen on all interfaces by setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.namenode.audit.loggers</name>
<value>default</value>

View File

@ -0,0 +1,200 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.server;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HTTP_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HTTPS_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_RPC_BIND_HOST_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hdfs.HdfsConfiguration;
/**
* This test checks that the JournalNode respects the following keys.
*
* - DFS_JOURNALNODE_RPC_BIND_HOST_KEY
* - DFS_JOURNALNODE_HTTP_BIND_HOST_KEY
* - DFS_JOURNALNODE_HTTPS_BIND_HOST_KEY
*/
public class TestJournalNodeRespectsBindHostKeys {
public static final Log LOG = LogFactory.getLog(
TestJournalNodeRespectsBindHostKeys.class);
private static final String WILDCARD_ADDRESS = "0.0.0.0";
private static final String LOCALHOST_SERVER_ADDRESS = "127.0.0.1:0";
private static final int NUM_JN = 1;
private HdfsConfiguration conf;
private MiniJournalCluster jCluster;
private JournalNode jn;
@Before
public void setUp() {
conf = new HdfsConfiguration();
}
@After
public void tearDown() throws IOException {
if (jCluster != null) {
jCluster.shutdown();
jCluster = null;
}
}
private static String getRpcServerAddress(JournalNode jn) {
JournalNodeRpcServer rpcServer = jn.getRpcServer();
return rpcServer.getRpcServer().getListenerAddress().getAddress().
toString();
}
@Test (timeout=300000)
public void testRpcBindHostKey() throws IOException {
LOG.info("Testing without " + DFS_JOURNALNODE_RPC_BIND_HOST_KEY);
// NN should not bind the wildcard address by default.
jCluster = new MiniJournalCluster.Builder(conf).format(true)
.numJournalNodes(NUM_JN).build();
jn = jCluster.getJournalNode(0);
String address = getRpcServerAddress(jn);
assertThat("Bind address not expected to be wildcard by default.",
address, not("/" + WILDCARD_ADDRESS));
LOG.info("Testing with " + DFS_JOURNALNODE_RPC_BIND_HOST_KEY);
// Tell NN to bind the wildcard address.
conf.set(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, WILDCARD_ADDRESS);
// Verify that NN binds wildcard address now.
jCluster = new MiniJournalCluster.Builder(conf).format(true)
.numJournalNodes(NUM_JN).build();
jn = jCluster.getJournalNode(0);
address = getRpcServerAddress(jn);
assertThat("Bind address " + address + " is not wildcard.",
address, is("/" + WILDCARD_ADDRESS));
}
@Test(timeout=300000)
public void testHttpBindHostKey() throws IOException {
LOG.info("Testing without " + DFS_JOURNALNODE_HTTP_BIND_HOST_KEY);
// NN should not bind the wildcard address by default.
conf.set(DFS_JOURNALNODE_HTTP_ADDRESS_KEY, LOCALHOST_SERVER_ADDRESS);
jCluster = new MiniJournalCluster.Builder(conf).format(true)
.numJournalNodes(NUM_JN).build();
jn = jCluster.getJournalNode(0);
String address = jn.getHttpAddress().toString();
assertFalse("HTTP Bind address not expected to be wildcard by default.",
address.startsWith(WILDCARD_ADDRESS));
LOG.info("Testing with " + DFS_JOURNALNODE_HTTP_BIND_HOST_KEY);
// Tell NN to bind the wildcard address.
conf.set(DFS_JOURNALNODE_HTTP_BIND_HOST_KEY, WILDCARD_ADDRESS);
// Verify that NN binds wildcard address now.
conf.set(DFS_JOURNALNODE_HTTP_ADDRESS_KEY, LOCALHOST_SERVER_ADDRESS);
jCluster = new MiniJournalCluster.Builder(conf).format(true)
.numJournalNodes(NUM_JN).build();
jn = jCluster.getJournalNode(0);
address = jn.getHttpAddress().toString();
assertTrue("HTTP Bind address " + address + " is not wildcard.",
address.startsWith(WILDCARD_ADDRESS));
}
private static final String BASEDIR = System.getProperty("test.build.dir",
"target/test-dir") + "/" +
TestJournalNodeRespectsBindHostKeys.class.getSimpleName();
private static void setupSsl() throws Exception {
Configuration conf = new Configuration();
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
conf.set(DFS_JOURNALNODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
assertTrue(base.mkdirs());
final String keystoresDir = new File(BASEDIR).getAbsolutePath();
final String sslConfDir = KeyStoreTestUtil.getClasspathDir(
TestJournalNodeRespectsBindHostKeys.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
}
/**
* HTTPS test is different since we need to setup SSL configuration.
* NN also binds the wildcard address for HTTPS port by default so we must
* pick a different host/port combination.
* @throws Exception
*/
@Test (timeout=300000)
public void testHttpsBindHostKey() throws Exception {
LOG.info("Testing behavior without " + DFS_JOURNALNODE_HTTPS_BIND_HOST_KEY);
setupSsl();
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
// NN should not bind the wildcard address by default.
conf.set(DFS_JOURNALNODE_HTTPS_ADDRESS_KEY, LOCALHOST_SERVER_ADDRESS);
jCluster = new MiniJournalCluster.Builder(conf).format(true)
.numJournalNodes(NUM_JN).build();
jn = jCluster.getJournalNode(0);
String address = jn.getHttpsAddress().toString();
assertFalse("HTTP Bind address not expected to be wildcard by default.",
address.startsWith(WILDCARD_ADDRESS));
LOG.info("Testing behavior with " + DFS_JOURNALNODE_HTTPS_BIND_HOST_KEY);
// Tell NN to bind the wildcard address.
conf.set(DFS_JOURNALNODE_HTTPS_BIND_HOST_KEY, WILDCARD_ADDRESS);
// Verify that NN binds wildcard address now.
conf.set(DFS_JOURNALNODE_HTTPS_ADDRESS_KEY, LOCALHOST_SERVER_ADDRESS);
jCluster = new MiniJournalCluster.Builder(conf).format(true)
.numJournalNodes(NUM_JN).build();
jn = jCluster.getJournalNode(0);
address = jn.getHttpsAddress().toString();
assertTrue("HTTP Bind address " + address + " is not wildcard.",
address.startsWith(WILDCARD_ADDRESS));
}
}