MAPREDUCE-2858. Added a WebApp Proxy for applications. Contributed by Robert Evans.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1189036 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-26 06:29:21 +00:00
parent 8335995630
commit 8aabd3d4e6
44 changed files with 1448 additions and 45 deletions

View File

@ -118,6 +118,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2708. Designed and implemented MR Application Master recovery to
make MR AMs resume their progress after restart. (Sharad Agarwal via vinodkv)
MAPREDUCE-2858. Added a WebApp Proxy for applications. (Robert Evans via
acmurthy)
IMPROVEMENTS
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

View File

@ -41,6 +41,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>

View File

@ -341,6 +341,8 @@ class JobSubmitter {
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");

View File

@ -77,7 +77,8 @@ public class NotRunningJob implements MRClientProtocol {
// Setting AppState to NEW and finalStatus to UNDEFINED as they are never used
// for a non running job
return BuilderUtils.newApplicationReport(unknownAppId, "N/A", "N/A", "N/A", "N/A", 0, "",
YarnApplicationState.NEW, "N/A", "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null);
YarnApplicationState.NEW, "N/A", "N/A", 0, 0,
FinalApplicationStatus.UNDEFINED, null, "N/A");
}
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {

View File

@ -231,14 +231,14 @@ public class TestClientServiceDelegate {
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
1234, 5), "user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
FinalApplicationStatus.SUCCEEDED, null);
FinalApplicationStatus.SUCCEEDED, null, "N/A");
}
private ApplicationReport getRunningApplicationReport(String host, int port) {
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
1234, 5), "user", "queue", "appname", host, port, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
FinalApplicationStatus.UNDEFINED, null);
FinalApplicationStatus.UNDEFINED, null, "N/A");
}
private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {

View File

@ -52,6 +52,11 @@
<version>${project.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>

View File

@ -30,3 +30,5 @@ bin=`cd "$bin"; pwd`
"$bin"/yarn-daemons.sh --config $YARN_CONF_DIR start nodemanager
# start historyserver
#"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start historyserver
# start proxyserver
#"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start proxyserver

View File

@ -30,4 +30,5 @@ bin=`cd "$bin"; pwd`
"$bin"/yarn-daemons.sh --config $YARN_CONF_DIR stop nodemanager
# stop historyServer
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop historyserver
# stop proxy server
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop proxyserver

View File

@ -206,6 +206,9 @@ elif [ "$COMMAND" = "nodemanager" ] ; then
elif [ "$COMMAND" = "historyserver" ] ; then
CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
YARN_OPTS="$YARN_OPTS $YARN_JOB_HISTORYSERVER_OPTS"
elif [ "$COMMAND" = "proxyserver" ] ; then
CLASS='org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer'
YARN_OPTS="$YARN_OPTS $YARN_PROXYSERVER_OPTS"
elif [ "$COMMAND" = "version" ] ; then
CLASS=org.apache.hadoop.util.VersionInfo
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"

View File

@ -60,6 +60,14 @@ public interface ApplicationConstants {
public static final String LOCAL_DIR_ENV = "YARN_LOCAL_DIRS";
/**
* The environmental variable for APPLICATION_WEB_PROXY_BASE. Set in
* ApplicationMaster's environment only. This states that for all non-relative
* web URLs in the app masters web UI what base should they have.
*/
public static final String APPLICATION_WEB_PROXY_BASE_ENV =
"APPLICATION_WEB_PROXY_BASE";
public static final String LOG_DIR_EXPANSION_VAR = "<LOG_DIR>";
public static final String STDERR = "stderr";

View File

@ -174,6 +174,19 @@ public interface ApplicationReport {
@Private
@Unstable
void setTrackingUrl(String url);
/**
* Get the original not-proxied <em>tracking url</em> for the application.
* This is intended to only be used by the proxy itself.
* @return the original not-proxied <em>tracking url</em> for the application
*/
@Private
@Unstable
String getOriginalTrackingUrl();
@Private
@Unstable
void setOriginalTrackingUrl(String url);
/**
* Get the <em>start time</em> of the application.

View File

@ -90,6 +90,15 @@ implements ApplicationReport {
return p.getTrackingUrl();
}
@Override
public String getOriginalTrackingUrl() {
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasOriginalTrackingUrl()) {
return null;
}
return p.getOriginalTrackingUrl();
}
@Override
public String getName() {
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
@ -198,6 +207,16 @@ implements ApplicationReport {
}
builder.setTrackingUrl(url);
}
@Override
public void setOriginalTrackingUrl(String url) {
maybeInitBuilder();
if (url == null) {
builder.clearOriginalTrackingUrl();
return;
}
builder.setOriginalTrackingUrl(url);
}
@Override
public void setName(String name) {

View File

@ -162,6 +162,7 @@ message ApplicationReportProto {
optional int64 finishTime = 14;
optional FinalApplicationStatusProto final_application_status = 15;
optional ApplicationResourceUsageReportProto app_resource_Usage = 16;
optional string originalTrackingUrl = 17;
}
message NodeIdProto {

View File

@ -371,6 +371,22 @@ public class YarnConfiguration extends Configuration {
public static final int INVALID_CONTAINER_EXIT_STATUS = -1000;
public static final int ABORTED_CONTAINER_EXIT_STATUS = -100;
////////////////////////////////
// Web Proxy Configs
////////////////////////////////
public static final String PROXY_PREFIX = "yarn.web-proxy.";
/** The kerberos principal for the proxy.*/
public static final String PROXY_PRINCIPAL =
PROXY_PREFIX + "principal";
/** Keytab for Proxy.*/
public static final String PROXY_KEYTAB = PROXY_PREFIX + "keytab";
/** The address for the web proxy.*/
public static final String PROXY_ADDRESS =
PROXY_PREFIX + "address";
/**
* YARN Service Level Authorization
@ -406,15 +422,27 @@ public class YarnConfiguration extends Configuration {
}
}
public static String getRMWebAppURL(Configuration conf) {
public static String getProxyHostAndPort(Configuration conf) {
String addr = conf.get(PROXY_ADDRESS);
if(addr == null || addr.isEmpty()) {
addr = getRMWebAppHostAndPort(conf);
}
return addr;
}
public static String getRMWebAppHostAndPort(Configuration conf) {
String addr = conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
Iterator<String> it = ADDR_SPLITTER.split(addr).iterator();
it.next(); // ignore the bind host
String port = it.next();
// Use apps manager address to figure out the host for webapp
addr = conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS);
String host = ADDR_SPLITTER.split(addr).iterator().next();
return JOINER.join("http://", host, ":", port);
return JOINER.join(host, ":", port);
}
public static String getRMWebAppURL(Configuration conf) {
return JOINER.join("http://", getRMWebAppHostAndPort(conf));
}
}

View File

@ -281,7 +281,8 @@ public class BuilderUtils {
ApplicationId applicationId, String user, String queue, String name,
String host, int rpcPort, String clientToken, YarnApplicationState state,
String diagnostics, String url, long startTime, long finishTime,
FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources) {
FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources,
String origTrackingUrl) {
ApplicationReport report = recordFactory
.newRecordInstance(ApplicationReport.class);
report.setApplicationId(applicationId);
@ -298,6 +299,7 @@ public class BuilderUtils {
report.setFinishTime(finishTime);
report.setFinalApplicationStatus(finalStatus);
report.setApplicationResourceUsageReport(appResources);
report.setOriginalTrackingUrl(origTrackingUrl);
return report;
}

View File

@ -154,15 +154,24 @@ public final class StringHelper {
if (part.startsWith("#") || isAbsUrl(part)) {
sb.append(part);
} else {
sb.append('/').append(pathPrefix).append('/').append(part);
uappend(sb, pathPrefix);
uappend(sb, part);
}
} else {
sb.append('/').append(part);
uappend(sb, part);
}
}
return sb.toString();
}
private static void uappend(StringBuilder sb, String part) {
if((sb.length() <= 0 || sb.charAt(sb.length() - 1) != '/')
&& !part.startsWith("/")) {
sb.append('/');
}
sb.append(part);
}
public static String percent(double value) {
return String.format("%.2f", value * 100);
}

View File

@ -32,6 +32,7 @@ import javax.servlet.http.HttpServletResponse;
import static org.apache.hadoop.yarn.util.StringHelper.*;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -175,8 +176,20 @@ public abstract class View implements Params {
moreParams().put(key, value);
}
public String root() {
String root = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV);
if(root == null || root.isEmpty()) {
root = "/";
}
return root;
}
public String prefix() {
return context().rc.prefix;
if(context().rc.prefix == null) {
return root();
} else {
return ujoin(root(), context().rc.prefix);
}
}
public void setTitle(String title) {
@ -188,6 +201,16 @@ public abstract class View implements Params {
set(TITLE_LINK, url);
}
/**
* Create an url from url components
* @param parts components to join
* @return an url string
*/
public String root_url(String... parts) {
return ujoin(root(), parts);
}
/**
* Create an url from url components
* @param parts components to join

View File

@ -18,23 +18,27 @@
package org.apache.hadoop.yarn.webapp;
import static com.google.common.base.Preconditions.*;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.servlet.GuiceFilter;
import static com.google.common.base.Preconditions.checkNotNull;
import java.net.ConnectException;
import java.net.URL;
import org.apache.commons.lang.StringUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import javax.servlet.http.HttpServlet;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
/**
* Helpers to create an embedded webapp.
*
@ -59,6 +63,12 @@ public class WebApps {
static final Logger LOG = LoggerFactory.getLogger(WebApps.class);
public static class Builder<T> {
static class ServletStruct {
public Class<? extends HttpServlet> clazz;
public String name;
public String spec;
}
final String name;
final Class<T> api;
final T application;
@ -67,6 +77,8 @@ public class WebApps {
boolean findPort = false;
Configuration conf;
boolean devMode = false;
private final HashSet<ServletStruct> servlets = new HashSet<ServletStruct>();
private final HashMap<String, Object> attributes = new HashMap<String, Object>();
Builder(String name, Class<T> api, T application) {
this.name = name;
@ -93,6 +105,21 @@ public class WebApps {
return this;
}
public Builder<T> withAttribute(String key, Object value) {
attributes.put(key, value);
return this;
}
public Builder<T> withServlet(String name, String pathSpec,
Class<? extends HttpServlet> servlet) {
ServletStruct struct = new ServletStruct();
struct.clazz = servlet;
struct.name = name;
struct.spec = pathSpec;
servlets.add(struct);
return this;
}
public Builder<T> with(Configuration conf) {
this.conf = conf;
return this;
@ -152,6 +179,12 @@ public class WebApps {
HttpServer server =
new HttpServer(name, bindAddress, port, findPort, conf,
webapp.getServePathSpecs());
for(ServletStruct struct: servlets) {
server.addServlet(struct.name, struct.spec, struct.clazz);
}
for(Map.Entry<String, Object> entry : attributes.entrySet()) {
server.setAttribute(entry.getKey(), entry.getValue());
}
server.addGlobalFilter("guice", GuiceFilter.class.getName(), null);
webapp.setConf(conf);
webapp.setHttpServer(server);

View File

@ -34,7 +34,7 @@ public class ErrorPage extends HtmlPage {
String title = "Sorry, got error "+ status();
html.
title(title).
link("/static/yarn.css").
link(root_url("static","yarn.css")).
_(JQueryUI.class). // an embedded sub-view
style("#msg { margin: 1em auto; width: 88%; }",
"#msg h1 { padding: 0.2em 1.5em; font: bold 1.3em serif; }").

View File

@ -81,12 +81,12 @@ public class JQueryUI extends HtmlBlock {
html.
link(join("https://ajax.googleapis.com/ajax/libs/jqueryui/1.8.9/themes/",
getTheme(), "/jquery-ui.css")).
link("/static/dt-1.7.5/css/jui-dt.css").
link(root_url("static/dt-1.7.5/css/jui-dt.css")).
script("https://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min.js").
script("https://ajax.googleapis.com/ajax/libs/jqueryui/1.8.9/jquery-ui.min.js").
script("/static/dt-1.7.5/js/jquery.dataTables.min.js").
script("/static/yarn.dt.plugins.js").
script("/static/themeswitcher.js").
script(root_url("static/dt-1.7.5/js/jquery.dataTables.min.js")).
script(root_url("static/yarn.dt.plugins.js")).
script(root_url("static/themeswitcher.js")).
style("#jsnotice { padding: 0.2em; text-align: center; }",
".ui-progressbar { height: 1em; min-width: 5em }"); // required

View File

@ -35,7 +35,7 @@ public class TwoColumnCssLayout extends HtmlPage {
preHead(html);
html.
title($("title")).
link("/static/yarn.css").
link(root_url("static","yarn.css")).
style(".main { min-height: 100%; height: auto !important; height: 100%;",
" margin: 0 auto -4em; border: 0; }",
".footer, .push { height: 4em; clear: both; border: 0 }",

View File

@ -41,7 +41,7 @@ public class TwoColumnLayout extends HtmlPage {
preHead(html);
html.
title($(TITLE)).
link("/static/yarn.css").
link(root_url("static","yarn.css")).
style("#layout { height: 100%; }",
"#layout thead td { height: 3em; }",
"#layout #navcell { width: 11em; padding: 0 1em; }",

View File

@ -89,6 +89,7 @@ public class MockApps {
@Override public YarnApplicationState getYarnApplicationState() { return state; }
@Override public String getQueue() { return queue; }
@Override public String getTrackingUrl() { return ""; }
@Override public String getOriginalTrackingUrl() { return ""; }
@Override public FinalApplicationStatus getFinalApplicationStatus() { return finishState; }
@Override
public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
@ -103,6 +104,7 @@ public class MockApps {
// TODO Auto-generated method stub
}
@Override public void setOriginalTrackingUrl(String url) { }
@Override
public void setApplicationResourceUsageReport(ApplicationResourceUsageReport appResources) {
this.appUsageReport = appResources;

View File

@ -376,4 +376,27 @@
<name>mapreduce.job.hdfs-servers</name>
<value>${fs.default.name}</value>
</property>
<!-- WebAppProxy Configuration-->
<property>
<description>The kerberos principal for the proxy, if the proxy is not
running as part of the RM.</description>
<name>yarn.web-proxy.principal</name>
<value/>
</property>
<property>
<description>Keytab for WebAppProxy, if the proxy is not running as part of
the RM.</description>
<name>yarn.web-proxy.keytab</name>
</property>
<property>
<description>The address for the web proxy as HOST:PORT, if this is not
given or if it matches yarn.resourcemanager.address then the proxy will
run as part of the RM</description>
<name>yarn.web-proxy.address</name>
<value/>
</property>
</configuration>

View File

@ -34,6 +34,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -66,11 +66,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
/**
* The ResourceManager is the main class that is a set of components.
@ -406,11 +411,18 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected void startWepApp() {
webApp = WebApps.$for("cluster", masterService).at(
this.conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS)).
start(new RMWebApp(this));
Builder<ApplicationMasterService> builder =
WebApps.$for("cluster", masterService).at(
this.conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS));
if(YarnConfiguration.getRMWebAppHostAndPort(conf).
equals(YarnConfiguration.getProxyHostAndPort(conf))) {
AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService());
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
}
webApp = builder.start(new RMWebApp(this));
}
@Override

View File

@ -187,6 +187,8 @@ public class AMLauncher implements Runnable {
throws IOException {
Map<String, String> environment = container.getEnvironment();
environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV,
application.getWebProxyBase());
// Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be
// consumable by the AM.
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV, container

View File

@ -331,11 +331,13 @@ public class RMAppImpl implements RMApp {
String clientToken = "N/A";
String trackingUrl = "N/A";
String host = "N/A";
String origTrackingUrl = "N/A";
int rpcPort = -1;
ApplicationResourceUsageReport appUsageReport = null;
FinalApplicationStatus finishState = getFinalApplicationStatus();
if (this.currentAttempt != null) {
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken();
host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort();
@ -345,7 +347,8 @@ public class RMAppImpl implements RMApp {
this.queue, this.name, host, rpcPort, clientToken,
createApplicationState(this.stateMachine.getCurrentState()),
this.diagnostics.toString(), trackingUrl,
this.startTime, this.finishTime, finishState, appUsageReport);
this.startTime, this.finishTime, finishState, appUsageReport,
origTrackingUrl);
} finally {
this.readLock.unlock();
}
@ -381,7 +384,7 @@ public class RMAppImpl implements RMApp {
@Override
public String getTrackingUrl() {
this.readLock.lock();
try {
if (this.currentAttempt != null) {
return this.currentAttempt.getTrackingUrl();
@ -439,7 +442,7 @@ public class RMAppImpl implements RMApp {
RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
clientTokenStr, rmContext, scheduler, masterService,
submissionContext);
submissionContext, YarnConfiguration.getProxyHostAndPort(conf));
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
handler.handle(

View File

@ -71,6 +71,22 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/
String getTrackingUrl();
/**
* The original url at which the status of the application attempt can be
* accessed. This url is not fronted by a proxy. This is only intended to be
* used by the proxy.
* @return the url at which the status of the attempt can be accessed and is
* not fronted by a proxy.
*/
String getOriginalTrackingUrl();
/**
* The base to be prepended to web URLs that are not relative, and the user
* has been checked.
* @return the base URL to be prepended to web URLs that are not relative.
*/
String getWebProxyBase();
/**
* The token required by the clients to talk to the application attempt
* @return the token required by the clients to talk to the application attempt

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
@ -114,12 +117,16 @@ public class RMAppAttemptImpl implements RMAppAttempt {
private float progress = 0;
private String host = "N/A";
private int rpcPort;
private String trackingUrl = "N/A";
private String origTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A";
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
private FinalApplicationStatus finalStatus = null;
private final StringBuilder diagnostics = new StringBuilder();
private final String proxy;
private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
RMAppAttemptEventType,
@ -250,8 +257,10 @@ public class RMAppAttemptImpl implements RMAppAttempt {
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
String clientToken, RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext) {
ApplicationSubmissionContext submissionContext,
String proxy) {
this.proxy = proxy;
this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext;
this.eventHandler = rmContext.getDispatcher().getEventHandler();
@ -322,9 +331,46 @@ public class RMAppAttemptImpl implements RMAppAttempt {
@Override
public String getTrackingUrl() {
this.readLock.lock();
try {
return this.trackingUrl;
return this.proxiedTrackingUrl;
} finally {
this.readLock.unlock();
}
}
@Override
public String getOriginalTrackingUrl() {
this.readLock.lock();
try {
return this.origTrackingUrl;
} finally {
this.readLock.unlock();
}
}
@Override
public String getWebProxyBase() {
this.readLock.lock();
try {
return ProxyUriUtils.getPath(applicationAttemptId.getApplicationId());
} finally {
this.readLock.unlock();
}
}
private String generateProxyUriWithoutScheme(
final String trackingUriWithoutScheme) {
this.readLock.lock();
try {
URI trackingUri = ProxyUriUtils.getUriFromAMUrl(trackingUriWithoutScheme);
URI proxyUri = ProxyUriUtils.getUriFromAMUrl(proxy);
URI result = ProxyUriUtils.getProxyUri(trackingUri, proxyUri,
applicationAttemptId.getApplicationId());
//We need to strip off the scheme to have it match what was there before
return result.toASCIIString().substring(7);
} catch (URISyntaxException e) {
LOG.warn("Could not proxify "+trackingUriWithoutScheme,e);
return trackingUriWithoutScheme;
} finally {
this.readLock.unlock();
}
@ -691,7 +737,9 @@ public class RMAppAttemptImpl implements RMAppAttempt {
= (RMAppAttemptRegistrationEvent) event;
appAttempt.host = registrationEvent.getHost();
appAttempt.rpcPort = registrationEvent.getRpcport();
appAttempt.trackingUrl = registrationEvent.getTrackingurl();
appAttempt.origTrackingUrl = registrationEvent.getTrackingurl();
appAttempt.proxiedTrackingUrl =
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
// Let the app know
appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
@ -787,7 +835,9 @@ public class RMAppAttemptImpl implements RMAppAttempt {
RMAppAttemptUnregistrationEvent unregisterEvent
= (RMAppAttemptUnregistrationEvent) event;
appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
appAttempt.trackingUrl = unregisterEvent.getTrackingUrl();
appAttempt.origTrackingUrl = unregisterEvent.getTrackingUrl();
appAttempt.proxiedTrackingUrl =
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
// Tell the app and the scheduler
@ -837,7 +887,8 @@ public class RMAppAttemptImpl implements RMAppAttempt {
* NOTE: don't set trackingUrl to 'null'. That will cause null-pointer exceptions
* in the generated proto code.
*/
appAttempt.trackingUrl = "";
appAttempt.origTrackingUrl = "";
appAttempt.proxiedTrackingUrl = "";
new FinalTransition(RMAppAttemptState.FAILED).transition(
appAttempt, containerFinishedEvent);

View File

@ -178,7 +178,7 @@ public class TestRMAppAttemptTransitions {
application = mock(RMApp.class);
applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler,
masterService, submissionContext);
masterService, submissionContext, null);
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId);
@ -328,7 +328,9 @@ public class TestRMAppAttemptTransitions {
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(host, applicationAttempt.getHost());
assertEquals(rpcPort, applicationAttempt.getRpcPort());
assertEquals(trackingUrl, applicationAttempt.getTrackingUrl());
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId().
getApplicationId()+"/", applicationAttempt.getTrackingUrl());
// TODO - need to add more checks relevant to this state
}
@ -343,7 +345,9 @@ public class TestRMAppAttemptTransitions {
assertEquals(RMAppAttemptState.FINISHED,
applicationAttempt.getAppAttemptState());
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
assertEquals(trackingUrl, applicationAttempt.getTrackingUrl());
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId().
getApplicationId()+"/", applicationAttempt.getTrackingUrl());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());

