From 8180ab436a19b8e253c3b6c4f392daa32680e187 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Wed, 1 Nov 2017 13:21:15 -0700 Subject: [PATCH] YARN-7276. Federation Router Web Service fixes. Contributed by Inigo Goiri. --- .../hadoop-yarn-server-router/pom.xml | 6 + .../hadoop/yarn/server/router/Router.java | 6 + .../webapp/DefaultRequestInterceptorREST.java | 12 +- .../webapp/FederationInterceptorREST.java | 254 ++-- .../router/webapp/RouterWebServiceUtil.java | 62 +- .../router/webapp/RouterWebServices.java | 97 +- .../webapp/BaseRouterWebServicesTest.java | 400 ++---- .../server/router/webapp/JavaProcess.java | 15 +- .../webapp/TestRouterWebServicesREST.java | 1156 +++++++++-------- 9 files changed, 1019 insertions(+), 989 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml index bcdf3d936d0..5b3ee436579 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml @@ -85,6 +85,12 @@ test + + org.apache.hadoop + hadoop-yarn-server-timelineservice + test + + org.mockito mockito-all diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 121e5344fdb..76050d067f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -74,6 +75,8 @@ public class Router extends CompositeService { */ public static final int SHUTDOWN_HOOK_PRIORITY = 30; + private static final String METRICS_NAME = "Router"; + public Router() { super(Router.class.getName()); } @@ -95,6 +98,8 @@ protected void serviceInit(Configuration config) throws Exception { webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.ROUTER_BIND_HOST, WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf)); + // Metrics + DefaultMetricsSystem.initialize(METRICS_NAME); super.serviceInit(conf); } @@ -118,6 +123,7 @@ protected void serviceStop() throws Exception { return; } super.serviceStop(); + DefaultMetricsSystem.shutdown(); } protected void shutDown() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index abd8ca6ec10..72ed02fd9a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -129,7 +129,9 @@ public String dumpSchedulerLogs(String time, HttpServletRequest hsr) public NodesInfo getNodes(String states) { // states will be part of additionalParam Map additionalParam = new HashMap(); - additionalParam.put(RMWSConsts.STATES, new String[] {states}); + if (states != null && !states.isEmpty()) { + additionalParam.put(RMWSConsts.STATES, new String[] {states}); + } return RouterWebServiceUtil.genericForward(webAppAddress, null, NodesInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, null, @@ -226,9 +228,11 @@ public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr) public LabelsToNodesInfo getLabelsToNodes(Set labels) throws IOException { // labels will be part of additionalParam - Map additionalParam = new HashMap(); - additionalParam.put(RMWSConsts.LABELS, - labels.toArray(new String[labels.size()])); + Map additionalParam = new HashMap<>(); + if (labels != null && !labels.isEmpty()) { + additionalParam.put(RMWSConsts.LABELS, + labels.toArray(new String[labels.size()])); + } return RouterWebServiceUtil.genericForward(webAppAddress, null, LabelsToNodesInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, null, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 6ba8ade01ea..2860d109623 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; +import java.security.Principal; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -26,12 +27,15 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -48,6 +52,7 @@ import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -121,29 +126,33 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public void init(String user) { federationFacade = FederationStateStoreFacade.getInstance(); - rand = new Random(System.currentTimeMillis()); + rand = new Random(); final Configuration conf = this.getConf(); try { - policyFacade = new RouterPolicyFacade(conf, federationFacade, - this.federationFacade.getSubClusterResolver(), null); + SubClusterResolver subClusterResolver = + this.federationFacade.getSubClusterResolver(); + policyFacade = new RouterPolicyFacade( + conf, federationFacade, subClusterResolver, null); } catch (FederationPolicyInitializationException e) { - LOG.error(e.getMessage()); + throw new YarnRuntimeException(e); } - numSubmitRetries = - conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, - YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); + numSubmitRetries = conf.getInt( + YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); - interceptors = new HashMap(); + interceptors = new HashMap<>(); routerMetrics = RouterMetrics.getMetrics(); - threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat("FederationInterceptorREST #%d").build()); + threadpool = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("FederationInterceptorREST #%d") + .build()); - returnPartialReport = - conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); + returnPartialReport = conf.getBoolean( + YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); } private SubClusterId getRandomActiveSubCluster( @@ -156,8 +165,8 @@ private SubClusterId getRandomActiveSubCluster( } List list = new ArrayList<>(activeSubclusters.keySet()); - FederationPolicyUtils.validateSubClusterAvailability(list, - blackListSubClusters); + FederationPolicyUtils.validateSubClusterAvailability( + list, blackListSubClusters); if (blackListSubClusters != null) { @@ -176,8 +185,9 @@ protected DefaultRequestInterceptorREST getInterceptorForSubCluster( if (interceptors.containsKey(subClusterId)) { return interceptors.get(subClusterId); } else { - LOG.error("The interceptor for SubCluster " + subClusterId - + " does not exist in the cache."); + LOG.error( + "The interceptor for SubCluster {} does not exist in the cache.", + subClusterId); return null; } } @@ -187,9 +197,9 @@ private DefaultRequestInterceptorREST createInterceptorForSubCluster( final Configuration conf = this.getConf(); - String interceptorClassName = - conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); + String interceptorClassName = conf.get( + YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); DefaultRequestInterceptorREST interceptorInstance = null; try { Class interceptorClass = conf.getClassByName(interceptorClassName); @@ -210,7 +220,7 @@ private DefaultRequestInterceptorREST createInterceptorForSubCluster( e); } - interceptorInstance.setWebAppAddress(webAppAddress); + interceptorInstance.setWebAppAddress("http://" + webAppAddress); interceptorInstance.setSubClusterId(subClusterId); interceptors.put(subClusterId, interceptorInstance); return interceptorInstance; @@ -272,8 +282,7 @@ public Response createNewApplication(HttpServletRequest hsr) .entity(e.getLocalizedMessage()).build(); } - LOG.debug( - "getNewApplication try #" + i + " on SubCluster " + subClusterId); + LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId, @@ -282,11 +291,12 @@ public Response createNewApplication(HttpServletRequest hsr) try { response = interceptor.createNewApplication(hsr); } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster " - + subClusterId.getId(), e); + LOG.warn("Unable to create a new ApplicationId in SubCluster {}", + subClusterId.getId(), e); } - if (response != null && response.getStatus() == 200) { + if (response != null && + response.getStatus() == HttpServletResponse.SC_OK) { long stopTime = clock.getTime(); routerMetrics.succeededAppsCreated(stopTime - startTime); @@ -302,7 +312,10 @@ public Response createNewApplication(HttpServletRequest hsr) String errMsg = "Fail to create a new application."; LOG.error(errMsg); routerMetrics.incrAppsFailedCreated(); - return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build(); + return Response + .status(Status.INTERNAL_SERVER_ERROR) + .entity(errMsg) + .build(); } /** @@ -381,7 +394,10 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Missing ApplicationSubmissionContextInfo or " + "applicationSubmissionContex information."; - return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); + return Response + .status(Status.BAD_REQUEST) + .entity(errMsg) + .build(); } ApplicationId applicationId = null; @@ -389,7 +405,9 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, applicationId = ApplicationId.fromString(newApp.getApplicationId()); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -405,11 +423,13 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, subClusterId = policyFacade.getHomeSubcluster(context, blacklist); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e.getLocalizedMessage()) + .build(); } - LOG.info("submitApplication appId" + applicationId + " try #" + i - + " on SubCluster " + subClusterId); + LOG.info("submitApplication appId {} try #{} on SubCluster {}", + applicationId, i, subClusterId); ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); @@ -424,8 +444,10 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Unable to insert the ApplicationId " + applicationId + " into the FederationStateStore"; - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(errMsg + " " + e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg + " " + e.getLocalizedMessage()) + .build(); } } else { try { @@ -441,15 +463,19 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, federationFacade.getApplicationHomeSubCluster(applicationId); } catch (YarnException e1) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e1.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e1.getLocalizedMessage()) + .build(); } if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application " + applicationId - + " already submitted on SubCluster " + subClusterId); + LOG.info("Application {} already submitted on SubCluster {}", + applicationId, subClusterId); } else { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg) + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg) .build(); } } @@ -460,8 +486,10 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e.getLocalizedMessage()) + .build(); } Response response = null; @@ -470,13 +498,14 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp, hsr); } catch (Exception e) { - LOG.warn("Unable to submit the application " + applicationId - + "to SubCluster " + subClusterId.getId(), e); + LOG.warn("Unable to submit the application {} to SubCluster {}", + applicationId, subClusterId.getId(), e); } - if (response != null && response.getStatus() == 202) { - LOG.info("Application " + context.getApplicationName() + " with appId " - + applicationId + " submitted on " + subClusterId); + if (response != null && + response.getStatus() == HttpServletResponse.SC_ACCEPTED) { + LOG.info("Application {} with appId {} submitted on {}", + context.getApplicationName(), applicationId, subClusterId); long stopTime = clock.getTime(); routerMetrics.succeededAppsSubmitted(stopTime - startTime); @@ -493,7 +522,10 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, String errMsg = "Application " + newApp.getApplicationName() + " with appId " + applicationId + " failed to be submitted."; LOG.error(errMsg); - return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg) + .build(); } /** @@ -541,9 +573,10 @@ public AppInfo getApp(HttpServletRequest hsr, String appId, return null; } - AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId, - subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId, - unselectedFields); + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterId, subClusterInfo.getRMWebServiceAddress()); + AppInfo response = interceptor.getApp(hsr, appId, unselectedFields); long stopTime = clock.getTime(); routerMetrics.succeededAppsRetrieved(stopTime - startTime); @@ -579,7 +612,9 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr, applicationId = ApplicationId.fromString(appId); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedKilled(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -591,7 +626,9 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr, subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedKilled(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -644,26 +681,28 @@ public AppsInfo getApps(final HttpServletRequest hsr, final String stateQuery, } // Send the requests in parallel - - ExecutorCompletionService compSvc = - new ExecutorCompletionService(this.threadpool); + CompletionService compSvc = + new ExecutorCompletionService<>(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { + // HttpServletRequest does not work with ExecutorCompletionService. + // Create a duplicate hsr. + final HttpServletRequest hsrCopy = clone(hsr); compSvc.submit(new Callable() { @Override public AppsInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); - AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery, - finalStatusQuery, userQuery, queueQuery, count, startedBegin, - startedEnd, finishBegin, finishEnd, applicationTypes, - applicationTags, unselectedFields); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); + AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery, + statesQuery, finalStatusQuery, userQuery, queueQuery, count, + startedBegin, startedEnd, finishBegin, finishEnd, + applicationTypes, applicationTags, unselectedFields); if (rmApps == null) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return appReport."); + LOG.error("Subcluster {} failed to return appReport.", + info.getSubClusterId()); return null; } return rmApps; @@ -672,8 +711,7 @@ public AppsInfo call() { } // Collect all the responses in parallel - - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); AppsInfo appsResponse = future.get(); @@ -686,7 +724,7 @@ public AppsInfo call() { } } catch (Throwable e) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.warn("Failed to get application report ", e); + LOG.warn("Failed to get application report", e); } } @@ -695,9 +733,42 @@ public AppsInfo call() { } // Merge all the application reports got from all the available Yarn RMs + return RouterWebServiceUtil.mergeAppsInfo( + apps.getApps(), returnPartialReport); + } - return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), - returnPartialReport); + /** + * Get a copy of a HTTP request. This is for thread safety. + * @param hsr HTTP servlet request to copy. + * @return Copy of the HTTP request. + */ + private HttpServletRequestWrapper clone(final HttpServletRequest hsr) { + if (hsr == null) { + return null; + } + return new HttpServletRequestWrapper(hsr) { + @SuppressWarnings("unchecked") + public Map getParameterMap() { + return (Map) hsr.getParameterMap(); + } + public String getPathInfo() { + return hsr.getPathInfo(); + } + public String getRemoteUser() { + return hsr.getRemoteUser(); + } + public Principal getUserPrincipal() { + return hsr.getUserPrincipal(); + } + public String getHeader(String value) { + // we override only Accept + if (value.equals(HttpHeaders.ACCEPT)) { + return RouterWebServiceUtil.getMediaTypeFromHttpServletRequest( + hsr, AppsInfo.class); + } + return null; + } + }; } /** @@ -731,8 +802,7 @@ public NodeInfo getNode(final String nodeId) { } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -740,14 +810,14 @@ public NodeInfo getNode(final String nodeId) { @Override public NodeInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { NodeInfo nodeInfo = interceptor.getNode(nodeId); return nodeInfo; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return nodeInfo."); + LOG.error("Subcluster {} failed to return nodeInfo.", + info.getSubClusterId()); return null; } } @@ -756,7 +826,7 @@ public NodeInfo call() { // Collect all the responses in parallel NodeInfo nodeInfo = null; - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); NodeInfo nodeResponse = future.get(); @@ -765,8 +835,8 @@ public NodeInfo call() { if (nodeResponse != null) { // Check if the node was already found in a different SubCluster and // it has an old health report - if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse - .getLastHealthUpdate()) { + if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < + nodeResponse.getLastHealthUpdate()) { nodeInfo = nodeResponse; } } @@ -808,13 +878,12 @@ public NodesInfo getNodes(final String states) { try { subClustersActive = federationFacade.getSubClusters(true); } catch (YarnException e) { - LOG.error(e.getMessage()); + LOG.error("Cannot get nodes: {}", e.getMessage()); return new NodesInfo(); } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -822,14 +891,14 @@ public NodesInfo getNodes(final String states) { @Override public NodesInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { NodesInfo nodesInfo = interceptor.getNodes(states); return nodesInfo; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return nodesInfo."); + LOG.error("Subcluster {} failed to return nodesInfo.", + info.getSubClusterId()); return null; } } @@ -838,7 +907,7 @@ public NodesInfo call() { // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); NodesInfo nodesResponse = future.get(); @@ -872,8 +941,7 @@ public ClusterMetricsInfo getClusterMetricsInfo() { } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -881,14 +949,14 @@ public ClusterMetricsInfo getClusterMetricsInfo() { @Override public ClusterMetricsInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo(); return metrics; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return Cluster Metrics."); + LOG.error("Subcluster {} failed to return Cluster Metrics.", + info.getSubClusterId()); return null; } } @@ -897,7 +965,7 @@ public ClusterMetricsInfo call() { // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); ClusterMetricsInfo metricsResponse = future.get(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index efc3ea31d30..40bdbd83c69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.router.webapp; +import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -28,13 +31,12 @@ import java.util.Map.Entry; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; @@ -47,6 +49,8 @@ import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.sun.jersey.api.ConflictException; import com.sun.jersey.api.client.Client; @@ -62,8 +66,8 @@ public final class RouterWebServiceUtil { private static String user = "YarnRouter"; - private static final Log LOG = - LogFactory.getLog(RouterWebServiceUtil.class.getName()); + private static final Logger LOG = + LoggerFactory.getLogger(RouterWebServiceUtil.class.getName()); private final static String PARTIAL_REPORT = "Partial Report "; @@ -85,9 +89,10 @@ private RouterWebServiceUtil() { * call in case the call has no servlet request * @return the retrieved entity from the REST call */ - protected static T genericForward(final String webApp, - final HttpServletRequest hsr, final Class returnType, - final HTTPMethods method, final String targetPath, final Object formParam, + protected static T genericForward( + final String webApp, final HttpServletRequest hsr, + final Class returnType, final HTTPMethods method, + final String targetPath, final Object formParam, final Map additionalParam) { UserGroupInformation callerUGI = null; @@ -122,14 +127,22 @@ public T run() { ClientResponse response = RouterWebServiceUtil.invokeRMWebService( webApp, targetPath, method, - (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam); + (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam, + getMediaTypeFromHttpServletRequest(hsr, returnType)); if (Response.class.equals(returnType)) { return (T) RouterWebServiceUtil.clientResponseToResponse(response); } // YARN RM can answer with Status.OK or it throws an exception - if (response.getStatus() == 200) { + if (response.getStatus() == SC_OK) { return response.getEntity(returnType); } + if (response.getStatus() == SC_NO_CONTENT) { + try { + return returnType.getConstructor().newInstance(); + } catch (RuntimeException | ReflectiveOperationException e) { + LOG.error("Cannot create empty entity for {}", returnType, e); + } + } RouterWebServiceUtil.retrieveException(response); return null; } @@ -148,7 +161,7 @@ public T run() { */ private static ClientResponse invokeRMWebService(String webApp, String path, HTTPMethods method, String additionalPath, - Map queryParams, Object formParam) { + Map queryParams, Object formParam, String mediaType) { Client client = Client.create(); WebResource webResource = client.resource(webApp).path(path); @@ -169,14 +182,12 @@ private static ClientResponse invokeRMWebService(String webApp, String path, webResource = webResource.queryParams(paramMap); } - // I can forward the call in JSON or XML since the Router will convert it - // again in Object before send it back to the client Builder builder = null; if (formParam != null) { - builder = webResource.entity(formParam, MediaType.APPLICATION_XML); - builder = builder.accept(MediaType.APPLICATION_XML); + builder = webResource.entity(formParam, mediaType); + builder = builder.accept(mediaType); } else { - builder = webResource.accept(MediaType.APPLICATION_XML); + builder = webResource.accept(mediaType); } ClientResponse response = null; @@ -429,4 +440,25 @@ public static void mergeMetrics(ClusterMetricsInfo metrics, + metricsResponse.getShutdownNodes()); } + /** + * Extract from HttpServletRequest the MediaType in output. + */ + protected static String getMediaTypeFromHttpServletRequest( + HttpServletRequest request, final Class returnType) { + if (request == null) { + // By default we return XML for REST call without HttpServletRequest + return MediaType.APPLICATION_XML; + } + // TODO + if (!returnType.equals(Response.class)) { + return MediaType.APPLICATION_XML; + } + String header = request.getHeader(HttpHeaders.ACCEPT); + if (header == null || header.equals("*")) { + // By default we return JSON + return MediaType.APPLICATION_JSON; + } + return header; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index 4bb6271f192..14e7b3bce7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -157,12 +157,19 @@ private void init() { } @VisibleForTesting - protected RequestInterceptorChainWrapper getInterceptorChain() { + protected RequestInterceptorChainWrapper getInterceptorChain( + final HttpServletRequest hsr) { String user = ""; + if (hsr != null) { + user = hsr.getRemoteUser(); + } try { - user = UserGroupInformation.getCurrentUser().getUserName(); + if (user == null || user.equals("")) { + // Yarn Router user + user = UserGroupInformation.getCurrentUser().getUserName(); + } } catch (IOException e) { - LOG.error("IOException " + e.getMessage()); + LOG.error("Cannot get user: {}", e.getMessage()); } if (!userPipelineMap.containsKey(user)) { initializePipeline(user); @@ -313,7 +320,7 @@ public ClusterInfo get() { @Override public ClusterInfo getClusterInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getClusterInfo(); } @@ -323,7 +330,7 @@ public ClusterInfo getClusterInfo() { @Override public ClusterMetricsInfo getClusterMetricsInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getClusterMetricsInfo(); } @@ -333,7 +340,7 @@ public ClusterMetricsInfo getClusterMetricsInfo() { @Override public SchedulerTypeInfo getSchedulerInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getSchedulerInfo(); } @@ -344,7 +351,7 @@ public SchedulerTypeInfo getSchedulerInfo() { public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time, @Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().dumpSchedulerLogs(time, hsr); } @@ -354,7 +361,7 @@ public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time, @Override public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getNodes(states); } @@ -364,7 +371,7 @@ public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) { @Override public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getNode(nodeId); } @@ -387,7 +394,7 @@ public AppsInfo getApps(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.APPLICATION_TAGS) Set applicationTags, @QueryParam(RMWSConsts.DESELECTS) Set unselectedFields) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getApps(hsr, stateQuery, statesQuery, finalStatusQuery, userQuery, queueQuery, count, startedBegin, startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags, @@ -401,7 +408,7 @@ public AppsInfo getApps(@Context HttpServletRequest hsr, public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.NODEID) String nodeId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getActivities(hsr, nodeId); } @@ -413,7 +420,7 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.APP_ID) String appId, @QueryParam(RMWSConsts.MAX_TIME) String time) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time); } @@ -426,7 +433,7 @@ public ApplicationStatisticsInfo getAppStatistics( @QueryParam(RMWSConsts.STATES) Set stateQueries, @QueryParam(RMWSConsts.APPLICATION_TYPES) Set typeQueries) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppStatistics(hsr, stateQueries, typeQueries); } @@ -439,7 +446,7 @@ public AppInfo getApp(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId, @QueryParam(RMWSConsts.DESELECTS) Set unselectedFields) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getApp(hsr, appId, unselectedFields); } @@ -450,7 +457,7 @@ public AppInfo getApp(@Context HttpServletRequest hsr, public AppState getAppState(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppState(hsr, appId); } @@ -463,7 +470,7 @@ public Response updateAppState(AppState targetState, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateAppState(targetState, hsr, appId); } @@ -475,7 +482,7 @@ public Response updateAppState(AppState targetState, public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getNodeToLabels(hsr); } @@ -486,7 +493,7 @@ public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr) public LabelsToNodesInfo getLabelsToNodes( @QueryParam(RMWSConsts.LABELS) Set labels) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getLabelsToNodes(labels); } @@ -498,7 +505,7 @@ public Response replaceLabelsOnNodes( final NodeToLabelsEntryList newNodeToLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().replaceLabelsOnNodes(newNodeToLabels, hsr); } @@ -512,7 +519,7 @@ public Response replaceLabelsOnNode( @Context HttpServletRequest hsr, @PathParam(RMWSConsts.NODEID) String nodeId) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().replaceLabelsOnNode(newNodeLabelsName, hsr, nodeId); } @@ -524,7 +531,7 @@ public Response replaceLabelsOnNode( public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getClusterNodeLabels(hsr); } @@ -535,7 +542,7 @@ public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr) public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().addToClusterNodeLabels(newNodeLabels, hsr); } @@ -548,7 +555,7 @@ public Response removeFromCluserNodeLabels( @QueryParam(RMWSConsts.LABELS) Set oldNodeLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor() .removeFromCluserNodeLabels(oldNodeLabels, hsr); } @@ -560,7 +567,7 @@ public Response removeFromCluserNodeLabels( public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.NODEID) String nodeId) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getLabelsOnNode(hsr, nodeId); } @@ -571,7 +578,7 @@ public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr, public AppPriority getAppPriority(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppPriority(hsr, appId); } @@ -584,7 +591,7 @@ public Response updateApplicationPriority(AppPriority targetPriority, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor() .updateApplicationPriority(targetPriority, hsr, appId); } @@ -596,7 +603,7 @@ public Response updateApplicationPriority(AppPriority targetPriority, public AppQueue getAppQueue(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppQueue(hsr, appId); } @@ -609,7 +616,7 @@ public Response updateAppQueue(AppQueue targetQueue, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateAppQueue(targetQueue, hsr, appId); } @@ -621,7 +628,7 @@ public Response updateAppQueue(AppQueue targetQueue, public Response createNewApplication(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().createNewApplication(hsr); } @@ -633,7 +640,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().submitApplication(newApp, hsr); } @@ -645,7 +652,7 @@ public Response postDelegationToken(DelegationToken tokenData, @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().postDelegationToken(tokenData, hsr); } @@ -656,7 +663,7 @@ public Response postDelegationToken(DelegationToken tokenData, public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().postDelegationTokenExpiration(hsr); } @@ -668,7 +675,7 @@ public Response cancelDelegationToken(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().cancelDelegationToken(hsr); } @@ -679,7 +686,7 @@ public Response cancelDelegationToken(@Context HttpServletRequest hsr) public Response createNewReservation(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().createNewReservation(hsr); } @@ -691,7 +698,7 @@ public Response submitReservation(ReservationSubmissionRequestInfo resContext, @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().submitReservation(resContext, hsr); } @@ -703,7 +710,7 @@ public Response updateReservation(ReservationUpdateRequestInfo resContext, @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateReservation(resContext, hsr); } @@ -715,7 +722,7 @@ public Response deleteReservation(ReservationDeleteRequestInfo resContext, @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().deleteReservation(resContext, hsr); } @@ -731,7 +738,7 @@ public Response listReservation( @QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().listReservation(queue, reservationId, startTime, endTime, includeResourceAllocations, hsr); } @@ -744,7 +751,7 @@ public AppTimeoutInfo getAppTimeout(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppTimeout(hsr, appId, type); } @@ -755,7 +762,7 @@ public AppTimeoutInfo getAppTimeout(@Context HttpServletRequest hsr, public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppTimeouts(hsr, appId); } @@ -768,7 +775,7 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateApplicationTimeout(appTimeout, hsr, appId); } @@ -780,7 +787,7 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppAttempts(hsr, appId); } @@ -792,7 +799,7 @@ public org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo getAppAttempt( @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getAppAttempt(req, res, appId, appAttemptId); } @@ -805,7 +812,7 @@ public ContainersInfo getContainers(@Context HttpServletRequest req, @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getContainers(req, res, appId, appAttemptId); } @@ -819,7 +826,7 @@ public ContainerInfo getContainer(@Context HttpServletRequest req, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId, @PathParam(RMWSConsts.CONTAINERID) String containerId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getContainer(req, res, appId, appAttemptId, containerId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java index 7d420844a42..9480850d328 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java @@ -20,15 +20,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; -import java.security.PrivilegedExceptionAction; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -128,487 +128,263 @@ protected RouterWebServices getRouterWebServices() { protected ClusterInfo get(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterInfo run() throws Exception { - return routerWebService.get(); - } - }); + // HSR is not used here + return routerWebService.get(); } protected ClusterInfo getClusterInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterInfo run() throws Exception { - return routerWebService.getClusterInfo(); - } - }); + // HSR is not used here + return routerWebService.getClusterInfo(); } protected ClusterMetricsInfo getClusterMetricsInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterMetricsInfo run() throws Exception { - return routerWebService.getClusterMetricsInfo(); - } - }); + // HSR is not used here + return routerWebService.getClusterMetricsInfo(); } protected SchedulerTypeInfo getSchedulerInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public SchedulerTypeInfo run() throws Exception { - return routerWebService.getSchedulerInfo(); - } - }); + // HSR is not used here + return routerWebService.getSchedulerInfo(); } protected String dumpSchedulerLogs(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public String run() throws Exception { - return routerWebService.dumpSchedulerLogs(null, null); - } - }); + return routerWebService.dumpSchedulerLogs(null, + createHttpServletRequest(user)); } protected NodesInfo getNodes(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodesInfo run() throws Exception { - return routerWebService.getNodes(null); - } - }); + return routerWebService.getNodes(null); } protected NodeInfo getNode(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeInfo run() throws Exception { - return routerWebService.getNode(null); - } - }); + return routerWebService.getNode(null); } protected AppsInfo getApps(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppsInfo run() throws Exception { - return routerWebService.getApps(null, null, null, null, null, null, - null, null, null, null, null, null, null, null); - } - }); + return routerWebService.getApps(createHttpServletRequest(user), null, null, + null, null, null, null, null, null, null, null, null, null, null); } protected ActivitiesInfo getActivities(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ActivitiesInfo run() throws Exception { - return routerWebService.getActivities(null, null); - } - }); + return routerWebService.getActivities( + createHttpServletRequest(user), null); } protected AppActivitiesInfo getAppActivities(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppActivitiesInfo run() throws Exception { - return routerWebService.getAppActivities(null, null, null); - } - }); + return routerWebService.getAppActivities( + createHttpServletRequest(user), null, null); } protected ApplicationStatisticsInfo getAppStatistics(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ApplicationStatisticsInfo run() throws Exception { - return routerWebService.getAppStatistics(null, null, null); - } - }); + return routerWebService.getAppStatistics( + createHttpServletRequest(user), null, null); } protected AppInfo getApp(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppInfo run() throws Exception { - return routerWebService.getApp(null, null, null); - } - }); + return routerWebService.getApp(createHttpServletRequest(user), null, null); } protected AppState getAppState(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppState run() throws Exception { - return routerWebService.getAppState(null, null); - } - }); + return routerWebService.getAppState(createHttpServletRequest(user), null); } protected Response updateAppState(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateAppState(null, null, null); - } - }); + return routerWebService.updateAppState( + null, createHttpServletRequest(user), null); } protected NodeToLabelsInfo getNodeToLabels(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeToLabelsInfo run() throws Exception { - return routerWebService.getNodeToLabels(null); - } - }); + return routerWebService.getNodeToLabels(createHttpServletRequest(user)); } protected LabelsToNodesInfo getLabelsToNodes(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public LabelsToNodesInfo run() throws Exception { - return routerWebService.getLabelsToNodes(null); - } - }); + return routerWebService.getLabelsToNodes(null); } protected Response replaceLabelsOnNodes(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.replaceLabelsOnNodes(null, null); - } - }); + return routerWebService.replaceLabelsOnNodes( + null, createHttpServletRequest(user)); } protected Response replaceLabelsOnNode(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.replaceLabelsOnNode(null, null, null); - } - }); + return routerWebService.replaceLabelsOnNode( + null, createHttpServletRequest(user), null); } protected NodeLabelsInfo getClusterNodeLabels(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeLabelsInfo run() throws Exception { - return routerWebService.getClusterNodeLabels(null); - } - }); + return routerWebService.getClusterNodeLabels( + createHttpServletRequest(user)); } protected Response addToClusterNodeLabels(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.addToClusterNodeLabels(null, null); - } - }); + return routerWebService.addToClusterNodeLabels( + null, createHttpServletRequest(user)); } protected Response removeFromCluserNodeLabels(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.removeFromCluserNodeLabels(null, null); - } - }); + return routerWebService.removeFromCluserNodeLabels( + null, createHttpServletRequest(user)); } protected NodeLabelsInfo getLabelsOnNode(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeLabelsInfo run() throws Exception { - return routerWebService.getLabelsOnNode(null, null); - } - }); + return routerWebService.getLabelsOnNode( + createHttpServletRequest(user), null); } protected AppPriority getAppPriority(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppPriority run() throws Exception { - return routerWebService.getAppPriority(null, null); - } - }); + return routerWebService.getAppPriority( + createHttpServletRequest(user), null); } protected Response updateApplicationPriority(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateApplicationPriority(null, null, null); - } - }); + return routerWebService.updateApplicationPriority( + null, createHttpServletRequest(user), null); } protected AppQueue getAppQueue(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppQueue run() throws Exception { - return routerWebService.getAppQueue(null, null); - } - }); + return routerWebService.getAppQueue(createHttpServletRequest(user), null); } protected Response updateAppQueue(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateAppQueue(null, null, null); - } - }); + return routerWebService.updateAppQueue( + null, createHttpServletRequest(user), null); } protected Response createNewApplication(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.createNewApplication(null); - } - }); + return routerWebService.createNewApplication( + createHttpServletRequest(user)); } protected Response submitApplication(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.submitApplication(null, null); - } - }); + return routerWebService.submitApplication( + null, createHttpServletRequest(user)); } protected Response postDelegationToken(String user) throws AuthorizationException, IOException, InterruptedException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.postDelegationToken(null, null); - } - }); + return routerWebService.postDelegationToken( + null, createHttpServletRequest(user)); } protected Response postDelegationTokenExpiration(String user) throws AuthorizationException, IOException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.postDelegationTokenExpiration(null); - } - }); + return routerWebService.postDelegationTokenExpiration( + createHttpServletRequest(user)); } protected Response cancelDelegationToken(String user) throws AuthorizationException, IOException, InterruptedException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.cancelDelegationToken(null); - } - }); + return routerWebService.cancelDelegationToken( + createHttpServletRequest(user)); } protected Response createNewReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.createNewReservation(null); - } - }); + return routerWebService.createNewReservation( + createHttpServletRequest(user)); } protected Response submitReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.submitReservation(null, null); - } - }); + return routerWebService.submitReservation( + null, createHttpServletRequest(user)); } protected Response updateReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateReservation(null, null); - } - }); + return routerWebService.updateReservation( + null, createHttpServletRequest(user)); } protected Response deleteReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.deleteReservation(null, null); - } - }); + return routerWebService.deleteReservation( + null, createHttpServletRequest(user)); } protected Response listReservation(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.listReservation(null, null, 0, 0, false, - null); - } - }); + return routerWebService.listReservation( + null, null, 0, 0, false, createHttpServletRequest(user)); } protected AppTimeoutInfo getAppTimeout(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppTimeoutInfo run() throws Exception { - return routerWebService.getAppTimeout(null, null, null); - } - }); + return routerWebService.getAppTimeout( + createHttpServletRequest(user), null, null); } protected AppTimeoutsInfo getAppTimeouts(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppTimeoutsInfo run() throws Exception { - return routerWebService.getAppTimeouts(null, null); - } - }); + return routerWebService.getAppTimeouts( + createHttpServletRequest(user), null); } protected Response updateApplicationTimeout(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateApplicationTimeout(null, null, null); - } - }); + return routerWebService.updateApplicationTimeout( + null, createHttpServletRequest(user), null); } protected AppAttemptsInfo getAppAttempts(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppAttemptsInfo run() throws Exception { - return routerWebService.getAppAttempts(null, null); - } - }); + return routerWebService.getAppAttempts( + createHttpServletRequest(user), null); } protected AppAttemptInfo getAppAttempt(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppAttemptInfo run() throws Exception { - return routerWebService.getAppAttempt(null, null, null, null); - } - }); + return routerWebService.getAppAttempt( + createHttpServletRequest(user), null, null, null); } protected ContainersInfo getContainers(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ContainersInfo run() throws Exception { - return routerWebService.getContainers(null, null, null, null); - } - }); + return routerWebService.getContainers( + createHttpServletRequest(user), null, null, null); } protected ContainerInfo getContainer(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ContainerInfo run() throws Exception { - return routerWebService.getContainer(null, null, null, null, null); - } - }); + return routerWebService.getContainer( + createHttpServletRequest(user), null, null, null, null); } protected RequestInterceptorChainWrapper getInterceptorChain(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public RequestInterceptorChainWrapper run() throws Exception { - return routerWebService.getInterceptorChain(); - } - }); + HttpServletRequest request = createHttpServletRequest(user); + return routerWebService.getInterceptorChain(request); } + private HttpServletRequest createHttpServletRequest(String user) { + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(user); + return request; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java index d32013f34b2..6c0938cc2ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.util.List; /** * Helper class to start a new process. @@ -28,13 +29,23 @@ public class JavaProcess { private Process process = null; - public JavaProcess(Class klass) throws IOException, InterruptedException { + public JavaProcess(Class clazz) throws IOException, InterruptedException { + this(clazz, null); + } + + public JavaProcess(Class clazz, List addClasspaths) + throws IOException, InterruptedException { String javaHome = System.getProperty("java.home"); String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; String classpath = System.getProperty("java.class.path"); classpath = classpath.concat("./src/test/resources"); - String className = klass.getCanonicalName(); + if (addClasspaths != null) { + for (String addClasspath : addClasspaths) { + classpath = classpath.concat(File.pathSeparatorChar + addClasspath); + } + } + String className = clazz.getCanonicalName(); ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className); builder.inheritIO(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java index 4878dc4b785..f90a7634cb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java @@ -18,21 +18,79 @@ package org.apache.hadoop.yarn.server.router.webapp; +import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED; +import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; +import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static javax.ws.rs.core.MediaType.APPLICATION_XML; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_PRIORITY; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_QUEUE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_STATE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUT; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUTS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_ID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_STATISTICS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_TO_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.INFO; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABEL_MAPPINGS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.METRICS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_GETLABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_REPLACE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REMOVE_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REPLACE_NODE_TO_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_DELETE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_SUBMIT; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_UPDATE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_ACTIVITIES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_APP_ACTIVITIES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_LOGS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.STATES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.TIME; +import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.POST; +import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.PUT; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getNMWebAppURLWithoutScheme; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getRMWebAppURLWithScheme; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getRouterWebAppURLWithScheme; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; - -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.core.MediaType; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; @@ -64,13 +122,12 @@ import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import org.codehaus.jettison.json.JSONException; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; @@ -88,109 +145,127 @@ @NotThreadSafe public class TestRouterWebServicesREST { + /** The number of concurrent submissions for multi-thread test. */ + private static final int NUM_THREADS_TESTS = 100; + + private static String userName = "test"; private static JavaProcess rm; private static JavaProcess nm; private static JavaProcess router; - private static Configuration conf; + private static String rmAddress; + private static String routerAddress; + private static String nmAddress; - private static final int STATUS_OK = 200; - private static final int STATUS_ACCEPTED = 202; - private static final int STATUS_BADREQUEST = 400; - private static final int STATUS_ERROR = 500; + private static Configuration conf; /** * Wait until the webservice is up and running. */ - private static void waitWebAppRunning(String address, String path) { - while (true) { - Client clientToRouter = Client.create(); - WebResource toRouter = clientToRouter.resource(address).path(path); - try { - ClientResponse response = toRouter.accept(MediaType.APPLICATION_JSON) - .get(ClientResponse.class); - if (response.getStatus() == STATUS_OK) { - // process is up and running - return; + private static void waitWebAppRunning( + final String address, final String path) { + try { + final Client clientToRouter = Client.create(); + final WebResource toRouter = clientToRouter + .resource(address) + .path(path); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + ClientResponse response = toRouter + .accept(APPLICATION_JSON) + .get(ClientResponse.class); + if (response.getStatus() == SC_OK) { + // process is up and running + return true; + } + } catch (ClientHandlerException e) { + // process is not up and running + } + return false; } - } catch (ClientHandlerException e) { - // process is not up and running - continue; - } + }, 1000, 10 * 1000); + } catch (Exception e) { + fail("Web app not running"); } } @BeforeClass public static void setUp() throws Exception { conf = new YarnConfiguration(); - rm = new JavaProcess(ResourceManager.class); + + List addClasspath = new LinkedList<>(); + addClasspath.add("../hadoop-yarn-server-timelineservice/target/classes"); + rm = new JavaProcess(ResourceManager.class, addClasspath); + rmAddress = getRMWebAppURLWithScheme(conf); + waitWebAppRunning(rmAddress, RM_WEB_SERVICE_PATH); + router = new JavaProcess(Router.class); + routerAddress = getRouterWebAppURLWithScheme(conf); + waitWebAppRunning(routerAddress, RM_WEB_SERVICE_PATH); + nm = new JavaProcess(NodeManager.class); - - // The tests cannot start if all the service are not up and running. - waitWebAppRunning(WebAppUtils.getRMWebAppURLWithScheme(conf), - RMWSConsts.RM_WEB_SERVICE_PATH); - - waitWebAppRunning(WebAppUtils.getRouterWebAppURLWithScheme(conf), - RMWSConsts.RM_WEB_SERVICE_PATH); - - waitWebAppRunning("http://" + WebAppUtils.getNMWebAppURLWithoutScheme(conf), - "/ws/v1/node"); + nmAddress = "http://" + getNMWebAppURLWithoutScheme(conf); + waitWebAppRunning(nmAddress, "/ws/v1/node"); } @AfterClass public static void stop() throws Exception { - nm.stop(); - router.stop(); - rm.stop(); + if (nm != null) { + nm.stop(); + } + if (router != null) { + router.stop(); + } + if (rm != null) { + rm.stop(); + } } /** * Performs 2 GET calls one to RM and the one to Router. In positive case, it * returns the 2 answers in a list. */ - private static List performGetCalls(String path, - final Class returnType, String queryName, String queryValue) - throws IOException, InterruptedException { + private static List performGetCalls(final String path, + final Class returnType, final String queryName, + final String queryValue) throws IOException, InterruptedException { Client clientToRouter = Client.create(); - WebResource toRouter = clientToRouter - .resource(WebAppUtils.getRouterWebAppURLWithScheme(conf)).path(path); + WebResource toRouter = clientToRouter.resource(routerAddress).path(path); Client clientToRM = Client.create(); - WebResource toRM = clientToRM - .resource(WebAppUtils.getRMWebAppURLWithScheme(conf)).path(path); + WebResource toRM = clientToRM.resource(rmAddress).path(path); final Builder toRouterBuilder; final Builder toRMBuilder; if (queryValue != null && queryName != null) { - toRouterBuilder = toRouter.queryParam(queryName, queryValue) - .accept(MediaType.APPLICATION_XML); - toRMBuilder = toRM.queryParam(queryName, queryValue) - .accept(MediaType.APPLICATION_XML); + toRouterBuilder = toRouter + .queryParam(queryName, queryValue) + .accept(APPLICATION_XML); + toRMBuilder = toRM + .queryParam(queryName, queryValue) + .accept(APPLICATION_XML); } else { - toRouterBuilder = toRouter.accept(MediaType.APPLICATION_XML); - toRMBuilder = toRM.accept(MediaType.APPLICATION_XML); + toRouterBuilder = toRouter.accept(APPLICATION_XML); + toRMBuilder = toRM.accept(APPLICATION_XML); } return UserGroupInformation.createRemoteUser(userName) .doAs(new PrivilegedExceptionAction>() { @Override public List run() throws Exception { - ClientResponse response = toRouterBuilder.get(ClientResponse.class); + ClientResponse response = + toRouterBuilder.get(ClientResponse.class); ClientResponse response2 = toRMBuilder.get(ClientResponse.class); - if (response.getStatus() == STATUS_OK - && response2.getStatus() == STATUS_OK) { - List responses = new ArrayList(); - responses.add(response.getEntity(returnType)); - responses.add(response2.getEntity(returnType)); - return responses; - } else { - Assert.fail(); - } - return null; + assertEquals(SC_OK, response.getStatus()); + assertEquals(SC_OK, response2.getStatus()); + List responses = new ArrayList<>(); + responses.add(response.getEntity(returnType)); + responses.add(response2.getEntity(returnType)); + return responses; } }); } @@ -208,22 +283,20 @@ private static ClientResponse performCall(final String webAddress, public ClientResponse run() throws Exception { Client clientToRouter = Client.create(); WebResource toRouter = clientToRouter - .resource(WebAppUtils.getRouterWebAppURLWithScheme(conf)) + .resource(routerAddress) .path(webAddress); - WebResource toRouterWR; + WebResource toRouterWR = toRouter; if (queryKey != null && queryValue != null) { - toRouterWR = toRouter.queryParam(queryKey, queryValue); - } else { - toRouterWR = toRouter; + toRouterWR = toRouterWR.queryParam(queryKey, queryValue); } Builder builder = null; if (context != null) { - builder = toRouterWR.entity(context, MediaType.APPLICATION_JSON); - builder = builder.accept(MediaType.APPLICATION_JSON); + builder = toRouterWR.entity(context, APPLICATION_JSON); + builder = builder.accept(APPLICATION_JSON); } else { - builder = toRouter.accept(MediaType.APPLICATION_JSON); + builder = toRouter.accept(APPLICATION_JSON); } ClientResponse response = null; @@ -251,19 +324,20 @@ public ClientResponse run() throws Exception { * This test validates the correctness of {@link RMWebServiceProtocol#get()} * inside Router. */ - @Test(timeout = 1000) - public void testInfoXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testInfoXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH, ClusterInfo.class, null, null); + RM_WEB_SERVICE_PATH, ClusterInfo.class, null, null); ClusterInfo routerResponse = responses.get(0); ClusterInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getRMVersion(), + assertEquals( + rmResponse.getRMVersion(), routerResponse.getRMVersion()); } @@ -271,20 +345,20 @@ public void testInfoXML() throws JSONException, Exception { * This test validates the correctness of * {@link RMWebServiceProtocol#getClusterInfo()} inside Router. */ - @Test(timeout = 1000) - public void testClusterInfoXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testClusterInfoXML() throws Exception { - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.INFO, - ClusterInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + INFO, ClusterInfo.class, null, null); ClusterInfo routerResponse = responses.get(0); ClusterInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getRMVersion(), + assertEquals( + rmResponse.getRMVersion(), routerResponse.getRMVersion()); } @@ -292,41 +366,41 @@ public void testClusterInfoXML() throws JSONException, Exception { * This test validates the correctness of * {@link RMWebServiceProtocol#getClusterMetricsInfo()} inside Router. */ - @Test(timeout = 1000) - public void testMetricsInfoXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testMetricsInfoXML() throws Exception { - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.METRICS, - ClusterMetricsInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + METRICS, ClusterMetricsInfo.class, null, null); ClusterMetricsInfo routerResponse = responses.get(0); ClusterMetricsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getActiveNodes(), + assertEquals( + rmResponse.getActiveNodes(), routerResponse.getActiveNodes()); } - /** + /* * This test validates the correctness of * {@link RMWebServiceProtocol#getSchedulerInfo()} inside Router. */ - @Test(timeout = 1000) - public void testSchedulerInfoXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testSchedulerInfoXML() throws Exception { - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER, - SchedulerTypeInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + SCHEDULER, SchedulerTypeInfo.class, null, null); SchedulerTypeInfo routerResponse = responses.get(0); SchedulerTypeInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getSchedulerInfo().getSchedulerType(), + assertEquals( + rmResponse.getSchedulerInfo().getSchedulerType(), routerResponse.getSchedulerInfo().getSchedulerType()); } @@ -334,20 +408,41 @@ public void testSchedulerInfoXML() throws JSONException, Exception { * This test validates the correctness of * {@link RMWebServiceProtocol#getNodes()} inside Router. */ - @Test(timeout = 1000) - public void testNodesXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testNodesEmptyXML() throws Exception { - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, - NodesInfo.class, RMWSConsts.STATES, "LOST"); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, null, null); NodesInfo routerResponse = responses.get(0); NodesInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getNodes().size(), + assertEquals( + rmResponse.getNodes().size(), + routerResponse.getNodes().size()); + } + + /** + * This test validates the correctness of + * {@link RMWebServiceProtocol#getNodes()} inside Router. + */ + @Test(timeout = 2000) + public void testNodesXML() throws Exception { + + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, STATES, "LOST"); + + NodesInfo routerResponse = responses.get(0); + NodesInfo rmResponse = responses.get(1); + + assertNotNull(routerResponse); + assertNotNull(rmResponse); + + assertEquals( + rmResponse.getNodes().size(), routerResponse.getNodes().size()); } @@ -355,80 +450,83 @@ public void testNodesXML() throws JSONException, Exception { * This test validates the correctness of * {@link RMWebServiceProtocol#getNode()} inside Router. */ - @Test(timeout = 1000) - public void testNodeXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testNodeXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" + getNodeId(), + RM_WEB_SERVICE_PATH + format(NODES_NODEID, getNodeId()), NodeInfo.class, null, null); NodeInfo routerResponse = responses.get(0); NodeInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getVersion(), routerResponse.getVersion()); + assertEquals( + rmResponse.getVersion(), + routerResponse.getVersion()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getActivities()} inside Router. */ - @Test(timeout = 1000) - public void testActiviesXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testActiviesXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_ACTIVITIES, + RM_WEB_SERVICE_PATH + SCHEDULER_ACTIVITIES, ActivitiesInfo.class, null, null); ActivitiesInfo routerResponse = responses.get(0); ActivitiesInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppActivities()} inside Router. */ - @Test(timeout = 1000) - public void testAppActivitiesXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppActivitiesXML() throws Exception { String appId = submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_APP_ACTIVITIES, - AppActivitiesInfo.class, RMWSConsts.APP_ID, appId); + RM_WEB_SERVICE_PATH + SCHEDULER_APP_ACTIVITIES, + AppActivitiesInfo.class, APP_ID, appId); AppActivitiesInfo routerResponse = responses.get(0); AppActivitiesInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppStatistics()} inside Router. */ - @Test(timeout = 1000) - public void testAppStatisticsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppStatisticsXML() throws Exception { submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APP_STATISTICS, - ApplicationStatisticsInfo.class, RMWSConsts.STATES, "RUNNING"); + RM_WEB_SERVICE_PATH + APP_STATISTICS, + ApplicationStatisticsInfo.class, STATES, "RUNNING"); ApplicationStatisticsInfo routerResponse = responses.get(0); ApplicationStatisticsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getStatItems().size(), + assertEquals( + rmResponse.getStatItems().size(), routerResponse.getStatItems().size()); } @@ -436,810 +534,818 @@ public void testAppStatisticsXML() throws JSONException, Exception { * This test validates the correctness of * {@link RMWebServiceProtocol#dumpSchedulerLogs()} inside Router. */ - @Test(timeout = 1000) - public void testDumpSchedulerLogsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testDumpSchedulerLogsXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_LOGS, - null, null, null, HTTPMethods.PUT); + performCall(RM_WEB_SERVICE_PATH + SCHEDULER_LOGS, + null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + SCHEDULER_LOGS, TIME, "1", null, POST); - ClientResponse response = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_LOGS, - RMWSConsts.TIME, "1", null, HTTPMethods.POST); - - if (response.getStatus() != HttpServletResponse.SC_NO_CONTENT) { - Assert.fail(); - } + assertEquals(SC_NO_CONTENT, response.getStatus()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#createNewApplication()} inside Router. */ - @Test(timeout = 1000) - public void testNewApplicationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testNewApplicationXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS_NEW_APPLICATION, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION, null, + null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS_NEW_APPLICATION, null, - null, null, HTTPMethods.POST); - - if (response.getStatus() == STATUS_OK) { - NewApplication ci = response.getEntity(NewApplication.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION, null, + null, null, POST); + assertEquals(SC_OK, response.getStatus()); + NewApplication ci = response.getEntity(NewApplication.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#submitApplication()} inside Router. */ - @Test(timeout = 1000) - public void testSubmitApplicationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testSubmitApplicationXML() throws Exception { // Test with a wrong HTTP method - ClientResponse badResponse = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS, null, - null, null, HTTPMethods.PUT); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + APPS, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); context.setApplicationId(getNewApplicationId().getApplicationId()); - ClientResponse response = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS, null, - null, context, HTTPMethods.POST); - - if (response.getStatus() == STATUS_ACCEPTED) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + APPS, null, null, context, POST); + assertEquals(SC_ACCEPTED, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getApps()} inside Router. */ - @Test(timeout = 1000) - public void testAppsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppsXML() throws Exception { submitApplication(); - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS, - AppsInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + APPS, AppsInfo.class, null, null); AppsInfo routerResponse = responses.get(0); AppsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getApps().size(), - rmResponse.getApps().size()); + assertEquals( + rmResponse.getApps().size(), + routerResponse.getApps().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getApp()} inside Router. */ - @Test(timeout = 1000) - public void testAppXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppXML() throws Exception { String appId = submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId, + RM_WEB_SERVICE_PATH + format(APPS_APPID, appId), AppInfo.class, null, null); AppInfo routerResponse = responses.get(0); AppInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getAMHostHttpAddress(), - rmResponse.getAMHostHttpAddress()); + assertEquals( + rmResponse.getAMHostHttpAddress(), + routerResponse.getAMHostHttpAddress()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppAttempts()} inside Router. */ - @Test(timeout = 1000) - public void testAppAttemptXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppAttemptXML() throws Exception { String appId = submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.ATTEMPTS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_APPATTEMPTS, appId), AppAttemptsInfo.class, null, null); AppAttemptsInfo routerResponse = responses.get(0); AppAttemptsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getAttempts().size(), - rmResponse.getAttempts().size()); + assertEquals( + rmResponse.getAttempts().size(), + routerResponse.getAttempts().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppState()} inside Router. */ - @Test(timeout = 1000) - public void testAppStateXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppStateXML() throws Exception { String appId = submitApplication(); - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" - + appId + "/" + RMWSConsts.STATE, AppState.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId), + AppState.class, null, null); AppState routerResponse = responses.get(0); AppState rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getState(), rmResponse.getState()); + assertEquals( + rmResponse.getState(), + routerResponse.getState()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateAppState()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateAppStateXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateAppStateXML() throws Exception { String appId = submitApplication(); + String pathApp = + RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId); // Test with a wrong HTTP method - ClientResponse badResponse = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.STATE, null, null, - null, HTTPMethods.POST); + ClientResponse badResponse = performCall( + pathApp, null, null, null, POST); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - AppState appState = new AppState("KILLED"); - ClientResponse response = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.STATE, null, null, - appState, HTTPMethods.PUT); + ClientResponse response = performCall( + pathApp, null, null, appState, PUT); - if (response.getStatus() == STATUS_ACCEPTED) { - AppState ci = response.getEntity(AppState.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_ACCEPTED, response.getStatus()); + AppState ci = response.getEntity(AppState.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppPriority()} inside Router. */ - @Test(timeout = 1000) - public void testAppPriorityXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppPriorityXML() throws Exception { String appId = submitApplication(); - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" - + appId + "/" + RMWSConsts.PRIORITY, AppPriority.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + AppPriority.class, null, null); AppPriority routerResponse = responses.get(0); AppPriority rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getPriority(), rmResponse.getPriority()); + assertEquals(rmResponse.getPriority(), routerResponse.getPriority()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateApplicationPriority()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateAppPriorityXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateAppPriorityXML() throws Exception { String appId = submitApplication(); // Test with a wrong HTTP method - ClientResponse badResponse = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.PRIORITY, null, null, - null, HTTPMethods.POST); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + null, null, null, POST); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - AppPriority appPriority = new AppPriority(1); - ClientResponse response = - performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.PRIORITY, - null, null, appPriority, HTTPMethods.PUT); + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + null, null, appPriority, PUT); - if (response.getStatus() == STATUS_OK) { - AppPriority ci = response.getEntity(AppPriority.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + AppPriority ci = response.getEntity(AppPriority.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppQueue()} inside Router. */ - @Test(timeout = 1000) - public void testAppQueueXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppQueueXML() throws Exception { String appId = submitApplication(); - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" - + appId + "/" + RMWSConsts.QUEUE, AppQueue.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + AppQueue.class, null, null); AppQueue routerResponse = responses.get(0); AppQueue rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getQueue(), rmResponse.getQueue()); + assertEquals(rmResponse.getQueue(), routerResponse.getQueue()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateAppQueue()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateAppQueueXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateAppQueueXML() throws Exception { String appId = submitApplication(); // Test with a wrong HTTP method - ClientResponse badResponse = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.QUEUE, null, null, - null, HTTPMethods.POST); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + null, null, null, POST); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - AppQueue appQueue = new AppQueue("default"); - ClientResponse response = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.QUEUE, null, null, - appQueue, HTTPMethods.PUT); + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + null, null, appQueue, PUT); - if (response.getStatus() == STATUS_OK) { - AppQueue ci = response.getEntity(AppQueue.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + AppQueue ci = response.getEntity(AppQueue.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppTimeouts()} inside Router. */ - @Test(timeout = 1000) - public void testAppTimeoutsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppTimeoutsXML() throws Exception { String appId = submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.TIMEOUTS, + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId), AppTimeoutsInfo.class, null, null); AppTimeoutsInfo routerResponse = responses.get(0); AppTimeoutsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getAppTimeouts().size(), - rmResponse.getAppTimeouts().size()); + assertEquals( + rmResponse.getAppTimeouts().size(), + routerResponse.getAppTimeouts().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppTimeout()} inside Router. */ - @Test(timeout = 1000) - public void testAppTimeoutXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppTimeoutXML() throws Exception { String appId = submitApplication(); - + String pathApp = RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.TIMEOUTS + "/" + "LIFETIME", - AppTimeoutInfo.class, null, null); + pathApp + "/" + "LIFETIME", AppTimeoutInfo.class, null, null); AppTimeoutInfo routerResponse = responses.get(0); AppTimeoutInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getExpireTime(), rmResponse.getExpireTime()); + assertEquals( + rmResponse.getExpireTime(), + routerResponse.getExpireTime()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateApplicationTimeout()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateAppTimeoutsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateAppTimeoutsXML() throws Exception { String appId = submitApplication(); // Test with a wrong HTTP method - ClientResponse badResponse = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.TIMEOUT, null, null, - null, HTTPMethods.POST); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUT, appId), + null, null, null, POST); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); - // Test with the correct HTTP method - - // Create a bad request + // Test with a bad request AppTimeoutInfo appTimeoutInfo = new AppTimeoutInfo(); - ClientResponse response = - performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.TIMEOUT, - null, null, appTimeoutInfo, HTTPMethods.PUT); + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUT, appId), + null, null, appTimeoutInfo, PUT); - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#createNewReservation()} inside Router. */ - @Test(timeout = 1000) - public void testNewReservationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testNewReservationXML() throws Exception { // Test with a wrong HTTP method - ClientResponse badResponse = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_NEW, - null, null, null, HTTPMethods.PUT); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + RESERVATION_NEW, + null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + RESERVATION_NEW, + null, null, null, POST); - ClientResponse response = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_NEW, - null, null, null, HTTPMethods.POST); - - if (response.getStatus() == STATUS_OK) { - NewReservation ci = response.getEntity(NewReservation.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + NewReservation ci = response.getEntity(NewReservation.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#submitReservation()} inside Router. */ - @Test(timeout = 1000) - public void testSubmitReservationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testSubmitReservationXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_SUBMIT, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + RESERVATION_SUBMIT, null, + null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - ReservationSubmissionRequestInfo context = new ReservationSubmissionRequestInfo(); context.setReservationId(getNewReservationId().getReservationId()); // ReservationDefinition is null ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_SUBMIT, null, - null, context, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + RESERVATION_SUBMIT, null, null, context, POST); - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateReservation()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateReservationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateReservationXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_UPDATE, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + RESERVATION_UPDATE, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - String reservationId = getNewReservationId().getReservationId(); ReservationUpdateRequestInfo context = new ReservationUpdateRequestInfo(); context.setReservationId(reservationId); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_UPDATE, null, - null, context, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + RESERVATION_UPDATE, null, null, context, POST); - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#deleteReservation()} inside Router. */ - @Test(timeout = 1000) - public void testDeleteReservationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testDeleteReservationXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_DELETE, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + RESERVATION_DELETE, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - String reservationId = getNewReservationId().getReservationId(); ReservationDeleteRequestInfo context = new ReservationDeleteRequestInfo(); context.setReservationId(reservationId); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_DELETE, null, - null, context, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + RESERVATION_DELETE, null, null, context, POST); - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getNodeToLabels()} inside Router. */ - @Test(timeout = 1000) - public void testGetNodeToLabelsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetNodeToLabelsXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.GET_NODE_TO_LABELS, + RM_WEB_SERVICE_PATH + GET_NODE_TO_LABELS, NodeToLabelsInfo.class, null, null); NodeToLabelsInfo routerResponse = responses.get(0); NodeToLabelsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getNodeToLabels().size(), - rmResponse.getNodeToLabels().size()); + assertEquals( + rmResponse.getNodeToLabels().size(), + routerResponse.getNodeToLabels().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getClusterNodeLabels()} inside Router. */ - @Test(timeout = 1000) - public void testGetClusterNodeLabelsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetClusterNodeLabelsXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.GET_NODE_LABELS, + RM_WEB_SERVICE_PATH + GET_NODE_LABELS, NodeLabelsInfo.class, null, null); NodeLabelsInfo routerResponse = responses.get(0); NodeLabelsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getNodeLabels().size(), - rmResponse.getNodeLabels().size()); + assertEquals( + rmResponse.getNodeLabels().size(), + routerResponse.getNodeLabels().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getLabelsOnNode()} inside Router. */ - @Test(timeout = 1000) - public void testGetLabelsOnNodeXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetLabelsOnNodeXML() throws Exception { - List responses = - performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" - + getNodeId() + "/" + RMWSConsts.GET_LABELS, - NodeLabelsInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + format(NODES_NODEID_GETLABELS, getNodeId()), + NodeLabelsInfo.class, null, null); NodeLabelsInfo routerResponse = responses.get(0); NodeLabelsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getNodeLabels().size(), - rmResponse.getNodeLabels().size()); + assertEquals( + rmResponse.getNodeLabels().size(), + routerResponse.getNodeLabels().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getLabelsToNodes()} inside Router. */ - @Test(timeout = 1000) - public void testGetLabelsMappingXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetLabelsMappingEmptyXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, + RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, LabelsToNodesInfo.class, null, null); LabelsToNodesInfo routerResponse = responses.get(0); LabelsToNodesInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getLabelsToNodes().size(), - rmResponse.getLabelsToNodes().size()); + assertEquals( + rmResponse.getLabelsToNodes().size(), + routerResponse.getLabelsToNodes().size()); + } + + /** + * This test validates the correctness of + * {@link RMWebServiceProtocol#getLabelsToNodes()} inside Router. + */ + @Test(timeout = 2000) + public void testGetLabelsMappingXML() throws Exception { + + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, + LabelsToNodesInfo.class, LABELS, "label1"); + + LabelsToNodesInfo routerResponse = responses.get(0); + LabelsToNodesInfo rmResponse = responses.get(1); + + assertNotNull(routerResponse); + assertNotNull(rmResponse); + + assertEquals( + rmResponse.getLabelsToNodes().size(), + routerResponse.getLabelsToNodes().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#addToClusterNodeLabels()} inside Router. */ - @Test(timeout = 1000) - public void testAddToClusterNodeLabelsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAddToClusterNodeLabelsXML() throws Exception { // Test with a wrong HTTP method - ClientResponse badResponse = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.ADD_NODE_LABELS, - null, null, null, HTTPMethods.PUT); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + ADD_NODE_LABELS, + null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - List nodeLabels = new ArrayList(); + List nodeLabels = new ArrayList<>(); nodeLabels.add(NodeLabel.newInstance("default")); NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels); - ClientResponse response = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.ADD_NODE_LABELS, - null, null, context, HTTPMethods.POST); + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + ADD_NODE_LABELS, null, null, context, POST); - if (response.getStatus() == STATUS_OK) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#removeFromCluserNodeLabels()} inside Router. */ - @Test(timeout = 1000) + @Test(timeout = 2000) public void testRemoveFromCluserNodeLabelsXML() - throws JSONException, Exception { + throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REMOVE_NODE_LABELS, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + REMOVE_NODE_LABELS, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - addNodeLabel(); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REMOVE_NODE_LABELS, - RMWSConsts.LABELS, "default", null, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + REMOVE_NODE_LABELS, + LABELS, "default", null, POST); - if (response.getStatus() == STATUS_OK) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#replaceLabelsOnNodes()} inside Router. */ - @Test(timeout = 1000) - public void testReplaceLabelsOnNodesXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testReplaceLabelsOnNodesXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REPLACE_NODE_TO_LABELS, - null, null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + REPLACE_NODE_TO_LABELS, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - addNodeLabel(); NodeToLabelsEntryList context = new NodeToLabelsEntryList(); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REPLACE_NODE_TO_LABELS, - null, null, context, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + REPLACE_NODE_TO_LABELS, + null, null, context, POST); - if (response.getStatus() == STATUS_OK) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#replaceLabelsOnNode()} inside Router. */ - @Test(timeout = 1000) - public void testReplaceLabelsOnNodeXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testReplaceLabelsOnNodeXML() throws Exception { // Test with a wrong HTTP method - ClientResponse badResponse = - performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" - + getNodeId() + "/replace-labels", - null, null, null, HTTPMethods.PUT); + String pathNode = RM_WEB_SERVICE_PATH + + format(NODES_NODEID_REPLACE_LABELS, getNodeId()); + ClientResponse badResponse = performCall( + pathNode, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - addNodeLabel(); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" + getNodeId() - + "/replace-labels", - RMWSConsts.LABELS, "default", null, HTTPMethods.POST); + pathNode, LABELS, "default", null, POST); - if (response.getStatus() == STATUS_OK) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of {@link WebServices#getAppAttempt} * inside Router. */ - @Test(timeout = 1000) - public void testGetAppAttemptXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetAppAttemptXML() throws Exception { String appId = submitApplication(); - + String pathAttempts = RM_WEB_SERVICE_PATH + format( + APPS_APPID_APPATTEMPTS_APPATTEMPTID, appId, getAppAttempt(appId)); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.APPATTEMPTS + "/" + getAppAttempt(appId), - AppAttemptInfo.class, null, null); + pathAttempts, AppAttemptInfo.class, null, null); AppAttemptInfo routerResponse = responses.get(0); AppAttemptInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getAppAttemptId(), - rmResponse.getAppAttemptId()); + assertEquals( + rmResponse.getAppAttemptId(), + routerResponse.getAppAttemptId()); } /** * This test validates the correctness of {@link WebServices#getContainers} * inside Router. */ - @Test(timeout = 1000) - public void testGetContainersXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetContainersXML() throws Exception { String appId = submitApplication(); - - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" - + appId + "/" + RMWSConsts.APPATTEMPTS + "/" + getAppAttempt(appId) - + "/" + RMWSConsts.CONTAINERS, ContainersInfo.class, null, null); + String pathAttempts = RM_WEB_SERVICE_PATH + format( + APPS_APPID_APPATTEMPTS_APPATTEMPTID_CONTAINERS, + appId, getAppAttempt(appId)); + List responses = performGetCalls( + pathAttempts, ContainersInfo.class, null, null); ContainersInfo routerResponse = responses.get(0); ContainersInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getContainers().size(), - rmResponse.getContainers().size()); + assertEquals( + rmResponse.getContainers().size(), + routerResponse.getContainers().size()); + } + + @Test(timeout = 60000) + public void testGetAppsMultiThread() throws Exception { + final int iniNumApps = getNumApps(); + + // This submits an application + testGetContainersXML(); + // This submits an application + testAppsXML(); + + // Wait at most 10 seconds until we see all the applications + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + // Check if we have the 2 apps we submitted + return getNumApps() == iniNumApps + 2; + } catch (Exception e) { + fail(); + } + return false; + } + }, 100, 10 * 1000); + + // Multithreaded getApps() + ExecutorService threadpool = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("REST Tester #%d") + .build()); + CompletionService svc = new ExecutorCompletionService<>(threadpool); + try { + // Submit a bunch of operations concurrently + for (int i = 0; i < NUM_THREADS_TESTS; i++) { + svc.submit(new Callable() { + @Override + public Void call() throws Exception { + assertEquals(iniNumApps + 2, getNumApps()); + return null; + } + }); + } + } finally { + threadpool.shutdown(); + } + + assertEquals(iniNumApps + 2, getNumApps()); + } + + /** + * Get the number of applications in the system. + * @return Number of applications in the system + * @throws Exception If we cannot get the applications. + */ + private int getNumApps() throws Exception { + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + APPS, AppsInfo.class, null, null); + AppsInfo routerResponse = responses.get(0); + AppsInfo rmResponse = responses.get(1); + assertEquals(rmResponse.getApps().size(), routerResponse.getApps().size()); + return rmResponse.getApps().size(); } private String getNodeId() { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES); + WebResource toRM = clientToRM.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + NODES); ClientResponse response = - toRM.accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + toRM.accept(APPLICATION_XML).get(ClientResponse.class); NodesInfo ci = response.getEntity(NodesInfo.class); return ci.getNodes().get(0).getNodeId(); } private NewApplication getNewApplicationId() { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)).path( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS_NEW_APPLICATION); + WebResource toRM = clientToRM.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION); ClientResponse response = - toRM.accept(MediaType.APPLICATION_XML).post(ClientResponse.class); + toRM.accept(APPLICATION_XML).post(ClientResponse.class); return response.getEntity(NewApplication.class); } @@ -1250,47 +1356,61 @@ private String submitApplication() { context.setApplicationId(appId); Client clientToRouter = Client.create(); - WebResource toRM = - clientToRouter.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS); - toRM.entity(context, MediaType.APPLICATION_XML) - .accept(MediaType.APPLICATION_XML).post(ClientResponse.class); + WebResource toRM = clientToRouter.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + APPS); + toRM.entity(context, APPLICATION_XML) + .accept(APPLICATION_XML) + .post(ClientResponse.class); return appId; } private NewReservation getNewReservationId() { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_NEW); - ClientResponse response = - toRM.accept(MediaType.APPLICATION_XML).post(ClientResponse.class); + WebResource toRM = clientToRM.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + RESERVATION_NEW); + ClientResponse response = toRM. + accept(APPLICATION_XML) + .post(ClientResponse.class); return response.getEntity(NewReservation.class); } private String addNodeLabel() { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.ADD_NODE_LABELS); - List nodeLabels = new ArrayList(); + WebResource toRM = clientToRM.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + ADD_NODE_LABELS); + List nodeLabels = new ArrayList<>(); nodeLabels.add(NodeLabel.newInstance("default")); NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels); - ClientResponse response = toRM.entity(context, MediaType.APPLICATION_XML) - .accept(MediaType.APPLICATION_XML).post(ClientResponse.class); + ClientResponse response = toRM + .entity(context, APPLICATION_XML) + .accept(APPLICATION_XML) + .post(ClientResponse.class); return response.getEntity(String.class); } private String getAppAttempt(String appId) { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId - + "/" + RMWSConsts.ATTEMPTS); - ClientResponse response = - toRM.accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + String pathAppAttempt = + RM_WEB_SERVICE_PATH + format(APPS_APPID_APPATTEMPTS, appId); + WebResource toRM = clientToRM.resource(rmAddress) + .path(pathAppAttempt); + ClientResponse response = toRM + .accept(APPLICATION_XML) + .get(ClientResponse.class); AppAttemptsInfo ci = response.getEntity(AppAttemptsInfo.class); return ci.getAttempts().get(0).getAppAttemptId(); } -} + /** + * Convert format using {name} (HTTP base) into %s (Java based). + * @param format Initial format using {}. + * @param args Arguments for the format. + * @return New format using %s. + */ + private static String format(String format, Object... args) { + Pattern p = Pattern.compile("\\{.*?}"); + Matcher m = p.matcher(format); + String newFormat = m.replaceAll("%s"); + return String.format(newFormat, args); + } +} \ No newline at end of file