YARN-6923. Metrics for Federation Router. (Giovanni Matteo Fumarola via asuresh)
(cherry picked from commit ae8fb13b31
)
This commit is contained in:
parent
ac090b38ad
commit
2aacb9d3fb
|
@ -0,0 +1,203 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.router;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.*;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
|
||||
/**
|
||||
* This class is for maintaining the various Router Federation Interceptor
|
||||
* activity statistics and publishing them through the metrics interfaces.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@Metrics(about = "Metrics for Router Federation Interceptor", context = "fedr")
|
||||
public final class RouterMetrics {
|
||||
|
||||
private static final MetricsInfo RECORD_INFO =
|
||||
info("RouterMetrics", "Router Federation Interceptor");
|
||||
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
|
||||
|
||||
// Metrics for operation failed
|
||||
@Metric("# of applications failed to be submitted")
|
||||
private MutableGaugeInt numAppsFailedSubmitted;
|
||||
@Metric("# of applications failed to be created")
|
||||
private MutableGaugeInt numAppsFailedCreated;
|
||||
@Metric("# of applications failed to be killed")
|
||||
private MutableGaugeInt numAppsFailedKilled;
|
||||
@Metric("# of application reports failed to be retrieved")
|
||||
private MutableGaugeInt numAppsFailedRetrieved;
|
||||
|
||||
// Aggregate metrics are shared, and don't have to be looked up per call
|
||||
@Metric("Total number of successful Submitted apps and latency(ms)")
|
||||
private MutableRate totalSucceededAppsSubmitted;
|
||||
@Metric("Total number of successful Killed apps and latency(ms)")
|
||||
private MutableRate totalSucceededAppsKilled;
|
||||
@Metric("Total number of successful Created apps and latency(ms)")
|
||||
private MutableRate totalSucceededAppsCreated;
|
||||
@Metric("Total number of successful Retrieved app reports and latency(ms)")
|
||||
private MutableRate totalSucceededAppsRetrieved;
|
||||
|
||||
/**
|
||||
* Provide quantile counters for all latencies.
|
||||
*/
|
||||
private MutableQuantiles submitApplicationLatency;
|
||||
private MutableQuantiles getNewApplicationLatency;
|
||||
private MutableQuantiles killApplicationLatency;
|
||||
private MutableQuantiles getApplicationReportLatency;
|
||||
|
||||
private static volatile RouterMetrics INSTANCE = null;
|
||||
private static MetricsRegistry registry;
|
||||
|
||||
private RouterMetrics() {
|
||||
registry = new MetricsRegistry(RECORD_INFO);
|
||||
registry.tag(RECORD_INFO, "Router");
|
||||
getNewApplicationLatency = registry.newQuantiles("getNewApplicationLatency",
|
||||
"latency of get new application", "ops", "latency", 10);
|
||||
submitApplicationLatency = registry.newQuantiles("submitApplicationLatency",
|
||||
"latency of submit application", "ops", "latency", 10);
|
||||
killApplicationLatency = registry.newQuantiles("killApplicationLatency",
|
||||
"latency of kill application", "ops", "latency", 10);
|
||||
getApplicationReportLatency =
|
||||
registry.newQuantiles("getApplicationReportLatency",
|
||||
"latency of get application report", "ops", "latency", 10);
|
||||
}
|
||||
|
||||
public static RouterMetrics getMetrics() {
|
||||
if (!isInitialized.get()) {
|
||||
synchronized (RouterMetrics.class) {
|
||||
if (INSTANCE == null) {
|
||||
INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics",
|
||||
"Metrics for the Yarn Router", new RouterMetrics());
|
||||
isInitialized.set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized static void destroy() {
|
||||
isInitialized.set(false);
|
||||
INSTANCE = null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededAppsCreated() {
|
||||
return totalSucceededAppsCreated.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededAppsSubmitted() {
|
||||
return totalSucceededAppsSubmitted.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededAppsKilled() {
|
||||
return totalSucceededAppsKilled.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededAppsRetrieved() {
|
||||
return totalSucceededAppsRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededAppsCreated() {
|
||||
return totalSucceededAppsCreated.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededAppsSubmitted() {
|
||||
return totalSucceededAppsSubmitted.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededAppsKilled() {
|
||||
return totalSucceededAppsKilled.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededGetAppReport() {
|
||||
return totalSucceededAppsRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedCreated() {
|
||||
return numAppsFailedCreated.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedSubmitted() {
|
||||
return numAppsFailedSubmitted.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedKilled() {
|
||||
return numAppsFailedKilled.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedRetrieved() {
|
||||
return numAppsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public void succeededAppsCreated(long duration) {
|
||||
totalSucceededAppsCreated.add(duration);
|
||||
getNewApplicationLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededAppsSubmitted(long duration) {
|
||||
totalSucceededAppsSubmitted.add(duration);
|
||||
submitApplicationLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededAppsKilled(long duration) {
|
||||
totalSucceededAppsKilled.add(duration);
|
||||
killApplicationLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededAppsRetrieved(long duration) {
|
||||
totalSucceededAppsRetrieved.add(duration);
|
||||
getApplicationReportLatency.add(duration);
|
||||
}
|
||||
|
||||
public void incrAppsFailedCreated() {
|
||||
numAppsFailedCreated.incr();
|
||||
}
|
||||
|
||||
public void incrAppsFailedSubmitted() {
|
||||
numAppsFailedSubmitted.incr();
|
||||
}
|
||||
|
||||
public void incrAppsFailedKilled() {
|
||||
numAppsFailedKilled.incr();
|
||||
}
|
||||
|
||||
public void incrAppsFailedRetrieved() {
|
||||
numAppsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
}
|
|
@ -98,7 +98,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSub
|
|||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -130,6 +133,8 @@ public class FederationClientInterceptor
|
|||
private FederationStateStoreFacade federationFacade;
|
||||
private Random rand;
|
||||
private RouterPolicyFacade policyFacade;
|
||||
private RouterMetrics routerMetrics;
|
||||
private final Clock clock = new MonotonicClock();
|
||||
|
||||
@Override
|
||||
public void init(String userName) {
|
||||
|
@ -153,7 +158,7 @@ public class FederationClientInterceptor
|
|||
|
||||
clientRMProxies =
|
||||
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
|
||||
|
||||
routerMetrics = RouterMetrics.getMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -220,6 +225,9 @@ public class FederationClientInterceptor
|
|||
@Override
|
||||
public GetNewApplicationResponse getNewApplication(
|
||||
GetNewApplicationRequest request) throws YarnException, IOException {
|
||||
|
||||
long startTime = clock.getTime();
|
||||
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive =
|
||||
federationFacade.getSubClusters(true);
|
||||
|
||||
|
@ -238,6 +246,9 @@ public class FederationClientInterceptor
|
|||
}
|
||||
|
||||
if (response != null) {
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededAppsCreated(stopTime - startTime);
|
||||
return response;
|
||||
} else {
|
||||
// Empty response from the ResourceManager.
|
||||
|
@ -247,6 +258,7 @@ public class FederationClientInterceptor
|
|||
|
||||
}
|
||||
|
||||
routerMetrics.incrAppsFailedCreated();
|
||||
String errMsg = "Fail to create a new application.";
|
||||
LOG.error(errMsg);
|
||||
throw new YarnException(errMsg);
|
||||
|
@ -320,9 +332,13 @@ public class FederationClientInterceptor
|
|||
@Override
|
||||
public SubmitApplicationResponse submitApplication(
|
||||
SubmitApplicationRequest request) throws YarnException, IOException {
|
||||
|
||||
long startTime = clock.getTime();
|
||||
|
||||
if (request == null || request.getApplicationSubmissionContext() == null
|
||||
|| request.getApplicationSubmissionContext()
|
||||
.getApplicationId() == null) {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
RouterServerUtil
|
||||
.logAndThrowException("Missing submitApplication request or "
|
||||
+ "applicationSubmissionContex information.", null);
|
||||
|
@ -350,6 +366,7 @@ public class FederationClientInterceptor
|
|||
subClusterId =
|
||||
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
String message = "Unable to insert the ApplicationId " + applicationId
|
||||
+ " into the FederationStateStore";
|
||||
RouterServerUtil.logAndThrowException(message, e);
|
||||
|
@ -368,6 +385,7 @@ public class FederationClientInterceptor
|
|||
LOG.info("Application " + applicationId
|
||||
+ " already submitted on SubCluster " + subClusterId);
|
||||
} else {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
RouterServerUtil.logAndThrowException(message, e);
|
||||
}
|
||||
}
|
||||
|
@ -388,6 +406,8 @@ public class FederationClientInterceptor
|
|||
LOG.info("Application "
|
||||
+ request.getApplicationSubmissionContext().getApplicationName()
|
||||
+ " with appId " + applicationId + " submitted on " + subClusterId);
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededAppsSubmitted(stopTime - startTime);
|
||||
return response;
|
||||
} else {
|
||||
// Empty response from the ResourceManager.
|
||||
|
@ -396,6 +416,7 @@ public class FederationClientInterceptor
|
|||
}
|
||||
}
|
||||
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
String errMsg = "Application "
|
||||
+ request.getApplicationSubmissionContext().getApplicationName()
|
||||
+ " with appId " + applicationId + " failed to be submitted.";
|
||||
|
@ -423,7 +444,10 @@ public class FederationClientInterceptor
|
|||
public KillApplicationResponse forceKillApplication(
|
||||
KillApplicationRequest request) throws YarnException, IOException {
|
||||
|
||||
long startTime = clock.getTime();
|
||||
|
||||
if (request == null || request.getApplicationId() == null) {
|
||||
routerMetrics.incrAppsFailedKilled();
|
||||
RouterServerUtil.logAndThrowException(
|
||||
"Missing forceKillApplication request or ApplicationId.", null);
|
||||
}
|
||||
|
@ -434,6 +458,7 @@ public class FederationClientInterceptor
|
|||
subClusterId = federationFacade
|
||||
.getApplicationHomeSubCluster(request.getApplicationId());
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedKilled();
|
||||
RouterServerUtil.logAndThrowException("Application " + applicationId
|
||||
+ " does not exist in FederationStateStore", e);
|
||||
}
|
||||
|
@ -447,6 +472,7 @@ public class FederationClientInterceptor
|
|||
+ subClusterId);
|
||||
response = clientRMProxy.forceKillApplication(request);
|
||||
} catch (Exception e) {
|
||||
routerMetrics.incrAppsFailedKilled();
|
||||
LOG.error("Unable to kill the application report for "
|
||||
+ request.getApplicationId() + "to SubCluster "
|
||||
+ subClusterId.getId(), e);
|
||||
|
@ -458,6 +484,8 @@ public class FederationClientInterceptor
|
|||
+ applicationId + " to SubCluster " + subClusterId.getId());
|
||||
}
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededAppsKilled(stopTime - startTime);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -481,7 +509,10 @@ public class FederationClientInterceptor
|
|||
public GetApplicationReportResponse getApplicationReport(
|
||||
GetApplicationReportRequest request) throws YarnException, IOException {
|
||||
|
||||
long startTime = clock.getTime();
|
||||
|
||||
if (request == null || request.getApplicationId() == null) {
|
||||
routerMetrics.incrAppsFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(
|
||||
"Missing getApplicationReport request or applicationId information.",
|
||||
null);
|
||||
|
@ -493,6 +524,7 @@ public class FederationClientInterceptor
|
|||
subClusterId = federationFacade
|
||||
.getApplicationHomeSubCluster(request.getApplicationId());
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedRetrieved();
|
||||
RouterServerUtil
|
||||
.logAndThrowException("Application " + request.getApplicationId()
|
||||
+ " does not exist in FederationStateStore", e);
|
||||
|
@ -505,6 +537,7 @@ public class FederationClientInterceptor
|
|||
try {
|
||||
response = clientRMProxy.getApplicationReport(request);
|
||||
} catch (Exception e) {
|
||||
routerMetrics.incrAppsFailedRetrieved();
|
||||
LOG.error("Unable to get the application report for "
|
||||
+ request.getApplicationId() + "to SubCluster "
|
||||
+ subClusterId.getId(), e);
|
||||
|
@ -517,6 +550,8 @@ public class FederationClientInterceptor
|
|||
+ subClusterId.getId());
|
||||
}
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededAppsRetrieved(stopTime - startTime);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,19 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.router.webapp;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
|
@ -36,20 +48,42 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
|
||||
|
@ -66,6 +100,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
private FederationStateStoreFacade federationFacade;
|
||||
private Random rand;
|
||||
private RouterPolicyFacade policyFacade;
|
||||
private RouterMetrics routerMetrics;
|
||||
private final Clock clock = new MonotonicClock();
|
||||
|
||||
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
|
||||
|
||||
|
@ -88,6 +124,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
|
||||
|
||||
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
|
||||
routerMetrics = RouterMetrics.getMetrics();
|
||||
}
|
||||
|
||||
private SubClusterId getRandomActiveSubCluster(
|
||||
|
@ -191,10 +228,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
@Override
|
||||
public Response createNewApplication(HttpServletRequest hsr)
|
||||
throws AuthorizationException, IOException, InterruptedException {
|
||||
|
||||
long startTime = clock.getTime();
|
||||
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive;
|
||||
try {
|
||||
subClustersActive = federationFacade.getSubClusters(true);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedCreated();
|
||||
return Response.status(Status.INTERNAL_SERVER_ERROR)
|
||||
.entity(e.getLocalizedMessage()).build();
|
||||
}
|
||||
|
@ -207,6 +248,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
try {
|
||||
subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedCreated();
|
||||
return Response.status(Status.SERVICE_UNAVAILABLE)
|
||||
.entity(e.getLocalizedMessage()).build();
|
||||
}
|
||||
|
@ -226,6 +268,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
}
|
||||
|
||||
if (response != null && response.getStatus() == 200) {
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededAppsCreated(stopTime - startTime);
|
||||
|
||||
return response;
|
||||
} else {
|
||||
// Empty response from the ResourceManager.
|
||||
|
@ -236,6 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
|
||||
String errMsg = "Fail to create a new application.";
|
||||
LOG.error(errMsg);
|
||||
routerMetrics.incrAppsFailedCreated();
|
||||
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
||||
}
|
||||
|
||||
|
@ -308,7 +355,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
|
||||
HttpServletRequest hsr)
|
||||
throws AuthorizationException, IOException, InterruptedException {
|
||||
|
||||
long startTime = clock.getTime();
|
||||
|
||||
if (newApp == null || newApp.getApplicationId() == null) {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
String errMsg = "Missing ApplicationSubmissionContextInfo or "
|
||||
+ "applicationSubmissionContex information.";
|
||||
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
|
||||
|
@ -318,6 +369,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
try {
|
||||
applicationId = ApplicationId.fromString(newApp.getApplicationId());
|
||||
} catch (IllegalArgumentException e) {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
|
||||
.build();
|
||||
}
|
||||
|
@ -333,6 +385,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
try {
|
||||
subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
return Response.status(Status.SERVICE_UNAVAILABLE)
|
||||
.entity(e.getLocalizedMessage()).build();
|
||||
}
|
||||
|
@ -349,6 +402,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
subClusterId =
|
||||
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
String errMsg = "Unable to insert the ApplicationId " + applicationId
|
||||
+ " into the FederationStateStore";
|
||||
return Response.status(Status.SERVICE_UNAVAILABLE)
|
||||
|
@ -367,6 +421,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
subClusterIdInStateStore =
|
||||
federationFacade.getApplicationHomeSubCluster(applicationId);
|
||||
} catch (YarnException e1) {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
return Response.status(Status.SERVICE_UNAVAILABLE)
|
||||
.entity(e1.getLocalizedMessage()).build();
|
||||
}
|
||||
|
@ -374,6 +429,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
LOG.info("Application " + applicationId
|
||||
+ " already submitted on SubCluster " + subClusterId);
|
||||
} else {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
|
||||
.build();
|
||||
}
|
||||
|
@ -384,6 +440,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
try {
|
||||
subClusterInfo = federationFacade.getSubCluster(subClusterId);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
return Response.status(Status.SERVICE_UNAVAILABLE)
|
||||
.entity(e.getLocalizedMessage()).build();
|
||||
}
|
||||
|
@ -401,6 +458,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
if (response != null && response.getStatus() == 202) {
|
||||
LOG.info("Application " + context.getApplicationName() + " with appId "
|
||||
+ applicationId + " submitted on " + subClusterId);
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededAppsSubmitted(stopTime - startTime);
|
||||
|
||||
return response;
|
||||
} else {
|
||||
// Empty response from the ResourceManager.
|
||||
|
@ -409,6 +470,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
}
|
||||
}
|
||||
|
||||
routerMetrics.incrAppsFailedSubmitted();
|
||||
String errMsg = "Application " + newApp.getApplicationName()
|
||||
+ " with appId " + applicationId + " failed to be submitted.";
|
||||
LOG.error(errMsg);
|
||||
|
@ -435,10 +497,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
public AppInfo getApp(HttpServletRequest hsr, String appId,
|
||||
Set<String> unselectedFields) {
|
||||
|
||||
long startTime = clock.getTime();
|
||||
|
||||
ApplicationId applicationId = null;
|
||||
try {
|
||||
applicationId = ApplicationId.fromString(appId);
|
||||
} catch (IllegalArgumentException e) {
|
||||
routerMetrics.incrAppsFailedRetrieved();
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -448,16 +513,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
subClusterId =
|
||||
federationFacade.getApplicationHomeSubCluster(applicationId);
|
||||
if (subClusterId == null) {
|
||||
routerMetrics.incrAppsFailedRetrieved();
|
||||
return null;
|
||||
}
|
||||
subClusterInfo = federationFacade.getSubCluster(subClusterId);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedRetrieved();
|
||||
return null;
|
||||
}
|
||||
|
||||
return getOrCreateInterceptorForSubCluster(subClusterId,
|
||||
AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId,
|
||||
subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
|
||||
unselectedFields);
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededAppsRetrieved(stopTime - startTime);
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -481,23 +553,37 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
String appId) throws AuthorizationException, YarnException,
|
||||
InterruptedException, IOException {
|
||||
|
||||
long startTime = clock.getTime();
|
||||
|
||||
ApplicationId applicationId = null;
|
||||
try {
|
||||
applicationId = ApplicationId.fromString(appId);
|
||||
} catch (IllegalArgumentException e) {
|
||||
routerMetrics.incrAppsFailedKilled();
|
||||
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
|
||||
.build();
|
||||
}
|
||||
|
||||
SubClusterId subClusterId =
|
||||
federationFacade.getApplicationHomeSubCluster(applicationId);
|
||||
SubClusterInfo subClusterInfo = null;
|
||||
SubClusterId subClusterId = null;
|
||||
try {
|
||||
subClusterId =
|
||||
federationFacade.getApplicationHomeSubCluster(applicationId);
|
||||
subClusterInfo = federationFacade.getSubCluster(subClusterId);
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrAppsFailedKilled();
|
||||
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
|
||||
.build();
|
||||
}
|
||||
|
||||
SubClusterInfo subClusterInfo =
|
||||
federationFacade.getSubCluster(subClusterId);
|
||||
|
||||
return getOrCreateInterceptorForSubCluster(subClusterId,
|
||||
Response response = getOrCreateInterceptorForSubCluster(subClusterId,
|
||||
subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
|
||||
hsr, appId);
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededAppsRetrieved(stopTime - startTime);
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,248 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.router;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class validates the correctness of Router Federation Interceptor
|
||||
* Metrics.
|
||||
*/
|
||||
public class TestRouterMetrics {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestRouterMetrics.class);
|
||||
|
||||
// All the operations in the bad subcluster failed.
|
||||
private MockBadSubCluster badSubCluster = new MockBadSubCluster();
|
||||
// All the operations in the bad subcluster succeed.
|
||||
private MockGoodSubCluster goodSubCluster = new MockGoodSubCluster();
|
||||
|
||||
private static RouterMetrics metrics = RouterMetrics.getMetrics();
|
||||
|
||||
@BeforeClass
|
||||
public static void init() {
|
||||
|
||||
LOG.info("Test: aggregate metrics are initialized correctly");
|
||||
|
||||
Assert.assertEquals(0, metrics.getNumSucceededAppsCreated());
|
||||
Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted());
|
||||
Assert.assertEquals(0, metrics.getNumSucceededAppsKilled());
|
||||
Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved());
|
||||
|
||||
Assert.assertEquals(0, metrics.getAppsFailedCreated());
|
||||
Assert.assertEquals(0, metrics.getAppsFailedSubmitted());
|
||||
Assert.assertEquals(0, metrics.getAppsFailedKilled());
|
||||
Assert.assertEquals(0, metrics.getAppsFailedRetrieved());
|
||||
|
||||
LOG.info("Test: aggregate metrics are updated correctly");
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Created Apps
|
||||
* successfully.
|
||||
*/
|
||||
@Test
|
||||
public void testSucceededAppsCreated() {
|
||||
|
||||
long totalGoodBefore = metrics.getNumSucceededAppsCreated();
|
||||
|
||||
goodSubCluster.getNewApplication(100);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededAppsCreated());
|
||||
Assert.assertEquals(100, metrics.getLatencySucceededAppsCreated(), 0);
|
||||
|
||||
goodSubCluster.getNewApplication(200);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededAppsCreated());
|
||||
Assert.assertEquals(150, metrics.getLatencySucceededAppsCreated(), 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Failed to create Apps.
|
||||
*/
|
||||
@Test
|
||||
public void testAppsFailedCreated() {
|
||||
|
||||
long totalBadbefore = metrics.getAppsFailedCreated();
|
||||
|
||||
badSubCluster.getNewApplication();
|
||||
|
||||
Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedCreated());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Submitted Apps
|
||||
* successfully.
|
||||
*/
|
||||
@Test
|
||||
public void testSucceededAppsSubmitted() {
|
||||
|
||||
long totalGoodBefore = metrics.getNumSucceededAppsSubmitted();
|
||||
|
||||
goodSubCluster.submitApplication(100);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededAppsSubmitted());
|
||||
Assert.assertEquals(100, metrics.getLatencySucceededAppsSubmitted(), 0);
|
||||
|
||||
goodSubCluster.submitApplication(200);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededAppsSubmitted());
|
||||
Assert.assertEquals(150, metrics.getLatencySucceededAppsSubmitted(), 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Failed to submit Apps.
|
||||
*/
|
||||
@Test
|
||||
public void testAppsFailedSubmitted() {
|
||||
|
||||
long totalBadbefore = metrics.getAppsFailedSubmitted();
|
||||
|
||||
badSubCluster.submitApplication();
|
||||
|
||||
Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedSubmitted());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Killed Apps
|
||||
* successfully.
|
||||
*/
|
||||
@Test
|
||||
public void testSucceededAppsKilled() {
|
||||
|
||||
long totalGoodBefore = metrics.getNumSucceededAppsKilled();
|
||||
|
||||
goodSubCluster.forceKillApplication(100);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededAppsKilled());
|
||||
Assert.assertEquals(100, metrics.getLatencySucceededAppsKilled(), 0);
|
||||
|
||||
goodSubCluster.forceKillApplication(200);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededAppsKilled());
|
||||
Assert.assertEquals(150, metrics.getLatencySucceededAppsKilled(), 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Failed to kill Apps.
|
||||
*/
|
||||
@Test
|
||||
public void testAppsFailedKilled() {
|
||||
|
||||
long totalBadbefore = metrics.getAppsFailedKilled();
|
||||
|
||||
badSubCluster.forceKillApplication();
|
||||
|
||||
Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedKilled());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Retrieved Apps
|
||||
* successfully.
|
||||
*/
|
||||
@Test
|
||||
public void testSucceededAppsReport() {
|
||||
|
||||
long totalGoodBefore = metrics.getNumSucceededAppsRetrieved();
|
||||
|
||||
goodSubCluster.getApplicationReport(100);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededAppsRetrieved());
|
||||
Assert.assertEquals(100, metrics.getLatencySucceededGetAppReport(), 0);
|
||||
|
||||
goodSubCluster.getApplicationReport(200);
|
||||
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededAppsRetrieved());
|
||||
Assert.assertEquals(150, metrics.getLatencySucceededGetAppReport(), 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of the metric: Failed to retrieve Apps.
|
||||
*/
|
||||
@Test
|
||||
public void testAppsReportFailed() {
|
||||
|
||||
long totalBadbefore = metrics.getAppsFailedRetrieved();
|
||||
|
||||
badSubCluster.getApplicationReport();
|
||||
|
||||
Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
|
||||
}
|
||||
|
||||
// Records failures for all calls
|
||||
private class MockBadSubCluster {
|
||||
public void getNewApplication() {
|
||||
LOG.info("Mocked: failed getNewApplication call");
|
||||
metrics.incrAppsFailedCreated();
|
||||
}
|
||||
|
||||
public void submitApplication() {
|
||||
LOG.info("Mocked: failed submitApplication call");
|
||||
metrics.incrAppsFailedSubmitted();
|
||||
}
|
||||
|
||||
public void forceKillApplication() {
|
||||
LOG.info("Mocked: failed forceKillApplication call");
|
||||
metrics.incrAppsFailedKilled();
|
||||
}
|
||||
|
||||
public void getApplicationReport() {
|
||||
LOG.info("Mocked: failed getApplicationReport call");
|
||||
metrics.incrAppsFailedRetrieved();
|
||||
}
|
||||
}
|
||||
|
||||
// Records successes for all calls
|
||||
private class MockGoodSubCluster {
|
||||
public void getNewApplication(long duration) {
|
||||
LOG.info("Mocked: successful getNewApplication call with duration {}",
|
||||
duration);
|
||||
metrics.succeededAppsCreated(duration);
|
||||
}
|
||||
|
||||
public void submitApplication(long duration) {
|
||||
LOG.info("Mocked: successful submitApplication call with duration {}",
|
||||
duration);
|
||||
metrics.succeededAppsSubmitted(duration);
|
||||
}
|
||||
|
||||
public void forceKillApplication(long duration) {
|
||||
LOG.info("Mocked: successful forceKillApplication call with duration {}",
|
||||
duration);
|
||||
metrics.succeededAppsKilled(duration);
|
||||
}
|
||||
|
||||
public void getApplicationReport(long duration) {
|
||||
LOG.info("Mocked: successful getApplicationReport call with duration {}",
|
||||
duration);
|
||||
metrics.succeededAppsRetrieved(duration);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -276,13 +276,11 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
|
|||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
AppState appState = new AppState("KILLED");
|
||||
try {
|
||||
interceptor.updateAppState(appState, null, appId.toString());
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().equals("Application " + appId + " does not exist"));
|
||||
}
|
||||
|
||||
Response response =
|
||||
interceptor.updateAppState(appState, null, appId.toString());
|
||||
Assert.assertEquals(BAD_REQUEST, response.getStatus());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue