diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 121e5344fdb..76050d067f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -74,6 +75,8 @@ public class Router extends CompositeService { */ public static final int SHUTDOWN_HOOK_PRIORITY = 30; + private static final String METRICS_NAME = "Router"; + public Router() { super(Router.class.getName()); } @@ -95,6 +98,8 @@ public class Router extends CompositeService { webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.ROUTER_BIND_HOST, WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf)); + // Metrics + DefaultMetricsSystem.initialize(METRICS_NAME); super.serviceInit(conf); } @@ -118,6 +123,7 @@ public class Router extends CompositeService { return; } super.serviceStop(); + DefaultMetricsSystem.shutdown(); } protected void shutDown() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index abd8ca6ec10..72ed02fd9a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -129,7 +129,9 @@ public class DefaultRequestInterceptorREST public NodesInfo getNodes(String states) { // states will be part of additionalParam Map additionalParam = new HashMap(); - additionalParam.put(RMWSConsts.STATES, new String[] {states}); + if (states != null && !states.isEmpty()) { + additionalParam.put(RMWSConsts.STATES, new String[] {states}); + } return RouterWebServiceUtil.genericForward(webAppAddress, null, NodesInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, null, @@ -226,9 +228,11 @@ public class DefaultRequestInterceptorREST public LabelsToNodesInfo getLabelsToNodes(Set labels) throws IOException { // labels will be part of additionalParam - Map additionalParam = new HashMap(); - additionalParam.put(RMWSConsts.LABELS, - labels.toArray(new String[labels.size()])); + Map additionalParam = new HashMap<>(); + if (labels != null && !labels.isEmpty()) { + additionalParam.put(RMWSConsts.LABELS, + labels.toArray(new String[labels.size()])); + } return RouterWebServiceUtil.genericForward(webAppAddress, null, LabelsToNodesInfo.class, HTTPMethods.GET, RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, null, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 5adcc626042..6e676348490 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; +import java.security.Principal; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -26,12 +27,15 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -48,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -121,29 +126,33 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public void init(String user) { federationFacade = FederationStateStoreFacade.getInstance(); - rand = new Random(System.currentTimeMillis()); + rand = new Random(); final Configuration conf = this.getConf(); try { - policyFacade = new RouterPolicyFacade(conf, federationFacade, - this.federationFacade.getSubClusterResolver(), null); + SubClusterResolver subClusterResolver = + this.federationFacade.getSubClusterResolver(); + policyFacade = new RouterPolicyFacade( + conf, federationFacade, subClusterResolver, null); } catch (FederationPolicyInitializationException e) { - LOG.error(e.getMessage()); + throw new YarnRuntimeException(e); } - numSubmitRetries = - conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, - YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); + numSubmitRetries = conf.getInt( + YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); - interceptors = new HashMap(); + interceptors = new HashMap<>(); routerMetrics = RouterMetrics.getMetrics(); - threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat("FederationInterceptorREST #%d").build()); + threadpool = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("FederationInterceptorREST #%d") + .build()); - returnPartialReport = - conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); + returnPartialReport = conf.getBoolean( + YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED); } private SubClusterId getRandomActiveSubCluster( @@ -156,8 +165,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } List list = new ArrayList<>(activeSubclusters.keySet()); - FederationPolicyUtils.validateSubClusterAvailability(list, - blackListSubClusters); + FederationPolicyUtils.validateSubClusterAvailability( + list, blackListSubClusters); if (blackListSubClusters != null) { @@ -176,8 +185,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { if (interceptors.containsKey(subClusterId)) { return interceptors.get(subClusterId); } else { - LOG.error("The interceptor for SubCluster " + subClusterId - + " does not exist in the cache."); + LOG.error( + "The interceptor for SubCluster {} does not exist in the cache.", + subClusterId); return null; } } @@ -187,9 +197,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { final Configuration conf = this.getConf(); - String interceptorClassName = - conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); + String interceptorClassName = conf.get( + YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS); DefaultRequestInterceptorREST interceptorInstance = null; try { Class interceptorClass = conf.getClassByName(interceptorClassName); @@ -210,7 +220,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { e); } - interceptorInstance.setWebAppAddress(webAppAddress); + interceptorInstance.setWebAppAddress("http://" + webAppAddress); interceptorInstance.setSubClusterId(subClusterId); interceptors.put(subClusterId, interceptorInstance); return interceptorInstance; @@ -272,8 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { .entity(e.getLocalizedMessage()).build(); } - LOG.debug( - "getNewApplication try #" + i + " on SubCluster " + subClusterId); + LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId, @@ -282,11 +291,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { response = interceptor.createNewApplication(hsr); } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster " - + subClusterId.getId(), e); + LOG.warn("Unable to create a new ApplicationId in SubCluster {}", + subClusterId.getId(), e); } - if (response != null && response.getStatus() == 200) { + if (response != null && + response.getStatus() == HttpServletResponse.SC_OK) { long stopTime = clock.getTime(); routerMetrics.succeededAppsCreated(stopTime - startTime); @@ -302,7 +312,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String errMsg = "Fail to create a new application."; LOG.error(errMsg); routerMetrics.incrAppsFailedCreated(); - return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build(); + return Response + .status(Status.INTERNAL_SERVER_ERROR) + .entity(errMsg) + .build(); } /** @@ -381,7 +394,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Missing ApplicationSubmissionContextInfo or " + "applicationSubmissionContex information."; - return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); + return Response + .status(Status.BAD_REQUEST) + .entity(errMsg) + .build(); } ApplicationId applicationId = null; @@ -389,7 +405,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { applicationId = ApplicationId.fromString(newApp.getApplicationId()); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -405,11 +423,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterId = policyFacade.getHomeSubcluster(context, blacklist); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e.getLocalizedMessage()) + .build(); } - LOG.info("submitApplication appId" + applicationId + " try #" + i - + " on SubCluster " + subClusterId); + LOG.info("submitApplication appId {} try #{} on SubCluster {}", + applicationId, i, subClusterId); ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); @@ -424,8 +444,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Unable to insert the ApplicationId " + applicationId + " into the FederationStateStore"; - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(errMsg + " " + e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg + " " + e.getLocalizedMessage()) + .build(); } } else { try { @@ -441,15 +463,19 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { federationFacade.getApplicationHomeSubCluster(applicationId); } catch (YarnException e1) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e1.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e1.getLocalizedMessage()) + .build(); } if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application " + applicationId - + " already submitted on SubCluster " + subClusterId); + LOG.info("Application {} already submitted on SubCluster {}", + applicationId, subClusterId); } else { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg) + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg) .build(); } } @@ -460,8 +486,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(e.getLocalizedMessage()) + .build(); } Response response = null; @@ -470,13 +498,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp, hsr); } catch (Exception e) { - LOG.warn("Unable to submit the application " + applicationId - + "to SubCluster " + subClusterId.getId(), e); + LOG.warn("Unable to submit the application {} to SubCluster {}", + applicationId, subClusterId.getId(), e); } - if (response != null && response.getStatus() == 202) { - LOG.info("Application " + context.getApplicationName() + " with appId " - + applicationId + " submitted on " + subClusterId); + if (response != null && + response.getStatus() == HttpServletResponse.SC_ACCEPTED) { + LOG.info("Application {} with appId {} submitted on {}", + context.getApplicationName(), applicationId, subClusterId); long stopTime = clock.getTime(); routerMetrics.succeededAppsSubmitted(stopTime - startTime); @@ -493,7 +522,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String errMsg = "Application " + newApp.getApplicationName() + " with appId " + applicationId + " failed to be submitted."; LOG.error(errMsg); - return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build(); + return Response + .status(Status.SERVICE_UNAVAILABLE) + .entity(errMsg) + .build(); } /** @@ -541,9 +573,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { return null; } - AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId, - subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId, - unselectedFields); + DefaultRequestInterceptorREST interceptor = + getOrCreateInterceptorForSubCluster( + subClusterId, subClusterInfo.getRMWebServiceAddress()); + AppInfo response = interceptor.getApp(hsr, appId, unselectedFields); long stopTime = clock.getTime(); routerMetrics.succeededAppsRetrieved(stopTime - startTime); @@ -579,7 +612,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { applicationId = ApplicationId.fromString(appId); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedKilled(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -591,7 +626,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { routerMetrics.incrAppsFailedKilled(); - return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + return Response + .status(Status.BAD_REQUEST) + .entity(e.getLocalizedMessage()) .build(); } @@ -642,26 +679,28 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService compSvc = - new ExecutorCompletionService(this.threadpool); + CompletionService compSvc = + new ExecutorCompletionService<>(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { + // HttpServletRequest does not work with ExecutorCompletionService. + // Create a duplicate hsr. + final HttpServletRequest hsrCopy = clone(hsr); compSvc.submit(new Callable() { @Override public AppsInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); - AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery, - finalStatusQuery, userQuery, queueQuery, count, startedBegin, - startedEnd, finishBegin, finishEnd, applicationTypes, - applicationTags, unselectedFields); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); + AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery, + statesQuery, finalStatusQuery, userQuery, queueQuery, count, + startedBegin, startedEnd, finishBegin, finishEnd, + applicationTypes, applicationTags, unselectedFields); if (rmApps == null) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return appReport."); + LOG.error("Subcluster {} failed to return appReport.", + info.getSubClusterId()); return null; } return rmApps; @@ -670,8 +709,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Collect all the responses in parallel - - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); AppsInfo appsResponse = future.get(); @@ -684,7 +722,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } } catch (Throwable e) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.warn("Failed to get application report ", e); + LOG.warn("Failed to get application report", e); } } @@ -693,9 +731,41 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Merge all the application reports got from all the available YARN RMs + return RouterWebServiceUtil.mergeAppsInfo( + apps.getApps(), returnPartialReport); + } - return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), - returnPartialReport); + /** + * Get a copy of a HTTP request. This is for thread safety. + * @param hsr HTTP servlet request to copy. + * @return Copy of the HTTP request. + */ + private HttpServletRequestWrapper clone(final HttpServletRequest hsr) { + if (hsr == null) { + return null; + } + return new HttpServletRequestWrapper(hsr) { + public Map getParameterMap() { + return hsr.getParameterMap(); + } + public String getPathInfo() { + return hsr.getPathInfo(); + } + public String getRemoteUser() { + return hsr.getRemoteUser(); + } + public Principal getUserPrincipal() { + return hsr.getUserPrincipal(); + } + public String getHeader(String value) { + // we override only Accept + if (value.equals(HttpHeaders.ACCEPT)) { + return RouterWebServiceUtil.getMediaTypeFromHttpServletRequest( + hsr, AppsInfo.class); + } + return null; + } + }; } /** @@ -729,8 +799,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -738,14 +807,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public NodeInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { NodeInfo nodeInfo = interceptor.getNode(nodeId); return nodeInfo; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return nodeInfo."); + LOG.error("Subcluster {} failed to return nodeInfo.", + info.getSubClusterId()); return null; } } @@ -754,7 +823,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel NodeInfo nodeInfo = null; - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); NodeInfo nodeResponse = future.get(); @@ -763,8 +832,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { if (nodeResponse != null) { // Check if the node was already found in a different SubCluster and // it has an old health report - if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse - .getLastHealthUpdate()) { + if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < + nodeResponse.getLastHealthUpdate()) { nodeInfo = nodeResponse; } } @@ -806,13 +875,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { subClustersActive = federationFacade.getSubClusters(true); } catch (YarnException e) { - LOG.error(e.getMessage()); + LOG.error("Cannot get nodes: {}", e.getMessage()); return new NodesInfo(); } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -820,14 +888,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public NodesInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { NodesInfo nodesInfo = interceptor.getNodes(states); return nodesInfo; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return nodesInfo."); + LOG.error("Subcluster {} failed to return nodesInfo.", + info.getSubClusterId()); return null; } } @@ -836,7 +904,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); NodesInfo nodesResponse = future.get(); @@ -870,8 +938,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } // Send the requests in parallel - - ExecutorCompletionService compSvc = + CompletionService compSvc = new ExecutorCompletionService(this.threadpool); for (final SubClusterInfo info : subClustersActive.values()) { @@ -879,14 +946,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public ClusterMetricsInfo call() { DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(info.getSubClusterId(), - info.getClientRMServiceAddress()); + getOrCreateInterceptorForSubCluster( + info.getSubClusterId(), info.getRMWebServiceAddress()); try { ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo(); return metrics; } catch (Exception e) { - LOG.error("Subcluster " + info.getSubClusterId() - + " failed to return Cluster Metrics."); + LOG.error("Subcluster {} failed to return Cluster Metrics.", + info.getSubClusterId()); return null; } } @@ -895,7 +962,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { // Collect all the responses in parallel - for (int i = 0; i < subClustersActive.values().size(); i++) { + for (int i = 0; i < subClustersActive.size(); i++) { try { Future future = compSvc.take(); ClusterMetricsInfo metricsResponse = future.get(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index 76435f0551d..40bdbd83c69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.router.webapp; +import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -28,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; @@ -45,6 +49,8 @@ import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.sun.jersey.api.ConflictException; import com.sun.jersey.api.client.Client; @@ -52,8 +58,6 @@ import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.WebResource.Builder; import com.sun.jersey.core.util.MultivaluedMapImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The Router webservice util class. @@ -85,9 +89,11 @@ public final class RouterWebServiceUtil { * call in case the call has no servlet request * @return the retrieved entity from the REST call */ - protected static T genericForward(String webApp, HttpServletRequest hsr, - final Class returnType, HTTPMethods method, String targetPath, - Object formParam, Map additionalParam) { + protected static T genericForward( + final String webApp, final HttpServletRequest hsr, + final Class returnType, final HTTPMethods method, + final String targetPath, final Object formParam, + final Map additionalParam) { UserGroupInformation callerUGI = null; @@ -121,14 +127,22 @@ public final class RouterWebServiceUtil { ClientResponse response = RouterWebServiceUtil.invokeRMWebService( webApp, targetPath, method, - (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam); + (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam, + getMediaTypeFromHttpServletRequest(hsr, returnType)); if (Response.class.equals(returnType)) { return (T) RouterWebServiceUtil.clientResponseToResponse(response); } // YARN RM can answer with Status.OK or it throws an exception - if (response.getStatus() == 200) { + if (response.getStatus() == SC_OK) { return response.getEntity(returnType); } + if (response.getStatus() == SC_NO_CONTENT) { + try { + return returnType.getConstructor().newInstance(); + } catch (RuntimeException | ReflectiveOperationException e) { + LOG.error("Cannot create empty entity for {}", returnType, e); + } + } RouterWebServiceUtil.retrieveException(response); return null; } @@ -147,7 +161,7 @@ public final class RouterWebServiceUtil { */ private static ClientResponse invokeRMWebService(String webApp, String path, HTTPMethods method, String additionalPath, - Map queryParams, Object formParam) { + Map queryParams, Object formParam, String mediaType) { Client client = Client.create(); WebResource webResource = client.resource(webApp).path(path); @@ -168,14 +182,12 @@ public final class RouterWebServiceUtil { webResource = webResource.queryParams(paramMap); } - // I can forward the call in JSON or XML since the Router will convert it - // again in Object before send it back to the client Builder builder = null; if (formParam != null) { - builder = webResource.entity(formParam, MediaType.APPLICATION_XML); - builder = builder.accept(MediaType.APPLICATION_XML); + builder = webResource.entity(formParam, mediaType); + builder = builder.accept(mediaType); } else { - builder = webResource.accept(MediaType.APPLICATION_XML); + builder = webResource.accept(mediaType); } ClientResponse response = null; @@ -428,4 +440,25 @@ public final class RouterWebServiceUtil { + metricsResponse.getShutdownNodes()); } + /** + * Extract from HttpServletRequest the MediaType in output. + */ + protected static String getMediaTypeFromHttpServletRequest( + HttpServletRequest request, final Class returnType) { + if (request == null) { + // By default we return XML for REST call without HttpServletRequest + return MediaType.APPLICATION_XML; + } + // TODO + if (!returnType.equals(Response.class)) { + return MediaType.APPLICATION_XML; + } + String header = request.getHeader(HttpHeaders.ACCEPT); + if (header == null || header.equals("*")) { + // By default we return JSON + return MediaType.APPLICATION_JSON; + } + return header; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index bbb83268274..b3272527ab6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -158,12 +158,19 @@ public class RouterWebServices implements RMWebServiceProtocol { } @VisibleForTesting - protected RequestInterceptorChainWrapper getInterceptorChain() { + protected RequestInterceptorChainWrapper getInterceptorChain( + final HttpServletRequest hsr) { String user = ""; + if (hsr != null) { + user = hsr.getRemoteUser(); + } try { - user = UserGroupInformation.getCurrentUser().getUserName(); + if (user == null || user.equals("")) { + // Yarn Router user + user = UserGroupInformation.getCurrentUser().getUserName(); + } } catch (IOException e) { - LOG.error("IOException " + e.getMessage()); + LOG.error("Cannot get user: {}", e.getMessage()); } if (!userPipelineMap.containsKey(user)) { initializePipeline(user); @@ -316,7 +323,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public ClusterInfo getClusterInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getClusterInfo(); } @@ -327,7 +334,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public ClusterMetricsInfo getClusterMetricsInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getClusterMetricsInfo(); } @@ -338,7 +345,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public SchedulerTypeInfo getSchedulerInfo() { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getSchedulerInfo(); } @@ -350,7 +357,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time, @Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().dumpSchedulerLogs(time, hsr); } @@ -361,7 +368,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getNodes(states); } @@ -372,7 +379,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Override public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getNode(nodeId); } @@ -396,7 +403,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.APPLICATION_TAGS) Set applicationTags, @QueryParam(RMWSConsts.DESELECTS) Set unselectedFields) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getApps(hsr, stateQuery, statesQuery, finalStatusQuery, userQuery, queueQuery, count, startedBegin, startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags, @@ -411,7 +418,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.NODEID) String nodeId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getActivities(hsr, nodeId); } @@ -424,7 +431,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.APP_ID) String appId, @QueryParam(RMWSConsts.MAX_TIME) String time) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time); } @@ -438,7 +445,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.STATES) Set stateQueries, @QueryParam(RMWSConsts.APPLICATION_TYPES) Set typeQueries) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppStatistics(hsr, stateQueries, typeQueries); } @@ -452,7 +459,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @QueryParam(RMWSConsts.DESELECTS) Set unselectedFields) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getApp(hsr, appId, unselectedFields); } @@ -464,7 +471,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppState getAppState(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppState(hsr, appId); } @@ -478,7 +485,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateAppState(targetState, hsr, appId); } @@ -491,7 +498,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getNodeToLabels(hsr); } @@ -503,7 +510,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public LabelsToNodesInfo getLabelsToNodes( @QueryParam(RMWSConsts.LABELS) Set labels) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(null); return pipeline.getRootInterceptor().getLabelsToNodes(labels); } @@ -516,7 +523,7 @@ public class RouterWebServices implements RMWebServiceProtocol { final NodeToLabelsEntryList newNodeToLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().replaceLabelsOnNodes(newNodeToLabels, hsr); } @@ -531,7 +538,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr, @PathParam(RMWSConsts.NODEID) String nodeId) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().replaceLabelsOnNode(newNodeLabelsName, hsr, nodeId); } @@ -544,7 +551,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getClusterNodeLabels(hsr); } @@ -556,7 +563,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().addToClusterNodeLabels(newNodeLabels, hsr); } @@ -570,7 +577,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.LABELS) Set oldNodeLabels, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor() .removeFromCluserNodeLabels(oldNodeLabels, hsr); } @@ -583,7 +590,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.NODEID) String nodeId) throws IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getLabelsOnNode(hsr, nodeId); } @@ -595,7 +602,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppPriority getAppPriority(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppPriority(hsr, appId); } @@ -609,7 +616,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor() .updateApplicationPriority(targetPriority, hsr, appId); } @@ -622,7 +629,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppQueue getAppQueue(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppQueue(hsr, appId); } @@ -636,7 +643,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateAppQueue(targetQueue, hsr, appId); } @@ -649,7 +656,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response createNewApplication(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().createNewApplication(hsr); } @@ -662,7 +669,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().submitApplication(newApp, hsr); } @@ -675,7 +682,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().postDelegationToken(tokenData, hsr); } @@ -687,7 +694,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().postDelegationTokenExpiration(hsr); } @@ -700,7 +707,7 @@ public class RouterWebServices implements RMWebServiceProtocol { throws AuthorizationException, IOException, InterruptedException, Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().cancelDelegationToken(hsr); } @@ -712,7 +719,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public Response createNewReservation(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().createNewReservation(hsr); } @@ -725,7 +732,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().submitReservation(resContext, hsr); } @@ -738,7 +745,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateReservation(resContext, hsr); } @@ -751,7 +758,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().deleteReservation(resContext, hsr); } @@ -768,7 +775,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations, @Context HttpServletRequest hsr) throws Exception { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().listReservation(queue, reservationId, startTime, endTime, includeResourceAllocations, hsr); } @@ -782,7 +789,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppTimeout(hsr, appId, type); } @@ -794,7 +801,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppTimeouts(hsr, appId); } @@ -808,7 +815,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().updateApplicationTimeout(appTimeout, hsr, appId); } @@ -821,7 +828,7 @@ public class RouterWebServices implements RMWebServiceProtocol { public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr, @PathParam(RMWSConsts.APPID) String appId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppAttempts(hsr, appId); } @@ -834,7 +841,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getAppAttempt(req, res, appId, appAttemptId); } @@ -848,7 +855,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPID) String appId, @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getContainers(req, res, appId, appAttemptId); } @@ -863,7 +870,7 @@ public class RouterWebServices implements RMWebServiceProtocol { @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId, @PathParam(RMWSConsts.CONTAINERID) String containerId) { init(); - RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + RequestInterceptorChainWrapper pipeline = getInterceptorChain(req); return pipeline.getRootInterceptor().getContainer(req, res, appId, appAttemptId, containerId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java index 7d420844a42..9480850d328 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java @@ -20,15 +20,15 @@ package org.apache.hadoop.yarn.server.router.webapp; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; -import java.security.PrivilegedExceptionAction; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -128,487 +128,263 @@ public abstract class BaseRouterWebServicesTest { protected ClusterInfo get(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterInfo run() throws Exception { - return routerWebService.get(); - } - }); + // HSR is not used here + return routerWebService.get(); } protected ClusterInfo getClusterInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterInfo run() throws Exception { - return routerWebService.getClusterInfo(); - } - }); + // HSR is not used here + return routerWebService.getClusterInfo(); } protected ClusterMetricsInfo getClusterMetricsInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ClusterMetricsInfo run() throws Exception { - return routerWebService.getClusterMetricsInfo(); - } - }); + // HSR is not used here + return routerWebService.getClusterMetricsInfo(); } protected SchedulerTypeInfo getSchedulerInfo(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public SchedulerTypeInfo run() throws Exception { - return routerWebService.getSchedulerInfo(); - } - }); + // HSR is not used here + return routerWebService.getSchedulerInfo(); } protected String dumpSchedulerLogs(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public String run() throws Exception { - return routerWebService.dumpSchedulerLogs(null, null); - } - }); + return routerWebService.dumpSchedulerLogs(null, + createHttpServletRequest(user)); } protected NodesInfo getNodes(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodesInfo run() throws Exception { - return routerWebService.getNodes(null); - } - }); + return routerWebService.getNodes(null); } protected NodeInfo getNode(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeInfo run() throws Exception { - return routerWebService.getNode(null); - } - }); + return routerWebService.getNode(null); } protected AppsInfo getApps(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppsInfo run() throws Exception { - return routerWebService.getApps(null, null, null, null, null, null, - null, null, null, null, null, null, null, null); - } - }); + return routerWebService.getApps(createHttpServletRequest(user), null, null, + null, null, null, null, null, null, null, null, null, null, null); } protected ActivitiesInfo getActivities(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ActivitiesInfo run() throws Exception { - return routerWebService.getActivities(null, null); - } - }); + return routerWebService.getActivities( + createHttpServletRequest(user), null); } protected AppActivitiesInfo getAppActivities(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppActivitiesInfo run() throws Exception { - return routerWebService.getAppActivities(null, null, null); - } - }); + return routerWebService.getAppActivities( + createHttpServletRequest(user), null, null); } protected ApplicationStatisticsInfo getAppStatistics(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ApplicationStatisticsInfo run() throws Exception { - return routerWebService.getAppStatistics(null, null, null); - } - }); + return routerWebService.getAppStatistics( + createHttpServletRequest(user), null, null); } protected AppInfo getApp(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppInfo run() throws Exception { - return routerWebService.getApp(null, null, null); - } - }); + return routerWebService.getApp(createHttpServletRequest(user), null, null); } protected AppState getAppState(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppState run() throws Exception { - return routerWebService.getAppState(null, null); - } - }); + return routerWebService.getAppState(createHttpServletRequest(user), null); } protected Response updateAppState(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateAppState(null, null, null); - } - }); + return routerWebService.updateAppState( + null, createHttpServletRequest(user), null); } protected NodeToLabelsInfo getNodeToLabels(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeToLabelsInfo run() throws Exception { - return routerWebService.getNodeToLabels(null); - } - }); + return routerWebService.getNodeToLabels(createHttpServletRequest(user)); } protected LabelsToNodesInfo getLabelsToNodes(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public LabelsToNodesInfo run() throws Exception { - return routerWebService.getLabelsToNodes(null); - } - }); + return routerWebService.getLabelsToNodes(null); } protected Response replaceLabelsOnNodes(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.replaceLabelsOnNodes(null, null); - } - }); + return routerWebService.replaceLabelsOnNodes( + null, createHttpServletRequest(user)); } protected Response replaceLabelsOnNode(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.replaceLabelsOnNode(null, null, null); - } - }); + return routerWebService.replaceLabelsOnNode( + null, createHttpServletRequest(user), null); } protected NodeLabelsInfo getClusterNodeLabels(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeLabelsInfo run() throws Exception { - return routerWebService.getClusterNodeLabels(null); - } - }); + return routerWebService.getClusterNodeLabels( + createHttpServletRequest(user)); } protected Response addToClusterNodeLabels(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.addToClusterNodeLabels(null, null); - } - }); + return routerWebService.addToClusterNodeLabels( + null, createHttpServletRequest(user)); } protected Response removeFromCluserNodeLabels(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.removeFromCluserNodeLabels(null, null); - } - }); + return routerWebService.removeFromCluserNodeLabels( + null, createHttpServletRequest(user)); } protected NodeLabelsInfo getLabelsOnNode(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public NodeLabelsInfo run() throws Exception { - return routerWebService.getLabelsOnNode(null, null); - } - }); + return routerWebService.getLabelsOnNode( + createHttpServletRequest(user), null); } protected AppPriority getAppPriority(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppPriority run() throws Exception { - return routerWebService.getAppPriority(null, null); - } - }); + return routerWebService.getAppPriority( + createHttpServletRequest(user), null); } protected Response updateApplicationPriority(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateApplicationPriority(null, null, null); - } - }); + return routerWebService.updateApplicationPriority( + null, createHttpServletRequest(user), null); } protected AppQueue getAppQueue(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppQueue run() throws Exception { - return routerWebService.getAppQueue(null, null); - } - }); + return routerWebService.getAppQueue(createHttpServletRequest(user), null); } protected Response updateAppQueue(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateAppQueue(null, null, null); - } - }); + return routerWebService.updateAppQueue( + null, createHttpServletRequest(user), null); } protected Response createNewApplication(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.createNewApplication(null); - } - }); + return routerWebService.createNewApplication( + createHttpServletRequest(user)); } protected Response submitApplication(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.submitApplication(null, null); - } - }); + return routerWebService.submitApplication( + null, createHttpServletRequest(user)); } protected Response postDelegationToken(String user) throws AuthorizationException, IOException, InterruptedException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.postDelegationToken(null, null); - } - }); + return routerWebService.postDelegationToken( + null, createHttpServletRequest(user)); } protected Response postDelegationTokenExpiration(String user) throws AuthorizationException, IOException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.postDelegationTokenExpiration(null); - } - }); + return routerWebService.postDelegationTokenExpiration( + createHttpServletRequest(user)); } protected Response cancelDelegationToken(String user) throws AuthorizationException, IOException, InterruptedException, Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.cancelDelegationToken(null); - } - }); + return routerWebService.cancelDelegationToken( + createHttpServletRequest(user)); } protected Response createNewReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.createNewReservation(null); - } - }); + return routerWebService.createNewReservation( + createHttpServletRequest(user)); } protected Response submitReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.submitReservation(null, null); - } - }); + return routerWebService.submitReservation( + null, createHttpServletRequest(user)); } protected Response updateReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateReservation(null, null); - } - }); + return routerWebService.updateReservation( + null, createHttpServletRequest(user)); } protected Response deleteReservation(String user) throws AuthorizationException, IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.deleteReservation(null, null); - } - }); + return routerWebService.deleteReservation( + null, createHttpServletRequest(user)); } protected Response listReservation(String user) throws Exception { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.listReservation(null, null, 0, 0, false, - null); - } - }); + return routerWebService.listReservation( + null, null, 0, 0, false, createHttpServletRequest(user)); } protected AppTimeoutInfo getAppTimeout(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppTimeoutInfo run() throws Exception { - return routerWebService.getAppTimeout(null, null, null); - } - }); + return routerWebService.getAppTimeout( + createHttpServletRequest(user), null, null); } protected AppTimeoutsInfo getAppTimeouts(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppTimeoutsInfo run() throws Exception { - return routerWebService.getAppTimeouts(null, null); - } - }); + return routerWebService.getAppTimeouts( + createHttpServletRequest(user), null); } protected Response updateApplicationTimeout(String user) throws AuthorizationException, YarnException, InterruptedException, IOException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public Response run() throws Exception { - return routerWebService.updateApplicationTimeout(null, null, null); - } - }); + return routerWebService.updateApplicationTimeout( + null, createHttpServletRequest(user), null); } protected AppAttemptsInfo getAppAttempts(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppAttemptsInfo run() throws Exception { - return routerWebService.getAppAttempts(null, null); - } - }); + return routerWebService.getAppAttempts( + createHttpServletRequest(user), null); } protected AppAttemptInfo getAppAttempt(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public AppAttemptInfo run() throws Exception { - return routerWebService.getAppAttempt(null, null, null, null); - } - }); + return routerWebService.getAppAttempt( + createHttpServletRequest(user), null, null, null); } protected ContainersInfo getContainers(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ContainersInfo run() throws Exception { - return routerWebService.getContainers(null, null, null, null); - } - }); + return routerWebService.getContainers( + createHttpServletRequest(user), null, null, null); } protected ContainerInfo getContainer(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public ContainerInfo run() throws Exception { - return routerWebService.getContainer(null, null, null, null, null); - } - }); + return routerWebService.getContainer( + createHttpServletRequest(user), null, null, null, null); } protected RequestInterceptorChainWrapper getInterceptorChain(String user) throws IOException, InterruptedException { - return UserGroupInformation.createRemoteUser(user) - .doAs(new PrivilegedExceptionAction() { - @Override - public RequestInterceptorChainWrapper run() throws Exception { - return routerWebService.getInterceptorChain(); - } - }); + HttpServletRequest request = createHttpServletRequest(user); + return routerWebService.getInterceptorChain(request); } + private HttpServletRequest createHttpServletRequest(String user) { + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(user); + return request; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java index d32013f34b2..6c0938cc2ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.File; import java.io.IOException; +import java.util.List; /** * Helper class to start a new process. @@ -28,13 +29,23 @@ public class JavaProcess { private Process process = null; - public JavaProcess(Class klass) throws IOException, InterruptedException { + public JavaProcess(Class clazz) throws IOException, InterruptedException { + this(clazz, null); + } + + public JavaProcess(Class clazz, List addClasspaths) + throws IOException, InterruptedException { String javaHome = System.getProperty("java.home"); String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; String classpath = System.getProperty("java.class.path"); classpath = classpath.concat("./src/test/resources"); - String className = klass.getCanonicalName(); + if (addClasspaths != null) { + for (String addClasspath : addClasspaths) { + classpath = classpath.concat(File.pathSeparatorChar + addClasspath); + } + } + String className = clazz.getCanonicalName(); ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className); builder.inheritIO(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java index d7b1a0f94bc..cf4a0442cec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java @@ -18,20 +18,78 @@ package org.apache.hadoop.yarn.server.router.webapp; +import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED; +import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; +import static javax.servlet.http.HttpServletResponse.SC_OK; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static javax.ws.rs.core.MediaType.APPLICATION_XML; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_PRIORITY; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_QUEUE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_APPID_STATE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUT; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_TIMEOUTS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_ID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APP_STATISTICS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.GET_NODE_TO_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.INFO; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.LABEL_MAPPINGS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.METRICS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_GETLABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES_NODEID_REPLACE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REMOVE_NODE_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.REPLACE_NODE_TO_LABELS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_DELETE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_SUBMIT; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_UPDATE; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_ACTIVITIES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_APP_ACTIVITIES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.SCHEDULER_LOGS; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.STATES; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.TIME; +import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.POST; +import static org.apache.hadoop.yarn.server.router.webapp.HTTPMethods.PUT; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getNMWebAppURLWithoutScheme; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getRMWebAppURLWithScheme; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.getRouterWebAppURLWithScheme; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; - -import javax.ws.rs.core.MediaType; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; @@ -63,13 +121,12 @@ import org.apache.hadoop.yarn.server.router.Router; import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import org.codehaus.jettison.json.JSONException; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; @@ -87,109 +144,127 @@ import net.jcip.annotations.NotThreadSafe; @NotThreadSafe public class TestRouterWebServicesREST { + /** The number of concurrent submissions for multi-thread test. */ + private static final int NUM_THREADS_TESTS = 100; + + private static String userName = "test"; private static JavaProcess rm; private static JavaProcess nm; private static JavaProcess router; - private static Configuration conf; + private static String rmAddress; + private static String routerAddress; + private static String nmAddress; - private static final int STATUS_OK = 200; - private static final int STATUS_ACCEPTED = 202; - private static final int STATUS_BADREQUEST = 400; - private static final int STATUS_ERROR = 500; + private static Configuration conf; /** * Wait until the webservice is up and running. */ - private static void waitWebAppRunning(String address, String path) { - while (true) { - Client clientToRouter = Client.create(); - WebResource toRouter = clientToRouter.resource(address).path(path); - try { - ClientResponse response = toRouter.accept(MediaType.APPLICATION_JSON) - .get(ClientResponse.class); - if (response.getStatus() == STATUS_OK) { - // process is up and running - return; + private static void waitWebAppRunning( + final String address, final String path) { + try { + final Client clientToRouter = Client.create(); + final WebResource toRouter = clientToRouter + .resource(address) + .path(path); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + ClientResponse response = toRouter + .accept(APPLICATION_JSON) + .get(ClientResponse.class); + if (response.getStatus() == SC_OK) { + // process is up and running + return true; + } + } catch (ClientHandlerException e) { + // process is not up and running + } + return false; } - } catch (ClientHandlerException e) { - // process is not up and running - continue; - } + }, 1000, 10 * 1000); + } catch (Exception e) { + fail("Web app not running"); } } @BeforeClass public static void setUp() throws Exception { conf = new YarnConfiguration(); - rm = new JavaProcess(ResourceManager.class); + + List addClasspath = new LinkedList<>(); + addClasspath.add("../hadoop-yarn-server-timelineservice/target/classes"); + rm = new JavaProcess(ResourceManager.class, addClasspath); + rmAddress = getRMWebAppURLWithScheme(conf); + waitWebAppRunning(rmAddress, RM_WEB_SERVICE_PATH); + router = new JavaProcess(Router.class); + routerAddress = getRouterWebAppURLWithScheme(conf); + waitWebAppRunning(routerAddress, RM_WEB_SERVICE_PATH); + nm = new JavaProcess(NodeManager.class); - - // The tests cannot start if all the service are not up and running. - waitWebAppRunning(WebAppUtils.getRMWebAppURLWithScheme(conf), - RMWSConsts.RM_WEB_SERVICE_PATH); - - waitWebAppRunning(WebAppUtils.getRouterWebAppURLWithScheme(conf), - RMWSConsts.RM_WEB_SERVICE_PATH); - - waitWebAppRunning("http://" + WebAppUtils.getNMWebAppURLWithoutScheme(conf), - "/ws/v1/node"); + nmAddress = "http://" + getNMWebAppURLWithoutScheme(conf); + waitWebAppRunning(nmAddress, "/ws/v1/node"); } @AfterClass public static void stop() throws Exception { - nm.stop(); - router.stop(); - rm.stop(); + if (nm != null) { + nm.stop(); + } + if (router != null) { + router.stop(); + } + if (rm != null) { + rm.stop(); + } } /** * Performs 2 GET calls one to RM and the one to Router. In positive case, it * returns the 2 answers in a list. */ - private static List performGetCalls(String path, Class returnType, - String queryName, String queryValue) - throws IOException, InterruptedException { + private static List performGetCalls(final String path, + final Class returnType, final String queryName, + final String queryValue) throws IOException, InterruptedException { Client clientToRouter = Client.create(); - WebResource toRouter = clientToRouter - .resource(WebAppUtils.getRouterWebAppURLWithScheme(conf)).path(path); + WebResource toRouter = clientToRouter.resource(routerAddress).path(path); Client clientToRM = Client.create(); - WebResource toRM = clientToRM - .resource(WebAppUtils.getRMWebAppURLWithScheme(conf)).path(path); + WebResource toRM = clientToRM.resource(rmAddress).path(path); - Builder toRouterBuilder; - Builder toRMBuilder; + final Builder toRouterBuilder; + final Builder toRMBuilder; if (queryValue != null && queryName != null) { - toRouterBuilder = toRouter.queryParam(queryName, queryValue) - .accept(MediaType.APPLICATION_XML); - toRMBuilder = toRM.queryParam(queryName, queryValue) - .accept(MediaType.APPLICATION_XML); + toRouterBuilder = toRouter + .queryParam(queryName, queryValue) + .accept(APPLICATION_XML); + toRMBuilder = toRM + .queryParam(queryName, queryValue) + .accept(APPLICATION_XML); } else { - toRouterBuilder = toRouter.accept(MediaType.APPLICATION_XML); - toRMBuilder = toRM.accept(MediaType.APPLICATION_XML); + toRouterBuilder = toRouter.accept(APPLICATION_XML); + toRMBuilder = toRM.accept(APPLICATION_XML); } return UserGroupInformation.createRemoteUser(userName) .doAs(new PrivilegedExceptionAction>() { @Override public List run() throws Exception { - ClientResponse response = toRouterBuilder.get(ClientResponse.class); + ClientResponse response = + toRouterBuilder.get(ClientResponse.class); ClientResponse response2 = toRMBuilder.get(ClientResponse.class); - if (response.getStatus() == STATUS_OK - && response2.getStatus() == STATUS_OK) { - List responses = new ArrayList(); - responses.add(response.getEntity(returnType)); - responses.add(response2.getEntity(returnType)); - return responses; - } else { - Assert.fail(); - } - return null; + assertEquals(SC_OK, response.getStatus()); + assertEquals(SC_OK, response2.getStatus()); + List responses = new ArrayList<>(); + responses.add(response.getEntity(returnType)); + responses.add(response2.getEntity(returnType)); + return responses; } }); } @@ -197,9 +272,9 @@ public class TestRouterWebServicesREST { /** * Performs a POST/PUT/DELETE call to Router and returns the ClientResponse. */ - private static ClientResponse performCall(String webAddress, String queryKey, - String queryValue, Object context, HTTPMethods method) - throws IOException, InterruptedException { + private static ClientResponse performCall(final String webAddress, + final String queryKey, final String queryValue, final Object context, + final HTTPMethods method) throws IOException, InterruptedException { return UserGroupInformation.createRemoteUser(userName) .doAs(new PrivilegedExceptionAction() { @@ -207,22 +282,20 @@ public class TestRouterWebServicesREST { public ClientResponse run() throws Exception { Client clientToRouter = Client.create(); WebResource toRouter = clientToRouter - .resource(WebAppUtils.getRouterWebAppURLWithScheme(conf)) + .resource(routerAddress) .path(webAddress); - WebResource toRouterWR; + WebResource toRouterWR = toRouter; if (queryKey != null && queryValue != null) { - toRouterWR = toRouter.queryParam(queryKey, queryValue); - } else { - toRouterWR = toRouter; + toRouterWR = toRouterWR.queryParam(queryKey, queryValue); } Builder builder = null; if (context != null) { - builder = toRouterWR.entity(context, MediaType.APPLICATION_JSON); - builder = builder.accept(MediaType.APPLICATION_JSON); + builder = toRouterWR.entity(context, APPLICATION_JSON); + builder = builder.accept(APPLICATION_JSON); } else { - builder = toRouter.accept(MediaType.APPLICATION_JSON); + builder = toRouter.accept(APPLICATION_JSON); } ClientResponse response = null; @@ -250,19 +323,20 @@ public class TestRouterWebServicesREST { * This test validates the correctness of {@link RMWebServiceProtocol#get()} * inside Router. */ - @Test(timeout = 1000) - public void testInfoXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testInfoXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH, ClusterInfo.class, null, null); + RM_WEB_SERVICE_PATH, ClusterInfo.class, null, null); ClusterInfo routerResponse = responses.get(0); ClusterInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getRMVersion(), + assertEquals( + rmResponse.getRMVersion(), routerResponse.getRMVersion()); } @@ -270,20 +344,20 @@ public class TestRouterWebServicesREST { * This test validates the correctness of * {@link RMWebServiceProtocol#getClusterInfo()} inside Router. */ - @Test(timeout = 1000) - public void testClusterInfoXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testClusterInfoXML() throws Exception { - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.INFO, - ClusterInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + INFO, ClusterInfo.class, null, null); ClusterInfo routerResponse = responses.get(0); ClusterInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getRMVersion(), + assertEquals( + rmResponse.getRMVersion(), routerResponse.getRMVersion()); } @@ -291,41 +365,41 @@ public class TestRouterWebServicesREST { * This test validates the correctness of * {@link RMWebServiceProtocol#getClusterMetricsInfo()} inside Router. */ - @Test(timeout = 1000) - public void testMetricsInfoXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testMetricsInfoXML() throws Exception { - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.METRICS, - ClusterMetricsInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + METRICS, ClusterMetricsInfo.class, null, null); ClusterMetricsInfo routerResponse = responses.get(0); ClusterMetricsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getActiveNodes(), + assertEquals( + rmResponse.getActiveNodes(), routerResponse.getActiveNodes()); } - /** + /* * This test validates the correctness of * {@link RMWebServiceProtocol#getSchedulerInfo()} inside Router. */ - @Test(timeout = 1000) - public void testSchedulerInfoXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testSchedulerInfoXML() throws Exception { - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER, - SchedulerTypeInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + SCHEDULER, SchedulerTypeInfo.class, null, null); SchedulerTypeInfo routerResponse = responses.get(0); SchedulerTypeInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getSchedulerInfo().getSchedulerType(), + assertEquals( + rmResponse.getSchedulerInfo().getSchedulerType(), routerResponse.getSchedulerInfo().getSchedulerType()); } @@ -333,20 +407,41 @@ public class TestRouterWebServicesREST { * This test validates the correctness of * {@link RMWebServiceProtocol#getNodes()} inside Router. */ - @Test(timeout = 1000) - public void testNodesXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testNodesEmptyXML() throws Exception { - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, - NodesInfo.class, RMWSConsts.STATES, "LOST"); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, null, null); NodesInfo routerResponse = responses.get(0); NodesInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getNodes().size(), + assertEquals( + rmResponse.getNodes().size(), + routerResponse.getNodes().size()); + } + + /** + * This test validates the correctness of + * {@link RMWebServiceProtocol#getNodes()} inside Router. + */ + @Test(timeout = 2000) + public void testNodesXML() throws Exception { + + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + NODES, NodesInfo.class, STATES, "LOST"); + + NodesInfo routerResponse = responses.get(0); + NodesInfo rmResponse = responses.get(1); + + assertNotNull(routerResponse); + assertNotNull(rmResponse); + + assertEquals( + rmResponse.getNodes().size(), routerResponse.getNodes().size()); } @@ -354,80 +449,83 @@ public class TestRouterWebServicesREST { * This test validates the correctness of * {@link RMWebServiceProtocol#getNode()} inside Router. */ - @Test(timeout = 1000) - public void testNodeXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testNodeXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" + getNodeId(), + RM_WEB_SERVICE_PATH + format(NODES_NODEID, getNodeId()), NodeInfo.class, null, null); NodeInfo routerResponse = responses.get(0); NodeInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getVersion(), routerResponse.getVersion()); + assertEquals( + rmResponse.getVersion(), + routerResponse.getVersion()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getActivities()} inside Router. */ - @Test(timeout = 1000) - public void testActiviesXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testActiviesXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_ACTIVITIES, + RM_WEB_SERVICE_PATH + SCHEDULER_ACTIVITIES, ActivitiesInfo.class, null, null); ActivitiesInfo routerResponse = responses.get(0); ActivitiesInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppActivities()} inside Router. */ - @Test(timeout = 1000) - public void testAppActivitiesXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppActivitiesXML() throws Exception { String appId = submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_APP_ACTIVITIES, - AppActivitiesInfo.class, RMWSConsts.APP_ID, appId); + RM_WEB_SERVICE_PATH + SCHEDULER_APP_ACTIVITIES, + AppActivitiesInfo.class, APP_ID, appId); AppActivitiesInfo routerResponse = responses.get(0); AppActivitiesInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppStatistics()} inside Router. */ - @Test(timeout = 1000) - public void testAppStatisticsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppStatisticsXML() throws Exception { submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APP_STATISTICS, - ApplicationStatisticsInfo.class, RMWSConsts.STATES, "RUNNING"); + RM_WEB_SERVICE_PATH + APP_STATISTICS, + ApplicationStatisticsInfo.class, STATES, "RUNNING"); ApplicationStatisticsInfo routerResponse = responses.get(0); ApplicationStatisticsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getStatItems().size(), + assertEquals( + rmResponse.getStatItems().size(), routerResponse.getStatItems().size()); } @@ -435,813 +533,820 @@ public class TestRouterWebServicesREST { * This test validates the correctness of * {@link RMWebServiceProtocol#dumpSchedulerLogs()} inside Router. */ - @Test(timeout = 1000) - public void testDumpSchedulerLogsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testDumpSchedulerLogsXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_LOGS, - null, null, null, HTTPMethods.PUT); + performCall(RM_WEB_SERVICE_PATH + SCHEDULER_LOGS, + null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + SCHEDULER_LOGS, TIME, "1", null, POST); - ClientResponse response = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_LOGS, - RMWSConsts.TIME, "1", null, HTTPMethods.POST); - - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#createNewApplication()} inside Router. */ - @Test(timeout = 1000) - public void testNewApplicationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testNewApplicationXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS_NEW_APPLICATION, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION, null, + null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS_NEW_APPLICATION, null, - null, null, HTTPMethods.POST); - - if (response.getStatus() == STATUS_OK) { - NewApplication ci = response.getEntity(NewApplication.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION, null, + null, null, POST); + assertEquals(SC_OK, response.getStatus()); + NewApplication ci = response.getEntity(NewApplication.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#submitApplication()} inside Router. */ - @Test(timeout = 1000) - public void testSubmitApplicationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testSubmitApplicationXML() throws Exception { // Test with a wrong HTTP method - ClientResponse badResponse = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS, null, - null, null, HTTPMethods.PUT); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + APPS, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); context.setApplicationId(getNewApplicationId().getApplicationId()); - ClientResponse response = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS, null, - null, context, HTTPMethods.POST); - - if (response.getStatus() == STATUS_ACCEPTED) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + APPS, null, null, context, POST); + assertEquals(SC_ACCEPTED, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getApps()} inside Router. */ - @Test(timeout = 1000) - public void testAppsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppsXML() throws Exception { submitApplication(); - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS, - AppsInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + APPS, AppsInfo.class, null, null); AppsInfo routerResponse = responses.get(0); AppsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getApps().size(), - rmResponse.getApps().size()); + assertEquals( + rmResponse.getApps().size(), + routerResponse.getApps().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getApp()} inside Router. */ - @Test(timeout = 1000) - public void testAppXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppXML() throws Exception { String appId = submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId, + RM_WEB_SERVICE_PATH + format(APPS_APPID, appId), AppInfo.class, null, null); AppInfo routerResponse = responses.get(0); AppInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getAMHostHttpAddress(), - rmResponse.getAMHostHttpAddress()); + assertEquals( + rmResponse.getAMHostHttpAddress(), + routerResponse.getAMHostHttpAddress()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppAttempts()} inside Router. */ - @Test(timeout = 1000) - public void testAppAttemptXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppAttemptXML() throws Exception { String appId = submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.ATTEMPTS, + RM_WEB_SERVICE_PATH + format(APPS_APPID_APPATTEMPTS, appId), AppAttemptsInfo.class, null, null); AppAttemptsInfo routerResponse = responses.get(0); AppAttemptsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getAttempts().size(), - rmResponse.getAttempts().size()); + assertEquals( + rmResponse.getAttempts().size(), + routerResponse.getAttempts().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppState()} inside Router. */ - @Test(timeout = 1000) - public void testAppStateXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppStateXML() throws Exception { String appId = submitApplication(); - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" - + appId + "/" + RMWSConsts.STATE, AppState.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId), + AppState.class, null, null); AppState routerResponse = responses.get(0); AppState rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getState(), rmResponse.getState()); + assertEquals( + rmResponse.getState(), + routerResponse.getState()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateAppState()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateAppStateXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateAppStateXML() throws Exception { String appId = submitApplication(); + String pathApp = + RM_WEB_SERVICE_PATH + format(APPS_APPID_STATE, appId); // Test with a wrong HTTP method - ClientResponse badResponse = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.STATE, null, null, - null, HTTPMethods.POST); + ClientResponse badResponse = performCall( + pathApp, null, null, null, POST); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - AppState appState = new AppState("KILLED"); - ClientResponse response = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.STATE, null, null, - appState, HTTPMethods.PUT); + ClientResponse response = performCall( + pathApp, null, null, appState, PUT); - if (response.getStatus() == STATUS_ACCEPTED) { - AppState ci = response.getEntity(AppState.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_ACCEPTED, response.getStatus()); + AppState ci = response.getEntity(AppState.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppPriority()} inside Router. */ - @Test(timeout = 1000) - public void testAppPriorityXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppPriorityXML() throws Exception { String appId = submitApplication(); - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" - + appId + "/" + RMWSConsts.PRIORITY, AppPriority.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + AppPriority.class, null, null); AppPriority routerResponse = responses.get(0); AppPriority rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getPriority(), rmResponse.getPriority()); + assertEquals(rmResponse.getPriority(), routerResponse.getPriority()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateApplicationPriority()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateAppPriorityXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateAppPriorityXML() throws Exception { String appId = submitApplication(); // Test with a wrong HTTP method - ClientResponse badResponse = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.PRIORITY, null, null, - null, HTTPMethods.POST); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + null, null, null, POST); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - AppPriority appPriority = new AppPriority(1); - ClientResponse response = - performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.PRIORITY, - null, null, appPriority, HTTPMethods.PUT); + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + format(APPS_APPID_PRIORITY, appId), + null, null, appPriority, PUT); - if (response.getStatus() == STATUS_OK) { - AppPriority ci = response.getEntity(AppPriority.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + AppPriority ci = response.getEntity(AppPriority.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppQueue()} inside Router. */ - @Test(timeout = 1000) - public void testAppQueueXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppQueueXML() throws Exception { String appId = submitApplication(); - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" - + appId + "/" + RMWSConsts.QUEUE, AppQueue.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + AppQueue.class, null, null); AppQueue routerResponse = responses.get(0); AppQueue rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getQueue(), rmResponse.getQueue()); + assertEquals(rmResponse.getQueue(), routerResponse.getQueue()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateAppQueue()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateAppQueueXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateAppQueueXML() throws Exception { String appId = submitApplication(); // Test with a wrong HTTP method - ClientResponse badResponse = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.QUEUE, null, null, - null, HTTPMethods.POST); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + null, null, null, POST); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - AppQueue appQueue = new AppQueue("default"); - ClientResponse response = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.QUEUE, null, null, - appQueue, HTTPMethods.PUT); + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + format(APPS_APPID_QUEUE, appId), + null, null, appQueue, PUT); - if (response.getStatus() == STATUS_OK) { - AppQueue ci = response.getEntity(AppQueue.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + AppQueue ci = response.getEntity(AppQueue.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppTimeouts()} inside Router. */ - @Test(timeout = 1000) - public void testAppTimeoutsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppTimeoutsXML() throws Exception { String appId = submitApplication(); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.TIMEOUTS, + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId), AppTimeoutsInfo.class, null, null); AppTimeoutsInfo routerResponse = responses.get(0); AppTimeoutsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getAppTimeouts().size(), - rmResponse.getAppTimeouts().size()); + assertEquals( + rmResponse.getAppTimeouts().size(), + routerResponse.getAppTimeouts().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getAppTimeout()} inside Router. */ - @Test(timeout = 1000) - public void testAppTimeoutXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAppTimeoutXML() throws Exception { String appId = submitApplication(); - + String pathApp = RM_WEB_SERVICE_PATH + format(APPS_TIMEOUTS, appId); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.TIMEOUTS + "/" + "LIFETIME", - AppTimeoutInfo.class, null, null); + pathApp + "/" + "LIFETIME", AppTimeoutInfo.class, null, null); AppTimeoutInfo routerResponse = responses.get(0); AppTimeoutInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getExpireTime(), rmResponse.getExpireTime()); + assertEquals( + rmResponse.getExpireTime(), + routerResponse.getExpireTime()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateApplicationTimeout()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateAppTimeoutsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateAppTimeoutsXML() throws Exception { String appId = submitApplication(); // Test with a wrong HTTP method - ClientResponse badResponse = performCall(RMWSConsts.RM_WEB_SERVICE_PATH - + RMWSConsts.APPS + "/" + appId + "/" + RMWSConsts.TIMEOUT, null, null, - null, HTTPMethods.POST); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUT, appId), + null, null, null, POST); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); - // Test with the correct HTTP method - - // Create a bad request + // Test with a bad request AppTimeoutInfo appTimeoutInfo = new AppTimeoutInfo(); - ClientResponse response = - performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.TIMEOUT, - null, null, appTimeoutInfo, HTTPMethods.PUT); + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + format(APPS_TIMEOUT, appId), + null, null, appTimeoutInfo, PUT); - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#createNewReservation()} inside Router. */ - @Test(timeout = 1000) - public void testNewReservationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testNewReservationXML() throws Exception { // Test with a wrong HTTP method - ClientResponse badResponse = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_NEW, - null, null, null, HTTPMethods.PUT); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + RESERVATION_NEW, + null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + RESERVATION_NEW, + null, null, null, POST); - ClientResponse response = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_NEW, - null, null, null, HTTPMethods.POST); - - if (response.getStatus() == STATUS_OK) { - NewReservation ci = response.getEntity(NewReservation.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + NewReservation ci = response.getEntity(NewReservation.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#submitReservation()} inside Router. */ - @Test(timeout = 1000) - public void testSubmitReservationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testSubmitReservationXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_SUBMIT, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + RESERVATION_SUBMIT, null, + null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - ReservationSubmissionRequestInfo context = new ReservationSubmissionRequestInfo(); context.setReservationId(getNewReservationId().getReservationId()); // ReservationDefinition is null ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_SUBMIT, null, - null, context, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + RESERVATION_SUBMIT, null, null, context, POST); - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#updateReservation()} inside Router. */ - @Test(timeout = 1000) - public void testUpdateReservationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testUpdateReservationXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_UPDATE, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + RESERVATION_UPDATE, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - String reservationId = getNewReservationId().getReservationId(); ReservationUpdateRequestInfo context = new ReservationUpdateRequestInfo(); context.setReservationId(reservationId); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_UPDATE, null, - null, context, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + RESERVATION_UPDATE, null, null, context, POST); - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#deleteReservation()} inside Router. */ - @Test(timeout = 1000) - public void testDeleteReservationXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testDeleteReservationXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_DELETE, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + RESERVATION_DELETE, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - String reservationId = getNewReservationId().getReservationId(); ReservationDeleteRequestInfo context = new ReservationDeleteRequestInfo(); context.setReservationId(reservationId); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_DELETE, null, - null, context, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + RESERVATION_DELETE, null, null, context, POST); - if (response.getStatus() == STATUS_BADREQUEST) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_BAD_REQUEST, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getNodeToLabels()} inside Router. */ - @Test(timeout = 1000) - public void testGetNodeToLabelsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetNodeToLabelsXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.GET_NODE_TO_LABELS, + RM_WEB_SERVICE_PATH + GET_NODE_TO_LABELS, NodeToLabelsInfo.class, null, null); NodeToLabelsInfo routerResponse = responses.get(0); NodeToLabelsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getNodeToLabels().size(), - rmResponse.getNodeToLabels().size()); + assertEquals( + rmResponse.getNodeToLabels().size(), + routerResponse.getNodeToLabels().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getClusterNodeLabels()} inside Router. */ - @Test(timeout = 1000) - public void testGetClusterNodeLabelsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetClusterNodeLabelsXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.GET_NODE_LABELS, + RM_WEB_SERVICE_PATH + GET_NODE_LABELS, NodeLabelsInfo.class, null, null); NodeLabelsInfo routerResponse = responses.get(0); NodeLabelsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getNodeLabels().size(), - rmResponse.getNodeLabels().size()); + assertEquals( + rmResponse.getNodeLabels().size(), + routerResponse.getNodeLabels().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getLabelsOnNode()} inside Router. */ - @Test(timeout = 1000) - public void testGetLabelsOnNodeXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetLabelsOnNodeXML() throws Exception { - List responses = - performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" - + getNodeId() + "/" + RMWSConsts.GET_LABELS, - NodeLabelsInfo.class, null, null); + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + format(NODES_NODEID_GETLABELS, getNodeId()), + NodeLabelsInfo.class, null, null); NodeLabelsInfo routerResponse = responses.get(0); NodeLabelsInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getNodeLabels().size(), - rmResponse.getNodeLabels().size()); + assertEquals( + rmResponse.getNodeLabels().size(), + routerResponse.getNodeLabels().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#getLabelsToNodes()} inside Router. */ - @Test(timeout = 1000) - public void testGetLabelsMappingXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetLabelsMappingEmptyXML() throws Exception { List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, + RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, LabelsToNodesInfo.class, null, null); LabelsToNodesInfo routerResponse = responses.get(0); LabelsToNodesInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getLabelsToNodes().size(), - rmResponse.getLabelsToNodes().size()); + assertEquals( + rmResponse.getLabelsToNodes().size(), + routerResponse.getLabelsToNodes().size()); + } + + /** + * This test validates the correctness of + * {@link RMWebServiceProtocol#getLabelsToNodes()} inside Router. + */ + @Test(timeout = 2000) + public void testGetLabelsMappingXML() throws Exception { + + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + LABEL_MAPPINGS, + LabelsToNodesInfo.class, LABELS, "label1"); + + LabelsToNodesInfo routerResponse = responses.get(0); + LabelsToNodesInfo rmResponse = responses.get(1); + + assertNotNull(routerResponse); + assertNotNull(rmResponse); + + assertEquals( + rmResponse.getLabelsToNodes().size(), + routerResponse.getLabelsToNodes().size()); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#addToClusterNodeLabels()} inside Router. */ - @Test(timeout = 1000) - public void testAddToClusterNodeLabelsXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testAddToClusterNodeLabelsXML() throws Exception { // Test with a wrong HTTP method - ClientResponse badResponse = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.ADD_NODE_LABELS, - null, null, null, HTTPMethods.PUT); + ClientResponse badResponse = performCall( + RM_WEB_SERVICE_PATH + ADD_NODE_LABELS, + null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - List nodeLabels = new ArrayList(); + List nodeLabels = new ArrayList<>(); nodeLabels.add(NodeLabel.newInstance("default")); NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels); - ClientResponse response = - performCall(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.ADD_NODE_LABELS, - null, null, context, HTTPMethods.POST); + ClientResponse response = performCall( + RM_WEB_SERVICE_PATH + ADD_NODE_LABELS, null, null, context, POST); - if (response.getStatus() == STATUS_OK) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#removeFromCluserNodeLabels()} inside Router. */ - @Test(timeout = 1000) + @Test(timeout = 2000) public void testRemoveFromCluserNodeLabelsXML() - throws JSONException, Exception { + throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REMOVE_NODE_LABELS, null, - null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + REMOVE_NODE_LABELS, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - addNodeLabel(); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REMOVE_NODE_LABELS, - RMWSConsts.LABELS, "default", null, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + REMOVE_NODE_LABELS, + LABELS, "default", null, POST); - if (response.getStatus() == STATUS_OK) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#replaceLabelsOnNodes()} inside Router. */ - @Test(timeout = 1000) - public void testReplaceLabelsOnNodesXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testReplaceLabelsOnNodesXML() throws Exception { // Test with a wrong HTTP method ClientResponse badResponse = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REPLACE_NODE_TO_LABELS, - null, null, null, HTTPMethods.PUT); + RM_WEB_SERVICE_PATH + REPLACE_NODE_TO_LABELS, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - addNodeLabel(); NodeToLabelsEntryList context = new NodeToLabelsEntryList(); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.REPLACE_NODE_TO_LABELS, - null, null, context, HTTPMethods.POST); + RM_WEB_SERVICE_PATH + REPLACE_NODE_TO_LABELS, + null, null, context, POST); - if (response.getStatus() == STATUS_OK) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of * {@link RMWebServiceProtocol#replaceLabelsOnNode()} inside Router. */ - @Test(timeout = 1000) - public void testReplaceLabelsOnNodeXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testReplaceLabelsOnNodeXML() throws Exception { // Test with a wrong HTTP method - ClientResponse badResponse = - performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" - + getNodeId() + "/replace-labels", - null, null, null, HTTPMethods.PUT); + String pathNode = RM_WEB_SERVICE_PATH + + format(NODES_NODEID_REPLACE_LABELS, getNodeId()); + ClientResponse badResponse = performCall( + pathNode, null, null, null, PUT); - Assert.assertEquals(STATUS_ERROR, badResponse.getStatus()); + assertEquals(SC_INTERNAL_SERVER_ERROR, badResponse.getStatus()); // Test with the correct HTTP method - addNodeLabel(); ClientResponse response = performCall( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES + "/" + getNodeId() - + "/replace-labels", - RMWSConsts.LABELS, "default", null, HTTPMethods.POST); + pathNode, LABELS, "default", null, POST); - if (response.getStatus() == STATUS_OK) { - String ci = response.getEntity(String.class); - Assert.assertNotNull(ci); - } else { - Assert.fail(); - } + assertEquals(SC_OK, response.getStatus()); + String ci = response.getEntity(String.class); + assertNotNull(ci); } /** * This test validates the correctness of {@link WebServices#getAppAttempt} * inside Router. */ - @Test(timeout = 1000) - public void testGetAppAttemptXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetAppAttemptXML() throws Exception { String appId = submitApplication(); - + String pathAttempts = RM_WEB_SERVICE_PATH + format( + APPS_APPID_APPATTEMPTS_APPATTEMPTID, appId, getAppAttempt(appId)); List responses = performGetCalls( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId + "/" - + RMWSConsts.APPATTEMPTS + "/" + getAppAttempt(appId), - AppAttemptInfo.class, null, null); + pathAttempts, AppAttemptInfo.class, null, null); AppAttemptInfo routerResponse = responses.get(0); AppAttemptInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getAppAttemptId(), - rmResponse.getAppAttemptId()); + assertEquals( + rmResponse.getAppAttemptId(), + routerResponse.getAppAttemptId()); } /** * This test validates the correctness of {@link WebServices#getContainers} * inside Router. */ - @Test(timeout = 1000) - public void testGetContainersXML() throws JSONException, Exception { + @Test(timeout = 2000) + public void testGetContainersXML() throws Exception { String appId = submitApplication(); - - List responses = - performGetCalls(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" - + appId + "/" + RMWSConsts.APPATTEMPTS + "/" + getAppAttempt(appId) - + "/" + RMWSConsts.CONTAINERS, ContainersInfo.class, null, null); + String pathAttempts = RM_WEB_SERVICE_PATH + format( + APPS_APPID_APPATTEMPTS_APPATTEMPTID_CONTAINERS, + appId, getAppAttempt(appId)); + List responses = performGetCalls( + pathAttempts, ContainersInfo.class, null, null); ContainersInfo routerResponse = responses.get(0); ContainersInfo rmResponse = responses.get(1); - Assert.assertNotNull(routerResponse); - Assert.assertNotNull(rmResponse); + assertNotNull(routerResponse); + assertNotNull(rmResponse); - Assert.assertEquals(rmResponse.getContainers().size(), - rmResponse.getContainers().size()); + assertEquals( + rmResponse.getContainers().size(), + routerResponse.getContainers().size()); + } + + @Test(timeout = 60000) + public void testGetAppsMultiThread() throws Exception { + final int iniNumApps = getNumApps(); + + // This submits an application + testGetContainersXML(); + // This submits an application + testAppsXML(); + + // Wait at most 10 seconds until we see all the applications + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + // Check if we have the 2 apps we submitted + return getNumApps() == iniNumApps + 2; + } catch (Exception e) { + fail(); + } + return false; + } + }, 100, 10 * 1000); + + // Multithreaded getApps() + ExecutorService threadpool = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("REST Tester #%d") + .build()); + CompletionService svc = new ExecutorCompletionService<>(threadpool); + try { + // Submit a bunch of operations concurrently + for (int i = 0; i < NUM_THREADS_TESTS; i++) { + svc.submit(new Callable() { + @Override + public Void call() throws Exception { + assertEquals(iniNumApps + 2, getNumApps()); + return null; + } + }); + } + } finally { + threadpool.shutdown(); + } + + assertEquals(iniNumApps + 2, getNumApps()); + } + + /** + * Get the number of applications in the system. + * @return Number of applications in the system + * @throws Exception If we cannot get the applications. + */ + private int getNumApps() throws Exception { + List responses = performGetCalls( + RM_WEB_SERVICE_PATH + APPS, AppsInfo.class, null, null); + AppsInfo routerResponse = responses.get(0); + AppsInfo rmResponse = responses.get(1); + assertEquals(rmResponse.getApps().size(), routerResponse.getApps().size()); + return rmResponse.getApps().size(); } private String getNodeId() { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES); + WebResource toRM = clientToRM.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + NODES); ClientResponse response = - toRM.accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + toRM.accept(APPLICATION_XML).get(ClientResponse.class); NodesInfo ci = response.getEntity(NodesInfo.class); return ci.getNodes().get(0).getNodeId(); } private NewApplication getNewApplicationId() { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)).path( - RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS_NEW_APPLICATION); + WebResource toRM = clientToRM.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION); ClientResponse response = - toRM.accept(MediaType.APPLICATION_XML).post(ClientResponse.class); + toRM.accept(APPLICATION_XML).post(ClientResponse.class); return response.getEntity(NewApplication.class); } @@ -1252,47 +1357,61 @@ public class TestRouterWebServicesREST { context.setApplicationId(appId); Client clientToRouter = Client.create(); - WebResource toRM = - clientToRouter.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS); - toRM.entity(context, MediaType.APPLICATION_XML) - .accept(MediaType.APPLICATION_XML).post(ClientResponse.class); + WebResource toRM = clientToRouter.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + APPS); + toRM.entity(context, APPLICATION_XML) + .accept(APPLICATION_XML) + .post(ClientResponse.class); return appId; } private NewReservation getNewReservationId() { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.RESERVATION_NEW); - ClientResponse response = - toRM.accept(MediaType.APPLICATION_XML).post(ClientResponse.class); + WebResource toRM = clientToRM.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + RESERVATION_NEW); + ClientResponse response = toRM. + accept(APPLICATION_XML) + .post(ClientResponse.class); return response.getEntity(NewReservation.class); } private String addNodeLabel() { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.ADD_NODE_LABELS); - List nodeLabels = new ArrayList(); + WebResource toRM = clientToRM.resource(rmAddress) + .path(RM_WEB_SERVICE_PATH + ADD_NODE_LABELS); + List nodeLabels = new ArrayList<>(); nodeLabels.add(NodeLabel.newInstance("default")); NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels); - ClientResponse response = toRM.entity(context, MediaType.APPLICATION_XML) - .accept(MediaType.APPLICATION_XML).post(ClientResponse.class); + ClientResponse response = toRM + .entity(context, APPLICATION_XML) + .accept(APPLICATION_XML) + .post(ClientResponse.class); return response.getEntity(String.class); } private String getAppAttempt(String appId) { Client clientToRM = Client.create(); - WebResource toRM = - clientToRM.resource(WebAppUtils.getRMWebAppURLWithScheme(conf)) - .path(RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.APPS + "/" + appId - + "/" + RMWSConsts.ATTEMPTS); - ClientResponse response = - toRM.accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + String pathAppAttempt = + RM_WEB_SERVICE_PATH + format(APPS_APPID_APPATTEMPTS, appId); + WebResource toRM = clientToRM.resource(rmAddress) + .path(pathAppAttempt); + ClientResponse response = toRM + .accept(APPLICATION_XML) + .get(ClientResponse.class); AppAttemptsInfo ci = response.getEntity(AppAttemptsInfo.class); return ci.getAttempts().get(0).getAppAttemptId(); } + /** + * Convert format using {name} (HTTP base) into %s (Java based). + * @param format Initial format using {}. + * @param args Arguments for the format. + * @return New format using %s. + */ + private static String format(String format, Object... args) { + Pattern p = Pattern.compile("\\{.*?}"); + Matcher m = p.matcher(format); + String newFormat = m.replaceAll("%s"); + return String.format(newFormat, args); + } }