MAPREDUCE-5770. Fixed MapReduce ApplicationMaster to correctly redirect to the YARN's web-app proxy with the correct scheme prefix. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1572711 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-27 19:23:55 +00:00
parent 0d03b29f91
commit 94b29b3348
7 changed files with 163 additions and 16 deletions

View File

@ -195,6 +195,10 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5757. ConcurrentModificationException in JobControl.toList MAPREDUCE-5757. ConcurrentModificationException in JobControl.toList
(jlowe) (jlowe)
MAPREDUCE-5770. Fixed MapReduce ApplicationMaster to correctly redirect
to the YARN's web-app proxy with the correct scheme prefix. (Jian He via
vinodkv)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1382,13 +1382,7 @@ public class MRAppMaster extends CompositeService {
JobConf conf = new JobConf(new YarnConfiguration()); JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE)); conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
// Explicitly disabling SSL for map reduce task as we can't allow MR users
// to gain access to keystore file for opening SSL listener. We can trust
// RM/NM to issue SSL certificates but definitely not MR-AM as it is
// running in user-land.
MRWebAppUtil.initialize(conf); MRWebAppUtil.initialize(conf);
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTP_ONLY.name());
// log the system properties // log the system properties
String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf); String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) { if (systemPropsToLog != null) {
@ -1490,4 +1484,7 @@ public class MRAppMaster extends CompositeService {
LogManager.shutdown(); LogManager.shutdown();
} }
public ClientService getClientService() {
return clientService;
}
} }

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
@ -133,8 +134,13 @@ public class MRClientService extends AbstractService implements ClientService {
this.bindAddress = NetUtils.getConnectAddress(server); this.bindAddress = NetUtils.getConnectAddress(server);
LOG.info("Instantiated MRClientService at " + this.bindAddress); LOG.info("Instantiated MRClientService at " + this.bindAddress);
try { try {
webApp = WebApps.$for("mapreduce", AppContext.class, appContext, "ws").with(conf). // Explicitly disabling SSL for map reduce task as we can't allow MR users
start(new AMWebApp()); // to gain access to keystore file for opening SSL listener. We can trust
// RM/NM to issue SSL certificates but definitely not MR-AM as it is
// running in user-land.
webApp =
WebApps.$for("mapreduce", AppContext.class, appContext, "ws")
.withHttpPolicy(conf, Policy.HTTP_ONLY).start(new AMWebApp());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Webapps failed to start. Ignoring for now:", e); LOG.error("Webapps failed to start. Ignoring for now:", e);
} }
@ -412,4 +418,8 @@ public class MRClientService extends AbstractService implements ClientService {
" token"); " token");
} }
} }
public WebApp getWebApp() {
return webApp;
}
} }

View File

