YARN-7276. Federation Router Web Service fixes. Contributed by Inigo Goiri.

(cherry picked from commit 8180ab436a19b8e253c3b6c4f392daa32680e187)
This commit is contained in:
Inigo Goiri 2017-11-01 13:21:15 -07:00
parent efa708f546
commit d6a5f8a06d
9 changed files with 1019 additions and 989 deletions

View File

@ -85,6 +85,12 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -74,6 +75,8 @@ public class Router extends CompositeService {
*/ */
public static final int SHUTDOWN_HOOK_PRIORITY = 30; public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final String METRICS_NAME = "Router";
public Router() { public Router() {
super(Router.class.getName()); super(Router.class.getName());
} }
@ -95,6 +98,8 @@ protected void serviceInit(Configuration config) throws Exception {
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.ROUTER_BIND_HOST, YarnConfiguration.ROUTER_BIND_HOST,
WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf)); WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf));
// Metrics
DefaultMetricsSystem.initialize(METRICS_NAME);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -118,6 +123,7 @@ protected void serviceStop() throws Exception {
return; return;
} }
super.serviceStop(); super.serviceStop();
DefaultMetricsSystem.shutdown();
} }
protected void shutDown() { protected void shutDown() {

View File

@ -129,7 +129,9 @@ public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
public NodesInfo getNodes(String states) { public NodesInfo getNodes(String states) {
// states will be part of additionalParam // states will be part of additionalParam
Map<String, String[]> additionalParam = new HashMap<String, String[]>(); Map<String, String[]> additionalParam = new HashMap<String, String[]>();
additionalParam.put(RMWSConsts.STATES, new String[] {states}); if (states != null && !states.isEmpty()) {
additionalParam.put(RMWSConsts.STATES, new String[] {states});
}
return RouterWebServiceUtil.genericForward(webAppAddress, null, return RouterWebServiceUtil.genericForward(webAppAddress, null,
NodesInfo.class, HTTPMethods.GET, NodesInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, null, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, null,
@ -226,9 +228,11 @@ public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels) public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
throws IOException { throws IOException {
// labels will be part of additionalParam // labels will be part of additionalParam
Map<String, String[]> additionalParam = new HashMap<String, String[]>(); Map<String, String[]> additionalParam = new HashMap<>();
additionalParam.put(RMWSConsts.LABELS, if (labels != null && !labels.isEmpty()) {
labels.toArray(new String[labels.size()])); additionalParam.put(RMWSConsts.LABELS,
labels.toArray(new String[labels.size()]));
}
return RouterWebServiceUtil.genericForward(webAppAddress, null, return RouterWebServiceUtil.genericForward(webAppAddress, null,
LabelsToNodesInfo.class, HTTPMethods.GET, LabelsToNodesInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, null, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, null,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.router.webapp; package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException; import java.io.IOException;
import java.security.Principal;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -26,12 +27,15 @@
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
@ -48,6 +52,7 @@
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -121,29 +126,33 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
@Override @Override
public void init(String user) { public void init(String user) {
federationFacade = FederationStateStoreFacade.getInstance(); federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random(System.currentTimeMillis()); rand = new Random();
final Configuration conf = this.getConf(); final Configuration conf = this.getConf();
try { try {
policyFacade = new RouterPolicyFacade(conf, federationFacade, SubClusterResolver subClusterResolver =
this.federationFacade.getSubClusterResolver(), null); this.federationFacade.getSubClusterResolver();
policyFacade = new RouterPolicyFacade(
conf, federationFacade, subClusterResolver, null);
} catch (FederationPolicyInitializationException e) { } catch (FederationPolicyInitializationException e) {
LOG.error(e.getMessage()); throw new YarnRuntimeException(e);
} }
numSubmitRetries = numSubmitRetries = conf.getInt(
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>(); interceptors = new HashMap<>();
routerMetrics = RouterMetrics.getMetrics(); routerMetrics = RouterMetrics.getMetrics();
threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder() threadpool = HadoopExecutors.newCachedThreadPool(
.setNameFormat("FederationInterceptorREST #%d").build()); new ThreadFactoryBuilder()
.setNameFormat("FederationInterceptorREST #%d")
.build());
returnPartialReport = returnPartialReport = conf.getBoolean(
conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
} }
private SubClusterId getRandomActiveSubCluster( private SubClusterId getRandomActiveSubCluster(
@ -156,8 +165,8 @@ private SubClusterId getRandomActiveSubCluster(
} }
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet()); List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
FederationPolicyUtils.validateSubClusterAvailability(list, FederationPolicyUtils.validateSubClusterAvailability(
blackListSubClusters); list, blackListSubClusters);
if (blackListSubClusters != null) { if (blackListSubClusters != null) {
@ -176,8 +185,9 @@ protected DefaultRequestInterceptorREST getInterceptorForSubCluster(
if (interceptors.containsKey(subClusterId)) { if (interceptors.containsKey(subClusterId)) {
return interceptors.get(subClusterId); return interceptors.get(subClusterId);
} else { } else {
LOG.error("The interceptor for SubCluster " + subClusterId LOG.error(
+ " does not exist in the cache."); "The interceptor for SubCluster {} does not exist in the cache.",
subClusterId);
return null; return null;
} }
} }
@ -187,9 +197,9 @@ private DefaultRequestInterceptorREST createInterceptorForSubCluster(
final Configuration conf = this.getConf(); final Configuration conf = this.getConf();
String interceptorClassName = String interceptorClassName = conf.get(
conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
DefaultRequestInterceptorREST interceptorInstance = null; DefaultRequestInterceptorREST interceptorInstance = null;
try { try {
Class<?> interceptorClass = conf.getClassByName(interceptorClassName); Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
@ -210,7 +220,7 @@ private DefaultRequestInterceptorREST createInterceptorForSubCluster(
e); e);
} }
interceptorInstance.setWebAppAddress(webAppAddress); interceptorInstance.setWebAppAddress("http://" + webAppAddress);
interceptorInstance.setSubClusterId(subClusterId); interceptorInstance.setSubClusterId(subClusterId);
interceptors.put(subClusterId, interceptorInstance); interceptors.put(subClusterId, interceptorInstance);
return interceptorInstance; return interceptorInstance;
@ -272,8 +282,7 @@ public Response createNewApplication(HttpServletRequest hsr)
.entity(e.getLocalizedMessage()).build(); .entity(e.getLocalizedMessage()).build();
} }
LOG.debug( LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
"getNewApplication try #" + i + " on SubCluster " + subClusterId);
DefaultRequestInterceptorREST interceptor = DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(subClusterId, getOrCreateInterceptorForSubCluster(subClusterId,
@ -282,11 +291,12 @@ public Response createNewApplication(HttpServletRequest hsr)
try { try {
response = interceptor.createNewApplication(hsr); response = interceptor.createNewApplication(hsr);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unable to create a new ApplicationId in SubCluster " LOG.warn("Unable to create a new ApplicationId in SubCluster {}",
+ subClusterId.getId(), e); subClusterId.getId(), e);
} }
if (response != null && response.getStatus() == 200) { if (response != null &&
response.getStatus() == HttpServletResponse.SC_OK) {
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppsCreated(stopTime - startTime); routerMetrics.succeededAppsCreated(stopTime - startTime);
@ -302,7 +312,10 @@ public Response createNewApplication(HttpServletRequest hsr)
String errMsg = "Fail to create a new application."; String errMsg = "Fail to create a new application.";
LOG.error(errMsg); LOG.error(errMsg);
routerMetrics.incrAppsFailedCreated(); routerMetrics.incrAppsFailedCreated();
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build(); return Response
.status(Status.INTERNAL_SERVER_ERROR)
.entity(errMsg)
.build();
} }
/** /**
@ -381,7 +394,10 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Missing ApplicationSubmissionContextInfo or " String errMsg = "Missing ApplicationSubmissionContextInfo or "
+ "applicationSubmissionContex information."; + "applicationSubmissionContex information.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); return Response
.status(Status.BAD_REQUEST)
.entity(errMsg)
.build();
} }
ApplicationId applicationId = null; ApplicationId applicationId = null;
@ -389,7 +405,9 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
applicationId = ApplicationId.fromString(newApp.getApplicationId()); applicationId = ApplicationId.fromString(newApp.getApplicationId());
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) return Response
.status(Status.BAD_REQUEST)
.entity(e.getLocalizedMessage())
.build(); .build();
} }
@ -405,11 +423,13 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
subClusterId = policyFacade.getHomeSubcluster(context, blacklist); subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
} catch (YarnException e) { } catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE) return Response
.entity(e.getLocalizedMessage()).build(); .status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage())
.build();
} }
LOG.info("submitApplication appId" + applicationId + " try #" + i LOG.info("submitApplication appId {} try #{} on SubCluster {}",
+ " on SubCluster " + subClusterId); applicationId, i, subClusterId);
ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
@ -424,8 +444,10 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Unable to insert the ApplicationId " + applicationId String errMsg = "Unable to insert the ApplicationId " + applicationId
+ " into the FederationStateStore"; + " into the FederationStateStore";
return Response.status(Status.SERVICE_UNAVAILABLE) return Response
.entity(errMsg + " " + e.getLocalizedMessage()).build(); .status(Status.SERVICE_UNAVAILABLE)
.entity(errMsg + " " + e.getLocalizedMessage())
.build();
} }
} else { } else {
try { try {
@ -441,15 +463,19 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
federationFacade.getApplicationHomeSubCluster(applicationId); federationFacade.getApplicationHomeSubCluster(applicationId);
} catch (YarnException e1) { } catch (YarnException e1) {
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE) return Response
.entity(e1.getLocalizedMessage()).build(); .status(Status.SERVICE_UNAVAILABLE)
.entity(e1.getLocalizedMessage())
.build();
} }
if (subClusterId == subClusterIdInStateStore) { if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application " + applicationId LOG.info("Application {} already submitted on SubCluster {}",
+ " already submitted on SubCluster " + subClusterId); applicationId, subClusterId);
} else { } else {
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg) return Response
.status(Status.SERVICE_UNAVAILABLE)
.entity(errMsg)
.build(); .build();
} }
} }
@ -460,8 +486,10 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
subClusterInfo = federationFacade.getSubCluster(subClusterId); subClusterInfo = federationFacade.getSubCluster(subClusterId);
} catch (YarnException e) { } catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE) return Response
.entity(e.getLocalizedMessage()).build(); .status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage())
.build();
} }
Response response = null; Response response = null;
@ -470,13 +498,14 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp, subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
hsr); hsr);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unable to submit the application " + applicationId LOG.warn("Unable to submit the application {} to SubCluster {}",
+ "to SubCluster " + subClusterId.getId(), e); applicationId, subClusterId.getId(), e);
} }
if (response != null && response.getStatus() == 202) { if (response != null &&
LOG.info("Application " + context.getApplicationName() + " with appId " response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
+ applicationId + " submitted on " + subClusterId); LOG.info("Application {} with appId {} submitted on {}",
context.getApplicationName(), applicationId, subClusterId);
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppsSubmitted(stopTime - startTime); routerMetrics.succeededAppsSubmitted(stopTime - startTime);
@ -493,7 +522,10 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
String errMsg = "Application " + newApp.getApplicationName() String errMsg = "Application " + newApp.getApplicationName()
+ " with appId " + applicationId + " failed to be submitted."; + " with appId " + applicationId + " failed to be submitted.";
LOG.error(errMsg); LOG.error(errMsg);
return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build(); return Response
.status(Status.SERVICE_UNAVAILABLE)
.entity(errMsg)
.build();
} }
/** /**
@ -541,9 +573,10 @@ public AppInfo getApp(HttpServletRequest hsr, String appId,
return null; return null;
} }
AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId, DefaultRequestInterceptorREST interceptor =
subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId, getOrCreateInterceptorForSubCluster(
unselectedFields); subClusterId, subClusterInfo.getRMWebServiceAddress());
AppInfo response = interceptor.getApp(hsr, appId, unselectedFields);
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppsRetrieved(stopTime - startTime); routerMetrics.succeededAppsRetrieved(stopTime - startTime);
@ -579,7 +612,9 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr,
applicationId = ApplicationId.fromString(appId); applicationId = ApplicationId.fromString(appId);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
routerMetrics.incrAppsFailedKilled(); routerMetrics.incrAppsFailedKilled();
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) return Response
.status(Status.BAD_REQUEST)
.entity(e.getLocalizedMessage())
.build(); .build();
} }
@ -591,7 +626,9 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr,
subClusterInfo = federationFacade.getSubCluster(subClusterId); subClusterInfo = federationFacade.getSubCluster(subClusterId);
} catch (YarnException e) { } catch (YarnException e) {
routerMetrics.incrAppsFailedKilled(); routerMetrics.incrAppsFailedKilled();
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) return Response
.status(Status.BAD_REQUEST)
.entity(e.getLocalizedMessage())
.build(); .build();
} }
@ -644,26 +681,28 @@ public AppsInfo getApps(final HttpServletRequest hsr, final String stateQuery,
} }
// Send the requests in parallel // Send the requests in parallel
CompletionService<AppsInfo> compSvc =
ExecutorCompletionService<AppsInfo> compSvc = new ExecutorCompletionService<>(this.threadpool);
new ExecutorCompletionService<AppsInfo>(this.threadpool);
for (final SubClusterInfo info : subClustersActive.values()) { for (final SubClusterInfo info : subClustersActive.values()) {
// HttpServletRequest does not work with ExecutorCompletionService.
// Create a duplicate hsr.
final HttpServletRequest hsrCopy = clone(hsr);
compSvc.submit(new Callable<AppsInfo>() { compSvc.submit(new Callable<AppsInfo>() {
@Override @Override
public AppsInfo call() { public AppsInfo call() {
DefaultRequestInterceptorREST interceptor = DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(info.getSubClusterId(), getOrCreateInterceptorForSubCluster(
info.getClientRMServiceAddress()); info.getSubClusterId(), info.getRMWebServiceAddress());
AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery, AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery,
finalStatusQuery, userQuery, queueQuery, count, startedBegin, statesQuery, finalStatusQuery, userQuery, queueQuery, count,
startedEnd, finishBegin, finishEnd, applicationTypes, startedBegin, startedEnd, finishBegin, finishEnd,
applicationTags, unselectedFields); applicationTypes, applicationTags, unselectedFields);
if (rmApps == null) { if (rmApps == null) {
routerMetrics.incrMultipleAppsFailedRetrieved(); routerMetrics.incrMultipleAppsFailedRetrieved();
LOG.error("Subcluster " + info.getSubClusterId() LOG.error("Subcluster {} failed to return appReport.",
+ " failed to return appReport."); info.getSubClusterId());
return null; return null;
} }
return rmApps; return rmApps;
@ -672,8 +711,7 @@ public AppsInfo call() {
} }
// Collect all the responses in parallel // Collect all the responses in parallel
for (int i = 0; i < subClustersActive.size(); i++) {
for (int i = 0; i < subClustersActive.values().size(); i++) {
try { try {
Future<AppsInfo> future = compSvc.take(); Future<AppsInfo> future = compSvc.take();
AppsInfo appsResponse = future.get(); AppsInfo appsResponse = future.get();
@ -686,7 +724,7 @@ public AppsInfo call() {
} }
} catch (Throwable e) { } catch (Throwable e) {
routerMetrics.incrMultipleAppsFailedRetrieved(); routerMetrics.incrMultipleAppsFailedRetrieved();
LOG.warn("Failed to get application report ", e); LOG.warn("Failed to get application report", e);
} }
} }
@ -695,9 +733,42 @@ public AppsInfo call() {
} }
// Merge all the application reports got from all the available Yarn RMs // Merge all the application reports got from all the available Yarn RMs
return RouterWebServiceUtil.mergeAppsInfo(
apps.getApps(), returnPartialReport);
}
return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), /**
returnPartialReport); * Get a copy of a HTTP request. This is for thread safety.
* @param hsr HTTP servlet request to copy.
* @return Copy of the HTTP request.
*/
private HttpServletRequestWrapper clone(final HttpServletRequest hsr) {
if (hsr == null) {
return null;
}
return new HttpServletRequestWrapper(hsr) {
@SuppressWarnings("unchecked")
public Map<String, String[]> getParameterMap() {
return (Map<String, String[]>) hsr.getParameterMap();
}
public String getPathInfo() {
return hsr.getPathInfo();
}
public String getRemoteUser() {
return hsr.getRemoteUser();
}
public Principal getUserPrincipal() {
return hsr.getUserPrincipal();
}
public String getHeader(String value) {
// we override only Accept
if (value.equals(HttpHeaders.ACCEPT)) {
return RouterWebServiceUtil.getMediaTypeFromHttpServletRequest(
hsr, AppsInfo.class);
}
return null;
}
};
} }
/** /**
@ -731,8 +802,7 @@ public NodeInfo getNode(final String nodeId) {
} }
// Send the requests in parallel // Send the requests in parallel
CompletionService<NodeInfo> compSvc =
ExecutorCompletionService<NodeInfo> compSvc =
new ExecutorCompletionService<NodeInfo>(this.threadpool); new ExecutorCompletionService<NodeInfo>(this.threadpool);
for (final SubClusterInfo info : subClustersActive.values()) { for (final SubClusterInfo info : subClustersActive.values()) {
@ -740,14 +810,14 @@ public NodeInfo getNode(final String nodeId) {
@Override @Override
public NodeInfo call() { public NodeInfo call() {
DefaultRequestInterceptorREST interceptor = DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(info.getSubClusterId(), getOrCreateInterceptorForSubCluster(
info.getClientRMServiceAddress()); info.getSubClusterId(), info.getRMWebServiceAddress());
try { try {
NodeInfo nodeInfo = interceptor.getNode(nodeId); NodeInfo nodeInfo = interceptor.getNode(nodeId);
return nodeInfo; return nodeInfo;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Subcluster " + info.getSubClusterId() LOG.error("Subcluster {} failed to return nodeInfo.",
+ " failed to return nodeInfo."); info.getSubClusterId());
return null; return null;
} }
} }
@ -756,7 +826,7 @@ public NodeInfo call() {
// Collect all the responses in parallel // Collect all the responses in parallel
NodeInfo nodeInfo = null; NodeInfo nodeInfo = null;
for (int i = 0; i < subClustersActive.values().size(); i++) { for (int i = 0; i < subClustersActive.size(); i++) {
try { try {
Future<NodeInfo> future = compSvc.take(); Future<NodeInfo> future = compSvc.take();
NodeInfo nodeResponse = future.get(); NodeInfo nodeResponse = future.get();
@ -765,8 +835,8 @@ public NodeInfo call() {
if (nodeResponse != null) { if (nodeResponse != null) {
// Check if the node was already found in a different SubCluster and // Check if the node was already found in a different SubCluster and
// it has an old health report // it has an old health report
if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse if (nodeInfo == null || nodeInfo.getLastHealthUpdate() <
.getLastHealthUpdate()) { nodeResponse.getLastHealthUpdate()) {
nodeInfo = nodeResponse; nodeInfo = nodeResponse;
} }
} }
@ -808,13 +878,12 @@ public NodesInfo getNodes(final String states) {
try { try {
subClustersActive = federationFacade.getSubClusters(true); subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) { } catch (YarnException e) {
LOG.error(e.getMessage()); LOG.error("Cannot get nodes: {}", e.getMessage());
return new NodesInfo(); return new NodesInfo();
} }
// Send the requests in parallel // Send the requests in parallel
CompletionService<NodesInfo> compSvc =
ExecutorCompletionService<NodesInfo> compSvc =
new ExecutorCompletionService<NodesInfo>(this.threadpool); new ExecutorCompletionService<NodesInfo>(this.threadpool);
for (final SubClusterInfo info : subClustersActive.values()) { for (final SubClusterInfo info : subClustersActive.values()) {
@ -822,14 +891,14 @@ public NodesInfo getNodes(final String states) {
@Override @Override
public NodesInfo call() { public NodesInfo call() {
DefaultRequestInterceptorREST interceptor = DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(info.getSubClusterId(), getOrCreateInterceptorForSubCluster(
info.getClientRMServiceAddress()); info.getSubClusterId(), info.getRMWebServiceAddress());
try { try {
NodesInfo nodesInfo = interceptor.getNodes(states); NodesInfo nodesInfo = interceptor.getNodes(states);
return nodesInfo; return nodesInfo;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Subcluster " + info.getSubClusterId() LOG.error("Subcluster {} failed to return nodesInfo.",
+ " failed to return nodesInfo."); info.getSubClusterId());
return null; return null;
} }
} }
@ -838,7 +907,7 @@ public NodesInfo call() {
// Collect all the responses in parallel // Collect all the responses in parallel
for (int i = 0; i < subClustersActive.values().size(); i++) { for (int i = 0; i < subClustersActive.size(); i++) {
try { try {
Future<NodesInfo> future = compSvc.take(); Future<NodesInfo> future = compSvc.take();
NodesInfo nodesResponse = future.get(); NodesInfo nodesResponse = future.get();
@ -872,8 +941,7 @@ public ClusterMetricsInfo getClusterMetricsInfo() {
} }
// Send the requests in parallel // Send the requests in parallel
CompletionService<ClusterMetricsInfo> compSvc =
ExecutorCompletionService<ClusterMetricsInfo> compSvc =
new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool); new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool);
for (final SubClusterInfo info : subClustersActive.values()) { for (final SubClusterInfo info : subClustersActive.values()) {
@ -881,14 +949,14 @@ public ClusterMetricsInfo getClusterMetricsInfo() {
@Override @Override
public ClusterMetricsInfo call() { public ClusterMetricsInfo call() {
DefaultRequestInterceptorREST interceptor = DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(info.getSubClusterId(), getOrCreateInterceptorForSubCluster(
info.getClientRMServiceAddress()); info.getSubClusterId(), info.getRMWebServiceAddress());
try { try {
ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo(); ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo();
return metrics; return metrics;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Subcluster " + info.getSubClusterId() LOG.error("Subcluster {} failed to return Cluster Metrics.",
+ " failed to return Cluster Metrics."); info.getSubClusterId());
return null; return null;
} }
} }
@ -897,7 +965,7 @@ public ClusterMetricsInfo call() {
// Collect all the responses in parallel // Collect all the responses in parallel
for (int i = 0; i < subClustersActive.values().size(); i++) { for (int i = 0; i < subClustersActive.size(); i++) {
try { try {
Future<ClusterMetricsInfo> future = compSvc.take(); Future<ClusterMetricsInfo> future = compSvc.take();
ClusterMetricsInfo metricsResponse = future.get(); ClusterMetricsInfo metricsResponse = future.get();

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.router.webapp; package org.apache.hadoop.yarn.server.router.webapp;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
@ -28,13 +31,12 @@
import java.util.Map.Entry; import java.util.Map.Entry;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
@ -47,6 +49,8 @@
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.jersey.api.ConflictException; import com.sun.jersey.api.ConflictException;
import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.Client;
@ -62,8 +66,8 @@ public final class RouterWebServiceUtil {
private static String user = "YarnRouter"; private static String user = "YarnRouter";
private static final Log LOG = private static final Logger LOG =
LogFactory.getLog(RouterWebServiceUtil.class.getName()); LoggerFactory.getLogger(RouterWebServiceUtil.class.getName());
private final static String PARTIAL_REPORT = "Partial Report "; private final static String PARTIAL_REPORT = "Partial Report ";
@ -85,9 +89,10 @@ private RouterWebServiceUtil() {
* call in case the call has no servlet request * call in case the call has no servlet request
* @return the retrieved entity from the REST call * @return the retrieved entity from the REST call
*/ */
protected static <T> T genericForward(final String webApp, protected static <T> T genericForward(
final HttpServletRequest hsr, final Class<T> returnType, final String webApp, final HttpServletRequest hsr,
final HTTPMethods method, final String targetPath, final Object formParam, final Class<T> returnType, final HTTPMethods method,
final String targetPath, final Object formParam,
final Map<String, String[]> additionalParam) { final Map<String, String[]> additionalParam) {
UserGroupInformation callerUGI = null; UserGroupInformation callerUGI = null;
@ -122,14 +127,22 @@ public T run() {
ClientResponse response = RouterWebServiceUtil.invokeRMWebService( ClientResponse response = RouterWebServiceUtil.invokeRMWebService(
webApp, targetPath, method, webApp, targetPath, method,
(hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam); (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam,
getMediaTypeFromHttpServletRequest(hsr, returnType));
if (Response.class.equals(returnType)) { if (Response.class.equals(returnType)) {
return (T) RouterWebServiceUtil.clientResponseToResponse(response); return (T) RouterWebServiceUtil.clientResponseToResponse(response);
} }
// YARN RM can answer with Status.OK or it throws an exception // YARN RM can answer with Status.OK or it throws an exception
if (response.getStatus() == 200) { if (response.getStatus() == SC_OK) {
return response.getEntity(returnType); return response.getEntity(returnType);
} }
if (response.getStatus() == SC_NO_CONTENT) {
try {
return returnType.getConstructor().newInstance();
} catch (RuntimeException | ReflectiveOperationException e) {
LOG.error("Cannot create empty entity for {}", returnType, e);
}
}
RouterWebServiceUtil.retrieveException(response); RouterWebServiceUtil.retrieveException(response);
return null; return null;
} }
@ -148,7 +161,7 @@ public T run() {
*/ */
private static ClientResponse invokeRMWebService(String webApp, String path, private static ClientResponse invokeRMWebService(String webApp, String path,
HTTPMethods method, String additionalPath, HTTPMethods method, String additionalPath,
Map<String, String[]> queryParams, Object formParam) { Map<String, String[]> queryParams, Object formParam, String mediaType) {
Client client = Client.create(); Client client = Client.create();
WebResource webResource = client.resource(webApp).path(path); WebResource webResource = client.resource(webApp).path(path);
@ -169,14 +182,12 @@ private static ClientResponse invokeRMWebService(String webApp, String path,
webResource = webResource.queryParams(paramMap); webResource = webResource.queryParams(paramMap);
} }
// I can forward the call in JSON or XML since the Router will convert it
// again in Object before send it back to the client
Builder builder = null; Builder builder = null;
if (formParam != null) { if (formParam != null) {
builder = webResource.entity(formParam, MediaType.APPLICATION_XML); builder = webResource.entity(formParam, mediaType);
builder = builder.accept(MediaType.APPLICATION_XML); builder = builder.accept(mediaType);
} else { } else {
builder = webResource.accept(MediaType.APPLICATION_XML); builder = webResource.accept(mediaType);
} }
ClientResponse response = null; ClientResponse response = null;
@ -429,4 +440,25 @@ public static void mergeMetrics(ClusterMetricsInfo metrics,
+ metricsResponse.getShutdownNodes()); + metricsResponse.getShutdownNodes());
} }
/**
* Extract from HttpServletRequest the MediaType in output.
*/
protected static <T> String getMediaTypeFromHttpServletRequest(
HttpServletRequest request, final Class<T> returnType) {
if (request == null) {
// By default we return XML for REST call without HttpServletRequest
return MediaType.APPLICATION_XML;
}
// TODO
if (!returnType.equals(Response.class)) {
return MediaType.APPLICATION_XML;
}
String header = request.getHeader(HttpHeaders.ACCEPT);
if (header == null || header.equals("*")) {
// By default we return JSON
return MediaType.APPLICATION_JSON;
}
return header;
}
} }

View File

@ -157,12 +157,19 @@ private void init() {
} }
@VisibleForTesting @VisibleForTesting
protected RequestInterceptorChainWrapper getInterceptorChain() { protected RequestInterceptorChainWrapper getInterceptorChain(
final HttpServletRequest hsr) {
String user = ""; String user = "";
if (hsr != null) {
user = hsr.getRemoteUser();
}
try { try {
user = UserGroupInformation.getCurrentUser().getUserName(); if (user == null || user.equals("")) {
// Yarn Router user
user = UserGroupInformation.getCurrentUser().getUserName();
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("IOException " + e.getMessage()); LOG.error("Cannot get user: {}", e.getMessage());
} }
if (!userPipelineMap.containsKey(user)) { if (!userPipelineMap.containsKey(user)) {
initializePipeline(user); initializePipeline(user);
@ -313,7 +320,7 @@ public ClusterInfo get() {
@Override @Override
public ClusterInfo getClusterInfo() { public ClusterInfo getClusterInfo() {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
return pipeline.getRootInterceptor().getClusterInfo(); return pipeline.getRootInterceptor().getClusterInfo();
} }
@ -323,7 +330,7 @@ public ClusterInfo getClusterInfo() {
@Override @Override
public ClusterMetricsInfo getClusterMetricsInfo() { public ClusterMetricsInfo getClusterMetricsInfo() {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
return pipeline.getRootInterceptor().getClusterMetricsInfo(); return pipeline.getRootInterceptor().getClusterMetricsInfo();
} }
@ -333,7 +340,7 @@ public ClusterMetricsInfo getClusterMetricsInfo() {
@Override @Override
public SchedulerTypeInfo getSchedulerInfo() { public SchedulerTypeInfo getSchedulerInfo() {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
return pipeline.getRootInterceptor().getSchedulerInfo(); return pipeline.getRootInterceptor().getSchedulerInfo();
} }
@ -344,7 +351,7 @@ public SchedulerTypeInfo getSchedulerInfo() {
public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time, public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time,
@Context HttpServletRequest hsr) throws IOException { @Context HttpServletRequest hsr) throws IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().dumpSchedulerLogs(time, hsr); return pipeline.getRootInterceptor().dumpSchedulerLogs(time, hsr);
} }
@ -354,7 +361,7 @@ public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time,
@Override @Override
public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) { public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
return pipeline.getRootInterceptor().getNodes(states); return pipeline.getRootInterceptor().getNodes(states);
} }
@ -364,7 +371,7 @@ public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) {
@Override @Override
public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) { public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
return pipeline.getRootInterceptor().getNode(nodeId); return pipeline.getRootInterceptor().getNode(nodeId);
} }
@ -387,7 +394,7 @@ public AppsInfo getApps(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.APPLICATION_TAGS) Set<String> applicationTags, @QueryParam(RMWSConsts.APPLICATION_TAGS) Set<String> applicationTags,
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) { @QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getApps(hsr, stateQuery, statesQuery, return pipeline.getRootInterceptor().getApps(hsr, stateQuery, statesQuery,
finalStatusQuery, userQuery, queueQuery, count, startedBegin, finalStatusQuery, userQuery, queueQuery, count, startedBegin,
startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags, startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags,
@ -401,7 +408,7 @@ public AppsInfo getApps(@Context HttpServletRequest hsr,
public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.NODEID) String nodeId) { @QueryParam(RMWSConsts.NODEID) String nodeId) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getActivities(hsr, nodeId); return pipeline.getRootInterceptor().getActivities(hsr, nodeId);
} }
@ -413,7 +420,7 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.APP_ID) String appId, @QueryParam(RMWSConsts.APP_ID) String appId,
@QueryParam(RMWSConsts.MAX_TIME) String time) { @QueryParam(RMWSConsts.MAX_TIME) String time) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time); return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time);
} }
@ -426,7 +433,7 @@ public ApplicationStatisticsInfo getAppStatistics(
@QueryParam(RMWSConsts.STATES) Set<String> stateQueries, @QueryParam(RMWSConsts.STATES) Set<String> stateQueries,
@QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> typeQueries) { @QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> typeQueries) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppStatistics(hsr, stateQueries, return pipeline.getRootInterceptor().getAppStatistics(hsr, stateQueries,
typeQueries); typeQueries);
} }
@ -439,7 +446,7 @@ public AppInfo getApp(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPID) String appId,
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) { @QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getApp(hsr, appId, unselectedFields); return pipeline.getRootInterceptor().getApp(hsr, appId, unselectedFields);
} }
@ -450,7 +457,7 @@ public AppInfo getApp(@Context HttpServletRequest hsr,
public AppState getAppState(@Context HttpServletRequest hsr, public AppState getAppState(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppState(hsr, appId); return pipeline.getRootInterceptor().getAppState(hsr, appId);
} }
@ -463,7 +470,7 @@ public Response updateAppState(AppState targetState,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException { YarnException, InterruptedException, IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().updateAppState(targetState, hsr, return pipeline.getRootInterceptor().updateAppState(targetState, hsr,
appId); appId);
} }
@ -475,7 +482,7 @@ public Response updateAppState(AppState targetState,
public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr) public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr)
throws IOException { throws IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getNodeToLabels(hsr); return pipeline.getRootInterceptor().getNodeToLabels(hsr);
} }
@ -486,7 +493,7 @@ public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr)
public LabelsToNodesInfo getLabelsToNodes( public LabelsToNodesInfo getLabelsToNodes(
@QueryParam(RMWSConsts.LABELS) Set<String> labels) throws IOException { @QueryParam(RMWSConsts.LABELS) Set<String> labels) throws IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
return pipeline.getRootInterceptor().getLabelsToNodes(labels); return pipeline.getRootInterceptor().getLabelsToNodes(labels);
} }
@ -498,7 +505,7 @@ public Response replaceLabelsOnNodes(
final NodeToLabelsEntryList newNodeToLabels, final NodeToLabelsEntryList newNodeToLabels,
@Context HttpServletRequest hsr) throws Exception { @Context HttpServletRequest hsr) throws Exception {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().replaceLabelsOnNodes(newNodeToLabels, return pipeline.getRootInterceptor().replaceLabelsOnNodes(newNodeToLabels,
hsr); hsr);
} }
@ -512,7 +519,7 @@ public Response replaceLabelsOnNode(
@Context HttpServletRequest hsr, @Context HttpServletRequest hsr,
@PathParam(RMWSConsts.NODEID) String nodeId) throws Exception { @PathParam(RMWSConsts.NODEID) String nodeId) throws Exception {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().replaceLabelsOnNode(newNodeLabelsName, return pipeline.getRootInterceptor().replaceLabelsOnNode(newNodeLabelsName,
hsr, nodeId); hsr, nodeId);
} }
@ -524,7 +531,7 @@ public Response replaceLabelsOnNode(
public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr) public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr)
throws IOException { throws IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getClusterNodeLabels(hsr); return pipeline.getRootInterceptor().getClusterNodeLabels(hsr);
} }
@ -535,7 +542,7 @@ public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr)
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
@Context HttpServletRequest hsr) throws Exception { @Context HttpServletRequest hsr) throws Exception {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().addToClusterNodeLabels(newNodeLabels, return pipeline.getRootInterceptor().addToClusterNodeLabels(newNodeLabels,
hsr); hsr);
} }
@ -548,7 +555,7 @@ public Response removeFromCluserNodeLabels(
@QueryParam(RMWSConsts.LABELS) Set<String> oldNodeLabels, @QueryParam(RMWSConsts.LABELS) Set<String> oldNodeLabels,
@Context HttpServletRequest hsr) throws Exception { @Context HttpServletRequest hsr) throws Exception {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor() return pipeline.getRootInterceptor()
.removeFromCluserNodeLabels(oldNodeLabels, hsr); .removeFromCluserNodeLabels(oldNodeLabels, hsr);
} }
@ -560,7 +567,7 @@ public Response removeFromCluserNodeLabels(
public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr, public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.NODEID) String nodeId) throws IOException { @PathParam(RMWSConsts.NODEID) String nodeId) throws IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getLabelsOnNode(hsr, nodeId); return pipeline.getRootInterceptor().getLabelsOnNode(hsr, nodeId);
} }
@ -571,7 +578,7 @@ public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr,
public AppPriority getAppPriority(@Context HttpServletRequest hsr, public AppPriority getAppPriority(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppPriority(hsr, appId); return pipeline.getRootInterceptor().getAppPriority(hsr, appId);
} }
@ -584,7 +591,7 @@ public Response updateApplicationPriority(AppPriority targetPriority,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException { YarnException, InterruptedException, IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor() return pipeline.getRootInterceptor()
.updateApplicationPriority(targetPriority, hsr, appId); .updateApplicationPriority(targetPriority, hsr, appId);
} }
@ -596,7 +603,7 @@ public Response updateApplicationPriority(AppPriority targetPriority,
public AppQueue getAppQueue(@Context HttpServletRequest hsr, public AppQueue getAppQueue(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppQueue(hsr, appId); return pipeline.getRootInterceptor().getAppQueue(hsr, appId);
} }
@ -609,7 +616,7 @@ public Response updateAppQueue(AppQueue targetQueue,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException { YarnException, InterruptedException, IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().updateAppQueue(targetQueue, hsr, return pipeline.getRootInterceptor().updateAppQueue(targetQueue, hsr,
appId); appId);
} }
@ -621,7 +628,7 @@ public Response updateAppQueue(AppQueue targetQueue,
public Response createNewApplication(@Context HttpServletRequest hsr) public Response createNewApplication(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().createNewApplication(hsr); return pipeline.getRootInterceptor().createNewApplication(hsr);
} }
@ -633,7 +640,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
@Context HttpServletRequest hsr) @Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().submitApplication(newApp, hsr); return pipeline.getRootInterceptor().submitApplication(newApp, hsr);
} }
@ -645,7 +652,7 @@ public Response postDelegationToken(DelegationToken tokenData,
@Context HttpServletRequest hsr) throws AuthorizationException, @Context HttpServletRequest hsr) throws AuthorizationException,
IOException, InterruptedException, Exception { IOException, InterruptedException, Exception {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().postDelegationToken(tokenData, hsr); return pipeline.getRootInterceptor().postDelegationToken(tokenData, hsr);
} }
@ -656,7 +663,7 @@ public Response postDelegationToken(DelegationToken tokenData,
public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr) public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, Exception { throws AuthorizationException, IOException, Exception {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().postDelegationTokenExpiration(hsr); return pipeline.getRootInterceptor().postDelegationTokenExpiration(hsr);
} }
@ -668,7 +675,7 @@ public Response cancelDelegationToken(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException, throws AuthorizationException, IOException, InterruptedException,
Exception { Exception {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().cancelDelegationToken(hsr); return pipeline.getRootInterceptor().cancelDelegationToken(hsr);
} }
@ -679,7 +686,7 @@ public Response cancelDelegationToken(@Context HttpServletRequest hsr)
public Response createNewReservation(@Context HttpServletRequest hsr) public Response createNewReservation(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().createNewReservation(hsr); return pipeline.getRootInterceptor().createNewReservation(hsr);
} }
@ -691,7 +698,7 @@ public Response submitReservation(ReservationSubmissionRequestInfo resContext,
@Context HttpServletRequest hsr) @Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().submitReservation(resContext, hsr); return pipeline.getRootInterceptor().submitReservation(resContext, hsr);
} }
@ -703,7 +710,7 @@ public Response updateReservation(ReservationUpdateRequestInfo resContext,
@Context HttpServletRequest hsr) @Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().updateReservation(resContext, hsr); return pipeline.getRootInterceptor().updateReservation(resContext, hsr);
} }
@ -715,7 +722,7 @@ public Response deleteReservation(ReservationDeleteRequestInfo resContext,
@Context HttpServletRequest hsr) @Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().deleteReservation(resContext, hsr); return pipeline.getRootInterceptor().deleteReservation(resContext, hsr);
} }
@ -731,7 +738,7 @@ public Response listReservation(
@QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations, @QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations,
@Context HttpServletRequest hsr) throws Exception { @Context HttpServletRequest hsr) throws Exception {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().listReservation(queue, reservationId, return pipeline.getRootInterceptor().listReservation(queue, reservationId,
startTime, endTime, includeResourceAllocations, hsr); startTime, endTime, includeResourceAllocations, hsr);
} }
@ -744,7 +751,7 @@ public AppTimeoutInfo getAppTimeout(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException { @PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppTimeout(hsr, appId, type); return pipeline.getRootInterceptor().getAppTimeout(hsr, appId, type);
} }
@ -755,7 +762,7 @@ public AppTimeoutInfo getAppTimeout(@Context HttpServletRequest hsr,
public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr, public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppTimeouts(hsr, appId); return pipeline.getRootInterceptor().getAppTimeouts(hsr, appId);
} }
@ -768,7 +775,7 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException { YarnException, InterruptedException, IOException {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().updateApplicationTimeout(appTimeout, return pipeline.getRootInterceptor().updateApplicationTimeout(appTimeout,
hsr, appId); hsr, appId);
} }
@ -780,7 +787,7 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr, public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) { @PathParam(RMWSConsts.APPID) String appId) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppAttempts(hsr, appId); return pipeline.getRootInterceptor().getAppAttempts(hsr, appId);
} }
@ -792,7 +799,7 @@ public org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo getAppAttempt(
@PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(req);
return pipeline.getRootInterceptor().getAppAttempt(req, res, appId, return pipeline.getRootInterceptor().getAppAttempt(req, res, appId,
appAttemptId); appAttemptId);
} }
@ -805,7 +812,7 @@ public ContainersInfo getContainers(@Context HttpServletRequest req,
@PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(req);
return pipeline.getRootInterceptor().getContainers(req, res, appId, return pipeline.getRootInterceptor().getContainers(req, res, appId,
appAttemptId); appAttemptId);
} }
@ -819,7 +826,7 @@ public ContainerInfo getContainer(@Context HttpServletRequest req,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId,
@PathParam(RMWSConsts.CONTAINERID) String containerId) { @PathParam(RMWSConsts.CONTAINERID) String containerId) {
init(); init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(req);
return pipeline.getRootInterceptor().getContainer(req, res, appId, return pipeline.getRootInterceptor().getContainer(req, res, appId,
appAttemptId, containerId); appAttemptId, containerId);
} }