View File

@ -0,0 +1,67 @@
<?xml version="1.0"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>0.24.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<version>0.24.0-SNAPSHOT</version>
<name>hadoop-yarn-server-web-proxy</name>
<properties>
<install.file>${project.artifact.file}</install.file>
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
</properties>
<dependencies>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>compile</scope>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Publish tests jar -->
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
/**
* This class abstracts away how ApplicationReports are fetched.
*/
public class AppReportFetcher {
private static final Log LOG = LogFactory.getLog(AppReportFetcher.class);
private final Configuration conf;
private final ClientRMProtocol applicationsManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
/**
* Create a new Connection to the RM to fetch Application reports.
* @param conf the conf to use to know where the RM is.
*/
public AppReportFetcher(Configuration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf);
InetSocketAddress rmAddress =
NetUtils.createSocketAddr(this.conf.get(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
applicationsManager =
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
rmAddress, this.conf);
LOG.info("Connected to ResourceManager at " + rmAddress);
}
/**
* Just call directly into the applicationsManager given instead of creating
* a remote connection to it. This is mostly for when the Proxy is running
* as part of the RM already.
* @param conf the configuration to use
* @param applicationsManager what to use to get the RM reports.
*/
public AppReportFetcher(Configuration conf, ClientRMProtocol applicationsManager) {
this.conf = conf;
this.applicationsManager = applicationsManager;
}
/**
* Get a report for the specified app.
* @param appId the id of the application to get.
* @return the ApplicationReport for that app.
* @throws YarnRemoteException on any error.
*/
public ApplicationReport getApplicationReport(ApplicationId appId)
throws YarnRemoteException {
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId);
GetApplicationReportResponse response = applicationsManager
.getApplicationReport(request);
return response.getApplicationReport();
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy;
import static org.apache.hadoop.yarn.util.StringHelper.ujoin;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class ProxyUriUtils {
@SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(ProxyUriUtils.class);
/**Name of the servlet to use when registering the proxy servlet. */
public static final String PROXY_SERVLET_NAME = "proxy";
/**Base path where the proxy servlet will handle requests.*/
public static final String PROXY_BASE = "/proxy/";
/**Path Specification for the proxy servlet.*/
public static final String PROXY_PATH_SPEC = PROXY_BASE+"*";
/**Query Parameter indicating that the URI was approved.*/
public static final String PROXY_APPROVAL_PARAM = "proxyapproved";
private static String uriEncode(Object o) {
try {
assert (o != null) : "o canot be null";
return URLEncoder.encode(o.toString(), "UTF-8");
} catch (UnsupportedEncodingException e) {
//This should never happen
throw new RuntimeException("UTF-8 is not supported by this system?", e);
}
}
/**
* Get the proxied path for an application.
* @param id the application id to use.
* @return the base path to that application through the proxy.
*/
public static String getPath(ApplicationId id) {
if(id == null) {
throw new IllegalArgumentException("Application id cannot be null ");
}
return ujoin(PROXY_BASE, uriEncode(id));
}
/**
* Get the proxied path for an application.
* @param id the application id to use.
* @param path the rest of the path to the application.
* @return the base path to that application through the proxy.
*/
public static String getPath(ApplicationId id, String path) {
if(path == null) {
return getPath(id);
} else {
return ujoin(getPath(id), path);
}
}
/**
* Get the proxied path for an application
* @param id the id of the application
* @param path the path of the application.
* @param query the query parameters
* @param approved true if the user has approved accessing this app.
* @return the proxied path for this app.
*/
public static String getPathAndQuery(ApplicationId id, String path,
String query, boolean approved) {
StringBuilder newp = new StringBuilder();
newp.append(getPath(id, path));
boolean first = appendQuery(newp, query, true);
if(approved) {
first = appendQuery(newp, PROXY_APPROVAL_PARAM+"=true", first);
}
return newp.toString();
}
private static boolean appendQuery(StringBuilder builder, String query,
boolean first) {
if(query != null && !query.isEmpty()) {
if(first && !query.startsWith("?")) {
builder.append('?');
}
if(!first && !query.startsWith("&")) {
builder.append('&');
}
builder.append(query);
return false;
}
return first;
}
/**
* Get a proxied URI for the original URI.
* @param originalUri the original URI to go through the proxy
* @param proxyUri the URI of the proxy itself, scheme, host and port are used.
* @param id the id of the application
* @return the proxied URI
*/
public static URI getProxyUri(URI originalUri, URI proxyUri,
ApplicationId id) {
try {
String path = getPath(id, originalUri.getPath());
return new URI(proxyUri.getScheme(), proxyUri.getAuthority(), path,
originalUri.getQuery(), originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException("Could not proxify "+originalUri,e);
}
}
/**
* Create a URI form a no scheme Url, such as is returned by the AM.
* @param noSchemeUrl the URL formate returned by an AM
* @return a URI with an http scheme
* @throws URISyntaxException if the url is not formatted correctly.
*/
public static URI getUriFromAMUrl(String noSchemeUrl)
throws URISyntaxException {
return new URI("http://"+noSchemeUrl);
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.fs.CommonConfigurationKeys;
public class WebAppProxy extends AbstractService {
public static final String FETCHER_ATTRIBUTE= "AppUrlFetcher";
public static final String IS_SECURITY_ENABLED_ATTRIBUTE = "IsSecurityEnabled";
private static final Log LOG = LogFactory.getLog(WebAppProxy.class);
private HttpServer proxyServer = null;
private String bindAddress = null;
private int port = 0;
private AccessControlList acl = null;
private AppReportFetcher fetcher = null;
private boolean isSecurityEnabled = false;
public WebAppProxy() {
super(WebAppProxy.class.getName());
}
@Override
public void init(Configuration conf) {
String auth = conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
if (auth == null || "simple".equals(auth)) {
isSecurityEnabled = false;
} else if ("kerberos".equals(auth)) {
isSecurityEnabled = true;
} else {
LOG.warn("Unrecongized attribute value for " +
CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION +
" of " + auth);
}
fetcher = new AppReportFetcher(conf);
bindAddress = conf.get(YarnConfiguration.PROXY_ADDRESS);
if(bindAddress == null || bindAddress.isEmpty()) {
throw new YarnException(YarnConfiguration.PROXY_ADDRESS +
" is not set so the proxy will not run.");
}
LOG.info("Instantiating Proxy at " + bindAddress);
String[] parts = StringUtils.split(bindAddress, ':');
port = 0;
if (parts.length == 2) {
bindAddress = parts[0];
port = Integer.parseInt(parts[1]);
}
acl = new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
super.init(conf);
}
@Override
public void start() {
try {
proxyServer = new HttpServer("proxy", bindAddress, port,
port == 0, getConfig(), acl);
proxyServer.addServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
proxyServer.setAttribute(FETCHER_ATTRIBUTE, fetcher);
proxyServer.setAttribute(IS_SECURITY_ENABLED_ATTRIBUTE, isSecurityEnabled);
proxyServer.start();
} catch (IOException e) {
LOG.fatal("Could not start proxy web server",e);
throw new YarnException("Could not start proxy web server",e);
}
super.start();
}
@Override
public void stop() {
if(proxyServer != null) {
try {
proxyServer.stop();
} catch (Exception e) {
LOG.fatal("Error stopping proxy web server", e);
throw new YarnException("Error stopping proxy web server",e);
}
}
super.stop();
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.CompositeService;
/**
* ProxyServer will sit in between the end user and AppMaster
* web interfaces.
*/
public class WebAppProxyServer extends CompositeService {
private static final Log LOG = LogFactory.getLog(WebAppProxyServer.class);
private WebAppProxy proxy = null;
public WebAppProxyServer() {
super(WebAppProxyServer.class.getName());
}
@Override
public synchronized void init(Configuration conf) {
Configuration config = new YarnConfiguration(conf);
try {
doSecureLogin(conf);
} catch(IOException ie) {
throw new YarnException("Proxy Server Failed to login", ie);
}
proxy = new WebAppProxy();
addService(proxy);
super.init(config);
}
/**
* Log in as the Kerberose principal designated for the proxy
* @param conf the configuration holding this information in it.
* @throws IOException on any error.
*/
protected void doSecureLogin(Configuration conf) throws IOException {
SecurityUtil.login(conf, YarnConfiguration.PROXY_KEYTAB,
YarnConfiguration.PROXY_PRINCIPAL);
}
public static void main(String[] args) {
StringUtils.startupShutdownMessage(WebAppProxyServer.class, args, LOG);
try {
WebAppProxyServer proxy = new WebAppProxyServer();
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(proxy));
YarnConfiguration conf = new YarnConfiguration();
proxy.init(conf);
proxy.start();
} catch (Throwable t) {
LOG.fatal("Error starting Proxy server", t);
System.exit(-1);
}
}
}

View File

@ -0,0 +1,275 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Enumeration;
import java.util.HashSet;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.webapp.MimeType;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
public class WebAppProxyServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(WebAppProxyServlet.class);
private static final HashSet<String> passThroughHeaders =
new HashSet<String>(Arrays.asList("User-Agent", "Accept", "Accept-Encoding",
"Accept-Language", "Accept-Charset"));
public static final String PROXY_USER_COOKIE_NAME = "proxy-user";
private static class _ implements Hamlet._ {
//Empty
}
private static class Page extends Hamlet {
Page(PrintWriter out) {
super(out, 0, false);
}
public HTML<WebAppProxyServlet._> html() {
return new HTML<WebAppProxyServlet._>("html", null, EnumSet.of(EOpt.ENDTAG));
}
}
/**
* Output 404 with appropriate message.
* @param resp the http response.
* @param message the message to include on the page.
* @throws IOException on any error.
*/
private static void notFound(HttpServletResponse resp, String message)
throws IOException {
resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
resp.setContentType(MimeType.HTML);
Page p = new Page(resp.getWriter());
p.html().
h1(message).
_();
}
/**
* Warn the user that the link may not be safe!
* @param resp the http response
* @param link the link to point to
* @param user the user that owns the link.
* @throws IOException on any error.
*/
private static void warnUserPage(HttpServletResponse resp, String link,
String user, ApplicationId id) throws IOException {
//Set the cookie when we warn which overrides the query parameter
//This is so that if a user passes in the approved query parameter without
//having first visited this page then this page will still be displayed
resp.addCookie(makeCheckCookie(id, false));
resp.setContentType(MimeType.HTML);
Page p = new Page(resp.getWriter());
p.html().
h1("WARNING: The following page may not be safe!").h3().
_("click ").a(link, "here").
_(" to continue to an Application Master web interface owned by ", user).
_().
_();
}
/**
* Download link and have it be the response.
* @param req the http request
* @param resp the http response
* @param link the link to download
* @param c the cookie to set if any
* @throws IOException on any error.
*/
private static void proxyLink(HttpServletRequest req,
HttpServletResponse resp, URI link,Cookie c) throws IOException {
org.apache.commons.httpclient.URI uri =
new org.apache.commons.httpclient.URI(link.toString(), false);
HttpClient client = new HttpClient();
HttpMethod method = new GetMethod(uri.getEscapedURI());
@SuppressWarnings("unchecked")
Enumeration<String> names = req.getHeaderNames();
while(names.hasMoreElements()) {
String name = names.nextElement();
if(passThroughHeaders.contains(name)) {
String value = req.getHeader(name);
LOG.debug("REQ HEADER: "+name+" : "+value);
method.setRequestHeader(name, value);
}
}
String user = req.getRemoteUser();
if(user != null && !user.isEmpty()) {
method.setRequestHeader("Cookie",PROXY_USER_COOKIE_NAME+"="+
URLEncoder.encode(user, "ASCII"));
}
OutputStream out = resp.getOutputStream();
try {
resp.setStatus(client.executeMethod(method));
for(Header header : method.getResponseHeaders()) {
resp.setHeader(header.getName(), header.getValue());
}
if(c != null) {
resp.addCookie(c);
}
InputStream in = method.getResponseBodyAsStream();
if(in != null) {
IOUtils.copyBytes(in, out, 4096, true);
}
} finally {
method.releaseConnection();
}
}
private static String getCheckCookieName(ApplicationId id){
return "checked_"+id;
}
private static Cookie makeCheckCookie(ApplicationId id, boolean isSet) {
Cookie c = new Cookie(getCheckCookieName(id),String.valueOf(isSet));
c.setPath(ProxyUriUtils.getPath(id));
c.setMaxAge(60 * 60 * 2); //2 hours in seconds
return c;
}
private boolean isSecurityEnabled() {
Boolean b = (Boolean) getServletContext()
.getAttribute(WebAppProxy.IS_SECURITY_ENABLED_ATTRIBUTE);
if(b != null) return b;
return false;
}
private ApplicationReport getApplicationReport(ApplicationId id) throws IOException {
return ((AppReportFetcher) getServletContext()
.getAttribute(WebAppProxy.FETCHER_ATTRIBUTE)).getApplicationReport(id);
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws IOException{
try {
String userApprovedParamS =
req.getParameter(ProxyUriUtils.PROXY_APPROVAL_PARAM);
boolean userWasWarned = false;
boolean userApproved =
(userApprovedParamS != null && Boolean.valueOf(userApprovedParamS));
boolean securityEnabled = isSecurityEnabled();
final String remoteUser = req.getRemoteUser();
final String pathInfo = req.getPathInfo();
String parts[] = pathInfo.split("/", 3);
if(parts.length < 2) {
LOG.warn(remoteUser+" Gave an invalid proxy path "+pathInfo);
notFound(resp, "Your path appears to be formatted incorrectly.");
return;
}
//parts[0] is empty because path info always starts with a /
String appId = parts[1];
String rest = parts.length > 2 ? parts[2] : "";
ApplicationId id = Apps.toAppID(appId);
if(id == null) {
LOG.warn(req.getRemoteUser()+" Attempting to access "+appId+
" that is invalid");
notFound(resp, appId+" appears to be formatted incorrectly.");
return;
}
if(securityEnabled) {
String cookieName = getCheckCookieName(id);
for(Cookie c: req.getCookies()) {
if(cookieName.equals(c.getName())) {
userWasWarned = true;
userApproved = userApproved || Boolean.valueOf(c.getValue());
break;
}
}
}
boolean checkUser = securityEnabled && (!userWasWarned || !userApproved);
ApplicationReport applicationReport = getApplicationReport(id);
if(applicationReport == null) {
LOG.warn(req.getRemoteUser()+" Attempting to access "+id+
" that was not found");
notFound(resp, "Application "+appId+" could not be found, " +
"please try the history server");
return;
}
URI trackingUri = ProxyUriUtils.getUriFromAMUrl(
applicationReport.getOriginalTrackingUrl());
String runningUser = applicationReport.getUser();
if(checkUser && !runningUser.equals(remoteUser)) {
LOG.info("Asking "+remoteUser+" if they want to connect to the " +
"app master GUI of "+appId+" owned by "+runningUser);
warnUserPage(resp, ProxyUriUtils.getPathAndQuery(id, rest,
req.getQueryString(), true), runningUser, id);
return;
}
URI toFetch = new URI(req.getScheme(),
trackingUri.getAuthority(),
StringHelper.ujoin(trackingUri.getPath(), rest), req.getQueryString(),
null);
LOG.info(req.getRemoteUser()+" is accessing unchecked "+toFetch+
" which is the app master GUI of "+appId+" owned by "+runningUser);
switch(applicationReport.getYarnApplicationState()) {
case KILLED:
case FINISHED:
case FAILED:
resp.sendRedirect(resp.encodeRedirectURL(toFetch.toString()));
return;
}
Cookie c = null;
if(userWasWarned && userApproved) {
c = makeCheckCookie(id, true);
}
proxyLink(req, resp, toFetch, c);
} catch(URISyntaxException e) {
throw new IOException(e);
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy.amfilter;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.FilterContainer;
import org.apache.hadoop.http.FilterInitializer;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
public class AmFilterInitializer extends FilterInitializer {
private static final String FILTER_NAME = "AM_PROXY_FILTER";
private static final String FILTER_CLASS = AmIpFilter.class.getCanonicalName();
@Override
public void initFilter(FilterContainer container, Configuration conf) {
Map<String, String> params = new HashMap<String, String>();
String proxy = YarnConfiguration.getProxyHostAndPort(conf);
String[] parts = proxy.split(":");
params.put(AmIpFilter.PROXY_HOST, parts[0]);
params.put(AmIpFilter.PROXY_URI_BASE, "http://"+proxy+
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV));
container.addFilter(FILTER_NAME, FILTER_CLASS, params);
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy.amfilter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
public class AmIpFilter implements Filter {
private static final Log LOG = LogFactory.getLog(AmIpFilter.class);
public static final String PROXY_HOST = "PROXY_HOST";
public static final String PROXY_URI_BASE = "PROXY_URI_BASE";
//update the proxy IP list about every 5 min
private static final long updateInterval = 5 * 60 * 1000;
private String proxyHost;
private Set<String> proxyAddresses = null;
private long lastUpdate;
private String proxyUriBase;
@Override
public void init(FilterConfig conf) throws ServletException {
proxyHost = conf.getInitParameter(PROXY_HOST);
proxyUriBase = conf.getInitParameter(PROXY_URI_BASE);
}
private Set<String> getProxyAddresses() throws ServletException {
long now = System.currentTimeMillis();
if(proxyAddresses == null || (lastUpdate + updateInterval) >= now) {
synchronized(this) {
try {
proxyAddresses = new HashSet<String>();
for(InetAddress add : InetAddress.getAllByName(proxyHost)) {
proxyAddresses.add(add.getHostAddress());
}
lastUpdate = now;
} catch (UnknownHostException e) {
throw new ServletException("Could not locate "+proxyHost, e);
}
}
}
return proxyAddresses;
}
@Override
public void destroy() {
//Empty
}
@Override
public void doFilter(ServletRequest req, ServletResponse resp,
FilterChain chain) throws IOException, ServletException {
if(!(req instanceof HttpServletRequest)) {
throw new ServletException("This filter only works for HTTP/HTTPS");
}
HttpServletRequest httpReq = (HttpServletRequest)req;
HttpServletResponse httpResp = (HttpServletResponse)resp;
if(!getProxyAddresses().contains(httpReq.getRemoteAddr())) {
String redirectUrl = httpResp.encodeRedirectURL(proxyUriBase +
httpReq.getRequestURI());
httpResp.sendRedirect(redirectUrl);
return;
}
String user = null;
for(Cookie c: httpReq.getCookies()) {
if(WebAppProxyServlet.PROXY_USER_COOKIE_NAME.equals(c.getName())){
user = c.getValue();
break;
}
}
if(user == null) {
LOG.warn("Could not find "+WebAppProxyServlet.PROXY_USER_COOKIE_NAME
+" cookie, so user will not be set");
chain.doFilter(req, resp);
} else {
final AmIpPrincipal principal = new AmIpPrincipal(user);
ServletRequest requestWrapper = new AmIpServletRequestWrapper(httpReq,
principal);
chain.doFilter(requestWrapper, resp);
}
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy.amfilter;
import java.security.Principal;
public class AmIpPrincipal implements Principal {
private final String name;
public AmIpPrincipal(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy.amfilter;
import java.security.Principal;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
public class AmIpServletRequestWrapper extends HttpServletRequestWrapper {
private final AmIpPrincipal principal;
public AmIpServletRequestWrapper(HttpServletRequest request,
AmIpPrincipal principal) {
super(request);
this.principal = principal;
}
@Override
public Principal getUserPrincipal() {
return principal;
}
@Override
public String getRemoteUser() {
return principal.getName();
}
@Override
public boolean isUserInRole(String role) {
//No role info so far
return false;
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy;
import static org.junit.Assert.*;
import java.net.URI;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.Test;
public class TestProxyUriUtils {
public static class TestAppId extends ApplicationId {
private long timestamp;
private int id;
public TestAppId(int id, long timestamp) {
setId(id);
setClusterTimestamp(timestamp);
}
@Override
public int getId() {
return id;
}
@Override
public void setId(int id) {
this.id = id;
}
@Override
public long getClusterTimestamp() {
return timestamp;
}
@Override
public void setClusterTimestamp(long clusterTimestamp) {
this.timestamp = clusterTimestamp;
}
}
@Test
public void testGetPathApplicationId() {
assertEquals("/proxy/application_100_0001",
ProxyUriUtils.getPath(new TestAppId(1, 100l)));
assertEquals("/proxy/application_6384623_0005",
ProxyUriUtils.getPath(new TestAppId(5, 6384623l)));
}
@Test(expected = IllegalArgumentException.class)
public void testGetPathApplicationIdBad() {
ProxyUriUtils.getPath(null);
}
@Test
public void testGetPathApplicationIdString() {
assertEquals("/proxy/application_6384623_0005",
ProxyUriUtils.getPath(new TestAppId(5, 6384623l), null));
assertEquals("/proxy/application_6384623_0005/static/app",
ProxyUriUtils.getPath(new TestAppId(5, 6384623l), "/static/app"));
assertEquals("/proxy/application_6384623_0005/",
ProxyUriUtils.getPath(new TestAppId(5, 6384623l), "/"));
assertEquals("/proxy/application_6384623_0005/some/path",
ProxyUriUtils.getPath(new TestAppId(5, 6384623l), "some/path"));
}
@Test
public void testGetPathAndQuery() {
assertEquals("/proxy/application_6384623_0005/static/app?foo=bar",
ProxyUriUtils.getPathAndQuery(new TestAppId(5, 6384623l), "/static/app",
"?foo=bar", false));
assertEquals("/proxy/application_6384623_0005/static/app?foo=bar&bad=good&proxyapproved=true",
ProxyUriUtils.getPathAndQuery(new TestAppId(5, 6384623l), "/static/app",
"foo=bar&bad=good", true));
}
@Test
public void testGetProxyUri() throws Exception {
URI originalUri = new URI("http://host.com/static/foo?bar=bar");
URI proxyUri = new URI("http://proxy.net:8080/");
TestAppId id = new TestAppId(5, 6384623l);
URI expected = new URI("http://proxy.net:8080/proxy/application_6384623_0005/static/foo?bar=bar");
URI result = ProxyUriUtils.getProxyUri(originalUri, proxyUri, id);
assertEquals(expected, result);
}
}

View File

@ -37,6 +37,7 @@
<modules>
<module>hadoop-yarn-server-common</module>
<module>hadoop-yarn-server-nodemanager</module>
<module>hadoop-yarn-server-web-proxy</module>
<module>hadoop-yarn-server-resourcemanager</module>
<module>hadoop-yarn-server-tests</module>
</modules>

View File

@ -334,6 +334,11 @@
<artifactId>hadoop-yarn-server-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>