diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java new file mode 100644 index 00000000000..42361a3ad96 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.*; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * This class is for maintaining the various Router Federation Interceptor + * activity statistics and publishing them through the metrics interfaces. + */ +@InterfaceAudience.Private +@Metrics(about = "Metrics for Router Federation Interceptor", context = "fedr") +public final class RouterMetrics { + + private static final MetricsInfo RECORD_INFO = + info("RouterMetrics", "Router Federation Interceptor"); + private static AtomicBoolean isInitialized = new AtomicBoolean(false); + + // Metrics for operation failed + @Metric("# of applications failed to be submitted") + private MutableGaugeInt numAppsFailedSubmitted; + @Metric("# of applications failed to be created") + private MutableGaugeInt numAppsFailedCreated; + @Metric("# of applications failed to be killed") + private MutableGaugeInt numAppsFailedKilled; + @Metric("# of application reports failed to be retrieved") + private MutableGaugeInt numAppsFailedRetrieved; + + // Aggregate metrics are shared, and don't have to be looked up per call + @Metric("Total number of successful Submitted apps and latency(ms)") + private MutableRate totalSucceededAppsSubmitted; + @Metric("Total number of successful Killed apps and latency(ms)") + private MutableRate totalSucceededAppsKilled; + @Metric("Total number of successful Created apps and latency(ms)") + private MutableRate totalSucceededAppsCreated; + @Metric("Total number of successful Retrieved app reports and latency(ms)") + private MutableRate totalSucceededAppsRetrieved; + + /** + * Provide quantile counters for all latencies. + */ + private MutableQuantiles submitApplicationLatency; + private MutableQuantiles getNewApplicationLatency; + private MutableQuantiles killApplicationLatency; + private MutableQuantiles getApplicationReportLatency; + + private static volatile RouterMetrics INSTANCE = null; + private static MetricsRegistry registry; + + private RouterMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "Router"); + getNewApplicationLatency = registry.newQuantiles("getNewApplicationLatency", + "latency of get new application", "ops", "latency", 10); + submitApplicationLatency = registry.newQuantiles("submitApplicationLatency", + "latency of submit application", "ops", "latency", 10); + killApplicationLatency = registry.newQuantiles("killApplicationLatency", + "latency of kill application", "ops", "latency", 10); + getApplicationReportLatency = + registry.newQuantiles("getApplicationReportLatency", + "latency of get application report", "ops", "latency", 10); + } + + public static RouterMetrics getMetrics() { + if (!isInitialized.get()) { + synchronized (RouterMetrics.class) { + if (INSTANCE == null) { + INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics", + "Metrics for the Yarn Router", new RouterMetrics()); + isInitialized.set(true); + } + } + } + return INSTANCE; + } + + @VisibleForTesting + synchronized static void destroy() { + isInitialized.set(false); + INSTANCE = null; + } + + @VisibleForTesting + public long getNumSucceededAppsCreated() { + return totalSucceededAppsCreated.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededAppsSubmitted() { + return totalSucceededAppsSubmitted.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededAppsKilled() { + return totalSucceededAppsKilled.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededAppsRetrieved() { + return totalSucceededAppsRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public double getLatencySucceededAppsCreated() { + return totalSucceededAppsCreated.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededAppsSubmitted() { + return totalSucceededAppsSubmitted.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededAppsKilled() { + return totalSucceededAppsKilled.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetAppReport() { + return totalSucceededAppsRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public int getAppsFailedCreated() { + return numAppsFailedCreated.value(); + } + + @VisibleForTesting + public int getAppsFailedSubmitted() { + return numAppsFailedSubmitted.value(); + } + + @VisibleForTesting + public int getAppsFailedKilled() { + return numAppsFailedKilled.value(); + } + + @VisibleForTesting + public int getAppsFailedRetrieved() { + return numAppsFailedRetrieved.value(); + } + + public void succeededAppsCreated(long duration) { + totalSucceededAppsCreated.add(duration); + getNewApplicationLatency.add(duration); + } + + public void succeededAppsSubmitted(long duration) { + totalSucceededAppsSubmitted.add(duration); + submitApplicationLatency.add(duration); + } + + public void succeededAppsKilled(long duration) { + totalSucceededAppsKilled.add(duration); + killApplicationLatency.add(duration); + } + + public void succeededAppsRetrieved(long duration) { + totalSucceededAppsRetrieved.add(duration); + getApplicationReportLatency.add(duration); + } + + public void incrAppsFailedCreated() { + numAppsFailedCreated.incr(); + } + + public void incrAppsFailedSubmitted() { + numAppsFailedSubmitted.incr(); + } + + public void incrAppsFailedKilled() { + numAppsFailedKilled.incr(); + } + + public void incrAppsFailedRetrieved() { + numAppsFailedRetrieved.incr(); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 7268ebd949a..3a36eec66ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -98,7 +98,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSub import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.router.RouterMetrics; import org.apache.hadoop.yarn.server.router.RouterServerUtil; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,6 +133,8 @@ public class FederationClientInterceptor private FederationStateStoreFacade federationFacade; private Random rand; private RouterPolicyFacade policyFacade; + private RouterMetrics routerMetrics; + private final Clock clock = new MonotonicClock(); @Override public void init(String userName) { @@ -153,7 +158,7 @@ public class FederationClientInterceptor clientRMProxies = new ConcurrentHashMap(); - + routerMetrics = RouterMetrics.getMetrics(); } @Override @@ -220,6 +225,9 @@ public class FederationClientInterceptor @Override public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException { + + long startTime = clock.getTime(); + Map subClustersActive = federationFacade.getSubClusters(true); @@ -238,6 +246,9 @@ public class FederationClientInterceptor } if (response != null) { + + long stopTime = clock.getTime(); + routerMetrics.succeededAppsCreated(stopTime - startTime); return response; } else { // Empty response from the ResourceManager. @@ -247,6 +258,7 @@ public class FederationClientInterceptor } + routerMetrics.incrAppsFailedCreated(); String errMsg = "Fail to create a new application."; LOG.error(errMsg); throw new YarnException(errMsg); @@ -320,9 +332,13 @@ public class FederationClientInterceptor @Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException { + + long startTime = clock.getTime(); + if (request == null || request.getApplicationSubmissionContext() == null || request.getApplicationSubmissionContext() .getApplicationId() == null) { + routerMetrics.incrAppsFailedSubmitted(); RouterServerUtil .logAndThrowException("Missing submitApplication request or " + "applicationSubmissionContex information.", null); @@ -350,6 +366,7 @@ public class FederationClientInterceptor subClusterId = federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); } catch (YarnException e) { + routerMetrics.incrAppsFailedSubmitted(); String message = "Unable to insert the ApplicationId " + applicationId + " into the FederationStateStore"; RouterServerUtil.logAndThrowException(message, e); @@ -368,6 +385,7 @@ public class FederationClientInterceptor LOG.info("Application " + applicationId + " already submitted on SubCluster " + subClusterId); } else { + routerMetrics.incrAppsFailedSubmitted(); RouterServerUtil.logAndThrowException(message, e); } } @@ -388,6 +406,8 @@ public class FederationClientInterceptor LOG.info("Application " + request.getApplicationSubmissionContext().getApplicationName() + " with appId " + applicationId + " submitted on " + subClusterId); + long stopTime = clock.getTime(); + routerMetrics.succeededAppsSubmitted(stopTime - startTime); return response; } else { // Empty response from the ResourceManager. @@ -396,6 +416,7 @@ public class FederationClientInterceptor } } + routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Application " + request.getApplicationSubmissionContext().getApplicationName() + " with appId " + applicationId + " failed to be submitted."; @@ -423,7 +444,10 @@ public class FederationClientInterceptor public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnException, IOException { + long startTime = clock.getTime(); + if (request == null || request.getApplicationId() == null) { + routerMetrics.incrAppsFailedKilled(); RouterServerUtil.logAndThrowException( "Missing forceKillApplication request or ApplicationId.", null); } @@ -434,6 +458,7 @@ public class FederationClientInterceptor subClusterId = federationFacade .getApplicationHomeSubCluster(request.getApplicationId()); } catch (YarnException e) { + routerMetrics.incrAppsFailedKilled(); RouterServerUtil.logAndThrowException("Application " + applicationId + " does not exist in FederationStateStore", e); } @@ -447,6 +472,7 @@ public class FederationClientInterceptor + subClusterId); response = clientRMProxy.forceKillApplication(request); } catch (Exception e) { + routerMetrics.incrAppsFailedKilled(); LOG.error("Unable to kill the application report for " + request.getApplicationId() + "to SubCluster " + subClusterId.getId(), e); @@ -458,6 +484,8 @@ public class FederationClientInterceptor + applicationId + " to SubCluster " + subClusterId.getId()); } + long stopTime = clock.getTime(); + routerMetrics.succeededAppsKilled(stopTime - startTime); return response; } @@ -481,7 +509,10 @@ public class FederationClientInterceptor public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException { + long startTime = clock.getTime(); + if (request == null || request.getApplicationId() == null) { + routerMetrics.incrAppsFailedRetrieved(); RouterServerUtil.logAndThrowException( "Missing getApplicationReport request or applicationId information.", null); @@ -493,6 +524,7 @@ public class FederationClientInterceptor subClusterId = federationFacade .getApplicationHomeSubCluster(request.getApplicationId()); } catch (YarnException e) { + routerMetrics.incrAppsFailedRetrieved(); RouterServerUtil .logAndThrowException("Application " + request.getApplicationId() + " does not exist in FederationStateStore", e); @@ -505,6 +537,7 @@ public class FederationClientInterceptor try { response = clientRMProxy.getApplicationReport(request); } catch (Exception e) { + routerMetrics.incrAppsFailedRetrieved(); LOG.error("Unable to get the application report for " + request.getApplicationId() + "to SubCluster " + subClusterId.getId(), e); @@ -517,6 +550,8 @@ public class FederationClientInterceptor + subClusterId.getId()); } + long stopTime = clock.getTime(); + routerMetrics.succeededAppsRetrieved(stopTime - startTime); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 8ecc19dacbe..4c7d4b18077 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -18,7 +18,19 @@ package org.apache.hadoop.yarn.server.router.webapp; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AuthorizationException; @@ -36,20 +48,42 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; -import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; +import org.apache.hadoop.yarn.server.router.RouterMetrics; import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import java.io.IOException; -import java.util.*; +import com.google.common.annotations.VisibleForTesting; /** * Extends the {@code AbstractRESTRequestInterceptor} class and provides an @@ -66,6 +100,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { private FederationStateStoreFacade federationFacade; private Random rand; private RouterPolicyFacade policyFacade; + private RouterMetrics routerMetrics; + private final Clock clock = new MonotonicClock(); private Map interceptors; @@ -88,6 +124,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); interceptors = new HashMap(); + routerMetrics = RouterMetrics.getMetrics(); } private SubClusterId getRandomActiveSubCluster( @@ -191,10 +228,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public Response createNewApplication(HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + long startTime = clock.getTime(); + Map subClustersActive; try { subClustersActive = federationFacade.getSubClusters(true); } catch (YarnException e) { + routerMetrics.incrAppsFailedCreated(); return Response.status(Status.INTERNAL_SERVER_ERROR) .entity(e.getLocalizedMessage()).build(); } @@ -207,6 +248,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist); } catch (YarnException e) { + routerMetrics.incrAppsFailedCreated(); return Response.status(Status.SERVICE_UNAVAILABLE) .entity(e.getLocalizedMessage()).build(); } @@ -226,6 +268,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } if (response != null && response.getStatus() == 200) { + + long stopTime = clock.getTime(); + routerMetrics.succeededAppsCreated(stopTime - startTime); + return response; } else { // Empty response from the ResourceManager. @@ -236,6 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String errMsg = "Fail to create a new application."; LOG.error(errMsg); + routerMetrics.incrAppsFailedCreated(); return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build(); } @@ -308,7 +355,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public Response submitApplication(ApplicationSubmissionContextInfo newApp, HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + long startTime = clock.getTime(); + if (newApp == null || newApp.getApplicationId() == null) { + routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Missing ApplicationSubmissionContextInfo or " + "applicationSubmissionContex information."; return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); @@ -318,6 +369,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { applicationId = ApplicationId.fromString(newApp.getApplicationId()); } catch (IllegalArgumentException e) { + routerMetrics.incrAppsFailedSubmitted(); return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) .build(); } @@ -333,6 +385,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { subClusterId = policyFacade.getHomeSubcluster(context, blacklist); } catch (YarnException e) { + routerMetrics.incrAppsFailedSubmitted(); return Response.status(Status.SERVICE_UNAVAILABLE) .entity(e.getLocalizedMessage()).build(); } @@ -349,6 +402,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterId = federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); } catch (YarnException e) { + routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Unable to insert the ApplicationId " + applicationId + " into the FederationStateStore"; return Response.status(Status.SERVICE_UNAVAILABLE) @@ -367,6 +421,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterIdInStateStore = federationFacade.getApplicationHomeSubCluster(applicationId); } catch (YarnException e1) { + routerMetrics.incrAppsFailedSubmitted(); return Response.status(Status.SERVICE_UNAVAILABLE) .entity(e1.getLocalizedMessage()).build(); } @@ -374,6 +429,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { LOG.info("Application " + applicationId + " already submitted on SubCluster " + subClusterId); } else { + routerMetrics.incrAppsFailedSubmitted(); return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg) .build(); } @@ -384,6 +440,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { try { subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { + routerMetrics.incrAppsFailedSubmitted(); return Response.status(Status.SERVICE_UNAVAILABLE) .entity(e.getLocalizedMessage()).build(); } @@ -401,6 +458,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { if (response != null && response.getStatus() == 202) { LOG.info("Application " + context.getApplicationName() + " with appId " + applicationId + " submitted on " + subClusterId); + + long stopTime = clock.getTime(); + routerMetrics.succeededAppsSubmitted(stopTime - startTime); + return response; } else { // Empty response from the ResourceManager. @@ -409,6 +470,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { } } + routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Application " + newApp.getApplicationName() + " with appId " + applicationId + " failed to be submitted."; LOG.error(errMsg); @@ -435,10 +497,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { public AppInfo getApp(HttpServletRequest hsr, String appId, Set unselectedFields) { + long startTime = clock.getTime(); + ApplicationId applicationId = null; try { applicationId = ApplicationId.fromString(appId); } catch (IllegalArgumentException e) { + routerMetrics.incrAppsFailedRetrieved(); return null; } @@ -448,16 +513,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); if (subClusterId == null) { + routerMetrics.incrAppsFailedRetrieved(); return null; } subClusterInfo = federationFacade.getSubCluster(subClusterId); } catch (YarnException e) { + routerMetrics.incrAppsFailedRetrieved(); return null; } - return getOrCreateInterceptorForSubCluster(subClusterId, + AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId, subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId, unselectedFields); + + long stopTime = clock.getTime(); + routerMetrics.succeededAppsRetrieved(stopTime - startTime); + + return response; } /** @@ -481,23 +553,37 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { String appId) throws AuthorizationException, YarnException, InterruptedException, IOException { + long startTime = clock.getTime(); + ApplicationId applicationId = null; try { applicationId = ApplicationId.fromString(appId); } catch (IllegalArgumentException e) { + routerMetrics.incrAppsFailedKilled(); return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) .build(); } - SubClusterId subClusterId = - federationFacade.getApplicationHomeSubCluster(applicationId); + SubClusterInfo subClusterInfo = null; + SubClusterId subClusterId = null; + try { + subClusterId = + federationFacade.getApplicationHomeSubCluster(applicationId); + subClusterInfo = federationFacade.getSubCluster(subClusterId); + } catch (YarnException e) { + routerMetrics.incrAppsFailedKilled(); + return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()) + .build(); + } - SubClusterInfo subClusterInfo = - federationFacade.getSubCluster(subClusterId); - - return getOrCreateInterceptorForSubCluster(subClusterId, + Response response = getOrCreateInterceptorForSubCluster(subClusterId, subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState, hsr, appId); + + long stopTime = clock.getTime(); + routerMetrics.succeededAppsRetrieved(stopTime - startTime); + + return response; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java new file mode 100644 index 00000000000..3cdafd832d9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class validates the correctness of Router Federation Interceptor + * Metrics. + */ +public class TestRouterMetrics { + public static final Logger LOG = + LoggerFactory.getLogger(TestRouterMetrics.class); + + // All the operations in the bad subcluster failed. + private MockBadSubCluster badSubCluster = new MockBadSubCluster(); + // All the operations in the bad subcluster succeed. + private MockGoodSubCluster goodSubCluster = new MockGoodSubCluster(); + + private static RouterMetrics metrics = RouterMetrics.getMetrics(); + + @BeforeClass + public static void init() { + + LOG.info("Test: aggregate metrics are initialized correctly"); + + Assert.assertEquals(0, metrics.getNumSucceededAppsCreated()); + Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted()); + Assert.assertEquals(0, metrics.getNumSucceededAppsKilled()); + Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved()); + + Assert.assertEquals(0, metrics.getAppsFailedCreated()); + Assert.assertEquals(0, metrics.getAppsFailedSubmitted()); + Assert.assertEquals(0, metrics.getAppsFailedKilled()); + Assert.assertEquals(0, metrics.getAppsFailedRetrieved()); + + LOG.info("Test: aggregate metrics are updated correctly"); + } + + /** + * This test validates the correctness of the metric: Created Apps + * successfully. + */ + @Test + public void testSucceededAppsCreated() { + + long totalGoodBefore = metrics.getNumSucceededAppsCreated(); + + goodSubCluster.getNewApplication(100); + + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededAppsCreated()); + Assert.assertEquals(100, metrics.getLatencySucceededAppsCreated(), 0); + + goodSubCluster.getNewApplication(200); + + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededAppsCreated()); + Assert.assertEquals(150, metrics.getLatencySucceededAppsCreated(), 0); + } + + /** + * This test validates the correctness of the metric: Failed to create Apps. + */ + @Test + public void testAppsFailedCreated() { + + long totalBadbefore = metrics.getAppsFailedCreated(); + + badSubCluster.getNewApplication(); + + Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedCreated()); + } + + /** + * This test validates the correctness of the metric: Submitted Apps + * successfully. + */ + @Test + public void testSucceededAppsSubmitted() { + + long totalGoodBefore = metrics.getNumSucceededAppsSubmitted(); + + goodSubCluster.submitApplication(100); + + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededAppsSubmitted()); + Assert.assertEquals(100, metrics.getLatencySucceededAppsSubmitted(), 0); + + goodSubCluster.submitApplication(200); + + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededAppsSubmitted()); + Assert.assertEquals(150, metrics.getLatencySucceededAppsSubmitted(), 0); + } + + /** + * This test validates the correctness of the metric: Failed to submit Apps. + */ + @Test + public void testAppsFailedSubmitted() { + + long totalBadbefore = metrics.getAppsFailedSubmitted(); + + badSubCluster.submitApplication(); + + Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedSubmitted()); + } + + /** + * This test validates the correctness of the metric: Killed Apps + * successfully. + */ + @Test + public void testSucceededAppsKilled() { + + long totalGoodBefore = metrics.getNumSucceededAppsKilled(); + + goodSubCluster.forceKillApplication(100); + + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededAppsKilled()); + Assert.assertEquals(100, metrics.getLatencySucceededAppsKilled(), 0); + + goodSubCluster.forceKillApplication(200); + + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededAppsKilled()); + Assert.assertEquals(150, metrics.getLatencySucceededAppsKilled(), 0); + } + + /** + * This test validates the correctness of the metric: Failed to kill Apps. + */ + @Test + public void testAppsFailedKilled() { + + long totalBadbefore = metrics.getAppsFailedKilled(); + + badSubCluster.forceKillApplication(); + + Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedKilled()); + } + + /** + * This test validates the correctness of the metric: Retrieved Apps + * successfully. + */ + @Test + public void testSucceededAppsReport() { + + long totalGoodBefore = metrics.getNumSucceededAppsRetrieved(); + + goodSubCluster.getApplicationReport(100); + + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededAppsRetrieved()); + Assert.assertEquals(100, metrics.getLatencySucceededGetAppReport(), 0); + + goodSubCluster.getApplicationReport(200); + + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededAppsRetrieved()); + Assert.assertEquals(150, metrics.getLatencySucceededGetAppReport(), 0); + } + + /** + * This test validates the correctness of the metric: Failed to retrieve Apps. + */ + @Test + public void testAppsReportFailed() { + + long totalBadbefore = metrics.getAppsFailedRetrieved(); + + badSubCluster.getApplicationReport(); + + Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved()); + } + + // Records failures for all calls + private class MockBadSubCluster { + public void getNewApplication() { + LOG.info("Mocked: failed getNewApplication call"); + metrics.incrAppsFailedCreated(); + } + + public void submitApplication() { + LOG.info("Mocked: failed submitApplication call"); + metrics.incrAppsFailedSubmitted(); + } + + public void forceKillApplication() { + LOG.info("Mocked: failed forceKillApplication call"); + metrics.incrAppsFailedKilled(); + } + + public void getApplicationReport() { + LOG.info("Mocked: failed getApplicationReport call"); + metrics.incrAppsFailedRetrieved(); + } + } + + // Records successes for all calls + private class MockGoodSubCluster { + public void getNewApplication(long duration) { + LOG.info("Mocked: successful getNewApplication call with duration {}", + duration); + metrics.succeededAppsCreated(duration); + } + + public void submitApplication(long duration) { + LOG.info("Mocked: successful submitApplication call with duration {}", + duration); + metrics.succeededAppsSubmitted(duration); + } + + public void forceKillApplication(long duration) { + LOG.info("Mocked: successful forceKillApplication call with duration {}", + duration); + metrics.succeededAppsKilled(duration); + } + + public void getApplicationReport(long duration) { + LOG.info("Mocked: successful getApplicationReport call with duration {}", + duration); + metrics.succeededAppsRetrieved(duration); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index d918149e6a1..fb6cdd8cc90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -276,13 +276,11 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); AppState appState = new AppState("KILLED"); - try { - interceptor.updateAppState(appState, null, appId.toString()); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().equals("Application " + appId + " does not exist")); - } + + Response response = + interceptor.updateAppState(appState, null, appId.toString()); + Assert.assertEquals(BAD_REQUEST, response.getStatus()); + } /**