YARN-8041. [Router] Federation: Improve Router REST API Metrics. (#4938)

This commit is contained in:
slfan1989 2022-10-14 07:54:36 +08:00 committed by GitHub
parent 1962851356
commit 5b52123c9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1124 additions and 167 deletions

View File

@ -107,6 +107,20 @@ public final class RouterMetrics {
private MutableGaugeInt numDeleteReservationFailedRetrieved;
@Metric("# of listReservation failed to be retrieved")
private MutableGaugeInt numListReservationFailedRetrieved;
@Metric("# of getAppActivities failed to be retrieved")
private MutableGaugeInt numGetAppActivitiesFailedRetrieved;
@Metric("# of getAppStatistics failed to be retrieved")
private MutableGaugeInt numGetAppStatisticsFailedRetrieved;
@Metric("# of getAppPriority failed to be retrieved")
private MutableGaugeInt numGetAppPriorityFailedRetrieved;
@Metric("# of getAppQueue failed to be retrieved")
private MutableGaugeInt numGetAppQueueFailedRetrieved;
@Metric("# of updateAppQueue failed to be retrieved")
private MutableGaugeInt numUpdateAppQueueFailedRetrieved;
@Metric("# of getAppTimeout failed to be retrieved")
private MutableGaugeInt numGetAppTimeoutFailedRetrieved;
@Metric("# of getAppTimeouts failed to be retrieved")
private MutableGaugeInt numGetAppTimeoutsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -175,6 +189,20 @@ public final class RouterMetrics {
private MutableRate totalSucceededDeleteReservationRetrieved;
@Metric("Total number of successful Retrieved ListReservation and latency(ms)")
private MutableRate totalSucceededListReservationRetrieved;
@Metric("Total number of successful Retrieved GetAppActivities and latency(ms)")
private MutableRate totalSucceededGetAppActivitiesRetrieved;
@Metric("Total number of successful Retrieved GetAppStatistics and latency(ms)")
private MutableRate totalSucceededGetAppStatisticsRetrieved;
@Metric("Total number of successful Retrieved GetAppPriority and latency(ms)")
private MutableRate totalSucceededGetAppPriorityRetrieved;
@Metric("Total number of successful Retrieved GetAppQueue and latency(ms)")
private MutableRate totalSucceededGetAppQueueRetrieved;
@Metric("Total number of successful Retrieved UpdateAppQueue and latency(ms)")
private MutableRate totalSucceededUpdateAppQueueRetrieved;
@Metric("Total number of successful Retrieved GetAppTimeout and latency(ms)")
private MutableRate totalSucceededGetAppTimeoutRetrieved;
@Metric("Total number of successful Retrieved GetAppTimeouts and latency(ms)")
private MutableRate totalSucceededGetAppTimeoutsRetrieved;
/**
* Provide quantile counters for all latencies.
@ -212,6 +240,13 @@ public final class RouterMetrics {
private MutableQuantiles updateReservationLatency;
private MutableQuantiles deleteReservationLatency;
private MutableQuantiles listReservationLatency;
private MutableQuantiles getAppActivitiesLatency;
private MutableQuantiles getAppStatisticsLatency;
private MutableQuantiles getAppPriorityLatency;
private MutableQuantiles getAppQueueLatency;
private MutableQuantiles getUpdateQueueLatency;
private MutableQuantiles getAppTimeoutLatency;
private MutableQuantiles getAppTimeoutsLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -342,6 +377,27 @@ public final class RouterMetrics {
listReservationLatency =
registry.newQuantiles("listReservationLatency",
"latency of list reservation timeouts", "ops", "latency", 10);
getAppActivitiesLatency = registry.newQuantiles("getAppActivitiesLatency",
"latency of get app activities timeouts", "ops", "latency", 10);
getAppStatisticsLatency = registry.newQuantiles("getAppStatisticsLatency",
"latency of get app statistics timeouts", "ops", "latency", 10);
getAppPriorityLatency = registry.newQuantiles("getAppPriorityLatency",
"latency of get app priority timeouts", "ops", "latency", 10);
getAppQueueLatency = registry.newQuantiles("getAppQueueLatency",
"latency of get app queue timeouts", "ops", "latency", 10);
getUpdateQueueLatency = registry.newQuantiles("getUpdateQueueLatency",
"latency of update app queue timeouts", "ops", "latency", 10);
getAppTimeoutLatency = registry.newQuantiles("getAppTimeoutLatency",
"latency of get apptimeout timeouts", "ops", "latency", 10);
getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency",
"latency of get apptimeouts timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -528,6 +584,41 @@ public final class RouterMetrics {
return totalSucceededListReservationRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetAppActivitiesRetrieved() {
return totalSucceededGetAppActivitiesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetAppStatisticsRetrieved() {
return totalSucceededGetAppStatisticsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetAppPriorityRetrieved() {
return totalSucceededGetAppPriorityRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetAppQueueRetrieved() {
return totalSucceededGetAppQueueRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededUpdateAppQueueRetrieved() {
return totalSucceededUpdateAppQueueRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetAppTimeoutRetrieved() {
return totalSucceededGetAppTimeoutRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetAppTimeoutsRetrieved() {
return totalSucceededGetAppTimeoutsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -693,6 +784,41 @@ public final class RouterMetrics {
return totalSucceededListReservationRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAppActivitiesRetrieved() {
return totalSucceededGetAppActivitiesRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAppStatisticsRetrieved() {
return totalSucceededGetAppStatisticsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAppPriorityRetrieved() {
return totalSucceededGetAppPriorityRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAppQueueRetrieved() {
return totalSucceededGetAppQueueRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededUpdateAppQueueRetrieved() {
return totalSucceededUpdateAppQueueRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAppTimeoutRetrieved() {
return totalSucceededGetAppTimeoutRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAppTimeoutsRetrieved() {
return totalSucceededGetAppTimeoutsRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -846,6 +972,34 @@ public final class RouterMetrics {
return numListReservationFailedRetrieved.value();
}
public int getAppActivitiesFailedRetrieved() {
return numGetAppActivitiesFailedRetrieved.value();
}
public int getAppStatisticsFailedRetrieved() {
return numGetAppStatisticsFailedRetrieved.value();
}
public int getAppPriorityFailedRetrieved() {
return numGetAppPriorityFailedRetrieved.value();
}
public int getAppQueueFailedRetrieved() {
return numGetAppQueueFailedRetrieved.value();
}
public int getUpdateAppQueueFailedRetrieved() {
return numUpdateAppQueueFailedRetrieved.value();
}
public int getAppTimeoutFailedRetrieved() {
return numGetAppTimeoutFailedRetrieved.value();
}
public int getAppTimeoutsFailedRetrieved() {
return numGetAppTimeoutsFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -1011,6 +1165,41 @@ public final class RouterMetrics {
listReservationLatency.add(duration);
}
public void succeededGetAppActivitiesRetrieved(long duration) {
totalSucceededGetAppActivitiesRetrieved.add(duration);
getAppActivitiesLatency.add(duration);
}
public void succeededGetAppStatisticsRetrieved(long duration) {
totalSucceededGetAppStatisticsRetrieved.add(duration);
getAppStatisticsLatency.add(duration);
}
public void succeededGetAppPriorityRetrieved(long duration) {
totalSucceededGetAppPriorityRetrieved.add(duration);
getAppPriorityLatency.add(duration);
}
public void succeededGetAppQueueRetrieved(long duration) {
totalSucceededGetAppQueueRetrieved.add(duration);
getAppQueueLatency.add(duration);
}
public void succeededUpdateAppQueueRetrieved(long duration) {
totalSucceededUpdateAppQueueRetrieved.add(duration);
getUpdateQueueLatency.add(duration);
}
public void succeededGetAppTimeoutRetrieved(long duration) {
totalSucceededGetAppTimeoutRetrieved.add(duration);
getAppTimeoutLatency.add(duration);
}
public void succeededGetAppTimeoutsRetrieved(long duration) {
totalSucceededGetAppTimeoutsRetrieved.add(duration);
getAppTimeoutsLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -1063,11 +1252,11 @@ public final class RouterMetrics {
numGetQueueUserAclsFailedRetrieved.incr();
}
public void incrContainerReportFailedRetrieved() {
public void incrGetContainerReportFailedRetrieved() {
numGetContainerReportFailedRetrieved.incr();
}
public void incrContainerFailedRetrieved() {
public void incrGetContainersFailedRetrieved() {
numGetContainersFailedRetrieved.incr();
}
@ -1142,4 +1331,32 @@ public final class RouterMetrics {
public void incrListReservationFailedRetrieved() {
numListReservationFailedRetrieved.incr();
}
public void incrGetAppActivitiesFailedRetrieved() {
numGetAppActivitiesFailedRetrieved.incr();
}
public void incrGetAppStatisticsFailedRetrieved() {
numGetAppStatisticsFailedRetrieved.incr();
}
public void incrGetAppPriorityFailedRetrieved() {
numGetAppPriorityFailedRetrieved.incr();
}
public void incrGetAppQueueFailedRetrieved() {
numGetAppQueueFailedRetrieved.incr();
}
public void incrUpdateAppQueueFailedRetrieved() {
numUpdateAppQueueFailedRetrieved.incr();
}
public void incrGetAppTimeoutFailedRetrieved() {
numGetAppTimeoutFailedRetrieved.incr();
}
public void incrGetAppTimeoutsFailedRetrieved() {
numGetAppTimeoutsFailedRetrieved.incr();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.router;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -44,6 +45,14 @@ import java.io.IOException;
@Unstable
public final class RouterServerUtil {
private static final String APPLICATION_ID_PREFIX = "application_";
private static final String APP_ATTEMPT_ID_PREFIX = "appattempt_";
private static final String CONTAINER_PREFIX = "container_";
private static final String EPOCH_PREFIX = "e";
/** Disable constructor. */
private RouterServerUtil() {
}
@ -181,6 +190,28 @@ public final class RouterServerUtil {
}
}
/**
* Throws an IOException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws IOException on failure
*/
@Public
@Unstable
public static void logAndThrowIOException(Throwable t, String errMsgFormat, Object... args)
throws IOException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new IOException(msg, t);
} else {
LOG.error(msg);
throw new IOException(msg);
}
}
/**
* Throws an RunTimeException due to an error.
*
@ -222,4 +253,197 @@ public final class RouterServerUtil {
throw new RuntimeException(msg);
}
}
/**
* Throws an RunTimeException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @return RuntimeException
*/
@Public
@Unstable
public static RuntimeException logAndReturnRunTimeException(
Throwable t, String errMsgFormat, Object... args) {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
return new RuntimeException(msg, t);
} else {
LOG.error(msg);
return new RuntimeException(msg);
}
}
/**
* Throws an RunTimeException due to an error.
*
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @return RuntimeException
*/
@Public
@Unstable
public static RuntimeException logAndReturnRunTimeException(
String errMsgFormat, Object... args) {
return logAndReturnRunTimeException(null, errMsgFormat, args);
}
/**
* Check applicationId is accurate.
*
* We need to ensure that applicationId cannot be empty and
* can be converted to ApplicationId object normally.
*
* @param applicationId applicationId of type string
* @throws IllegalArgumentException If the format of the applicationId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateApplicationId(String applicationId)
throws IllegalArgumentException {
// Make Sure applicationId is not empty.
if (applicationId == null || applicationId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
// Make sure the prefix information of applicationId is accurate.
if (!applicationId.startsWith(APPLICATION_ID_PREFIX)) {
throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+ applicationId + ". The valid ApplicationId should start with prefix application");
}
// Check the split position of the string.
int pos1 = APPLICATION_ID_PREFIX.length() - 1;
int pos2 = applicationId.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid ApplicationId: " + applicationId);
}
// Confirm that the parsed rmId and appId are numeric types.
String rmId = applicationId.substring(pos1 + 1, pos2);
String appId = applicationId.substring(pos2 + 1);
if(!NumberUtils.isDigits(rmId) || !NumberUtils.isDigits(appId)){
throw new IllegalArgumentException("Invalid ApplicationId: " + applicationId);
}
}
/**
* Check appAttemptId is accurate.
*
* We need to ensure that appAttemptId cannot be empty and
* can be converted to ApplicationAttemptId object normally.
*
* @param appAttemptId appAttemptId of type string.
* @throws IllegalArgumentException If the format of the appAttemptId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateApplicationAttemptId(String appAttemptId)
throws IllegalArgumentException {
// Make Sure appAttemptId is not empty.
if (appAttemptId == null || appAttemptId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
}
// Make sure the prefix information of appAttemptId is accurate.
if (!appAttemptId.startsWith(APP_ATTEMPT_ID_PREFIX)) {
throw new IllegalArgumentException("Invalid AppAttemptId prefix: " + appAttemptId);
}
// Check the split position of the string.
int pos1 = APP_ATTEMPT_ID_PREFIX.length() - 1;
int pos2 = appAttemptId.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid AppAttemptId: " + appAttemptId);
}
int pos3 = appAttemptId.indexOf('_', pos2 + 1);
if (pos3 < 0) {
throw new IllegalArgumentException("Invalid AppAttemptId: " + appAttemptId);
}
// Confirm that the parsed rmId and appId and attemptId are numeric types.
String rmId = appAttemptId.substring(pos1 + 1, pos2);
String appId = appAttemptId.substring(pos2 + 1, pos3);
String attemptId = appAttemptId.substring(pos3 + 1);
if (!NumberUtils.isDigits(rmId) || !NumberUtils.isDigits(appId)
|| !NumberUtils.isDigits(attemptId)) {
throw new IllegalArgumentException("Invalid AppAttemptId: " + appAttemptId);
}
}
/**
* Check containerId is accurate.
*
* We need to ensure that containerId cannot be empty and
* can be converted to ContainerId object normally.
*
* @param containerId containerId of type string.
* @throws IllegalArgumentException If the format of the appAttemptId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateContainerId(String containerId)
throws IllegalArgumentException {
// Make Sure containerId is not empty.
if (containerId == null || containerId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
}
// Make sure the prefix information of containerId is accurate.
if (!containerId.startsWith(CONTAINER_PREFIX)) {
throw new IllegalArgumentException("Invalid ContainerId prefix: " + containerId);
}
// Check the split position of the string.
int pos1 = CONTAINER_PREFIX.length() - 1;
String epoch = "0";
if (containerId.regionMatches(pos1 + 1, EPOCH_PREFIX, 0, EPOCH_PREFIX.length())) {
int pos2 = containerId.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
String epochStr = containerId.substring(pos1 + 1 + EPOCH_PREFIX.length(), pos2);
epoch = epochStr;
// rewind the current position
pos1 = pos2;
}
int pos2 = containerId.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
int pos3 = containerId.indexOf('_', pos2 + 1);
if (pos3 < 0) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
int pos4 = containerId.indexOf('_', pos3 + 1);
if (pos4 < 0) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
// Confirm that the parsed appId and clusterTimestamp and attemptId and cid and epoch
// are numeric types.
String appId = containerId.substring(pos2 + 1, pos3);
String clusterTimestamp = containerId.substring(pos1 + 1, pos2);
String attemptId = containerId.substring(pos3 + 1, pos4);
String cid = containerId.substring(pos4 + 1);
if (!NumberUtils.isDigits(appId) || !NumberUtils.isDigits(clusterTimestamp)
|| !NumberUtils.isDigits(attemptId) || !NumberUtils.isDigits(cid)
|| !NumberUtils.isDigits(epoch)) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
}
}

View File

@ -855,7 +855,7 @@ public class FederationClientInterceptor
try {
response = clientRMProxy.moveApplicationAcrossQueues(request);
} catch (Exception e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
routerMetrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to moveApplicationAcrossQueues for " +
applicationId + " to SubCluster " + subClusterId.getId(), e);
}
@ -1174,7 +1174,7 @@ public class FederationClientInterceptor
try {
response = clientRMProxy.getApplicationAttemptReport(request);
} catch (Exception e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
routerMetrics.incrAppAttemptReportFailedRetrieved();
String msg = String.format(
"Unable to get the applicationAttempt report for %s to SubCluster %s.",
request.getApplicationAttemptId(), subClusterId.getId());
@ -1237,7 +1237,7 @@ public class FederationClientInterceptor
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException {
if(request == null || request.getContainerId() == null){
routerMetrics.incrContainerReportFailedRetrieved();
routerMetrics.incrGetContainerReportFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getContainerReport request " +
"or containerId", null);
}
@ -1249,7 +1249,7 @@ public class FederationClientInterceptor
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException ex) {
routerMetrics.incrContainerReportFailedRetrieved();
routerMetrics.incrGetContainerReportFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " + applicationId +
" does not exist in FederationStateStore.", ex);
}
@ -1260,7 +1260,7 @@ public class FederationClientInterceptor
try {
response = clientRMProxy.getContainerReport(request);
} catch (Exception ex) {
routerMetrics.incrContainerReportFailedRetrieved();
routerMetrics.incrGetContainerReportFailedRetrieved();
LOG.error("Unable to get the container report for {} from SubCluster {}.",
applicationId, subClusterId.getId(), ex);
}
@ -1280,7 +1280,7 @@ public class FederationClientInterceptor
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
if (request == null || request.getApplicationAttemptId() == null) {
routerMetrics.incrContainerFailedRetrieved();
routerMetrics.incrGetContainersFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getContainers request or ApplicationAttemptId.", null);
}
@ -1291,7 +1291,7 @@ public class FederationClientInterceptor
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException ex) {
routerMetrics.incrContainerFailedRetrieved();
routerMetrics.incrGetContainersFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " + applicationId +
" does not exist in FederationStateStore.", ex);
}
@ -1302,7 +1302,7 @@ public class FederationClientInterceptor
try {
response = clientRMProxy.getContainers(request);
} catch (Exception ex) {
routerMetrics.incrContainerFailedRetrieved();
routerMetrics.incrGetContainersFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get the containers for " +
applicationId + " from SubCluster " + subClusterId.getId(), ex);
}

View File

@ -44,10 +44,12 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -1000,11 +1002,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
nodes.addAll(nodesInfo.getNodes());
});
} catch (NotFoundException e) {
LOG.error("Get all active sub cluster(s) error.", e);
LOG.error("get all active sub cluster(s) error.", e);
} catch (YarnException e) {
LOG.error("getNodes error.", e);
LOG.error("getNodes by states = {} error.", states, e);
} catch (IOException e) {
LOG.error("getNodes error with io error.", e);
LOG.error("getNodes by states = {} error with io error.", states, e);
}
// Delete duplicate from all the node reports got from all the available
@ -1170,32 +1172,45 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
// Only verify the app_id,
// because the specific subCluster needs to be found according to the app_id,
// and other verifications are directly handed over to the corresponding subCluster RM
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
// Check that the appId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrGetAppActivitiesFailedRetrieved();
throw e;
}
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
final HttpServletRequest hsrCopy = clone(hsr);
return interceptor.getAppActivities(hsrCopy, appId, time, requestPriorities,
allocationRequestIds, groupBy, limit, actions, summarize);
AppActivitiesInfo appActivitiesInfo = interceptor.getAppActivities(hsrCopy, appId, time,
requestPriorities, allocationRequestIds, groupBy, limit, actions, summarize);
if (appActivitiesInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetAppActivitiesRetrieved(stopTime - startTime);
return appActivitiesInfo;
}
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e, "Unable to get subCluster by appId: %s.",
appId);
routerMetrics.incrGetAppActivitiesFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get subCluster by appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppActivities Failed.", e);
routerMetrics.incrGetAppActivitiesFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"getAppActivities by appId = %s error .", appId);
}
return null;
routerMetrics.incrGetAppActivitiesFailedRetrieved();
throw new RuntimeException("getAppActivities Failed.");
}
@Override
public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
Set<String> stateQueries, Set<String> typeQueries) {
try {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class, Set.class, Set.class};
@ -1203,19 +1218,38 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
ClientMethod remoteMethod = new ClientMethod("getAppStatistics", argsClasses, args);
Map<SubClusterInfo, ApplicationStatisticsInfo> appStatisticsMap = invokeConcurrent(
subClustersActive.values(), remoteMethod, ApplicationStatisticsInfo.class);
return RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values());
ApplicationStatisticsInfo applicationStatisticsInfo =
RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values());
if (applicationStatisticsInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetAppStatisticsRetrieved(stopTime - startTime);
return applicationStatisticsInfo;
}
} catch (NotFoundException e) {
routerMetrics.incrGetAppStatisticsFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
} catch (IOException e) {
RouterServerUtil.logAndThrowRunTimeException(e, "Get all active sub cluster(s) error.");
routerMetrics.incrGetAppStatisticsFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"getAppStatistics error by stateQueries = %s, typeQueries = %s with io error.",
StringUtils.join(stateQueries, ","), StringUtils.join(typeQueries, ","));
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException(e, "getAppStatistics error.");
routerMetrics.incrGetAppStatisticsFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"getAppStatistics by stateQueries = %s, typeQueries = %s with yarn error.",
StringUtils.join(stateQueries, ","), StringUtils.join(typeQueries, ","));
}
return null;
routerMetrics.incrGetAppStatisticsFailedRetrieved();
throw RouterServerUtil.logAndReturnRunTimeException(
"getAppStatistics by stateQueries = %s, typeQueries = %s Failed.",
StringUtils.join(stateQueries, ","), StringUtils.join(typeQueries, ","));
}
@Override
public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
throws IOException {
try {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class};
@ -1223,27 +1257,35 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
ClientMethod remoteMethod = new ClientMethod("getNodeToLabels", argsClasses, args);
Map<SubClusterInfo, NodeToLabelsInfo> nodeToLabelsInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, NodeToLabelsInfo.class);
return RouterWebServiceUtil.mergeNodeToLabels(nodeToLabelsInfoMap);
NodeToLabelsInfo nodeToLabelsInfo =
RouterWebServiceUtil.mergeNodeToLabels(nodeToLabelsInfoMap);
if (nodeToLabelsInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime);
return nodeToLabelsInfo;
}
} catch (NotFoundException e) {
LOG.error("Get all active sub cluster(s) error.", e);
throw new IOException("Get all active sub cluster(s) error.", e);
routerMetrics.incrNodeToLabelsFailedRetrieved();
RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
} catch (YarnException e) {
LOG.error("getNodeToLabels error.", e);
throw new IOException("getNodeToLabels error.", e);
routerMetrics.incrNodeToLabelsFailedRetrieved();
RouterServerUtil.logAndThrowIOException("getNodeToLabels error.", e);
}
routerMetrics.incrGetAppStatisticsFailedRetrieved();
throw new RuntimeException("getNodeToLabels Failed.");
}
@Override
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
throws IOException {
try {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
Class[] argsClasses = new Class[]{Set.class};
Object[] args = new Object[]{labels};
ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes", argsClasses, args);
Map<SubClusterInfo, LabelsToNodesInfo> labelsToNodesInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, LabelsToNodesInfo.class);
Map<NodeLabelInfo, NodeIDsInfo> labelToNodesMap = new HashMap<>();
labelsToNodesInfoMap.values().forEach(labelsToNode -> {
Map<NodeLabelInfo, NodeIDsInfo> values = labelsToNode.getLabelsToNodes();
@ -1255,13 +1297,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
labelToNodesMap.put(key, newValue);
}
});
return new LabelsToNodesInfo(labelToNodesMap);
LabelsToNodesInfo labelsToNodesInfo = new LabelsToNodesInfo(labelToNodesMap);
if (labelsToNodesInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
return labelsToNodesInfo;
}
} catch (NotFoundException e) {
RouterServerUtil.logAndThrowIOException("Get all active sub cluster(s) error.", e);
routerMetrics.incrLabelsToNodesFailedRetrieved();
RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
} catch (YarnException e) {
RouterServerUtil.logAndThrowIOException("getLabelsToNodes error.", e);
routerMetrics.incrLabelsToNodesFailedRetrieved();
RouterServerUtil.logAndThrowIOException(
e, "getLabelsToNodes by labels = %s with yarn error.", StringUtils.join(labels, ","));
}
return null;
routerMetrics.incrLabelsToNodesFailedRetrieved();
throw RouterServerUtil.logAndReturnRunTimeException(
"getLabelsToNodes by labels = %s Failed.", StringUtils.join(labels, ","));
}
@Override
@ -1280,6 +1332,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
throws IOException {
try {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class};
@ -1289,13 +1342,21 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
invokeConcurrent(subClustersActive.values(), remoteMethod, NodeLabelsInfo.class);
Set<NodeLabel> hashSets = Sets.newHashSet();
nodeToLabelsInfoMap.values().forEach(item -> hashSets.addAll(item.getNodeLabels()));
return new NodeLabelsInfo(hashSets);
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(hashSets);
if (nodeLabelsInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime);
return nodeLabelsInfo;
}
} catch (NotFoundException e) {
RouterServerUtil.logAndThrowIOException("Get all active sub cluster(s) error.", e);
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
} catch (YarnException e) {
RouterServerUtil.logAndThrowIOException("getClusterNodeLabels error.", e);
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowIOException("getClusterNodeLabels with yarn error.", e);
}
return null;
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
throw new RuntimeException("getClusterNodeLabels Failed.");
}
@Override
@ -1314,45 +1375,68 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
throws IOException {
try {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class};
Object[] args = new Object[]{hsrCopy, nodeId};
ClientMethod remoteMethod = new ClientMethod("getLabelsOnNode", argsClasses, args);
Map<SubClusterInfo, NodeLabelsInfo> nodeToLabelsInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, NodeLabelsInfo.class);
invokeConcurrent(subClustersActive.values(), remoteMethod, NodeLabelsInfo.class);
Set<NodeLabel> hashSets = Sets.newHashSet();
nodeToLabelsInfoMap.values().forEach(item -> hashSets.addAll(item.getNodeLabels()));
return new NodeLabelsInfo(hashSets);
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(hashSets);
if (nodeLabelsInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
return nodeLabelsInfo;
}
} catch (NotFoundException e) {
RouterServerUtil.logAndThrowIOException("Get all active sub cluster(s) error.", e);
routerMetrics.incrLabelsToNodesFailedRetrieved();
RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
} catch (YarnException e) {
RouterServerUtil.logAndThrowIOException("getClusterNodeLabels error.", e);
routerMetrics.incrLabelsToNodesFailedRetrieved();
RouterServerUtil.logAndThrowIOException(
e, "getLabelsOnNode nodeId = %s with yarn error.", nodeId);
}
return null;
routerMetrics.incrLabelsToNodesFailedRetrieved();
throw RouterServerUtil.logAndReturnRunTimeException(
"getLabelsOnNode by nodeId = %s Failed.", nodeId);
}
@Override
public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
// Check that the appId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrGetAppPriorityFailedRetrieved();
throw e;
}
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppPriority(hsr, appId);
AppPriority appPriority = interceptor.getAppPriority(hsr, appId);
if (appPriority != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetAppPriorityRetrieved(stopTime - startTime);
return appPriority;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrGetAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the getAppPriority appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppPriority Failed.", e);
routerMetrics.incrGetAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getAppPriority error.", e);
}
return null;
routerMetrics.incrGetAppPriorityFailedRetrieved();
throw new RuntimeException("getAppPriority Failed.");
}
@Override
@ -1360,50 +1444,74 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
// Check that the appId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
throw e;
}
if (targetPriority == null) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the targetPriority is empty or null.");
}
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.updateApplicationPriority(targetPriority, hsr, appId);
Response response = interceptor.updateApplicationPriority(targetPriority, hsr, appId);
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime);
return response;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the updateApplicationPriority appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("updateApplicationPriority Failed.", e);
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("updateApplicationPriority error.", e);
}
return null;
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
throw new RuntimeException("updateApplicationPriority Failed.");
}
@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
// Check that the appId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrGetAppQueueFailedRetrieved();
throw e;
}
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppQueue(hsr, appId);
AppQueue queue = interceptor.getAppQueue(hsr, appId);
if (queue != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetAppQueueRetrieved((stopTime - startTime));
return queue;
}
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get queue by appId: %s.", appId);
routerMetrics.incrGetAppQueueFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e, "Unable to get queue by appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppQueue Failed.", e);
routerMetrics.incrGetAppQueueFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getAppQueue error.", e);
}
return null;
routerMetrics.incrGetAppQueueFailedRetrieved();
throw new RuntimeException("getAppQueue Failed.");
}
@Override
@ -1411,27 +1519,40 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
// Check that the appId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrUpdateAppQueueFailedRetrieved();
throw e;
}
if (targetQueue == null) {
routerMetrics.incrUpdateAppQueueFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the targetQueue is null.");
}
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.updateAppQueue(targetQueue, hsr, appId);
Response response = interceptor.updateAppQueue(targetQueue, hsr, appId);
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededUpdateAppQueueRetrieved(stopTime - startTime);
return response;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrUpdateAppQueueFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to update app queue by appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("updateAppQueue Failed.", e);
routerMetrics.incrUpdateAppQueueFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("updateAppQueue error.", e);
}
return null;
routerMetrics.incrUpdateAppQueueFailedRetrieved();
throw new RuntimeException("updateAppQueue Failed.");
}
@Override
@ -1497,7 +1618,16 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}
// Check that the appId format is accurate
try {
ReservationId.parseReservationId(reservationId);
} catch (IllegalArgumentException e) {
routerMetrics.incrListReservationFailedRetrieved();
throw e;
}
try {
long startTime1 = clock.getTime();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
@ -1505,11 +1635,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
Response response = interceptor.listReservation(queue, reservationId, startTime, endTime,
includeResourceAllocations, hsrCopy);
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededListReservationRetrieved(stopTime - startTime1);
return response;
}
} catch (YarnException e) {
routerMetrics.incrListReservationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("listReservation Failed.", e);
RouterServerUtil.logAndThrowRunTimeException("listReservation error.", e);
}
routerMetrics.incrListReservationFailedRetrieved();
@ -1521,47 +1653,80 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
String type) throws AuthorizationException {
if (appId == null || appId.isEmpty()) {
routerMetrics.incrGetAppTimeoutFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
// Check that the appId format is accurate
try {
ApplicationId.fromString(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrGetAppTimeoutFailedRetrieved();
throw e;
}
if (type == null || type.isEmpty()) {
routerMetrics.incrGetAppTimeoutFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the type is empty or null.");
}
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppTimeout(hsr, appId, type);
AppTimeoutInfo appTimeoutInfo = interceptor.getAppTimeout(hsr, appId, type);
if (appTimeoutInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetAppTimeoutRetrieved((stopTime - startTime));
return appTimeoutInfo;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrGetAppTimeoutFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the getAppTimeout appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppTimeout Failed.", e);
routerMetrics.incrGetAppTimeoutFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getAppTimeout error.", e);
}
return null;
routerMetrics.incrGetAppTimeoutFailedRetrieved();
throw new RuntimeException("getAppTimeout Failed.");
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
// Check that the appId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrGetAppTimeoutsFailedRetrieved();
throw e;
}
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppTimeouts(hsr, appId);
AppTimeoutsInfo appTimeoutsInfo = interceptor.getAppTimeouts(hsr, appId);
if (appTimeoutsInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetAppTimeoutsRetrieved((stopTime - startTime));
return appTimeoutsInfo;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrGetAppTimeoutsFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the getAppTimeouts appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppTimeouts Failed.", e);
routerMetrics.incrGetAppTimeoutsFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getAppTimeouts error.", e);
}
return null;
routerMetrics.incrGetAppTimeoutsFailedRetrieved();
throw new RuntimeException("getAppTimeouts Failed.");
}
@Override
@ -1569,47 +1734,76 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
// Check that the appId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
throw e;
}
if (appTimeout == null) {
routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
throw new IllegalArgumentException("Parameter error, the appTimeout is null.");
}
try {
long startTime = Time.now();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.updateApplicationTimeout(appTimeout, hsr, appId);
Response response = interceptor.updateApplicationTimeout(appTimeout, hsr, appId);
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededUpdateAppTimeoutsRetrieved((stopTime - startTime));
return response;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the updateApplicationTimeout appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("updateApplicationTimeout Failed.", e);
routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
RouterServerUtil.logAndThrowRunTimeException("updateApplicationTimeout error.", e);
}
return null;
routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
throw new RuntimeException("updateApplicationTimeout Failed.");
}
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
// Check that the appId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
} catch (IllegalArgumentException e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
throw e;
}
try {
long startTime = Time.now();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempts(hsr, appId);
AppAttemptsInfo appAttemptsInfo = interceptor.getAppAttempts(hsr, appId);
if (appAttemptsInfo != null) {
long stopTime = Time.now();
routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime);
return appAttemptsInfo;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppAttempts Failed.", e);
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getAppAttempts error.", e);
}
return null;
routerMetrics.incrAppAttemptsFailedRetrieved();
throw new RuntimeException("getAppAttempts Failed.");
}
@Override
@ -1622,59 +1816,87 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
if (appAttemptId == null || appAttemptId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
}
// Check that the appId/appAttemptId format is accurate
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
RouterServerUtil.validateApplicationId(appId);
RouterServerUtil.validateApplicationAttemptId(appAttemptId);
} catch (IllegalArgumentException e) {
routerMetrics.incrAppAttemptReportFailedRetrieved();
throw e;
}
// Call the getAppAttempt method
try {
long startTime = Time.now();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempt(req, res, appId, appAttemptId);
AppAttemptInfo appAttemptInfo = interceptor.getAppAttempt(req, res, appId, appAttemptId);
if (appAttemptInfo != null) {
long stopTime = Time.now();
routerMetrics.succeededAppAttemptReportRetrieved(stopTime - startTime);
return appAttemptInfo;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrAppAttemptReportFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s, appAttemptId: %s.", appId, appAttemptId);
"Unable to getAppAttempt by appId: %s, appAttemptId: %s.", appId, appAttemptId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
routerMetrics.incrAppAttemptReportFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"getAppAttempt error, appId: %s, appAttemptId: %s.", appId, appAttemptId);
}
return null;
routerMetrics.incrAppAttemptReportFailedRetrieved();
throw RouterServerUtil.logAndReturnRunTimeException(
"getAppAttempt failed, appId: %s, appAttemptId: %s.", appId, appAttemptId);
}
@Override
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
ContainersInfo containersInfo = new ContainersInfo();
Map<SubClusterId, SubClusterInfo> subClustersActive;
// Check that the appId/appAttemptId format is accurate
try {
subClustersActive = getActiveSubclusters();
} catch (NotFoundException e) {
LOG.error("Get all active sub cluster(s) error.", e);
return containersInfo;
RouterServerUtil.validateApplicationId(appId);
RouterServerUtil.validateApplicationAttemptId(appAttemptId);
} catch (IllegalArgumentException e) {
routerMetrics.incrGetContainersFailedRetrieved();
throw e;
}
try {
long startTime = clock.getTime();
ContainersInfo containersInfo = new ContainersInfo();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
Class[] argsClasses = new Class[]{
HttpServletRequest.class, HttpServletResponse.class, String.class, String.class};
Object[] args = new Object[]{req, res, appId, appAttemptId};
ClientMethod remoteMethod = new ClientMethod("getContainers", argsClasses, args);
Map<SubClusterInfo, ContainersInfo> containersInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, ContainersInfo.class);
if (containersInfoMap != null) {
if (containersInfoMap != null && !containersInfoMap.isEmpty()) {
containersInfoMap.values().forEach(containers ->
containersInfo.addAll(containers.getContainers()));
}
} catch (Exception ex) {
LOG.error("Failed to return GetContainers.", ex);
if (containersInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetContainersRetrieved(stopTime - startTime);
return containersInfo;
}
} catch (NotFoundException e) {
routerMetrics.incrGetContainersFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e, "getContainers error, appId = %s, " +
" appAttemptId = %s, Probably getActiveSubclusters error.", appId, appAttemptId);
} catch (IOException | YarnException e) {
routerMetrics.incrGetContainersFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e, "getContainers error, appId = %s, " +
" appAttemptId = %s.", appId, appAttemptId);
}
return containersInfo;
routerMetrics.incrGetContainersFailedRetrieved();
throw RouterServerUtil.logAndReturnRunTimeException(
"getContainers failed, appId: %s, appAttemptId: %s.", appId, appAttemptId);
}
@Override
@ -1682,32 +1904,45 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
HttpServletResponse res, String appId, String appAttemptId,
String containerId) {
if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
if (appAttemptId == null || appAttemptId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
}
if (containerId == null || containerId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
// FederationInterceptorREST#getContainer is logically
// the same as FederationClientInterceptor#getContainerReport,
// so use the same Metric.
// Check that the appId/appAttemptId/containerId format is accurate
try {
RouterServerUtil.validateApplicationId(appId);
RouterServerUtil.validateApplicationAttemptId(appAttemptId);
RouterServerUtil.validateContainerId(containerId);
} catch (IllegalArgumentException e) {
routerMetrics.incrGetContainerReportFailedRetrieved();
throw e;
}
try {
long startTime = Time.now();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getContainer(req, res, appId, appAttemptId, containerId);
ContainerInfo containerInfo =
interceptor.getContainer(req, res, appId, appAttemptId, containerId);
if (containerInfo != null) {
long stopTime = Time.now();
routerMetrics.succeededGetContainerReportRetrieved(stopTime - startTime);
return containerInfo;
}
} catch (IllegalArgumentException e) {
String msg = String.format(
"Unable to get the AppAttempt appId: %s, appAttemptId: %s, containerId: %s.", appId,
appAttemptId, containerId);
routerMetrics.incrGetContainerReportFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(msg, e);
} catch (YarnException e) {
routerMetrics.incrGetContainerReportFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
}
return null;
routerMetrics.incrGetContainerReportFailedRetrieved();
throw new RuntimeException("getContainer Failed.");
}
@Override
@ -1735,31 +1970,45 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
public Response signalToContainer(String containerId, String command,
HttpServletRequest req) {
if (containerId == null || containerId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
// Check if containerId is empty or null
try {
RouterServerUtil.validateContainerId(containerId);
} catch (IllegalArgumentException e) {
routerMetrics.incrSignalToContainerFailedRetrieved();
throw e;
}
// Check if command is empty or null
if (command == null || command.isEmpty()) {
routerMetrics.incrSignalToContainerFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the command is empty or null.");
}
try {
long startTime = Time.now();
ContainerId containerIdObj = ContainerId.fromString(containerId);
ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId.toString());
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.signalToContainer(containerId, command, req);
Response response = interceptor.signalToContainer(containerId, command, req);
if (response != null) {
long stopTime = Time.now();
routerMetrics.succeededSignalToContainerRetrieved(stopTime - startTime);
return response;
}
} catch (YarnException e) {
routerMetrics.incrSignalToContainerFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("signalToContainer Failed.", e);
} catch (AuthorizationException e) {
routerMetrics.incrSignalToContainerFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("signalToContainer Author Failed.", e);
}
return null;
routerMetrics.incrSignalToContainerFailedRetrieved();
throw new RuntimeException("signalToContainer Failed.");
}
@Override
@ -1777,6 +2026,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
// Send the requests in parallel
CompletionService<R> compSvc = new ExecutorCompletionService<>(this.threadpool);
// Error Msg
for (final SubClusterInfo info : clusterIds) {
compSvc.submit(() -> {
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
@ -1831,6 +2081,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
}
subClusterInfo = federationFacade.getSubCluster(subClusterId);
return subClusterInfo;
} catch (IllegalArgumentException e){
throw new IllegalArgumentException(e);
} catch (YarnException e) {
RouterServerUtil.logAndThrowException(e,
"Get HomeSubClusterInfo by applicationId %s failed.", appId);

View File

@ -386,12 +386,12 @@ public class TestRouterMetrics {
public void getContainerReport() {
LOG.info("Mocked: failed getContainerReport call");
metrics.incrContainerReportFailedRetrieved();
metrics.incrGetContainerReportFailedRetrieved();
}
public void getContainer() {
public void getContainers() {
LOG.info("Mocked: failed getContainer call");
metrics.incrContainerFailedRetrieved();
metrics.incrGetContainersFailedRetrieved();
}
public void getResourceTypeInfo() {
@ -478,6 +478,41 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getListReservationFailed call");
metrics.incrListReservationFailedRetrieved();
}
public void getAppActivitiesFailed() {
LOG.info("Mocked: failed getAppActivitiesFailed call");
metrics.incrGetAppActivitiesFailedRetrieved();
}
public void getAppStatisticsFailed() {
LOG.info("Mocked: failed getAppStatisticsFailed call");
metrics.incrGetAppStatisticsFailedRetrieved();
}
public void getAppPriorityFailed() {
LOG.info("Mocked: failed getAppPriorityFailed call");
metrics.incrGetAppPriorityFailedRetrieved();
}
public void getAppQueueFailed() {
LOG.info("Mocked: failed getAppQueueFailed call");
metrics.incrGetAppQueueFailedRetrieved();
}
public void getUpdateQueueFailed() {
LOG.info("Mocked: failed getUpdateQueueFailed call");
metrics.incrUpdateAppQueueFailedRetrieved();
}
public void getAppTimeoutFailed() {
LOG.info("Mocked: failed getAppTimeoutFailed call");
metrics.incrGetAppTimeoutFailedRetrieved();
}
public void getAppTimeoutsFailed() {
LOG.info("Mocked: failed getAppTimeoutsFailed call");
metrics.incrGetAppTimeoutsFailedRetrieved();
}
}
// Records successes for all calls
@ -564,7 +599,7 @@ public class TestRouterMetrics {
metrics.succeededGetContainerReportRetrieved(duration);
}
public void getContainer(long duration) {
public void getContainers(long duration) {
LOG.info("Mocked: successful getContainer call with duration {}", duration);
metrics.succeededGetContainersRetrieved(duration);
}
@ -653,6 +688,41 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful getListReservation call with duration {}", duration);
metrics.succeededListReservationRetrieved(duration);
}
public void getAppActivitiesRetrieved(long duration) {
LOG.info("Mocked: successful getAppActivities call with duration {}", duration);
metrics.succeededGetAppActivitiesRetrieved(duration);
}
public void getAppStatisticsRetrieved(long duration) {
LOG.info("Mocked: successful getAppStatistics call with duration {}", duration);
metrics.succeededGetAppStatisticsRetrieved(duration);
}
public void getAppPriorityRetrieved(long duration) {
LOG.info("Mocked: successful getAppPriority call with duration {}", duration);
metrics.succeededGetAppPriorityRetrieved(duration);
}
public void getAppQueueRetrieved(long duration) {
LOG.info("Mocked: successful getAppQueue call with duration {}", duration);
metrics.succeededGetAppQueueRetrieved(duration);
}
public void getUpdateQueueRetrieved(long duration) {
LOG.info("Mocked: successful getUpdateQueue call with duration {}", duration);
metrics.succeededUpdateAppQueueRetrieved(duration);
}
public void getAppTimeoutRetrieved(long duration) {
LOG.info("Mocked: successful getAppTimeout call with duration {}", duration);
metrics.succeededGetAppTimeoutRetrieved(duration);
}
public void getAppTimeoutsRetrieved(long duration) {
LOG.info("Mocked: successful getAppTimeouts call with duration {}", duration);
metrics.succeededGetAppTimeoutsRetrieved(duration);
}
}
@Test
@ -827,12 +897,12 @@ public class TestRouterMetrics {
@Test
public void testSucceededGetContainers() {
long totalGoodBefore = metrics.getNumSucceededGetContainersRetrieved();
goodSubCluster.getContainer(150);
goodSubCluster.getContainers(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetContainersRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetContainersRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getContainer(300);
goodSubCluster.getContainers(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetContainersRetrieved());
Assert.assertEquals(225, metrics.getLatencySucceededGetContainersRetrieved(),
@ -840,9 +910,9 @@ public class TestRouterMetrics {
}
@Test
public void testGetContainerFailed() {
public void testGetContainersFailed() {
long totalBadBefore = metrics.getContainersFailedRetrieved();
badSubCluster.getContainer();
badSubCluster.getContainers();
Assert.assertEquals(totalBadBefore + 1, metrics.getContainersFailedRetrieved());
}
@ -1234,4 +1304,165 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadBefore + 1,
metrics.getListReservationFailedRetrieved());
}
@Test
public void testGetAppActivitiesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetAppActivitiesRetrieved();
goodSubCluster.getAppActivitiesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetAppActivitiesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetAppActivitiesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getAppActivitiesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetAppActivitiesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetAppActivitiesRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetAppActivitiesRetrievedFailed() {
long totalBadBefore = metrics.getAppActivitiesFailedRetrieved();
badSubCluster.getAppActivitiesFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getAppActivitiesFailedRetrieved());
}
@Test
public void testGetAppStatisticsLatencyRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetAppStatisticsRetrieved();
goodSubCluster.getAppStatisticsRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetAppStatisticsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetAppStatisticsRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getAppStatisticsRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetAppStatisticsRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetAppStatisticsRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetAppStatisticsRetrievedFailed() {
long totalBadBefore = metrics.getAppStatisticsFailedRetrieved();
badSubCluster.getAppStatisticsFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getAppStatisticsFailedRetrieved());
}
@Test
public void testGetAppPriorityLatencyRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetAppPriorityRetrieved();
goodSubCluster.getAppPriorityRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetAppPriorityRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getAppPriorityRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetAppPriorityRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetAppPriorityRetrievedFailed() {
long totalBadBefore = metrics.getAppPriorityFailedRetrieved();
badSubCluster.getAppPriorityFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getAppPriorityFailedRetrieved());
}
@Test
public void testGetAppQueueLatencyRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetAppQueueRetrieved();
goodSubCluster.getAppQueueRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetAppQueueRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetAppQueueRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getAppQueueRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetAppQueueRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetAppQueueRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetAppQueueRetrievedFailed() {
long totalBadBefore = metrics.getAppQueueFailedRetrieved();
badSubCluster.getAppQueueFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getAppQueueFailedRetrieved());
}
@Test
public void testUpdateAppQueueLatencyRetrieved() {
long totalGoodBefore = metrics.getNumSucceededUpdateAppQueueRetrieved();
goodSubCluster.getUpdateQueueRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededUpdateAppQueueRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededUpdateAppQueueRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getUpdateQueueRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededUpdateAppQueueRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededUpdateAppQueueRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testUpdateAppQueueRetrievedFailed() {
long totalBadBefore = metrics.getUpdateAppQueueFailedRetrieved();
badSubCluster.getUpdateQueueFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getUpdateAppQueueFailedRetrieved());
}
@Test
public void testGetAppTimeoutLatencyRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetAppTimeoutRetrieved();
goodSubCluster.getAppTimeoutRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetAppTimeoutRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetAppTimeoutRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getAppTimeoutRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetAppTimeoutRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetAppTimeoutRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetAppTimeoutRetrievedFailed() {
long totalBadBefore = metrics.getAppTimeoutFailedRetrieved();
badSubCluster.getAppTimeoutFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getAppTimeoutFailedRetrieved());
}
@Test
public void testGetAppTimeoutsLatencyRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetAppTimeoutsRetrieved();
goodSubCluster.getAppTimeoutsRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetAppTimeoutsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getAppTimeoutsRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetAppTimeoutsRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetAppTimeoutsRetrievedFailed() {
long totalBadBefore = metrics.getAppTimeoutsFailedRetrieved();
badSubCluster.getAppTimeoutsFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getAppTimeoutsFailedRetrieved());
}
}

View File

@ -117,6 +117,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.mockito.Mockito;
import org.slf4j.Logger;
@ -343,6 +344,23 @@ public class MockDefaultRequestInterceptorREST
throw new RuntimeException("RM is stopped");
}
// Try format conversion for app_id
ApplicationId applicationId = null;
try {
applicationId = ApplicationId.fromString(appId);
} catch (Exception e) {
throw new BadRequestException(e);
}
// Try format conversion for app_attempt_id
ApplicationAttemptId applicationAttemptId = null;
try {
applicationAttemptId =
ApplicationAttemptId.fromString(appAttemptId);
} catch (Exception e) {
throw new BadRequestException(e);
}
// We avoid to check if the Application exists in the system because we need
// to validate that each subCluster returns 1 container.
ContainersInfo containers = new ContainersInfo();
@ -453,8 +471,7 @@ public class MockDefaultRequestInterceptorREST
throw new RuntimeException("RM is stopped");
}
ContainerId newContainerId = ContainerId.newContainerId(
ApplicationAttemptId.fromString(appAttemptId), Integer.valueOf(containerId));
ContainerId newContainerId = ContainerId.fromString(containerId);
Resource allocatedResource = Resource.newInstance(1024, 2);
@ -505,15 +522,15 @@ public class MockDefaultRequestInterceptorREST
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(appAttemptId);
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
"user", "queue", "appname", "host", 124, null,
applicationId, attemptId, "user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
ApplicationAttemptId.newInstance(applicationId, Integer.parseInt(appAttemptId)),
"host", 124, "url", "oUrl", "diagnostics",
attemptId, "host", 124, "url", "oUrl", "diagnostics",
YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId(
newApplicationReport.getCurrentApplicationAttemptId(), 1));

View File

@ -30,6 +30,7 @@ import java.util.Collections;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.Time;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@ -634,23 +636,28 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
}
@Test
public void testGetContainersNotExists() {
public void testGetContainersNotExists() throws Exception {
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ContainersInfo response = interceptor.getContainers(null, null, appId.toString(), null);
Assert.assertTrue(response.getContainers().isEmpty());
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the appAttemptId is empty or null.",
() -> interceptor.getContainers(null, null, appId.toString(), null));
}
@Test
public void testGetContainersWrongFormat() {
ContainersInfo response = interceptor.getContainers(null, null, "Application_wrong_id", null);
Assert.assertNotNull(response);
Assert.assertTrue(response.getContainers().isEmpty());
public void testGetContainersWrongFormat() throws Exception {
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
response = interceptor.getContainers(null, null, appId.toString(), "AppAttempt_wrong_id");
ApplicationAttemptId appAttempt = ApplicationAttemptId.newInstance(appId, 1);
Assert.assertTrue(response.getContainers().isEmpty());
// Test Case 1: appId is wrong format, appAttemptId is accurate.
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Invalid ApplicationId prefix: Application_wrong_id. " +
"The valid ApplicationId should start with prefix application",
() -> interceptor.getContainers(null, null, "Application_wrong_id", appAttempt.toString()));
// Test Case2: appId is accurate, appAttemptId is wrong format.
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Invalid AppAttemptId prefix: AppAttempt_wrong_id",
() -> interceptor.getContainers(null, null, appId.toString(), "AppAttempt_wrong_id"));
}
@Test
@ -739,20 +746,28 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
}
@Test
public void testGetContainer()
throws IOException, InterruptedException, YarnException {
// Submit application to multiSubCluster
public void testGetContainer() throws Exception {
//
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
ContainerId appContainerId = ContainerId.newContainerId(appAttemptId, 1);
String applicationId = appId.toString();
String attemptId = appAttemptId.toString();
String containerId = appContainerId.toString();
// Submit application to multiSubCluster
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(applicationId);
Assert.assertNotNull(interceptor.submitApplication(context, null));
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
// Test Case1: Wrong ContainerId
LambdaTestUtils.intercept(IllegalArgumentException.class, "Invalid ContainerId prefix: 0",
() -> interceptor.getContainer(null, null, applicationId, attemptId, "0"));
ContainerInfo containerInfo = interceptor.getContainer(null, null,
appId.toString(), appAttemptId.toString(), "0");
// Test Case2: Correct ContainerId
ContainerInfo containerInfo = interceptor.getContainer(null, null, applicationId,
attemptId, containerId);
Assert.assertNotNull(containerInfo);
}
@ -800,9 +815,10 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
// Generate ApplicationAttemptId information
Assert.assertNotNull(interceptor.submitApplication(context, null));
ApplicationAttemptId expectAppAttemptId = ApplicationAttemptId.newInstance(appId, 1);
String appAttemptId = expectAppAttemptId.toString();
org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo
appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), "1");
appAttemptInfo = interceptor.getAppAttempt(null, null, appId.toString(), appAttemptId);
Assert.assertNotNull(appAttemptInfo);
Assert.assertEquals(expectAppAttemptId.toString(), appAttemptInfo.getAppAttemptId());
@ -1154,4 +1170,4 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
WebAppUtils.getHttpSchemePrefix(this.getConf()) + webAppAddress;
Assert.assertEquals(expectedHttpsWebAddress, webAppAddressWithScheme2);
}
}
}