@ -21,23 +21,45 @@ package org.apache.hadoop.mapreduce.v2.app.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.APP_ID; import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.APP_ID;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import javax.net.ssl.SSLException;
import junit.framework.Assert;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.MockAppContext; import org.apache.hadoop.mapreduce.v2.app.MockAppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer;
import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.test.WebAppTests; import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.Test; import org.junit.Test;
import com.google.common.net.HttpHeaders;
import com.google.inject.Injector; import com.google.inject.Injector;
public class TestAMWebApp { public class TestAMWebApp {
@ -148,6 +170,95 @@ public class TestAMWebApp {
appContext, params); appContext, params);
} }
@Test
public void testMRWebAppSSLDisabled() throws Exception {
MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true) {
@Override
protected ClientService createClientService(AppContext context) {
return new MRClientService(context);
}
};
Configuration conf = new Configuration();
// MR is explicitly disabling SSL, even though setting as HTTPS_ONLY
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, Policy.HTTPS_ONLY.name());
Job job = app.submit(conf);
String hostPort =
NetUtils.getHostPortString(((MRClientService) app.getClientService())
.getWebApp().getListenerAddress());
// http:// should be accessible
URL httpUrl = new URL("http://" + hostPort);
HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection();
InputStream in = conn.getInputStream();
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copyBytes(in, out, 1024);
Assert.assertTrue(out.toString().contains("MapReduce Application"));
// https:// is not accessible.
URL httpsUrl = new URL("https://" + hostPort);
try {
HttpURLConnection httpsConn =
(HttpURLConnection) httpsUrl.openConnection();
httpsConn.getInputStream();
Assert.fail("https:// is not accessible, expected to fail");
} catch (Exception e) {
Assert.assertTrue(e instanceof SSLException);
}
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
static String webProxyBase = null;
public static class TestAMFilterInitializer extends AmFilterInitializer {
@Override
protected String getApplicationWebProxyBase() {
return webProxyBase;
}
}
@Test
public void testMRWebAppRedirection() throws Exception {
String[] schemePrefix =
{ WebAppUtils.HTTP_PREFIX, WebAppUtils.HTTPS_PREFIX };
for (String scheme : schemePrefix) {
MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true) {
@Override
protected ClientService createClientService(AppContext context) {
return new MRClientService(context);
}
};
Configuration conf = new Configuration();
conf.set(YarnConfiguration.PROXY_ADDRESS, "9.9.9.9");
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, scheme
.equals(WebAppUtils.HTTPS_PREFIX) ? Policy.HTTPS_ONLY.name()
: Policy.HTTP_ONLY.name());
webProxyBase = "/proxy/" + app.getAppID();
conf.set("hadoop.http.filter.initializers",
TestAMFilterInitializer.class.getName());
Job job = app.submit(conf);
String hostPort =
NetUtils.getHostPortString(((MRClientService) app.getClientService())
.getWebApp().getListenerAddress());
URL httpUrl = new URL("http://" + hostPort + "/mapreduce");
HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection();
conn.setInstanceFollowRedirects(false);
conn.connect();
String expectedURL =
scheme + conf.get(YarnConfiguration.PROXY_ADDRESS)
+ ProxyUriUtils.getPath(app.getAppID(), "/mapreduce");
Assert.assertEquals(expectedURL,
conn.getHeaderField(HttpHeaders.LOCATION));
Assert.assertEquals(HttpStatus.SC_MOVED_TEMPORARILY,
conn.getResponseCode());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
}
public static void main(String[] args) { public static void main(String[] args) {
WebApps.$for("yarn", AppContext.class, new MockAppContext(0, 8, 88, 4)). WebApps.$for("yarn", AppContext.class, new MockAppContext(0, 8, 88, 4)).
at(58888).inDevMode().start(new AMWebApp()).joinThread(); at(58888).inDevMode().start(new AMWebApp()).joinThread();

View File

@ -35,6 +35,8 @@ import javax.servlet.http.HttpServlet;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -86,6 +88,7 @@ public class WebApps {
int port = 0; int port = 0;
boolean findPort = false; boolean findPort = false;
Configuration conf; Configuration conf;
Policy httpPolicy = null;
boolean devMode = false; boolean devMode = false;
private String spnegoPrincipalKey; private String spnegoPrincipalKey;
private String spnegoKeytabKey; private String spnegoKeytabKey;
@ -143,6 +146,12 @@ public class WebApps {
return this; return this;
} }
public Builder<T> withHttpPolicy(Configuration conf, Policy httpPolicy) {
this.conf = conf;
this.httpPolicy = httpPolicy;
return this;
}
public Builder<T> withHttpSpnegoPrincipalKey(String spnegoPrincipalKey) { public Builder<T> withHttpSpnegoPrincipalKey(String spnegoPrincipalKey) {
this.spnegoPrincipalKey = spnegoPrincipalKey; this.spnegoPrincipalKey = spnegoPrincipalKey;
return this; return this;
@ -218,10 +227,18 @@ public class WebApps {
System.exit(1); System.exit(1);
} }
} }
String httpScheme;
if (this.httpPolicy == null) {
httpScheme = WebAppUtils.getHttpSchemePrefix(conf);
} else {
httpScheme =
(httpPolicy == Policy.HTTPS_ONLY) ? WebAppUtils.HTTPS_PREFIX
: WebAppUtils.HTTP_PREFIX;
}
HttpServer2.Builder builder = new HttpServer2.Builder() HttpServer2.Builder builder = new HttpServer2.Builder()
.setName(name) .setName(name)
.addEndpoint( .addEndpoint(
URI.create(WebAppUtils.getHttpSchemePrefix(conf) + bindAddress URI.create(httpScheme + bindAddress
+ ":" + port)).setConf(conf).setFindPort(findPort) + ":" + port)).setConf(conf).setFindPort(findPort)
.setACL(new AdminACLsManager(conf).getAdminAcl()) .setACL(new AdminACLsManager(conf).getAdminAcl())
.setPathSpec(pathList.toArray(new String[0])); .setPathSpec(pathList.toArray(new String[0]));
@ -236,7 +253,7 @@ public class WebApps {
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled()); .setSecurityEnabled(UserGroupInformation.isSecurityEnabled());
} }
if (YarnConfiguration.useHttps(conf)) { if (httpScheme.equals(WebAppUtils.HTTPS_PREFIX)) {
WebAppUtils.loadSslConfiguration(builder); WebAppUtils.loadSslConfiguration(builder);
} }

