YARN-3975. WebAppProxyServlet should not redirect to RM page if AHS is enabled. Contributed by Mit Desai
(cherry picked from commit 692d51c09d
)
This commit is contained in:
parent
c829be5133
commit
b7111449a1
|
@ -911,6 +911,9 @@ Release 2.7.2 - UNRELEASED
|
|||
YARN-3433. Jersey tests failing with Port in Use -again.
|
||||
(Brahma Reddy Battula)
|
||||
|
||||
YARN-3975. WebAppProxyServlet should not redirect to RM page if AHS is
|
||||
enabled (Mit Desai via jlowe)
|
||||
|
||||
Release 2.7.1 - 2015-07-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -432,16 +432,11 @@ public class YarnClientImpl extends YarnClient {
|
|||
.newRecord(GetApplicationReportRequest.class);
|
||||
request.setApplicationId(appId);
|
||||
response = rmClient.getApplicationReport(request);
|
||||
} catch (YarnException e) {
|
||||
} catch (ApplicationNotFoundException e) {
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (!(e.getClass() == ApplicationNotFoundException.class)) {
|
||||
throw e;
|
||||
}
|
||||
return historyClient.getApplicationReport(appId);
|
||||
}
|
||||
return response.getApplicationReport();
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.hadoop.ipc.RPC;
|
|||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.client.AHSProxy;
|
||||
|
@ -42,6 +41,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|||
* This class abstracts away how ApplicationReports are fetched.
|
||||
*/
|
||||
public class AppReportFetcher {
|
||||
enum AppReportSource { RM, AHS }
|
||||
private static final Log LOG = LogFactory.getLog(AppReportFetcher.class);
|
||||
private final Configuration conf;
|
||||
private final ApplicationClientProtocol applicationsManager;
|
||||
|
@ -115,28 +115,29 @@ public class AppReportFetcher {
|
|||
* @throws YarnException on any error.
|
||||
* @throws IOException
|
||||
*/
|
||||
public ApplicationReport getApplicationReport(ApplicationId appId)
|
||||
public FetchedAppReport getApplicationReport(ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
GetApplicationReportRequest request = recordFactory
|
||||
.newRecordInstance(GetApplicationReportRequest.class);
|
||||
request.setApplicationId(appId);
|
||||
|
||||
GetApplicationReportResponse response;
|
||||
ApplicationReport appReport;
|
||||
FetchedAppReport fetchedAppReport;
|
||||
try {
|
||||
response = applicationsManager.getApplicationReport(request);
|
||||
} catch (YarnException e) {
|
||||
appReport = applicationsManager.
|
||||
getApplicationReport(request).getApplicationReport();
|
||||
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.RM);
|
||||
} catch (ApplicationNotFoundException e) {
|
||||
if (!isAHSEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (!(e.getClass() == ApplicationNotFoundException.class)) {
|
||||
throw e;
|
||||
}
|
||||
response = historyManager.getApplicationReport(request);
|
||||
//Fetch the application report from AHS
|
||||
appReport = historyManager.
|
||||
getApplicationReport(request).getApplicationReport();
|
||||
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.AHS);
|
||||
}
|
||||
return response.getApplicationReport();
|
||||
return fetchedAppReport;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
@ -147,4 +148,28 @@ public class AppReportFetcher {
|
|||
RPC.stopProxy(this.historyManager);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This class creates a bundle of the application report and the source from
|
||||
* where the the report was fetched. This allows the WebAppProxyServlet
|
||||
* to make decisions for the application report based on the source.
|
||||
*/
|
||||
static class FetchedAppReport {
|
||||
private ApplicationReport appReport;
|
||||
private AppReportSource appReportSource;
|
||||
|
||||
public FetchedAppReport(ApplicationReport appReport,
|
||||
AppReportSource appReportSource) {
|
||||
this.appReport = appReport;
|
||||
this.appReportSource = appReportSource;
|
||||
}
|
||||
|
||||
public AppReportSource getAppReportSource() {
|
||||
return this.appReportSource;
|
||||
}
|
||||
|
||||
public ApplicationReport getApplicationReport() {
|
||||
return this.appReport;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher.AppReportSource;
|
||||
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher.FetchedAppReport;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
import org.apache.hadoop.yarn.util.StringHelper;
|
||||
import org.apache.hadoop.yarn.util.TrackingUriPlugin;
|
||||
|
@ -90,6 +92,7 @@ public class WebAppProxyServlet extends HttpServlet {
|
|||
|
||||
private transient List<TrackingUriPlugin> trackingUriPlugins;
|
||||
private final String rmAppPageUrlBase;
|
||||
private final String ahsAppPageUrlBase;
|
||||
private transient YarnConfiguration conf;
|
||||
|
||||
/**
|
||||
|
@ -125,6 +128,9 @@ public class WebAppProxyServlet extends HttpServlet {
|
|||
TrackingUriPlugin.class);
|
||||
this.rmAppPageUrlBase = StringHelper.pjoin(
|
||||
WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app");
|
||||
this.ahsAppPageUrlBase = StringHelper.pjoin(
|
||||
WebAppUtils.getHttpSchemePrefix(conf) + WebAppUtils
|
||||
.getAHSWebAppURLWithoutScheme(conf), "applicationhistory", "apps");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -266,7 +272,7 @@ public class WebAppProxyServlet extends HttpServlet {
|
|||
return b != null ? b : false;
|
||||
}
|
||||
|
||||
private ApplicationReport getApplicationReport(ApplicationId id)
|
||||
private FetchedAppReport getApplicationReport(ApplicationId id)
|
||||
throws IOException, YarnException {
|
||||
return ((AppReportFetcher) getServletContext()
|
||||
.getAttribute(WebAppProxy.FETCHER_ATTRIBUTE)).getApplicationReport(id);
|
||||
|
@ -345,9 +351,18 @@ public class WebAppProxyServlet extends HttpServlet {
|
|||
|
||||
boolean checkUser = securityEnabled && (!userWasWarned || !userApproved);
|
||||
|
||||
ApplicationReport applicationReport;
|
||||
FetchedAppReport fetchedAppReport = null;
|
||||
ApplicationReport applicationReport = null;
|
||||
try {
|
||||
applicationReport = getApplicationReport(id);
|
||||
fetchedAppReport = getApplicationReport(id);
|
||||
if (fetchedAppReport != null) {
|
||||
if (fetchedAppReport.getAppReportSource() != AppReportSource.RM &&
|
||||
fetchedAppReport.getAppReportSource() != AppReportSource.AHS) {
|
||||
throw new UnsupportedOperationException("Application report not "
|
||||
+ "fetched from RM or history server.");
|
||||
}
|
||||
applicationReport = fetchedAppReport.getApplicationReport();
|
||||
}
|
||||
} catch (ApplicationNotFoundException e) {
|
||||
applicationReport = null;
|
||||
}
|
||||
|
@ -363,16 +378,29 @@ public class WebAppProxyServlet extends HttpServlet {
|
|||
return;
|
||||
}
|
||||
|
||||
notFound(resp, "Application " + appId + " could not be found, " +
|
||||
"please try the history server");
|
||||
notFound(resp, "Application " + appId + " could not be found " +
|
||||
"in RM or history server");
|
||||
return;
|
||||
}
|
||||
String original = applicationReport.getOriginalTrackingUrl();
|
||||
URI trackingUri;
|
||||
// fallback to ResourceManager's app page if no tracking URI provided
|
||||
if(original == null || original.equals("N/A")) {
|
||||
ProxyUtils.sendRedirect(req, resp,
|
||||
StringHelper.pjoin(rmAppPageUrlBase, id.toString()));
|
||||
if (original == null || original.equals("N/A") || original.equals("")) {
|
||||
if (fetchedAppReport.getAppReportSource() == AppReportSource.RM) {
|
||||
// fallback to ResourceManager's app page if no tracking URI provided
|
||||
// and Application Report was fetched from RM
|
||||
LOG.debug("Original tracking url is '{}'. Redirecting to RM app page",
|
||||
original == null? "NULL" : original);
|
||||
ProxyUtils.sendRedirect(req, resp,
|
||||
StringHelper.pjoin(rmAppPageUrlBase, id.toString()));
|
||||
} else if (fetchedAppReport.getAppReportSource()
|
||||
== AppReportSource.AHS) {
|
||||
// fallback to Application History Server app page if the application
|
||||
// report was fetched from AHS
|
||||
LOG.debug("Original tracking url is '{}'. Redirecting to AHS app page"
|
||||
, original == null? "NULL" : original);
|
||||
ProxyUtils.sendRedirect(req, resp,
|
||||
StringHelper.pjoin(ahsAppPageUrlBase, id.toString()));
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
if (ProxyUriUtils.getSchemeFromUrl(original).isEmpty()) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.ConnectException;
|
||||
import java.net.HttpCookie;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
|
@ -76,6 +77,7 @@ public class TestWebAppProxyServlet {
|
|||
private static int numberOfHeaders = 0;
|
||||
private static final String UNKNOWN_HEADER = "Unknown-Header";
|
||||
private static boolean hasUnknownHeader = false;
|
||||
Configuration configuration = new Configuration();
|
||||
|
||||
|
||||
/**
|
||||
|
@ -137,8 +139,6 @@ public class TestWebAppProxyServlet {
|
|||
|
||||
@Test(timeout=5000)
|
||||
public void testWebAppProxyServlet() throws Exception {
|
||||
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.set(YarnConfiguration.PROXY_ADDRESS, "localhost:9090");
|
||||
// overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS
|
||||
configuration.setInt("hadoop.http.max.threads", 5);
|
||||
|
@ -166,6 +166,7 @@ public class TestWebAppProxyServlet {
|
|||
proxyConn.connect();
|
||||
assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR,
|
||||
proxyConn.getResponseCode());
|
||||
|
||||
// set true Application ID in url
|
||||
URL url = new URL("http://localhost:" + proxyPort + "/proxy/application_00_0");
|
||||
proxyConn = (HttpURLConnection) url.openConnection();
|
||||
|
@ -220,12 +221,69 @@ public class TestWebAppProxyServlet {
|
|||
LOG.info("ProxyConn.getHeaderField(): " + proxyConn.getHeaderField(ProxyUtils.LOCATION));
|
||||
assertEquals("http://localhost:" + originalPort
|
||||
+ "/foo/bar/test/tez?a=b&x=y&h=p#main", proxyConn.getURL().toString());
|
||||
|
||||
} finally {
|
||||
proxy.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=5000)
|
||||
public void testAppReportForEmptyTrackingUrl() throws Exception {
|
||||
configuration.set(YarnConfiguration.PROXY_ADDRESS, "localhost:9090");
|
||||
// overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS
|
||||
configuration.setInt("hadoop.http.max.threads", 5);
|
||||
WebAppProxyServerForTest proxy = new WebAppProxyServerForTest();
|
||||
proxy.init(configuration);
|
||||
proxy.start();
|
||||
|
||||
int proxyPort = proxy.proxy.proxyServer.getConnectorAddress(0).getPort();
|
||||
AppReportFetcherForTest appReportFetcher = proxy.proxy.appReportFetcher;
|
||||
|
||||
try {
|
||||
//set AHS_ENBALED = false to simulate getting the app report from RM
|
||||
configuration.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
||||
false);
|
||||
ApplicationId app = ApplicationId.newInstance(0, 0);
|
||||
appReportFetcher.answer = 6;
|
||||
URL url = new URL("http://localhost:" + proxyPort +
|
||||
"/proxy/" + app.toString());
|
||||
HttpURLConnection proxyConn = (HttpURLConnection) url.openConnection();
|
||||
proxyConn.connect();
|
||||
try {
|
||||
proxyConn.getResponseCode();
|
||||
} catch (ConnectException e) {
|
||||
// Connection Exception is expected as we have set
|
||||
// appReportFetcher.answer = 6, which does not set anything for
|
||||
// original tracking url field in the app report.
|
||||
}
|
||||
String appAddressInRm =
|
||||
WebAppUtils.getResolvedRMWebAppURLWithScheme(configuration) +
|
||||
"/cluster" + "/app/" + app.toString();
|
||||
assertTrue("Webapp proxy servlet should have redirected to RM",
|
||||
proxyConn.getURL().toString().equals(appAddressInRm));
|
||||
|
||||
//set AHS_ENBALED = true to simulate getting the app report from AHS
|
||||
configuration.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
||||
true);
|
||||
proxyConn = (HttpURLConnection) url.openConnection();
|
||||
proxyConn.connect();
|
||||
try {
|
||||
proxyConn.getResponseCode();
|
||||
} catch (ConnectException e) {
|
||||
// Connection Exception is expected as we have set
|
||||
// appReportFetcher.answer = 6, which does not set anything for
|
||||
// original tracking url field in the app report.
|
||||
}
|
||||
String appAddressInAhs = WebAppUtils.getHttpSchemePrefix(configuration) +
|
||||
WebAppUtils.getAHSWebAppURLWithoutScheme(configuration) +
|
||||
"/applicationhistory" + "/apps/" + app.toString();
|
||||
assertTrue("Webapp proxy servlet should have redirected to AHS",
|
||||
proxyConn.getURL().toString().equals(appAddressInAhs));
|
||||
}
|
||||
finally {
|
||||
proxy.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=5000)
|
||||
public void testWebAppProxyPassThroughHeaders() throws Exception {
|
||||
Configuration configuration = new Configuration();
|
||||
|
@ -398,49 +456,70 @@ public class TestWebAppProxyServlet {
|
|||
}
|
||||
|
||||
private class AppReportFetcherForTest extends AppReportFetcher {
|
||||
|
||||
int answer = 0;
|
||||
|
||||
public AppReportFetcherForTest(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
public ApplicationReport getApplicationReport(ApplicationId appId)
|
||||
public FetchedAppReport getApplicationReport(ApplicationId appId)
|
||||
throws YarnException {
|
||||
if (answer == 0) {
|
||||
return getDefaultApplicationReport(appId);
|
||||
} else if (answer == 1) {
|
||||
return null;
|
||||
} else if (answer == 2) {
|
||||
ApplicationReport result = getDefaultApplicationReport(appId);
|
||||
result.setUser("user");
|
||||
FetchedAppReport result = getDefaultApplicationReport(appId);
|
||||
result.getApplicationReport().setUser("user");
|
||||
return result;
|
||||
} else if (answer == 3) {
|
||||
ApplicationReport result = getDefaultApplicationReport(appId);
|
||||
result.setYarnApplicationState(YarnApplicationState.KILLED);
|
||||
FetchedAppReport result = getDefaultApplicationReport(appId);
|
||||
result.getApplicationReport().
|
||||
setYarnApplicationState(YarnApplicationState.KILLED);
|
||||
return result;
|
||||
} else if (answer == 4) {
|
||||
throw new ApplicationNotFoundException("Application is not found");
|
||||
} else if (answer == 5) {
|
||||
// test user-provided path and query parameter can be appended to the
|
||||
// original tracking url
|
||||
ApplicationReport result = getDefaultApplicationReport(appId);
|
||||
result.setOriginalTrackingUrl("localhost:" + originalPort
|
||||
+ "/foo/bar?a=b#main");
|
||||
result.setYarnApplicationState(YarnApplicationState.FINISHED);
|
||||
FetchedAppReport result = getDefaultApplicationReport(appId);
|
||||
result.getApplicationReport().setOriginalTrackingUrl("localhost:"
|
||||
+ originalPort + "/foo/bar?a=b#main");
|
||||
result.getApplicationReport().
|
||||
setYarnApplicationState(YarnApplicationState.FINISHED);
|
||||
return result;
|
||||
} else if (answer == 6) {
|
||||
return getDefaultApplicationReport(appId, false);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private ApplicationReport getDefaultApplicationReport(ApplicationId appId) {
|
||||
/*
|
||||
* If this method is called with isTrackingUrl=false, no tracking url
|
||||
* will set in the app report. Hence, there will be a connection exception
|
||||
* when the prxyCon tries to connect.
|
||||
*/
|
||||
private FetchedAppReport getDefaultApplicationReport(ApplicationId appId,
|
||||
boolean isTrackingUrl) {
|
||||
FetchedAppReport fetchedReport;
|
||||
ApplicationReport result = new ApplicationReportPBImpl();
|
||||
result.setApplicationId(appId);
|
||||
result.setOriginalTrackingUrl("localhost:" + originalPort + "/foo/bar");
|
||||
result.setYarnApplicationState(YarnApplicationState.RUNNING);
|
||||
result.setUser(CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER);
|
||||
return result;
|
||||
if (isTrackingUrl) {
|
||||
result.setOriginalTrackingUrl("localhost:" + originalPort + "/foo/bar");
|
||||
}
|
||||
if(configuration.getBoolean(YarnConfiguration.
|
||||
APPLICATION_HISTORY_ENABLED, false)) {
|
||||
fetchedReport = new FetchedAppReport(result, AppReportSource.AHS);
|
||||
} else {
|
||||
fetchedReport = new FetchedAppReport(result, AppReportSource.RM);
|
||||
}
|
||||
return fetchedReport;
|
||||
}
|
||||
|
||||
private FetchedAppReport getDefaultApplicationReport(ApplicationId appId) {
|
||||
return getDefaultApplicationReport(appId, true);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue