HDFS-5545. Allow specifying endpoints for listeners in HttpServer. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1546151 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-11-27 18:20:14 +00:00
parent 13edb391d0
commit 2214871d91
23 changed files with 550 additions and 412 deletions

View File

@ -19,12 +19,13 @@ package org.apache.hadoop.http;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.net.BindException; import java.net.BindException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.security.GeneralSecurityException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
@ -32,7 +33,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.net.ssl.SSLServerSocketFactory;
import javax.servlet.Filter; import javax.servlet.Filter;
import javax.servlet.FilterChain; import javax.servlet.FilterChain;
import javax.servlet.FilterConfig; import javax.servlet.FilterConfig;
@ -60,7 +60,6 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.mortbay.io.Buffer; import org.mortbay.io.Buffer;
@ -71,8 +70,8 @@ import org.mortbay.jetty.RequestLog;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.handler.ContextHandler; import org.mortbay.jetty.handler.ContextHandler;
import org.mortbay.jetty.handler.ContextHandlerCollection; import org.mortbay.jetty.handler.ContextHandlerCollection;
import org.mortbay.jetty.handler.RequestLogHandler;
import org.mortbay.jetty.handler.HandlerCollection; import org.mortbay.jetty.handler.HandlerCollection;
import org.mortbay.jetty.handler.RequestLogHandler;
import org.mortbay.jetty.nio.SelectChannelConnector; import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.security.SslSocketConnector; import org.mortbay.jetty.security.SslSocketConnector;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
@ -86,6 +85,7 @@ import org.mortbay.thread.QueuedThreadPool;
import org.mortbay.util.MultiException; import org.mortbay.util.MultiException;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.sun.jersey.spi.container.servlet.ServletContainer; import com.sun.jersey.spi.container.servlet.ServletContainer;
/** /**
@ -114,11 +114,25 @@ public class HttpServer implements FilterContainer {
public static final String BIND_ADDRESS = "bind.address"; public static final String BIND_ADDRESS = "bind.address";
private AccessControlList adminsAcl; private final AccessControlList adminsAcl;
private SSLFactory sslFactory;
protected final Server webServer; protected final Server webServer;
protected final Connector listener;
private static class ListenerInfo {
/**
* Boolean flag to determine whether the HTTP server should clean up the
* listener in stop().
*/
private final boolean isManaged;
private final Connector listener;
private ListenerInfo(boolean isManaged, Connector listener) {
this.isManaged = isManaged;
this.listener = listener;
}
}
private final List<ListenerInfo> listeners = Lists.newArrayList();
protected final WebAppContext webAppContext; protected final WebAppContext webAppContext;
protected final boolean findPort; protected final boolean findPort;
protected final Map<Context, Boolean> defaultContexts = protected final Map<Context, Boolean> defaultContexts =
@ -127,34 +141,111 @@ public class HttpServer implements FilterContainer {
static final String STATE_DESCRIPTION_ALIVE = " - alive"; static final String STATE_DESCRIPTION_ALIVE = " - alive";
static final String STATE_DESCRIPTION_NOT_LIVE = " - not live"; static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
private final boolean listenerStartedExternally;
/** /**
* Class to construct instances of HTTP server with specific options. * Class to construct instances of HTTP server with specific options.
*/ */
public static class Builder { public static class Builder {
String name; private ArrayList<URI> endpoints = Lists.newArrayList();
String bindAddress; private Connector connector;
Integer port; private String name;
Boolean findPort; private Configuration conf;
Configuration conf; private String[] pathSpecs;
Connector connector; private AccessControlList adminsAcl;
String[] pathSpecs; private boolean securityEnabled = false;
AccessControlList adminsAcl; private String usernameConfKey;
boolean securityEnabled = false; private String keytabConfKey;
String usernameConfKey = null; private boolean needsClientAuth;
String keytabConfKey = null; private String trustStore;
private String trustStorePassword;
private String trustStoreType;
private String keyStore;
private String keyStorePassword;
private String keyStoreType;
// The -keypass option in keytool
private String keyPassword;
@Deprecated
private String bindAddress;
@Deprecated
private int port = -1;
private boolean findPort;
private String hostName;
public Builder setName(String name){ public Builder setName(String name){
this.name = name; this.name = name;
return this; return this;
} }
/**
* Add an endpoint that the HTTP server should listen to.
*
* @param endpoint
* the endpoint of that the HTTP server should listen to. The
* scheme specifies the protocol (i.e. HTTP / HTTPS), the host
* specifies the binding address, and the port specifies the
* listening port. Unspecified or zero port means that the server
* can listen to any port.
*/
public Builder addEndpoint(URI endpoint) {
endpoints.add(endpoint);
return this;
}
/**
* Set the hostname of the http server. The host name is used to resolve the
* _HOST field in Kerberos principals. The hostname of the first listener
* will be used if the name is unspecified.
*/
public Builder hostName(String hostName) {
this.hostName = hostName;
return this;
}
public Builder trustStore(String location, String password, String type) {
this.trustStore = location;
this.trustStorePassword = password;
this.trustStoreType = type;
return this;
}
public Builder keyStore(String location, String password, String type) {
this.keyStore = location;
this.keyStorePassword = password;
this.keyStoreType = type;
return this;
}
public Builder keyPassword(String password) {
this.keyPassword = password;
return this;
}
/**
* Specify whether the server should authorize the client in SSL
* connections.
*/
public Builder needsClientAuth(boolean value) {
this.needsClientAuth = value;
return this;
}
/**
* Use addEndpoint() instead.
*/
@Deprecated
public Builder setBindAddress(String bindAddress){ public Builder setBindAddress(String bindAddress){
this.bindAddress = bindAddress; this.bindAddress = bindAddress;
return this; return this;
} }
/**
* Use addEndpoint() instead.
*/
@Deprecated
public Builder setPort(int port) { public Builder setPort(int port) {
this.port = port; this.port = port;
return this; return this;
@ -204,25 +295,70 @@ public class HttpServer implements FilterContainer {
if (this.name == null) { if (this.name == null) {
throw new HadoopIllegalArgumentException("name is not set"); throw new HadoopIllegalArgumentException("name is not set");
} }
if (this.bindAddress == null) {
throw new HadoopIllegalArgumentException("bindAddress is not set"); // Make the behavior compatible with deprecated interfaces
if (bindAddress != null && port != -1) {
try {
endpoints.add(0, new URI("http", "", bindAddress, port, "", "", ""));
} catch (URISyntaxException e) {
throw new HadoopIllegalArgumentException("Invalid endpoint: "+ e);
} }
if (this.port == null) {
throw new HadoopIllegalArgumentException("port is not set");
} }
if (this.findPort == null) {
throw new HadoopIllegalArgumentException("findPort is not set"); if (endpoints.size() == 0) {
throw new HadoopIllegalArgumentException("No endpoints specified");
}
if (hostName == null) {
hostName = endpoints.get(0).getHost();
} }
if (this.conf == null) { if (this.conf == null) {
conf = new Configuration(); conf = new Configuration();
} }
HttpServer server = new HttpServer(this.name, this.bindAddress, this.port, HttpServer server = new HttpServer(this);
this.findPort, this.conf, this.adminsAcl, this.connector, this.pathSpecs);
if (this.securityEnabled) { if (this.securityEnabled) {
server.initSpnego(this.conf, this.usernameConfKey, this.keytabConfKey); server.initSpnego(conf, hostName, usernameConfKey, keytabConfKey);
} }
if (connector != null) {
server.addUnmanagedListener(connector);
}
for (URI ep : endpoints) {
Connector listener = null;
String scheme = ep.getScheme();
if ("http".equals(scheme)) {
listener = HttpServer.createDefaultChannelConnector();
} else if ("https".equals(scheme)) {
SslSocketConnector c = new SslSocketConnector();
c.setNeedClientAuth(needsClientAuth);
c.setKeyPassword(keyPassword);
if (keyStore != null) {
c.setKeystore(keyStore);
c.setKeystoreType(keyStoreType);
c.setPassword(keyStorePassword);
}
if (trustStore != null) {
c.setTruststore(trustStore);
c.setTruststoreType(trustStoreType);
c.setTrustPassword(trustStorePassword);
}
listener = c;
} else {
throw new HadoopIllegalArgumentException(
"unknown scheme for endpoint:" + ep);
}
listener.setHost(ep.getHost());
listener.setPort(ep.getPort() == -1 ? 0 : ep.getPort());
server.addManagedListener(listener);
}
server.loadListeners();
return server; return server;
} }
} }
@ -314,51 +450,39 @@ public class HttpServer implements FilterContainer {
* @param pathSpecs Path specifications that this httpserver will be serving. * @param pathSpecs Path specifications that this httpserver will be serving.
* These will be added to any filters. * These will be added to any filters.
*/ */
@Deprecated
public HttpServer(String name, String bindAddress, int port, public HttpServer(String name, String bindAddress, int port,
boolean findPort, Configuration conf, AccessControlList adminsAcl, boolean findPort, Configuration conf, AccessControlList adminsAcl,
Connector connector, String[] pathSpecs) throws IOException { Connector connector, String[] pathSpecs) throws IOException {
webServer = new Server(); this(new Builder().setName(name)
this.findPort = findPort; .addEndpoint(URI.create("http://" + bindAddress + ":" + port))
this.adminsAcl = adminsAcl; .setFindPort(findPort).setConf(conf).setACL(adminsAcl)
.setConnector(connector).setPathSpec(pathSpecs));
if(connector == null) {
listenerStartedExternally = false;
if (HttpConfig.isSecure()) {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
try {
sslFactory.init();
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
SslSocketConnector sslListener = new SslSocketConnector() {
@Override
protected SSLServerSocketFactory createFactory() throws Exception {
return sslFactory.createSSLServerSocketFactory();
}
};
listener = sslListener;
} else {
listener = createBaseListener(conf);
}
listener.setHost(bindAddress);
listener.setPort(port);
LOG.info("SSL is enabled on " + toString());
} else {
listenerStartedExternally = true;
listener = connector;
} }
webServer.addConnector(listener); private HttpServer(final Builder b) throws IOException {
final String appDir = getWebAppsPath(b.name);
this.webServer = new Server();
this.adminsAcl = b.adminsAcl;
this.webAppContext = createWebAppContext(b.name, b.conf, adminsAcl, appDir);
this.findPort = b.findPort;
initializeWebServer(b.name, b.hostName, b.conf, b.pathSpecs);
}
private void initializeWebServer(String name, String hostName,
Configuration conf, String[] pathSpecs)
throws FileNotFoundException, IOException {
Preconditions.checkNotNull(webAppContext);
int maxThreads = conf.getInt(HTTP_MAX_THREADS, -1); int maxThreads = conf.getInt(HTTP_MAX_THREADS, -1);
// If HTTP_MAX_THREADS is not configured, QueueThreadPool() will use the // If HTTP_MAX_THREADS is not configured, QueueThreadPool() will use the
// default value (currently 250). // default value (currently 250).
QueuedThreadPool threadPool = maxThreads == -1 ? QueuedThreadPool threadPool = maxThreads == -1 ? new QueuedThreadPool()
new QueuedThreadPool() : new QueuedThreadPool(maxThreads); : new QueuedThreadPool(maxThreads);
threadPool.setDaemon(true); threadPool.setDaemon(true);
webServer.setThreadPool(threadPool); webServer.setThreadPool(threadPool);
final String appDir = getWebAppsPath(name);
ContextHandlerCollection contexts = new ContextHandlerCollection(); ContextHandlerCollection contexts = new ContextHandlerCollection();
RequestLog requestLog = HttpRequestLog.getRequestLog(name); RequestLog requestLog = HttpRequestLog.getRequestLog(name);
@ -366,20 +490,14 @@ public class HttpServer implements FilterContainer {
RequestLogHandler requestLogHandler = new RequestLogHandler(); RequestLogHandler requestLogHandler = new RequestLogHandler();
requestLogHandler.setRequestLog(requestLog); requestLogHandler.setRequestLog(requestLog);
HandlerCollection handlers = new HandlerCollection(); HandlerCollection handlers = new HandlerCollection();
handlers.setHandlers(new Handler[] {requestLogHandler, contexts}); handlers.setHandlers(new Handler[] { requestLogHandler, contexts });
webServer.setHandler(handlers); webServer.setHandler(handlers);
} } else {
else {
webServer.setHandler(contexts); webServer.setHandler(contexts);
} }
webAppContext = new WebAppContext(); final String appDir = getWebAppsPath(name);
webAppContext.setDisplayName(name);
webAppContext.setContextPath("/");
webAppContext.setWar(appDir + "/" + name);
webAppContext.getServletContext().setAttribute(CONF_CONTEXT_ATTRIBUTE, conf);
webAppContext.getServletContext().setAttribute(ADMINS_ACL, adminsAcl);
addNoCacheFilter(webAppContext);
webServer.addHandler(webAppContext); webServer.addHandler(webAppContext);
addDefaultApps(contexts, appDir, conf); addDefaultApps(contexts, appDir, conf);
@ -388,8 +506,8 @@ public class HttpServer implements FilterContainer {
final FilterInitializer[] initializers = getFilterInitializers(conf); final FilterInitializer[] initializers = getFilterInitializers(conf);
if (initializers != null) { if (initializers != null) {
conf = new Configuration(conf); conf = new Configuration(conf);
conf.set(BIND_ADDRESS, bindAddress); conf.set(BIND_ADDRESS, hostName);
for(FilterInitializer c : initializers) { for (FilterInitializer c : initializers) {
c.initFilter(this, conf); c.initFilter(this, conf);
} }
} }
@ -404,10 +522,29 @@ public class HttpServer implements FilterContainer {
} }
} }
@SuppressWarnings("unchecked") private void addUnmanagedListener(Connector connector) {
private void addNoCacheFilter(WebAppContext ctxt) { listeners.add(new ListenerInfo(false, connector));
defineFilter(ctxt, NO_CACHE_FILTER, }
NoCacheFilter.class.getName(), Collections.EMPTY_MAP, new String[] { "/*"});
private void addManagedListener(Connector connector) {
listeners.add(new ListenerInfo(true, connector));
}
private static WebAppContext createWebAppContext(String name,
Configuration conf, AccessControlList adminsAcl, final String appDir) {
WebAppContext ctx = new WebAppContext();
ctx.setDisplayName(name);
ctx.setContextPath("/");
ctx.setWar(appDir + "/" + name);
ctx.getServletContext().setAttribute(CONF_CONTEXT_ATTRIBUTE, conf);
ctx.getServletContext().setAttribute(ADMINS_ACL, adminsAcl);
addNoCacheFilter(ctx);
return ctx;
}
private static void addNoCacheFilter(WebAppContext ctxt) {
defineFilter(ctxt, NO_CACHE_FILTER, NoCacheFilter.class.getName(),
Collections.<String, String> emptyMap(), new String[] { "/*" });
} }
/** /**
@ -651,7 +788,7 @@ public class HttpServer implements FilterContainer {
/** /**
* Define a filter for a context and set up default url mappings. * Define a filter for a context and set up default url mappings.
*/ */
public void defineFilter(Context ctx, String name, public static void defineFilter(Context ctx, String name,
String classname, Map<String,String> parameters, String[] urls) { String classname, Map<String,String> parameters, String[] urls) {
FilterHolder holder = new FilterHolder(); FilterHolder holder = new FilterHolder();
@ -715,93 +852,47 @@ public class HttpServer implements FilterContainer {
* Get the port that the server is on * Get the port that the server is on
* @return the port * @return the port
*/ */
@Deprecated
public int getPort() { public int getPort() {
return webServer.getConnectors()[0].getLocalPort(); return webServer.getConnectors()[0].getLocalPort();
} }
/** /**
* Get the port that corresponds to a particular connector. In the case of * Get the address that corresponds to a particular connector.
* HDFS, the second connector corresponds to the HTTPS connector.
* *
* @return the corresponding port for the connector, or -1 if there's no such * @return the corresponding address for the connector, or null if there's no
* connector. * such connector or the connector is not bounded.
*/ */
public int getConnectorPort(int index) { public InetSocketAddress getConnectorAddress(int index) {
Preconditions.checkArgument(index >= 0); Preconditions.checkArgument(index >= 0);
return index < webServer.getConnectors().length ? if (index > webServer.getConnectors().length)
webServer.getConnectors()[index].getLocalPort() : -1; return null;
Connector c = webServer.getConnectors()[index];
if (c.getLocalPort() == -1) {
// The connector is not bounded
return null;
}
return new InetSocketAddress(c.getHost(), c.getLocalPort());
} }
/** /**
* Set the min, max number of worker threads (simultaneous connections). * Set the min, max number of worker threads (simultaneous connections).
*/ */
public void setThreads(int min, int max) { public void setThreads(int min, int max) {
QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool() ; QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool();
pool.setMinThreads(min); pool.setMinThreads(min);
pool.setMaxThreads(max); pool.setMaxThreads(max);
} }
/** private void initSpnego(Configuration conf, String hostName,
* Configure an ssl listener on the server.
* @param addr address to listen on
* @param keystore location of the keystore
* @param storPass password for the keystore
* @param keyPass password for the key
* @deprecated Use {@link #addSslListener(InetSocketAddress, Configuration, boolean)}
*/
@Deprecated
public void addSslListener(InetSocketAddress addr, String keystore,
String storPass, String keyPass) throws IOException {
if (webServer.isStarted()) {
throw new IOException("Failed to add ssl listener");
}
SslSocketConnector sslListener = new SslSocketConnector();
sslListener.setHost(addr.getHostName());
sslListener.setPort(addr.getPort());
sslListener.setKeystore(keystore);
sslListener.setPassword(storPass);
sslListener.setKeyPassword(keyPass);
webServer.addConnector(sslListener);
}
/**
* Configure an ssl listener on the server.
* @param addr address to listen on
* @param sslConf conf to retrieve ssl options
* @param needCertsAuth whether x509 certificate authentication is required
*/
public void addSslListener(InetSocketAddress addr, Configuration sslConf,
boolean needCertsAuth) throws IOException {
if (webServer.isStarted()) {
throw new IOException("Failed to add ssl listener");
}
if (needCertsAuth) {
// setting up SSL truststore for authenticating clients
System.setProperty("javax.net.ssl.trustStore", sslConf.get(
"ssl.server.truststore.location", ""));
System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
"ssl.server.truststore.password", ""));
System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
"ssl.server.truststore.type", "jks"));
}
SslSocketConnector sslListener = new SslSocketConnector();
sslListener.setHost(addr.getHostName());
sslListener.setPort(addr.getPort());
sslListener.setKeystore(sslConf.get("ssl.server.keystore.location"));
sslListener.setPassword(sslConf.get("ssl.server.keystore.password", ""));
sslListener.setKeyPassword(sslConf.get("ssl.server.keystore.keypassword", ""));
sslListener.setKeystoreType(sslConf.get("ssl.server.keystore.type", "jks"));
sslListener.setNeedClientAuth(needCertsAuth);
webServer.addConnector(sslListener);
}
protected void initSpnego(Configuration conf,
String usernameConfKey, String keytabConfKey) throws IOException { String usernameConfKey, String keytabConfKey) throws IOException {
Map<String, String> params = new HashMap<String, String>(); Map<String, String> params = new HashMap<String, String>();
String principalInConf = conf.get(usernameConfKey); String principalInConf = conf.get(usernameConfKey);
if (principalInConf != null && !principalInConf.isEmpty()) { if (principalInConf != null && !principalInConf.isEmpty()) {
params.put("kerberos.principal", params.put("kerberos.principal", SecurityUtil.getServerPrincipal(
SecurityUtil.getServerPrincipal(principalInConf, listener.getHost())); principalInConf, hostName));
} }
String httpKeytab = conf.get(keytabConfKey); String httpKeytab = conf.get(keytabConfKey);
if (httpKeytab != null && !httpKeytab.isEmpty()) { if (httpKeytab != null && !httpKeytab.isEmpty()) {
@ -819,8 +910,7 @@ public class HttpServer implements FilterContainer {
public void start() throws IOException { public void start() throws IOException {
try { try {
try { try {
openListener(); openListeners();
LOG.info("Jetty bound to port " + listener.getLocalPort());
webServer.start(); webServer.start();
} catch (IOException ex) { } catch (IOException ex) {
LOG.info("HttpServer.start() threw a non Bind IOException", ex); LOG.info("HttpServer.start() threw a non Bind IOException", ex);
@ -856,17 +946,22 @@ public class HttpServer implements FilterContainer {
} }
} }
private void loadListeners() {
for (ListenerInfo li : listeners) {
webServer.addConnector(li.listener);
}
}
/** /**
* Open the main listener for the server * Open the main listener for the server
* @throws Exception * @throws Exception
*/ */
void openListener() throws Exception { void openListeners() throws Exception {
if (listener.getLocalPort() != -1) { // it's already bound for (ListenerInfo li : listeners) {
return; Connector listener = li.listener;
} if (!li.isManaged || li.listener.getLocalPort() != -1) {
if (listenerStartedExternally) { // Expect that listener was started securely // This listener is either started externally or has been bound
throw new Exception("Expected webserver's listener to be started " + continue;
"previously but wasn't");
} }
int port = listener.getPort(); int port = listener.getPort();
while (true) { while (true) {
@ -875,11 +970,12 @@ public class HttpServer implements FilterContainer {
try { try {
listener.close(); listener.close();
listener.open(); listener.open();
LOG.info("Jetty bound to port " + listener.getLocalPort());
break; break;
} catch (BindException ex) { } catch (BindException ex) {
if (port == 0 || !findPort) { if (port == 0 || !findPort) {
BindException be = new BindException( BindException be = new BindException("Port in use: "
"Port in use: " + listener.getHost() + ":" + listener.getPort()); + listener.getHost() + ":" + listener.getPort());
be.initCause(ex); be.initCause(ex);
throw be; throw be;
} }
@ -889,17 +985,6 @@ public class HttpServer implements FilterContainer {
Thread.sleep(100); Thread.sleep(100);
} }
} }
/**
* Return the bind address of the listener.
* @return InetSocketAddress of the listener
*/
public InetSocketAddress getListenerAddress() {
int port = listener.getLocalPort();
if (port == -1) { // not bound, return requested port
port = listener.getPort();
}
return new InetSocketAddress(listener.getHost(), port);
} }
/** /**
@ -907,23 +992,20 @@ public class HttpServer implements FilterContainer {
*/ */
public void stop() throws Exception { public void stop() throws Exception {
MultiException exception = null; MultiException exception = null;
try { for (ListenerInfo li : listeners) {
listener.close(); if (!li.isManaged) {
} catch (Exception e) { continue;
LOG.error("Error while stopping listener for webapp"
+ webAppContext.getDisplayName(), e);
exception = addMultiException(exception, e);
} }
try { try {
if (sslFactory != null) { li.listener.close();
sslFactory.destroy();
}
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error while destroying the SSLFactory" LOG.error(
"Error while stopping listener for webapp"
+ webAppContext.getDisplayName(), e); + webAppContext.getDisplayName(), e);
exception = addMultiException(exception, e); exception = addMultiException(exception, e);
} }
}
try { try {
// clear & stop webAppContext attributes to avoid memory leaks. // clear & stop webAppContext attributes to avoid memory leaks.
@ -934,6 +1016,7 @@ public class HttpServer implements FilterContainer {
+ webAppContext.getDisplayName(), e); + webAppContext.getDisplayName(), e);
exception = addMultiException(exception, e); exception = addMultiException(exception, e);
} }
try { try {
webServer.stop(); webServer.stop();
} catch (Exception e) { } catch (Exception e) {
@ -974,10 +1057,17 @@ public class HttpServer implements FilterContainer {
*/ */
@Override @Override
public String toString() { public String toString() {
return listener != null ? if (listeners.size() == 0) {
("HttpServer at http://" + listener.getHost() + ":" + listener.getLocalPort() + "/" return "Inactive HttpServer";
+ (isAlive() ? STATE_DESCRIPTION_ALIVE : STATE_DESCRIPTION_NOT_LIVE)) } else {
: "Inactive HttpServer"; StringBuilder sb = new StringBuilder("HttpServer (")
.append(isAlive() ? STATE_DESCRIPTION_ALIVE : STATE_DESCRIPTION_NOT_LIVE).append("), listening at:");
for (ListenerInfo li : listeners) {
Connector l = li.listener;
sb.append(l.getHost()).append(":").append(l.getPort()).append("/,");
}
return sb.toString();
}
} }
/** /**

View File

@ -19,13 +19,16 @@
package org.apache.hadoop.http; package org.apache.hadoop.http;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer.Builder;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -120,8 +123,9 @@ public class HttpServerFunctionalTest extends Assert {
public static HttpServer createServer(String host, int port) public static HttpServer createServer(String host, int port)
throws IOException { throws IOException {
prepareTestWebapp(); prepareTestWebapp();
return new HttpServer.Builder().setName(TEST).setBindAddress(host) return new HttpServer.Builder().setName(TEST)
.setPort(port).setFindPort(true).build(); .addEndpoint(URI.create("http://" + host + ":" + port))
.setFindPort(true).build();
} }
/** /**
@ -131,8 +135,7 @@ public class HttpServerFunctionalTest extends Assert {
* @throws IOException if it could not be created * @throws IOException if it could not be created
*/ */
public static HttpServer createServer(String webapp) throws IOException { public static HttpServer createServer(String webapp) throws IOException {
return new HttpServer.Builder().setName(webapp).setBindAddress("0.0.0.0") return localServerBuilder(webapp).setFindPort(true).build();
.setPort(0).setFindPort(true).build();
} }
/** /**
* Create an HttpServer instance for the given webapp * Create an HttpServer instance for the given webapp
@ -143,14 +146,17 @@ public class HttpServerFunctionalTest extends Assert {
*/ */
public static HttpServer createServer(String webapp, Configuration conf) public static HttpServer createServer(String webapp, Configuration conf)
throws IOException { throws IOException {
return new HttpServer.Builder().setName(webapp).setBindAddress("0.0.0.0") return localServerBuilder(webapp).setFindPort(true).setConf(conf).build();
.setPort(0).setFindPort(true).setConf(conf).build();
} }
public static HttpServer createServer(String webapp, Configuration conf, AccessControlList adminsAcl) public static HttpServer createServer(String webapp, Configuration conf, AccessControlList adminsAcl)
throws IOException { throws IOException {
return new HttpServer.Builder().setName(webapp).setBindAddress("0.0.0.0") return localServerBuilder(webapp).setFindPort(true).setConf(conf).setACL(adminsAcl).build();
.setPort(0).setFindPort(true).setConf(conf).setACL(adminsAcl).build(); }
private static Builder localServerBuilder(String webapp) {
return new HttpServer.Builder().setName(webapp).addEndpoint(
URI.create("http://localhost:0"));
} }
/** /**
@ -163,8 +169,7 @@ public class HttpServerFunctionalTest extends Assert {
*/ */
public static HttpServer createServer(String webapp, Configuration conf, public static HttpServer createServer(String webapp, Configuration conf,
String[] pathSpecs) throws IOException { String[] pathSpecs) throws IOException {
return new HttpServer.Builder().setName(webapp).setBindAddress("0.0.0.0") return localServerBuilder(webapp).setFindPort(true).setConf(conf).setPathSpec(pathSpecs).build();
.setPort(0).setFindPort(true).setConf(conf).setPathSpec(pathSpecs).build();
} }
/** /**
@ -201,8 +206,8 @@ public class HttpServerFunctionalTest extends Assert {
public static URL getServerURL(HttpServer server) public static URL getServerURL(HttpServer server)
throws MalformedURLException { throws MalformedURLException {
assertNotNull("No server", server); assertNotNull("No server", server);
int port = server.getPort(); return new URL("http://"
return new URL("http://localhost:" + port + "/"); + NetUtils.getHostPortString(server.getConnectorAddress(0)));
} }
/** /**

View File

@ -36,6 +36,7 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.junit.Test; import org.junit.Test;
public class TestGlobalFilter extends HttpServerFunctionalTest { public class TestGlobalFilter extends HttpServerFunctionalTest {
@ -125,7 +126,8 @@ public class TestGlobalFilter extends HttpServerFunctionalTest {
dataURL, streamFile, rootURL, allURL, outURL, logURL}; dataURL, streamFile, rootURL, allURL, outURL, logURL};
//access the urls //access the urls
final String prefix = "http://localhost:" + http.getPort(); final String prefix = "http://"
+ NetUtils.getHostPortString(http.getConnectorAddress(0));
try { try {
for(int i = 0; i < urls.length; i++) { for(int i = 0; i < urls.length; i++) {
access(prefix + urls[i]); access(prefix + urls[i]);

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.http;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.InetSocketAddress; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
import java.util.Enumeration; import java.util.Enumeration;
@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.http.HttpServer.QuotingInputFilter.RequestQuoter; import org.apache.hadoop.http.HttpServer.QuotingInputFilter.RequestQuoter;
import org.apache.hadoop.http.resource.JerseyResource; import org.apache.hadoop.http.resource.JerseyResource;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -61,6 +62,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.mortbay.jetty.Connector;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
public class TestHttpServer extends HttpServerFunctionalTest { public class TestHttpServer extends HttpServerFunctionalTest {
@ -362,11 +365,10 @@ public class TestHttpServer extends HttpServerFunctionalTest {
MyGroupsProvider.mapping.put("userB", Arrays.asList("groupB")); MyGroupsProvider.mapping.put("userB", Arrays.asList("groupB"));
HttpServer myServer = new HttpServer.Builder().setName("test") HttpServer myServer = new HttpServer.Builder().setName("test")
.setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build(); .addEndpoint(new URI("http://localhost:0")).setFindPort(true).build();
myServer.setAttribute(HttpServer.CONF_CONTEXT_ATTRIBUTE, conf); myServer.setAttribute(HttpServer.CONF_CONTEXT_ATTRIBUTE, conf);
myServer.start(); myServer.start();
int port = myServer.getPort(); String serverURL = "http://" + NetUtils.getHostPortString(myServer.getConnectorAddress(0)) + "/";
String serverURL = "http://localhost:" + port + "/";
for (String servlet : new String[] { "conf", "logs", "stacks", for (String servlet : new String[] { "conf", "logs", "stacks",
"logLevel", "metrics" }) { "logLevel", "metrics" }) {
for (String user : new String[] { "userA", "userB" }) { for (String user : new String[] { "userA", "userB" }) {
@ -404,12 +406,13 @@ public class TestHttpServer extends HttpServerFunctionalTest {
MyGroupsProvider.mapping.put("userE", Arrays.asList("groupE")); MyGroupsProvider.mapping.put("userE", Arrays.asList("groupE"));
HttpServer myServer = new HttpServer.Builder().setName("test") HttpServer myServer = new HttpServer.Builder().setName("test")
.setBindAddress("0.0.0.0").setPort(0).setFindPort(true).setConf(conf) .addEndpoint(new URI("http://localhost:0")).setFindPort(true).setConf(conf)
.setACL(new AccessControlList("userA,userB groupC,groupD")).build(); .setACL(new AccessControlList("userA,userB groupC,groupD")).build();
myServer.setAttribute(HttpServer.CONF_CONTEXT_ATTRIBUTE, conf); myServer.setAttribute(HttpServer.CONF_CONTEXT_ATTRIBUTE, conf);
myServer.start(); myServer.start();
int port = myServer.getPort();
String serverURL = "http://localhost:" + port + "/"; String serverURL = "http://"
+ NetUtils.getHostPortString(myServer.getConnectorAddress(0)) + "/";
for (String servlet : new String[] { "conf", "logs", "stacks", for (String servlet : new String[] { "conf", "logs", "stacks",
"logLevel", "metrics" }) { "logLevel", "metrics" }) {
for (String user : new String[] { "userA", "userB", "userC", "userD" }) { for (String user : new String[] { "userA", "userB", "userC", "userD" }) {
@ -520,20 +523,20 @@ public class TestHttpServer extends HttpServerFunctionalTest {
} }
@Test public void testBindAddress() throws Exception { @Test public void testBindAddress() throws Exception {
checkBindAddress("0.0.0.0", 0, false).stop(); checkBindAddress("localhost", 0, false).stop();
// hang onto this one for a bit more testing // hang onto this one for a bit more testing
HttpServer myServer = checkBindAddress("localhost", 0, false); HttpServer myServer = checkBindAddress("localhost", 0, false);
HttpServer myServer2 = null; HttpServer myServer2 = null;
try { try {
int port = myServer.getListenerAddress().getPort(); int port = myServer.getConnectorAddress(0).getPort();
// it's already in use, true = expect a higher port // it's already in use, true = expect a higher port
myServer2 = checkBindAddress("localhost", port, true); myServer2 = checkBindAddress("localhost", port, true);
// try to reuse the port // try to reuse the port
port = myServer2.getListenerAddress().getPort(); port = myServer2.getConnectorAddress(0).getPort();
myServer2.stop(); myServer2.stop();
assertEquals(-1, myServer2.getPort()); // not bound assertNull(myServer2.getConnectorAddress(0)); // not bound
myServer2.openListener(); myServer2.openListeners();
assertEquals(port, myServer2.getPort()); // expect same port assertEquals(port, myServer2.getConnectorAddress(0).getPort()); // expect same port
} finally { } finally {
myServer.stop(); myServer.stop();
if (myServer2 != null) { if (myServer2 != null) {
@ -547,21 +550,24 @@ public class TestHttpServer extends HttpServerFunctionalTest {
HttpServer server = createServer(host, port); HttpServer server = createServer(host, port);
try { try {
// not bound, ephemeral should return requested port (0 for ephemeral) // not bound, ephemeral should return requested port (0 for ephemeral)
InetSocketAddress addr = server.getListenerAddress(); List<?> listeners = (List<?>) Whitebox.getInternalState(server,
assertEquals(port, addr.getPort()); "listeners");
// verify hostname is what was given Connector listener = (Connector) Whitebox.getInternalState(
server.openListener(); listeners.get(0), "listener");
addr = server.getListenerAddress();
assertEquals(host, addr.getHostName());
int boundPort = addr.getPort(); assertEquals(port, listener.getPort());
// verify hostname is what was given
server.openListeners();
assertEquals(host, server.getConnectorAddress(0).getHostName());
int boundPort = server.getConnectorAddress(0).getPort();
if (port == 0) { if (port == 0) {
assertTrue(boundPort != 0); // ephemeral should now return bound port assertTrue(boundPort != 0); // ephemeral should now return bound port
} else if (findPort) { } else if (findPort) {
assertTrue(boundPort > port); assertTrue(boundPort > port);
// allow a little wiggle room to prevent random test failures if // allow a little wiggle room to prevent random test failures if
// some consecutive ports are already in use // some consecutive ports are already in use
assertTrue(addr.getPort() - port < 8); assertTrue(boundPort - port < 8);
} }
} catch (Exception e) { } catch (Exception e) {
server.stop(); server.stop();

View File

@ -36,6 +36,7 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.junit.Test; import org.junit.Test;
public class TestPathFilter extends HttpServerFunctionalTest { public class TestPathFilter extends HttpServerFunctionalTest {
@ -126,7 +127,8 @@ public class TestPathFilter extends HttpServerFunctionalTest {
// access the urls and verify our paths specs got added to the // access the urls and verify our paths specs got added to the
// filters // filters
final String prefix = "http://localhost:" + http.getPort(); final String prefix = "http://"
+ NetUtils.getHostPortString(http.getConnectorAddress(0));
try { try {
for(int i = 0; i < filteredUrls.length; i++) { for(int i = 0; i < filteredUrls.length; i++) {
access(prefix + filteredUrls[i]); access(prefix + filteredUrls[i]);

View File

@ -17,105 +17,101 @@
*/ */
package org.apache.hadoop.http; package org.apache.hadoop.http;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLFactory;
import org.junit.After; import org.junit.AfterClass;
import org.junit.Before; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import javax.net.ssl.HttpsURLConnection;
import java.io.File;
import java.io.FileWriter;
import java.io.InputStream;
import java.io.Writer;
import java.net.URL;
/** /**
* This testcase issues SSL certificates configures the HttpServer to serve * This testcase issues SSL certificates configures the HttpServer to serve
* HTTPS using the created certficates and calls an echo servlet using the * HTTPS using the created certficates and calls an echo servlet using the
* corresponding HTTPS URL. * corresponding HTTPS URL.
*/ */
public class TestSSLHttpServer extends HttpServerFunctionalTest { public class TestSSLHttpServer extends HttpServerFunctionalTest {
private static final String CONFIG_SITE_XML = "sslhttpserver-site.xml"; private static final String BASEDIR = System.getProperty("test.build.dir",
"target/test-dir") + "/" + TestSSLHttpServer.class.getSimpleName();
private static final String BASEDIR = private static final Log LOG = LogFactory.getLog(TestSSLHttpServer.class);
System.getProperty("test.build.dir", "target/test-dir") + "/" + private static Configuration conf;
TestSSLHttpServer.class.getSimpleName();
static final Log LOG = LogFactory.getLog(TestSSLHttpServer.class);
private static HttpServer server; private static HttpServer server;
private static URL baseUrl; private static URL baseUrl;
private static String keystoresDir;
private static String sslConfDir;
private static SSLFactory clientSslFactory;
@BeforeClass
public static void setup() throws Exception {
conf = new Configuration();
conf.setInt(HttpServer.HTTP_MAX_THREADS, 10);
@Before
public void setup() throws Exception {
HttpConfig.setPolicy(HttpConfig.Policy.HTTPS_ONLY);
File base = new File(BASEDIR); File base = new File(BASEDIR);
FileUtil.fullyDelete(base); FileUtil.fullyDelete(base);
base.mkdirs(); base.mkdirs();
String classpathDir = keystoresDir = new File(BASEDIR).getAbsolutePath();
KeyStoreTestUtil.getClasspathDir(TestSSLHttpServer.class); sslConfDir = KeyStoreTestUtil.getClasspathDir(TestSSLHttpServer.class);
Configuration conf = new Configuration();
String keystoresDir = new File(BASEDIR).getAbsolutePath();
String sslConfsDir =
KeyStoreTestUtil.getClasspathDir(TestSSLHttpServer.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf, false);
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, true);
//we do this trick because the MR AppMaster is started in another VM and KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
//the HttpServer configuration is not loaded from the job.xml but from the Configuration sslConf = new Configuration(false);
//site.xml files in the classpath sslConf.addResource("ssl-server.xml");
Writer writer = new FileWriter(new File(classpathDir, CONFIG_SITE_XML)); sslConf.addResource("ssl-client.xml");
conf.writeXml(writer);
writer.close();
conf.setInt(HttpServer.HTTP_MAX_THREADS, 10); clientSslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, sslConf);
conf.addResource(CONFIG_SITE_XML); clientSslFactory.init();
server = createServer("test", conf);
server = new HttpServer.Builder()
.setName("test")
.addEndpoint(new URI("https://localhost"))
.setConf(conf)
.keyPassword(sslConf.get("ssl.server.keystore.keypassword"))
.keyStore(sslConf.get("ssl.server.keystore.location"),
sslConf.get("ssl.server.keystore.password"),
sslConf.get("ssl.server.keystore.type", "jks"))
.trustStore(sslConf.get("ssl.server.truststore.location"),
sslConf.get("ssl.server.truststore.password"),
sslConf.get("ssl.server.truststore.type", "jks")).build();
server.addServlet("echo", "/echo", TestHttpServer.EchoServlet.class); server.addServlet("echo", "/echo", TestHttpServer.EchoServlet.class);
server.start(); server.start();
baseUrl = new URL("https://localhost:" + server.getPort() + "/"); baseUrl = new URL("https://"
LOG.info("HTTP server started: "+ baseUrl); + NetUtils.getHostPortString(server.getConnectorAddress(0)));
LOG.info("HTTP server started: " + baseUrl);
} }
@After @AfterClass
public void cleanup() throws Exception { public static void cleanup() throws Exception {
server.stop(); server.stop();
String classpathDir = FileUtil.fullyDelete(new File(BASEDIR));
KeyStoreTestUtil.getClasspathDir(TestSSLHttpServer.class); KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
new File(classpathDir, CONFIG_SITE_XML).delete(); clientSslFactory.destroy();
HttpConfig.setPolicy(HttpConfig.Policy.HTTP_ONLY);
} }
@Test @Test
public void testEcho() throws Exception { public void testEcho() throws Exception {
assertEquals("a:b\nc:d\n", assertEquals("a:b\nc:d\n", readOut(new URL(baseUrl, "/echo?a=b&c=d")));
readOut(new URL(baseUrl, "/echo?a=b&c=d"))); assertEquals("a:b\nc&lt;:d\ne:&gt;\n", readOut(new URL(baseUrl,
assertEquals("a:b\nc&lt;:d\ne:&gt;\n", "/echo?a=b&c<=d&e=>")));
readOut(new URL(baseUrl, "/echo?a=b&c<=d&e=>")));
} }
private static String readOut(URL url) throws Exception { private static String readOut(URL url) throws Exception {
StringBuilder out = new StringBuilder();
HttpsURLConnection conn = (HttpsURLConnection) url.openConnection(); HttpsURLConnection conn = (HttpsURLConnection) url.openConnection();
Configuration conf = new Configuration(); conn.setSSLSocketFactory(clientSslFactory.createSSLSocketFactory());
conf.addResource(CONFIG_SITE_XML);
SSLFactory sslf = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
sslf.init();
conn.setSSLSocketFactory(sslf.createSSLSocketFactory());
InputStream in = conn.getInputStream(); InputStream in = conn.getInputStream();
byte[] buffer = new byte[64 * 1024]; ByteArrayOutputStream out = new ByteArrayOutputStream();
int len = in.read(buffer); IOUtils.copyBytes(in, out, 1024);
while (len > 0) {
out.append(new String(buffer, 0, len));
len = in.read(buffer);
}
return out.toString(); return out.toString();
} }

View File

@ -35,6 +35,7 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test; import org.junit.Test;
@ -125,7 +126,8 @@ public class TestServletFilter extends HttpServerFunctionalTest {
} }
//access the urls as the sequence //access the urls as the sequence
final String prefix = "http://localhost:" + http.getPort(); final String prefix = "http://"
+ NetUtils.getHostPortString(http.getConnectorAddress(0));
try { try {
for(int i = 0; i < sequence.length; i++) { for(int i = 0; i < sequence.length; i++) {
access(prefix + urls[sequence[i]]); access(prefix + urls[sequence[i]]);
@ -185,7 +187,7 @@ public class TestServletFilter extends HttpServerFunctionalTest {
throws Exception { throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
HttpServer http = createTestServer(conf); HttpServer http = createTestServer(conf);
http.defineFilter(http.webAppContext, HttpServer.defineFilter(http.webAppContext,
"ErrorFilter", ErrorFilter.class.getName(), "ErrorFilter", ErrorFilter.class.getName(),
null, null); null, null);
try { try {

View File

@ -21,8 +21,10 @@ import java.io.*;
import java.net.*; import java.net.*;
import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.net.NetUtils;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.commons.logging.*; import org.apache.commons.logging.*;
import org.apache.commons.logging.impl.*; import org.apache.commons.logging.impl.*;
import org.apache.log4j.*; import org.apache.log4j.*;
@ -43,15 +45,16 @@ public class TestLogLevel extends TestCase {
assertTrue(!Level.ERROR.equals(log.getEffectiveLevel())); assertTrue(!Level.ERROR.equals(log.getEffectiveLevel()));
HttpServer server = new HttpServer.Builder().setName("..") HttpServer server = new HttpServer.Builder().setName("..")
.setBindAddress("localhost").setPort(22222).setFindPort(true) .addEndpoint(new URI("http://localhost:0")).setFindPort(true)
.build(); .build();
server.start(); server.start();
int port = server.getPort(); String authority = NetUtils.getHostPortString(server
.getConnectorAddress(0));
//servlet //servlet
URL url = new URL("http://localhost:" + port URL url = new URL("http://" + authority + "/logLevel?log=" + logName
+ "/logLevel?log=" + logName + "&level=" + Level.ERROR); + "&level=" + Level.ERROR);
out.println("*** Connecting to " + url); out.println("*** Connecting to " + url);
URLConnection connection = url.openConnection(); URLConnection connection = url.openConnection();
connection.connect(); connection.connect();
@ -67,7 +70,7 @@ public class TestLogLevel extends TestCase {
assertTrue(Level.ERROR.equals(log.getEffectiveLevel())); assertTrue(Level.ERROR.equals(log.getEffectiveLevel()));
//command line //command line
String[] args = {"-setlevel", "localhost:"+port, logName,""+Level.DEBUG}; String[] args = {"-setlevel", authority, logName, Level.DEBUG.toString()};
LogLevel.main(args); LogLevel.main(args);
log.debug("log.debug3"); log.debug("log.debug3");
log.info("log.info3"); log.info("log.info3");

View File

@ -221,6 +221,9 @@ Trunk (Unreleased)
HDFS-5556. Add some more NameNode cache statistics, cache pool stats HDFS-5556. Add some more NameNode cache statistics, cache pool stats
(cmccabe) (cmccabe)
HDFS-5545. Allow specifying endpoints for listeners in HttpServer. (Haohui
Mai via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem; import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -1410,4 +1411,19 @@ public class DFSUtil {
return (value == null || value.isEmpty()) ? return (value == null || value.isEmpty()) ?
defaultKey : DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY; defaultKey : DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY;
} }
public static HttpServer.Builder loadSslConfToHttpServerBuilder(
HttpServer.Builder builder, Configuration sslConf) {
return builder
.needsClientAuth(
sslConf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT))
.keyPassword(sslConf.get("ssl.server.keystore.keypassword"))
.keyStore(sslConf.get("ssl.server.keystore.location"),
sslConf.get("ssl.server.keystore.password"),
sslConf.get("ssl.server.keystore.type", "jks"))
.trustStore(sslConf.get("ssl.server.truststore.location"),
sslConf.get("ssl.server.truststore.password"),
sslConf.get("ssl.server.truststore.type", "jks"));
}
} }

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_INTERNAL_SPNE
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
@ -69,8 +71,15 @@ public class JournalNodeHttpServer {
bindAddr.getHostName())); bindAddr.getHostName()));
int tmpInfoPort = bindAddr.getPort(); int tmpInfoPort = bindAddr.getPort();
URI httpEndpoint;
try {
httpEndpoint = new URI("http://" + NetUtils.getHostPortString(bindAddr));
} catch (URISyntaxException e) {
throw new IOException(e);
}
httpServer = new HttpServer.Builder().setName("journal") httpServer = new HttpServer.Builder().setName("journal")
.setBindAddress(bindAddr.getHostName()).setPort(tmpInfoPort) .addEndpoint(httpEndpoint)
.setFindPort(tmpInfoPort == 0).setConf(conf).setACL( .setFindPort(tmpInfoPort == 0).setConf(conf).setACL(
new AccessControlList(conf.get(DFS_ADMIN, " "))) new AccessControlList(conf.get(DFS_ADMIN, " ")))
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled()) .setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
@ -85,7 +94,7 @@ public class JournalNodeHttpServer {
httpServer.start(); httpServer.start();
// The web-server port can be ephemeral... ensure we have the correct info // The web-server port can be ephemeral... ensure we have the correct info
infoPort = httpServer.getPort(); infoPort = httpServer.getConnectorAddress(0).getPort();
LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort); LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort);
} }
@ -104,7 +113,7 @@ public class JournalNodeHttpServer {
* Return the actual address bound to by the running server. * Return the actual address bound to by the running server.
*/ */
public InetSocketAddress getAddress() { public InetSocketAddress getAddress() {
InetSocketAddress addr = httpServer.getListenerAddress(); InetSocketAddress addr = httpServer.getConnectorAddress(0);
assert addr.getPort() != 0; assert addr.getPort() != 0;
return addr; return addr;
} }

View File

@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -181,6 +182,7 @@ public class DataNode extends Configured
private volatile boolean heartbeatsDisabledForTests = false; private volatile boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null; private DataStorage storage = null;
private HttpServer infoServer = null; private HttpServer infoServer = null;
private int infoPort;
private int infoSecurePort; private int infoSecurePort;
DataNodeMetrics metrics; DataNodeMetrics metrics;
private InetSocketAddress streamingAddr; private InetSocketAddress streamingAddr;
@ -310,27 +312,33 @@ public class DataNode extends Configured
String infoHost = infoSocAddr.getHostName(); String infoHost = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort(); int tmpInfoPort = infoSocAddr.getPort();
HttpServer.Builder builder = new HttpServer.Builder().setName("datanode") HttpServer.Builder builder = new HttpServer.Builder().setName("datanode")
.setBindAddress(infoHost).setPort(tmpInfoPort) .addEndpoint(URI.create("http://" + NetUtils.getHostPortString(infoSocAddr)))
.setFindPort(tmpInfoPort == 0).setConf(conf) .setFindPort(tmpInfoPort == 0).setConf(conf)
.setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))); .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")));
this.infoServer = (secureResources == null) ? builder.build() :
builder.setConnector(secureResources.getListener()).build();
LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort); LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort);
if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) { if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get( InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0)); DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0));
Configuration sslConf = new HdfsConfiguration(false); builder.addEndpoint(URI.create("https://"
sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY, + NetUtils.getHostPortString(secInfoSocAddr)));
"ssl-server.xml")); Configuration sslConf = new Configuration(false);
this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth); sslConf.setBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY, conf
.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT));
sslConf.addResource(conf.get(
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Datanode listening for SSL on " + secInfoSocAddr); LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
} }
infoSecurePort = secInfoSocAddr.getPort(); infoSecurePort = secInfoSocAddr.getPort();
} }
this.infoServer = (secureResources == null) ? builder.build() :
builder.setConnector(secureResources.getListener()).build();
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class); this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.addInternalServlet(null, "/getFileChecksum/*", this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
FileChecksumServlets.GetServlet.class); FileChecksumServlets.GetServlet.class);
@ -346,6 +354,7 @@ public class DataNode extends Configured
WebHdfsFileSystem.PATH_PREFIX + "/*"); WebHdfsFileSystem.PATH_PREFIX + "/*");
} }
this.infoServer.start(); this.infoServer.start();
this.infoPort = infoServer.getConnectorAddress(0).getPort();
} }
private void startPlugins(Configuration conf) { private void startPlugins(Configuration conf) {
@ -2276,7 +2285,7 @@ public class DataNode extends Configured
* @return the datanode's http port * @return the datanode's http port
*/ */
public int getInfoPort() { public int getInfoPort() {
return infoServer.getPort(); return infoPort;
} }
/** /**

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.Trash;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt; import static org.apache.hadoop.util.ToolRunner.confirmPrompt;

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -69,25 +70,45 @@ public class NameNodeHttpServer {
this.bindAddress = bindAddress; this.bindAddress = bindAddress;
} }
public void start() throws IOException { void start() throws IOException {
final String infoHost = bindAddress.getHostName(); final String infoHost = bindAddress.getHostName();
int infoPort = bindAddress.getPort(); int infoPort = bindAddress.getPort();
httpServer = new HttpServer.Builder().setName("hdfs") HttpServer.Builder builder = new HttpServer.Builder().setName("hdfs")
.setBindAddress(infoHost).setPort(infoPort) .addEndpoint(URI.create(("http://" + NetUtils.getHostPortString(bindAddress))))
.setFindPort(infoPort == 0).setConf(conf).setACL( .setFindPort(infoPort == 0).setConf(conf).setACL(
new AccessControlList(conf.get(DFS_ADMIN, " "))) new AccessControlList(conf.get(DFS_ADMIN, " ")))
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled()) .setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
.setUsernameConfKey( .setUsernameConfKey(
DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY) DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY)
.setKeytabConfKey(DFSUtil.getSpnegoKeytabKey(conf, .setKeytabConfKey(DFSUtil.getSpnegoKeytabKey(conf,
DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY)).build(); DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY));
boolean certSSL = conf.getBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, false);
if (certSSL) {
httpsAddress = NetUtils.createSocketAddr(conf.get(
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
builder.addEndpoint(URI.create("https://"
+ NetUtils.getHostPortString(httpsAddress)));
Configuration sslConf = new Configuration(false);
sslConf.setBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY, conf
.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT));
sslConf.addResource(conf.get(
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf);
}
httpServer = builder.build();
if (WebHdfsFileSystem.isEnabled(conf, HttpServer.LOG)) { if (WebHdfsFileSystem.isEnabled(conf, HttpServer.LOG)) {
//add SPNEGO authentication filter for webhdfs //add SPNEGO authentication filter for webhdfs
final String name = "SPNEGO"; final String name = "SPNEGO";
final String classname = AuthFilter.class.getName(); final String classname = AuthFilter.class.getName();
final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*"; final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
Map<String, String> params = getAuthFilterParams(conf); Map<String, String> params = getAuthFilterParams(conf);
httpServer.defineFilter(httpServer.getWebAppContext(), name, classname, params, HttpServer.defineFilter(httpServer.getWebAppContext(), name, classname, params,
new String[]{pathSpec}); new String[]{pathSpec});
HttpServer.LOG.info("Added filter '" + name + "' (class=" + classname + ")"); HttpServer.LOG.info("Added filter '" + name + "' (class=" + classname + ")");
@ -97,34 +118,19 @@ public class NameNodeHttpServer {
+ ";" + Param.class.getPackage().getName(), pathSpec); + ";" + Param.class.getPackage().getName(), pathSpec);
} }
boolean certSSL = conf.getBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, false); httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
setupServlets(httpServer, conf);
httpServer.start();
httpAddress = httpServer.getConnectorAddress(0);
if (certSSL) { if (certSSL) {
boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false); httpsAddress = httpServer.getConnectorAddress(1);
httpsAddress = NetUtils.createSocketAddr(conf.get(
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
Configuration sslConf = new Configuration(false);
sslConf.addResource(conf.get(
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
httpServer.addSslListener(httpsAddress, sslConf, needClientAuth);
// assume same ssl port for all datanodes // assume same ssl port for all datanodes
InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get( InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get(
DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 50475)); DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 50475));
httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY, datanodeSslPort httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY, datanodeSslPort
.getPort()); .getPort());
} }
httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
setupServlets(httpServer, conf);
httpServer.start();
httpAddress = new InetSocketAddress(bindAddress.getAddress(),
httpServer.getPort());
if (certSSL) {
httpsAddress = new InetSocketAddress(bindAddress.getAddress(),
httpServer.getConnectorPort(1));
}
} }
private Map<String, String> getAuthFilterParams(Configuration conf) private Map<String, String> getAuthFilterParams(Configuration conf)

View File

@ -30,6 +30,7 @@ import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collection; import java.util.Collection;
@ -256,8 +257,15 @@ public class SecondaryNameNode implements Runnable {
// initialize the webserver for uploading files. // initialize the webserver for uploading files.
int tmpInfoPort = infoSocAddr.getPort(); int tmpInfoPort = infoSocAddr.getPort();
URI httpEndpoint;
try {
httpEndpoint = new URI("http://" + NetUtils.getHostPortString(infoSocAddr));
} catch (URISyntaxException e) {
throw new IOException(e);
}
infoServer = new HttpServer.Builder().setName("secondary") infoServer = new HttpServer.Builder().setName("secondary")
.setBindAddress(infoBindAddress).setPort(tmpInfoPort) .addEndpoint(httpEndpoint)
.setFindPort(tmpInfoPort == 0).setConf(conf).setACL( .setFindPort(tmpInfoPort == 0).setConf(conf).setACL(
new AccessControlList(conf.get(DFS_ADMIN, " "))) new AccessControlList(conf.get(DFS_ADMIN, " ")))
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled()) .setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
@ -275,7 +283,7 @@ public class SecondaryNameNode implements Runnable {
LOG.info("Web server init done"); LOG.info("Web server init done");
// The web-server port can be ephemeral... ensure we have the correct info // The web-server port can be ephemeral... ensure we have the correct info
infoPort = infoServer.getPort(); infoPort = infoServer.getConnectorAddress(0).getPort();
conf.set(DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" + infoPort); conf.set(DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" + infoPort);
LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" + infoPort); LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" + infoPort);

View File

@ -278,13 +278,15 @@ public class TestJobEndNotifier extends JobEndNotifier {
new File(System.getProperty( new File(System.getProperty(
"build.webapps", "build/webapps") + "/test").mkdirs(); "build.webapps", "build/webapps") + "/test").mkdirs();
HttpServer server = new HttpServer.Builder().setName("test") HttpServer server = new HttpServer.Builder().setName("test")
.setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build(); .addEndpoint(URI.create("http://localhost:0"))
.setFindPort(true).build();
server.addServlet("jobend", "/jobend", JobEndServlet.class); server.addServlet("jobend", "/jobend", JobEndServlet.class);
server.start(); server.start();
JobEndServlet.calledTimes = 0; JobEndServlet.calledTimes = 0;
JobEndServlet.requestUri = null; JobEndServlet.requestUri = null;
JobEndServlet.baseUrl = "http://localhost:" + server.getPort() + "/"; JobEndServlet.baseUrl = "http://localhost:"
+ server.getConnectorAddress(0).getPort() + "/";
JobEndServlet.foundJobState = null; JobEndServlet.foundJobState = null;
return server; return server;
} }

View File

@ -103,12 +103,13 @@ public class TestJobEndNotifier extends TestCase {
new File(System.getProperty("build.webapps", "build/webapps") + "/test" new File(System.getProperty("build.webapps", "build/webapps") + "/test"
).mkdirs(); ).mkdirs();
server = new HttpServer.Builder().setName("test") server = new HttpServer.Builder().setName("test")
.setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build(); .addEndpoint(URI.create("http://localhost:0"))
.setFindPort(true).build();
server.addServlet("delay", "/delay", DelayServlet.class); server.addServlet("delay", "/delay", DelayServlet.class);
server.addServlet("jobend", "/jobend", JobEndServlet.class); server.addServlet("jobend", "/jobend", JobEndServlet.class);
server.addServlet("fail", "/fail", FailServlet.class); server.addServlet("fail", "/fail", FailServlet.class);
server.start(); server.start();
int port = server.getPort(); int port = server.getConnectorAddress(0).getPort();
baseUrl = new URL("http://localhost:" + port + "/"); baseUrl = new URL("http://localhost:" + port + "/");
JobEndServlet.calledTimes = 0; JobEndServlet.calledTimes = 0;

View File

@ -83,11 +83,13 @@ public abstract class WebApp extends ServletModule {
* @return InetSocketAddress * @return InetSocketAddress
*/ */
public InetSocketAddress getListenerAddress() { public InetSocketAddress getListenerAddress() {
return checkNotNull(httpServer, "httpServer").getListenerAddress(); return checkNotNull(httpServer, "httpServer").getConnectorAddress(0);
} }
public int port() { public int port() {
return checkNotNull(httpServer, "httpServer").getPort(); InetSocketAddress addr = checkNotNull(httpServer, "httpServer")
.getConnectorAddress(0);
return addr == null ? -1 : addr.getPort();
} }
public void stop() { public void stop() {

View File

@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -36,7 +37,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AdminACLsManager; import org.apache.hadoop.yarn.security.AdminACLsManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -216,46 +216,34 @@ public class WebApps {
System.exit(1); System.exit(1);
} }
} }
HttpServer server = HttpServer.Builder builder = new HttpServer.Builder().setName(name)
new HttpServer(name, bindAddress, port, findPort, conf, .addEndpoint(URI.create("http://" + bindAddress + ":" + port))
new AdminACLsManager(conf).getAdminAcl(), null, .setConf(conf).setFindPort(findPort)
pathList.toArray(new String[0])) { .setACL(new AdminACLsManager(conf).getAdminAcl())
.setPathSpec(pathList.toArray(new String[0]));
{ boolean hasSpnegoConf = spnegoPrincipalKey != null
if (UserGroupInformation.isSecurityEnabled()) { && spnegoKeytabKey != null;
boolean initSpnego = true; if (hasSpnegoConf) {
if (spnegoPrincipalKey == null builder.setUsernameConfKey(conf.get(spnegoPrincipalKey))
|| conf.get(spnegoPrincipalKey, "").isEmpty()) { .setKeytabConfKey(conf.get(spnegoKeytabKey))
LOG.warn("Principal for spnego filter is not set"); .setSecurityEnabled(UserGroupInformation.isSecurityEnabled());
initSpnego = false;
} }
if (spnegoKeytabKey == null HttpServer server = builder.build();
|| conf.get(spnegoKeytabKey, "").isEmpty()) {
LOG.warn("Keytab for spnego filter is not set");
initSpnego = false;
}
if (initSpnego) {
LOG.info("Initializing spnego filter with principal key : "
+ spnegoPrincipalKey + " keytab key : "
+ spnegoKeytabKey);
initSpnego(conf, spnegoPrincipalKey, spnegoKeytabKey);
}
}
}
};
for(ServletStruct struct: servlets) { for(ServletStruct struct: servlets) {
server.addServlet(struct.name, struct.spec, struct.clazz); server.addServlet(struct.name, struct.spec, struct.clazz);
} }
for(Map.Entry<String, Object> entry : attributes.entrySet()) { for(Map.Entry<String, Object> entry : attributes.entrySet()) {
server.setAttribute(entry.getKey(), entry.getValue()); server.setAttribute(entry.getKey(), entry.getValue());
} }
server.defineFilter(server.getWebAppContext(), "guice", HttpServer.defineFilter(server.getWebAppContext(), "guice",
GuiceFilter.class.getName(), null, new String[] { "/*" }); GuiceFilter.class.getName(), null, new String[] { "/*" });
webapp.setConf(conf); webapp.setConf(conf);
webapp.setHttpServer(server); webapp.setHttpServer(server);
server.start(); server.start();
LOG.info("Web app /"+ name +" started at "+ server.getPort()); LOG.info("Web app /"+ name +" started at "+ server.getConnectorAddress(0).getPort());
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
throw new WebAppException("Error starting http server", e); throw new WebAppException("Error starting http server", e);
} catch (IOException e) { } catch (IOException e) {

View File

@ -33,17 +33,6 @@ import static org.junit.Assert.assertTrue;
import java.io.InputStream; import java.io.InputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
import javax.xml.bind.JAXBContext;
import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
@ -55,9 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
public class TestWebApp { public class TestWebApp {
static final Logger LOG = LoggerFactory.getLogger(TestWebApp.class); static final Logger LOG = LoggerFactory.getLogger(TestWebApp.class);

View File

@ -69,7 +69,7 @@ public class WebServer extends AbstractService {
.withHttpSpnegoKeytabKey( .withHttpSpnegoKeytabKey(
YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
.start(this.nmWebApp); .start(this.nmWebApp);
this.port = this.webApp.httpServer().getPort(); this.port = this.webApp.httpServer().getConnectorAddress(0).getPort();
} catch (Exception e) { } catch (Exception e) {
String msg = "NMWebapps failed to start."; String msg = "NMWebapps failed to start.";
LOG.error(msg, e); LOG.error(msg, e);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.webproxy; package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -30,7 +31,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
public class WebAppProxy extends AbstractService { public class WebAppProxy extends AbstractService {
@ -89,7 +89,8 @@ public class WebAppProxy extends AbstractService {
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
try { try {
proxyServer = new HttpServer.Builder().setName("proxy") proxyServer = new HttpServer.Builder().setName("proxy")
.setBindAddress(bindAddress).setPort(port).setFindPort(port == 0) .addEndpoint(URI.create("http://" + bindAddress + ":" + port))
.setFindPort(port == 0)
.setConf(getConfig()).setACL(acl).build(); .setConf(getConfig()).setACL(acl).build();
proxyServer.addServlet(ProxyUriUtils.PROXY_SERVLET_NAME, proxyServer.addServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class); ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);

View File

@ -29,6 +29,7 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.HttpCookie; import java.net.HttpCookie;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -126,7 +127,7 @@ public class TestWebAppProxyServlet {
proxy.init(configuration); proxy.init(configuration);
proxy.start(); proxy.start();
int proxyPort = proxy.proxy.proxyServer.getPort(); int proxyPort = proxy.proxy.proxyServer.getConnectorAddress(0).getPort();
AppReportFetcherForTest appReportFetcher = proxy.proxy.appReportFetcher; AppReportFetcherForTest appReportFetcher = proxy.proxy.appReportFetcher;
// wrong url // wrong url
@ -285,8 +286,7 @@ public class TestWebAppProxyServlet {
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
proxyServer = new HttpServer.Builder() proxyServer = new HttpServer.Builder()
.setName("proxy") .setName("proxy")
.setBindAddress(bindAddress) .addEndpoint(URI.create("http://" + bindAddress + ":0"))
.setPort(0)
.setFindPort(true) .setFindPort(true)
.setConf(conf) .setConf(conf)
.setACL(acl) .setACL(acl)
@ -306,7 +306,7 @@ public class TestWebAppProxyServlet {
proxyServer.setAttribute(PROXY_HOST_ATTRIBUTE, proxyHost); proxyServer.setAttribute(PROXY_HOST_ATTRIBUTE, proxyHost);
proxyServer.start(); proxyServer.start();
System.out.println("Proxy server is started at port " + System.out.println("Proxy server is started at port " +
proxyServer.getPort()); proxyServer.getConnectorAddress(0).getPort());
} catch (Exception e) { } catch (Exception e) {
LOG.fatal("Could not start proxy web server", e); LOG.fatal("Could not start proxy web server", e);
throw new YarnRuntimeException("Could not start proxy web server", e); throw new YarnRuntimeException("Could not start proxy web server", e);