YARN-3975. WebAppProxyServlet should not redirect to RM page if AHS is enabled. Contributed by Mit Desai

This commit is contained in:
Jason Lowe 2015-09-23 16:38:55 +00:00
parent c890c51a91
commit 692d51c09d
5 changed files with 174 additions and 44 deletions

View File

@ -963,6 +963,9 @@ Release 2.7.2 - UNRELEASED
YARN-3433. Jersey tests failing with Port in Use -again. YARN-3433. Jersey tests failing with Port in Use -again.
(Brahma Reddy Battula) (Brahma Reddy Battula)
YARN-3975. WebAppProxyServlet should not redirect to RM page if AHS is
enabled (Mit Desai via jlowe)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -432,16 +432,11 @@ public class YarnClientImpl extends YarnClient {
.newRecord(GetApplicationReportRequest.class); .newRecord(GetApplicationReportRequest.class);
request.setApplicationId(appId); request.setApplicationId(appId);
response = rmClient.getApplicationReport(request); response = rmClient.getApplicationReport(request);
} catch (YarnException e) { } catch (ApplicationNotFoundException e) {
if (!historyServiceEnabled) { if (!historyServiceEnabled) {
// Just throw it as usual if historyService is not enabled. // Just throw it as usual if historyService is not enabled.
throw e; throw e;
} }
// Even if history-service is enabled, treat all exceptions still the same
// except the following
if (!(e.getClass() == ApplicationNotFoundException.class)) {
throw e;
}
return historyClient.getApplicationReport(appId); return historyClient.getApplicationReport(appId);
} }
return response.getApplicationReport(); return response.getApplicationReport();

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.AHSProxy; import org.apache.hadoop.yarn.client.AHSProxy;
@ -42,6 +41,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
* This class abstracts away how ApplicationReports are fetched. * This class abstracts away how ApplicationReports are fetched.
*/ */
public class AppReportFetcher { public class AppReportFetcher {
enum AppReportSource { RM, AHS }
private static final Log LOG = LogFactory.getLog(AppReportFetcher.class); private static final Log LOG = LogFactory.getLog(AppReportFetcher.class);
private final Configuration conf; private final Configuration conf;
private final ApplicationClientProtocol applicationsManager; private final ApplicationClientProtocol applicationsManager;
@ -115,28 +115,29 @@ public class AppReportFetcher {
* @throws YarnException on any error. * @throws YarnException on any error.
* @throws IOException * @throws IOException
*/ */
public ApplicationReport getApplicationReport(ApplicationId appId) public FetchedAppReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException { throws YarnException, IOException {
GetApplicationReportRequest request = recordFactory GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class); .newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId); request.setApplicationId(appId);
GetApplicationReportResponse response; ApplicationReport appReport;
FetchedAppReport fetchedAppReport;
try { try {
response = applicationsManager.getApplicationReport(request); appReport = applicationsManager.
} catch (YarnException e) { getApplicationReport(request).getApplicationReport();
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.RM);
} catch (ApplicationNotFoundException e) {
if (!isAHSEnabled) { if (!isAHSEnabled) {
// Just throw it as usual if historyService is not enabled. // Just throw it as usual if historyService is not enabled.
throw e; throw e;
} }
// Even if history-service is enabled, treat all exceptions still the same //Fetch the application report from AHS
// except the following appReport = historyManager.
if (!(e.getClass() == ApplicationNotFoundException.class)) { getApplicationReport(request).getApplicationReport();
throw e; fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.AHS);
} }
response = historyManager.getApplicationReport(request); return fetchedAppReport;
}
return response.getApplicationReport();
} }
public void stop() { public void stop() {
@ -147,4 +148,28 @@ public class AppReportFetcher {
RPC.stopProxy(this.historyManager); RPC.stopProxy(this.historyManager);
} }
} }
/*
* This class creates a bundle of the application report and the source from
* where the the report was fetched. This allows the WebAppProxyServlet
* to make decisions for the application report based on the source.
*/
static class FetchedAppReport {
private ApplicationReport appReport;
private AppReportSource appReportSource;
public FetchedAppReport(ApplicationReport appReport,
AppReportSource appReportSource) {
this.appReport = appReport;
this.appReportSource = appReportSource;
}
public AppReportSource getAppReportSource() {
return this.appReportSource;
}
public ApplicationReport getApplicationReport() {
return this.appReport;
}
}
} }

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher.AppReportSource;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher.FetchedAppReport;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.StringHelper; import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.util.TrackingUriPlugin; import org.apache.hadoop.yarn.util.TrackingUriPlugin;
@ -90,6 +92,7 @@ public class WebAppProxyServlet extends HttpServlet {
private transient List<TrackingUriPlugin> trackingUriPlugins; private transient List<TrackingUriPlugin> trackingUriPlugins;
private final String rmAppPageUrlBase; private final String rmAppPageUrlBase;
private final String ahsAppPageUrlBase;
private transient YarnConfiguration conf; private transient YarnConfiguration conf;
/** /**
@ -125,6 +128,9 @@ public class WebAppProxyServlet extends HttpServlet {
TrackingUriPlugin.class); TrackingUriPlugin.class);
this.rmAppPageUrlBase = StringHelper.pjoin( this.rmAppPageUrlBase = StringHelper.pjoin(
WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app"); WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app");
this.ahsAppPageUrlBase = StringHelper.pjoin(
WebAppUtils.getHttpSchemePrefix(conf) + WebAppUtils
.getAHSWebAppURLWithoutScheme(conf), "applicationhistory", "apps");
} }
/** /**
@ -266,7 +272,7 @@ public class WebAppProxyServlet extends HttpServlet {
return b != null ? b : false; return b != null ? b : false;
} }
private ApplicationReport getApplicationReport(ApplicationId id) private FetchedAppReport getApplicationReport(ApplicationId id)
throws IOException, YarnException { throws IOException, YarnException {
return ((AppReportFetcher) getServletContext() return ((AppReportFetcher) getServletContext()
.getAttribute(WebAppProxy.FETCHER_ATTRIBUTE)).getApplicationReport(id); .getAttribute(WebAppProxy.FETCHER_ATTRIBUTE)).getApplicationReport(id);
@ -345,9 +351,18 @@ public class WebAppProxyServlet extends HttpServlet {
boolean checkUser = securityEnabled && (!userWasWarned || !userApproved); boolean checkUser = securityEnabled && (!userWasWarned || !userApproved);
ApplicationReport applicationReport; FetchedAppReport fetchedAppReport = null;
ApplicationReport applicationReport = null;
try { try {
applicationReport = getApplicationReport(id); fetchedAppReport = getApplicationReport(id);
if (fetchedAppReport != null) {
if (fetchedAppReport.getAppReportSource() != AppReportSource.RM &&
fetchedAppReport.getAppReportSource() != AppReportSource.AHS) {
throw new UnsupportedOperationException("Application report not "
+ "fetched from RM or history server.");
}
applicationReport = fetchedAppReport.getApplicationReport();
}
} catch (ApplicationNotFoundException e) { } catch (ApplicationNotFoundException e) {
applicationReport = null; applicationReport = null;
} }
@ -363,16 +378,29 @@ public class WebAppProxyServlet extends HttpServlet {
return; return;
} }
notFound(resp, "Application " + appId + " could not be found, " + notFound(resp, "Application " + appId + " could not be found " +
"please try the history server"); "in RM or history server");
return; return;
} }
String original = applicationReport.getOriginalTrackingUrl(); String original = applicationReport.getOriginalTrackingUrl();
URI trackingUri; URI trackingUri;
if (original == null || original.equals("N/A") || original.equals("")) {
if (fetchedAppReport.getAppReportSource() == AppReportSource.RM) {
// fallback to ResourceManager's app page if no tracking URI provided // fallback to ResourceManager's app page if no tracking URI provided
if(original == null || original.equals("N/A")) { // and Application Report was fetched from RM
LOG.debug("Original tracking url is '{}'. Redirecting to RM app page",
original == null? "NULL" : original);
ProxyUtils.sendRedirect(req, resp, ProxyUtils.sendRedirect(req, resp,
StringHelper.pjoin(rmAppPageUrlBase, id.toString())); StringHelper.pjoin(rmAppPageUrlBase, id.toString()));
} else if (fetchedAppReport.getAppReportSource()
== AppReportSource.AHS) {
// fallback to Application History Server app page if the application
// report was fetched from AHS
LOG.debug("Original tracking url is '{}'. Redirecting to AHS app page"
, original == null? "NULL" : original);
ProxyUtils.sendRedirect(req, resp,
StringHelper.pjoin(ahsAppPageUrlBase, id.toString()));
}
return; return;
} else { } else {
if (ProxyUriUtils.getSchemeFromUrl(original).isEmpty()) { if (ProxyUriUtils.getSchemeFromUrl(original).isEmpty()) {

View File

@ -27,6 +27,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.ConnectException;
import java.net.HttpCookie; import java.net.HttpCookie;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URI; import java.net.URI;
@ -76,6 +77,7 @@ public class TestWebAppProxyServlet {
private static int numberOfHeaders = 0; private static int numberOfHeaders = 0;
private static final String UNKNOWN_HEADER = "Unknown-Header"; private static final String UNKNOWN_HEADER = "Unknown-Header";
private static boolean hasUnknownHeader = false; private static boolean hasUnknownHeader = false;
Configuration configuration = new Configuration();
/** /**
@ -137,8 +139,6 @@ public class TestWebAppProxyServlet {
@Test(timeout=5000) @Test(timeout=5000)
public void testWebAppProxyServlet() throws Exception { public void testWebAppProxyServlet() throws Exception {
Configuration configuration = new Configuration();
configuration.set(YarnConfiguration.PROXY_ADDRESS, "localhost:9090"); configuration.set(YarnConfiguration.PROXY_ADDRESS, "localhost:9090");
// overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS // overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS
configuration.setInt("hadoop.http.max.threads", 5); configuration.setInt("hadoop.http.max.threads", 5);
@ -166,6 +166,7 @@ public class TestWebAppProxyServlet {
proxyConn.connect(); proxyConn.connect();
assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR,
proxyConn.getResponseCode()); proxyConn.getResponseCode());
// set true Application ID in url // set true Application ID in url
URL url = new URL("http://localhost:" + proxyPort + "/proxy/application_00_0"); URL url = new URL("http://localhost:" + proxyPort + "/proxy/application_00_0");
proxyConn = (HttpURLConnection) url.openConnection(); proxyConn = (HttpURLConnection) url.openConnection();
@ -220,12 +221,69 @@ public class TestWebAppProxyServlet {
LOG.info("ProxyConn.getHeaderField(): " + proxyConn.getHeaderField(ProxyUtils.LOCATION)); LOG.info("ProxyConn.getHeaderField(): " + proxyConn.getHeaderField(ProxyUtils.LOCATION));
assertEquals("http://localhost:" + originalPort assertEquals("http://localhost:" + originalPort
+ "/foo/bar/test/tez?a=b&x=y&h=p#main", proxyConn.getURL().toString()); + "/foo/bar/test/tez?a=b&x=y&h=p#main", proxyConn.getURL().toString());
} finally { } finally {
proxy.close(); proxy.close();
} }
} }
@Test(timeout=5000)
public void testAppReportForEmptyTrackingUrl() throws Exception {
configuration.set(YarnConfiguration.PROXY_ADDRESS, "localhost:9090");
// overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS
configuration.setInt("hadoop.http.max.threads", 5);
WebAppProxyServerForTest proxy = new WebAppProxyServerForTest();
proxy.init(configuration);
proxy.start();
int proxyPort = proxy.proxy.proxyServer.getConnectorAddress(0).getPort();
AppReportFetcherForTest appReportFetcher = proxy.proxy.appReportFetcher;
try {
//set AHS_ENBALED = false to simulate getting the app report from RM
configuration.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
false);
ApplicationId app = ApplicationId.newInstance(0, 0);
appReportFetcher.answer = 6;
URL url = new URL("http://localhost:" + proxyPort +
"/proxy/" + app.toString());
HttpURLConnection proxyConn = (HttpURLConnection) url.openConnection();
proxyConn.connect();
try {
proxyConn.getResponseCode();
} catch (ConnectException e) {
// Connection Exception is expected as we have set
// appReportFetcher.answer = 6, which does not set anything for
// original tracking url field in the app report.
}
String appAddressInRm =
WebAppUtils.getResolvedRMWebAppURLWithScheme(configuration) +
"/cluster" + "/app/" + app.toString();
assertTrue("Webapp proxy servlet should have redirected to RM",
proxyConn.getURL().toString().equals(appAddressInRm));
//set AHS_ENBALED = true to simulate getting the app report from AHS
configuration.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
true);
proxyConn = (HttpURLConnection) url.openConnection();
proxyConn.connect();
try {
proxyConn.getResponseCode();
} catch (ConnectException e) {
// Connection Exception is expected as we have set
// appReportFetcher.answer = 6, which does not set anything for
// original tracking url field in the app report.
}
String appAddressInAhs = WebAppUtils.getHttpSchemePrefix(configuration) +
WebAppUtils.getAHSWebAppURLWithoutScheme(configuration) +
"/applicationhistory" + "/apps/" + app.toString();
assertTrue("Webapp proxy servlet should have redirected to AHS",
proxyConn.getURL().toString().equals(appAddressInAhs));
}
finally {
proxy.close();
}
}
@Test(timeout=5000) @Test(timeout=5000)
public void testWebAppProxyPassThroughHeaders() throws Exception { public void testWebAppProxyPassThroughHeaders() throws Exception {
Configuration configuration = new Configuration(); Configuration configuration = new Configuration();
@ -398,49 +456,70 @@ public class TestWebAppProxyServlet {
} }
private class AppReportFetcherForTest extends AppReportFetcher { private class AppReportFetcherForTest extends AppReportFetcher {
int answer = 0; int answer = 0;
public AppReportFetcherForTest(Configuration conf) { public AppReportFetcherForTest(Configuration conf) {
super(conf); super(conf);
} }
public ApplicationReport getApplicationReport(ApplicationId appId) public FetchedAppReport getApplicationReport(ApplicationId appId)
throws YarnException { throws YarnException {
if (answer == 0) { if (answer == 0) {
return getDefaultApplicationReport(appId); return getDefaultApplicationReport(appId);
} else if (answer == 1) { } else if (answer == 1) {
return null; return null;
} else if (answer == 2) { } else if (answer == 2) {
ApplicationReport result = getDefaultApplicationReport(appId); FetchedAppReport result = getDefaultApplicationReport(appId);
result.setUser("user"); result.getApplicationReport().setUser("user");
return result; return result;
} else if (answer == 3) { } else if (answer == 3) {
ApplicationReport result = getDefaultApplicationReport(appId); FetchedAppReport result = getDefaultApplicationReport(appId);
result.setYarnApplicationState(YarnApplicationState.KILLED); result.getApplicationReport().
setYarnApplicationState(YarnApplicationState.KILLED);
return result; return result;
} else if (answer == 4) { } else if (answer == 4) {
throw new ApplicationNotFoundException("Application is not found"); throw new ApplicationNotFoundException("Application is not found");
} else if (answer == 5) { } else if (answer == 5) {
// test user-provided path and query parameter can be appended to the // test user-provided path and query parameter can be appended to the
// original tracking url // original tracking url
ApplicationReport result = getDefaultApplicationReport(appId); FetchedAppReport result = getDefaultApplicationReport(appId);
result.setOriginalTrackingUrl("localhost:" + originalPort result.getApplicationReport().setOriginalTrackingUrl("localhost:"
+ "/foo/bar?a=b#main"); + originalPort + "/foo/bar?a=b#main");
result.setYarnApplicationState(YarnApplicationState.FINISHED); result.getApplicationReport().
setYarnApplicationState(YarnApplicationState.FINISHED);
return result; return result;
} else if (answer == 6) {
return getDefaultApplicationReport(appId, false);
} }
return null; return null;
} }
private ApplicationReport getDefaultApplicationReport(ApplicationId appId) { /*
* If this method is called with isTrackingUrl=false, no tracking url
* will set in the app report. Hence, there will be a connection exception
* when the prxyCon tries to connect.
*/
private FetchedAppReport getDefaultApplicationReport(ApplicationId appId,
boolean isTrackingUrl) {
FetchedAppReport fetchedReport;
ApplicationReport result = new ApplicationReportPBImpl(); ApplicationReport result = new ApplicationReportPBImpl();
result.setApplicationId(appId); result.setApplicationId(appId);
result.setOriginalTrackingUrl("localhost:" + originalPort + "/foo/bar");
result.setYarnApplicationState(YarnApplicationState.RUNNING); result.setYarnApplicationState(YarnApplicationState.RUNNING);
result.setUser(CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER); result.setUser(CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER);
return result; if (isTrackingUrl) {
result.setOriginalTrackingUrl("localhost:" + originalPort + "/foo/bar");
}
if(configuration.getBoolean(YarnConfiguration.
APPLICATION_HISTORY_ENABLED, false)) {
fetchedReport = new FetchedAppReport(result, AppReportSource.AHS);
} else {
fetchedReport = new FetchedAppReport(result, AppReportSource.RM);
}
return fetchedReport;
} }
private FetchedAppReport getDefaultApplicationReport(ApplicationId appId) {
return getDefaultApplicationReport(appId, true);
}
} }
} }