View File

@ -20,15 +20,15 @@
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -128,487 +128,263 @@ protected RouterWebServices getRouterWebServices() {
protected ClusterInfo get(String user) protected ClusterInfo get(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) // HSR is not used here
.doAs(new PrivilegedExceptionAction<ClusterInfo>() { return routerWebService.get();
@Override
public ClusterInfo run() throws Exception {
return routerWebService.get();
}
});
} }
protected ClusterInfo getClusterInfo(String user) protected ClusterInfo getClusterInfo(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) // HSR is not used here
.doAs(new PrivilegedExceptionAction<ClusterInfo>() { return routerWebService.getClusterInfo();
@Override
public ClusterInfo run() throws Exception {
return routerWebService.getClusterInfo();
}
});
} }
protected ClusterMetricsInfo getClusterMetricsInfo(String user) protected ClusterMetricsInfo getClusterMetricsInfo(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) // HSR is not used here
.doAs(new PrivilegedExceptionAction<ClusterMetricsInfo>() { return routerWebService.getClusterMetricsInfo();
@Override
public ClusterMetricsInfo run() throws Exception {
return routerWebService.getClusterMetricsInfo();
}
});
} }
protected SchedulerTypeInfo getSchedulerInfo(String user) protected SchedulerTypeInfo getSchedulerInfo(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) // HSR is not used here
.doAs(new PrivilegedExceptionAction<SchedulerTypeInfo>() { return routerWebService.getSchedulerInfo();
@Override
public SchedulerTypeInfo run() throws Exception {
return routerWebService.getSchedulerInfo();
}
});
} }
protected String dumpSchedulerLogs(String user) protected String dumpSchedulerLogs(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.dumpSchedulerLogs(null,
.doAs(new PrivilegedExceptionAction<String>() { createHttpServletRequest(user));
@Override
public String run() throws Exception {
return routerWebService.dumpSchedulerLogs(null, null);
}
});
} }
protected NodesInfo getNodes(String user) protected NodesInfo getNodes(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getNodes(null);
.doAs(new PrivilegedExceptionAction<NodesInfo>() {
@Override
public NodesInfo run() throws Exception {
return routerWebService.getNodes(null);
}
});
} }
protected NodeInfo getNode(String user) protected NodeInfo getNode(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getNode(null);
.doAs(new PrivilegedExceptionAction<NodeInfo>() {
@Override
public NodeInfo run() throws Exception {
return routerWebService.getNode(null);
}
});
} }
protected AppsInfo getApps(String user) protected AppsInfo getApps(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getApps(createHttpServletRequest(user), null, null,
.doAs(new PrivilegedExceptionAction<AppsInfo>() { null, null, null, null, null, null, null, null, null, null, null);
@Override
public AppsInfo run() throws Exception {
return routerWebService.getApps(null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
}
});
} }
protected ActivitiesInfo getActivities(String user) protected ActivitiesInfo getActivities(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getActivities(
.doAs(new PrivilegedExceptionAction<ActivitiesInfo>() { createHttpServletRequest(user), null);
@Override
public ActivitiesInfo run() throws Exception {
return routerWebService.getActivities(null, null);
}
});
} }
protected AppActivitiesInfo getAppActivities(String user) protected AppActivitiesInfo getAppActivities(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppActivities(
.doAs(new PrivilegedExceptionAction<AppActivitiesInfo>() { createHttpServletRequest(user), null, null);
@Override
public AppActivitiesInfo run() throws Exception {
return routerWebService.getAppActivities(null, null, null);
}
});
} }
protected ApplicationStatisticsInfo getAppStatistics(String user) protected ApplicationStatisticsInfo getAppStatistics(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppStatistics(
.doAs(new PrivilegedExceptionAction<ApplicationStatisticsInfo>() { createHttpServletRequest(user), null, null);
@Override
public ApplicationStatisticsInfo run() throws Exception {
return routerWebService.getAppStatistics(null, null, null);
}
});
} }
protected AppInfo getApp(String user) protected AppInfo getApp(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getApp(createHttpServletRequest(user), null, null);
.doAs(new PrivilegedExceptionAction<AppInfo>() {
@Override
public AppInfo run() throws Exception {
return routerWebService.getApp(null, null, null);
}
});
} }
protected AppState getAppState(String user) protected AppState getAppState(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppState(createHttpServletRequest(user), null);
.doAs(new PrivilegedExceptionAction<AppState>() {
@Override
public AppState run() throws Exception {
return routerWebService.getAppState(null, null);
}
});
} }
protected Response updateAppState(String user) throws AuthorizationException, protected Response updateAppState(String user) throws AuthorizationException,
YarnException, InterruptedException, IOException { YarnException, InterruptedException, IOException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.updateAppState(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user), null);
@Override
public Response run() throws Exception {
return routerWebService.updateAppState(null, null, null);
}
});
} }
protected NodeToLabelsInfo getNodeToLabels(String user) protected NodeToLabelsInfo getNodeToLabels(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getNodeToLabels(createHttpServletRequest(user));
.doAs(new PrivilegedExceptionAction<NodeToLabelsInfo>() {
@Override
public NodeToLabelsInfo run() throws Exception {
return routerWebService.getNodeToLabels(null);
}
});
} }
protected LabelsToNodesInfo getLabelsToNodes(String user) protected LabelsToNodesInfo getLabelsToNodes(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getLabelsToNodes(null);
.doAs(new PrivilegedExceptionAction<LabelsToNodesInfo>() {
@Override
public LabelsToNodesInfo run() throws Exception {
return routerWebService.getLabelsToNodes(null);
}
});
} }
protected Response replaceLabelsOnNodes(String user) throws Exception { protected Response replaceLabelsOnNodes(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user) return routerWebService.replaceLabelsOnNodes(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.replaceLabelsOnNodes(null, null);
}
});
} }
protected Response replaceLabelsOnNode(String user) throws Exception { protected Response replaceLabelsOnNode(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user) return routerWebService.replaceLabelsOnNode(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user), null);
@Override
public Response run() throws Exception {
return routerWebService.replaceLabelsOnNode(null, null, null);
}
});
} }
protected NodeLabelsInfo getClusterNodeLabels(String user) protected NodeLabelsInfo getClusterNodeLabels(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getClusterNodeLabels(
.doAs(new PrivilegedExceptionAction<NodeLabelsInfo>() { createHttpServletRequest(user));
@Override
public NodeLabelsInfo run() throws Exception {
return routerWebService.getClusterNodeLabels(null);
}
});
} }
protected Response addToClusterNodeLabels(String user) throws Exception { protected Response addToClusterNodeLabels(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user) return routerWebService.addToClusterNodeLabels(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.addToClusterNodeLabels(null, null);
}
});
} }
protected Response removeFromCluserNodeLabels(String user) throws Exception { protected Response removeFromCluserNodeLabels(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user) return routerWebService.removeFromCluserNodeLabels(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.removeFromCluserNodeLabels(null, null);
}
});
} }
protected NodeLabelsInfo getLabelsOnNode(String user) protected NodeLabelsInfo getLabelsOnNode(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getLabelsOnNode(
.doAs(new PrivilegedExceptionAction<NodeLabelsInfo>() { createHttpServletRequest(user), null);
@Override
public NodeLabelsInfo run() throws Exception {
return routerWebService.getLabelsOnNode(null, null);
}
});
} }
protected AppPriority getAppPriority(String user) protected AppPriority getAppPriority(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppPriority(
.doAs(new PrivilegedExceptionAction<AppPriority>() { createHttpServletRequest(user), null);
@Override
public AppPriority run() throws Exception {
return routerWebService.getAppPriority(null, null);
}
});
} }
protected Response updateApplicationPriority(String user) protected Response updateApplicationPriority(String user)
throws AuthorizationException, YarnException, InterruptedException, throws AuthorizationException, YarnException, InterruptedException,
IOException { IOException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.updateApplicationPriority(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user), null);
@Override
public Response run() throws Exception {
return routerWebService.updateApplicationPriority(null, null, null);
}
});
} }
protected AppQueue getAppQueue(String user) protected AppQueue getAppQueue(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppQueue(createHttpServletRequest(user), null);
.doAs(new PrivilegedExceptionAction<AppQueue>() {
@Override
public AppQueue run() throws Exception {
return routerWebService.getAppQueue(null, null);
}
});
} }
protected Response updateAppQueue(String user) throws AuthorizationException, protected Response updateAppQueue(String user) throws AuthorizationException,
YarnException, InterruptedException, IOException { YarnException, InterruptedException, IOException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.updateAppQueue(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user), null);
@Override
public Response run() throws Exception {
return routerWebService.updateAppQueue(null, null, null);
}
});
} }
protected Response createNewApplication(String user) protected Response createNewApplication(String user)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.createNewApplication(
.doAs(new PrivilegedExceptionAction<Response>() { createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.createNewApplication(null);
}
});
} }
protected Response submitApplication(String user) protected Response submitApplication(String user)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.submitApplication(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.submitApplication(null, null);
}
});
} }
protected Response postDelegationToken(String user) protected Response postDelegationToken(String user)
throws AuthorizationException, IOException, InterruptedException, throws AuthorizationException, IOException, InterruptedException,
Exception { Exception {
return UserGroupInformation.createRemoteUser(user) return routerWebService.postDelegationToken(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.postDelegationToken(null, null);
}
});
} }
protected Response postDelegationTokenExpiration(String user) protected Response postDelegationTokenExpiration(String user)
throws AuthorizationException, IOException, Exception { throws AuthorizationException, IOException, Exception {
return UserGroupInformation.createRemoteUser(user) return routerWebService.postDelegationTokenExpiration(
.doAs(new PrivilegedExceptionAction<Response>() { createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.postDelegationTokenExpiration(null);
}
});
} }
protected Response cancelDelegationToken(String user) protected Response cancelDelegationToken(String user)
throws AuthorizationException, IOException, InterruptedException, throws AuthorizationException, IOException, InterruptedException,
Exception { Exception {
return UserGroupInformation.createRemoteUser(user) return routerWebService.cancelDelegationToken(
.doAs(new PrivilegedExceptionAction<Response>() { createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.cancelDelegationToken(null);
}
});
} }
protected Response createNewReservation(String user) protected Response createNewReservation(String user)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.createNewReservation(
.doAs(new PrivilegedExceptionAction<Response>() { createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.createNewReservation(null);
}
});
} }
protected Response submitReservation(String user) protected Response submitReservation(String user)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.submitReservation(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.submitReservation(null, null);
}
});
} }
protected Response updateReservation(String user) protected Response updateReservation(String user)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.updateReservation(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.updateReservation(null, null);
}
});
} }
protected Response deleteReservation(String user) protected Response deleteReservation(String user)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.deleteReservation(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.deleteReservation(null, null);
}
});
} }
protected Response listReservation(String user) throws Exception { protected Response listReservation(String user) throws Exception {
return UserGroupInformation.createRemoteUser(user) return routerWebService.listReservation(
.doAs(new PrivilegedExceptionAction<Response>() { null, null, 0, 0, false, createHttpServletRequest(user));
@Override
public Response run() throws Exception {
return routerWebService.listReservation(null, null, 0, 0, false,
null);
}
});
} }
protected AppTimeoutInfo getAppTimeout(String user) protected AppTimeoutInfo getAppTimeout(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppTimeout(
.doAs(new PrivilegedExceptionAction<AppTimeoutInfo>() { createHttpServletRequest(user), null, null);
@Override
public AppTimeoutInfo run() throws Exception {
return routerWebService.getAppTimeout(null, null, null);
}
});
} }
protected AppTimeoutsInfo getAppTimeouts(String user) protected AppTimeoutsInfo getAppTimeouts(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppTimeouts(
.doAs(new PrivilegedExceptionAction<AppTimeoutsInfo>() { createHttpServletRequest(user), null);
@Override
public AppTimeoutsInfo run() throws Exception {
return routerWebService.getAppTimeouts(null, null);
}
});
} }
protected Response updateApplicationTimeout(String user) protected Response updateApplicationTimeout(String user)
throws AuthorizationException, YarnException, InterruptedException, throws AuthorizationException, YarnException, InterruptedException,
IOException { IOException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.updateApplicationTimeout(
.doAs(new PrivilegedExceptionAction<Response>() { null, createHttpServletRequest(user), null);
@Override
public Response run() throws Exception {
return routerWebService.updateApplicationTimeout(null, null, null);
}
});
} }
protected AppAttemptsInfo getAppAttempts(String user) protected AppAttemptsInfo getAppAttempts(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppAttempts(
.doAs(new PrivilegedExceptionAction<AppAttemptsInfo>() { createHttpServletRequest(user), null);
@Override
public AppAttemptsInfo run() throws Exception {
return routerWebService.getAppAttempts(null, null);
}
});
} }
protected AppAttemptInfo getAppAttempt(String user) protected AppAttemptInfo getAppAttempt(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getAppAttempt(
.doAs(new PrivilegedExceptionAction<AppAttemptInfo>() { createHttpServletRequest(user), null, null, null);
@Override
public AppAttemptInfo run() throws Exception {
return routerWebService.getAppAttempt(null, null, null, null);
}
});
} }
protected ContainersInfo getContainers(String user) protected ContainersInfo getContainers(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getContainers(
.doAs(new PrivilegedExceptionAction<ContainersInfo>() { createHttpServletRequest(user), null, null, null);
@Override
public ContainersInfo run() throws Exception {
return routerWebService.getContainers(null, null, null, null);
}
});
} }
protected ContainerInfo getContainer(String user) protected ContainerInfo getContainer(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) return routerWebService.getContainer(
.doAs(new PrivilegedExceptionAction<ContainerInfo>() { createHttpServletRequest(user), null, null, null, null);
@Override
public ContainerInfo run() throws Exception {
return routerWebService.getContainer(null, null, null, null, null);
}
});
} }
protected RequestInterceptorChainWrapper getInterceptorChain(String user) protected RequestInterceptorChainWrapper getInterceptorChain(String user)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user) HttpServletRequest request = createHttpServletRequest(user);
.doAs(new PrivilegedExceptionAction<RequestInterceptorChainWrapper>() { return routerWebService.getInterceptorChain(request);
@Override
public RequestInterceptorChainWrapper run() throws Exception {
return routerWebService.getInterceptorChain();
}
});
} }
private HttpServletRequest createHttpServletRequest(String user) {
HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getRemoteUser()).thenReturn(user);
return request;
}
} }

View File

@ -20,6 +20,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* Helper class to start a new process. * Helper class to start a new process.
@ -28,13 +29,23 @@ public class JavaProcess {
private Process process = null; private Process process = null;
public JavaProcess(Class<?> klass) throws IOException, InterruptedException { public JavaProcess(Class<?> clazz) throws IOException, InterruptedException {
this(clazz, null);
}
public JavaProcess(Class<?> clazz, List<String> addClasspaths)
throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home"); String javaHome = System.getProperty("java.home");
String javaBin = String javaBin =
javaHome + File.separator + "bin" + File.separator + "java"; javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path"); String classpath = System.getProperty("java.class.path");
classpath = classpath.concat("./src/test/resources"); classpath = classpath.concat("./src/test/resources");
String className = klass.getCanonicalName(); if (addClasspaths != null) {
for (String addClasspath : addClasspaths) {
classpath = classpath.concat(File.pathSeparatorChar + addClasspath);
}
}
String className = clazz.getCanonicalName();
ProcessBuilder builder = ProcessBuilder builder =
new ProcessBuilder(javaBin, "-cp", classpath, className); new ProcessBuilder(javaBin, "-cp", classpath, className);
builder.inheritIO(); builder.inheritIO();