View File

@ -36,6 +36,9 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
@Private @Private
@Evolving @Evolving
public class WebAppUtils { public class WebAppUtils {
public static final String HTTPS_PREFIX = "https://";
public static final String HTTP_PREFIX = "http://";
public static void setRMWebAppPort(Configuration conf, int port) { public static void setRMWebAppPort(Configuration conf, int port) {
String hostname = getRMWebAppURLWithoutScheme(conf); String hostname = getRMWebAppURLWithoutScheme(conf);
hostname = hostname =
@ -180,7 +183,7 @@ public class WebAppUtils {
* @return the schmeme (HTTP / HTTPS) * @return the schmeme (HTTP / HTTPS)
*/ */
public static String getHttpSchemePrefix(Configuration conf) { public static String getHttpSchemePrefix(Configuration conf) {
return YarnConfiguration.useHttps(conf) ? "https://" : "http://"; return YarnConfiguration.useHttps(conf) ? HTTPS_PREFIX : HTTP_PREFIX;
} }
/** /**

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.http.FilterInitializer;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
public class AmFilterInitializer extends FilterInitializer { public class AmFilterInitializer extends FilterInitializer {
private static final String FILTER_NAME = "AM_PROXY_FILTER"; private static final String FILTER_NAME = "AM_PROXY_FILTER";
private static final String FILTER_CLASS = AmIpFilter.class.getCanonicalName(); private static final String FILTER_CLASS = AmIpFilter.class.getCanonicalName();
@ -37,10 +39,13 @@ public class AmFilterInitializer extends FilterInitializer {
String proxy = WebAppUtils.getProxyHostAndPort(conf); String proxy = WebAppUtils.getProxyHostAndPort(conf);
String[] parts = proxy.split(":"); String[] parts = proxy.split(":");
params.put(AmIpFilter.PROXY_HOST, parts[0]); params.put(AmIpFilter.PROXY_HOST, parts[0]);
params.put(AmIpFilter.PROXY_URI_BASE, params.put(AmIpFilter.PROXY_URI_BASE, WebAppUtils.getHttpSchemePrefix(conf)
WebAppUtils.getHttpSchemePrefix(conf) + proxy + + proxy + getApplicationWebProxyBase());
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV));
container.addFilter(FILTER_NAME, FILTER_CLASS, params); container.addFilter(FILTER_NAME, FILTER_CLASS, params);
} }
@VisibleForTesting
protected String getApplicationWebProxyBase() {
return System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV);
}
} }