YARN-11180. Refactor some code of getNewApplication, submitApplication etc. (#4618)

This commit is contained in:
slfan1989 2022-07-29 23:23:11 +08:00 committed by GitHub
parent e994635a95
commit 2680f17eb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 282 additions and 374 deletions

View File

@ -49,6 +49,8 @@ public static class AuditConstants {
public static final String SUBMIT_NEW_APP = "Submit New App"; public static final String SUBMIT_NEW_APP = "Submit New App";
public static final String FORCE_KILL_APP = "Force Kill App"; public static final String FORCE_KILL_APP = "Force Kill App";
public static final String GET_APP_REPORT = "Get Application Report"; public static final String GET_APP_REPORT = "Get Application Report";
public static final String TARGET_CLIENT_RM_SERVICE = "RouterClientRMService";
public static final String UNKNOWN = "UNKNOWN";
} }
/** /**

View File

@ -137,6 +137,13 @@
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_NEW_APP;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APP_REPORT;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.FORCE_KILL_APP;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.TARGET_CLIENT_RM_SERVICE;
import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UNKNOWN;
/** /**
* Extends the {@code AbstractRequestInterceptorClient} class and provides an * Extends the {@code AbstractRequestInterceptorClient} class and provides an
* implementation for federation of YARN RM and scaling an application across * implementation for federation of YARN RM and scaling an application across
@ -228,8 +235,8 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
UserGroupInformation realUser = user; UserGroupInformation realUser = user;
if (serviceAuthEnabled) { if (serviceAuthEnabled) {
realUser = UserGroupInformation.createProxyUser(user.getShortUserName(), realUser = UserGroupInformation.createProxyUser(
UserGroupInformation.getLoginUser()); user.getShortUserName(), UserGroupInformation.getLoginUser());
} }
clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(), clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
ApplicationClientProtocol.class, subClusterId, realUser); ApplicationClientProtocol.class, subClusterId, realUser);
@ -237,21 +244,17 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
RouterServerUtil.logAndThrowException( RouterServerUtil.logAndThrowException(
"Unable to create the interface to reach the SubCluster " + subClusterId, e); "Unable to create the interface to reach the SubCluster " + subClusterId, e);
} }
clientRMProxies.put(subClusterId, clientRMProxy); clientRMProxies.put(subClusterId, clientRMProxy);
return clientRMProxy; return clientRMProxy;
} }
private SubClusterId getRandomActiveSubCluster( private SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubclusters) Map<SubClusterId, SubClusterInfo> activeSubClusters) throws YarnException {
throws YarnException { if (activeSubClusters == null || activeSubClusters.isEmpty()) {
if (activeSubclusters == null || activeSubclusters.size() < 1) {
RouterServerUtil.logAndThrowException( RouterServerUtil.logAndThrowException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
} }
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet()); List<SubClusterId> list = new ArrayList<>(activeSubClusters.keySet());
return list.get(rand.nextInt(list.size())); return list.get(rand.nextInt(list.size()));
} }
@ -276,45 +279,43 @@ private SubClusterId getRandomActiveSubCluster(
public GetNewApplicationResponse getNewApplication( public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException { GetNewApplicationRequest request) throws YarnException, IOException {
long startTime = clock.getTime(); if (request == null) {
routerMetrics.incrAppsFailedCreated();
String errMsg = "Missing getNewApplication request.";
RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN,
TARGET_CLIENT_RM_SERVICE, errMsg);
RouterServerUtil.logAndThrowException(errMsg, null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = Map<SubClusterId, SubClusterInfo> subClustersActive =
federationFacade.getSubClusters(true); federationFacade.getSubClusters(true);
for (int i = 0; i < numSubmitRetries; ++i) { for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); LOG.info("getNewApplication try #{} on SubCluster {}.", i, subClusterId);
ApplicationClientProtocol clientRMProxy = ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
getClientRMProxyForSubCluster(subClusterId);
GetNewApplicationResponse response = null; GetNewApplicationResponse response = null;
try { try {
response = clientRMProxy.getNewApplication(request); response = clientRMProxy.getNewApplication(request);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unable to create a new ApplicationId in SubCluster " LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e);
+ subClusterId.getId(), e);
}
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededAppsCreated(stopTime - startTime);
RouterAuditLogger.logSuccess(user.getShortUserName(),
RouterAuditLogger.AuditConstants.GET_NEW_APP,
"RouterClientRMService", response.getApplicationId());
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
subClustersActive.remove(subClusterId); subClustersActive.remove(subClusterId);
} }
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededAppsCreated(stopTime - startTime);
RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP,
TARGET_CLIENT_RM_SERVICE, response.getApplicationId());
return response;
}
} }
routerMetrics.incrAppsFailedCreated(); routerMetrics.incrAppsFailedCreated();
String errMsg = "Fail to create a new application."; String errMsg = "Failed to create a new application.";
RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN,
RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN", TARGET_CLIENT_RM_SERVICE, errMsg);
"RouterClientRMService", errMsg);
throw new YarnException(errMsg); throw new YarnException(errMsg);
} }
@ -387,21 +388,20 @@ public GetNewApplicationResponse getNewApplication(
public SubmitApplicationResponse submitApplication( public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException { SubmitApplicationRequest request) throws YarnException, IOException {
long startTime = clock.getTime();
if (request == null || request.getApplicationSubmissionContext() == null if (request == null || request.getApplicationSubmissionContext() == null
|| request.getApplicationSubmissionContext() || request.getApplicationSubmissionContext().getApplicationId() == null) {
.getApplicationId() == null) {
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
String errMsg = String errMsg =
"Missing submitApplication request or applicationSubmissionContext " "Missing submitApplication request or applicationSubmissionContext information.";
+ "information."; RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
RouterAuditLogger.logFailure(user.getShortUserName(), TARGET_CLIENT_RM_SERVICE, errMsg);
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", RouterServerUtil.logAndThrowException(errMsg, null);
"RouterClientRMService", errMsg);
throw new YarnException(errMsg);
} }
SubmitApplicationResponse response = null;
long startTime = clock.getTime();
ApplicationId applicationId = ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId(); request.getApplicationSubmissionContext().getApplicationId();
@ -411,8 +411,9 @@ public SubmitApplicationResponse submitApplication(
SubClusterId subClusterId = policyFacade.getHomeSubcluster( SubClusterId subClusterId = policyFacade.getHomeSubcluster(
request.getApplicationSubmissionContext(), blacklist); request.getApplicationSubmissionContext(), blacklist);
LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, i,
subClusterId); LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
applicationId, i, subClusterId);
ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
@ -425,12 +426,12 @@ public SubmitApplicationResponse submitApplication(
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) { } catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
String message = "Unable to insert the ApplicationId " + applicationId String message =
+ " into the FederationStateStore"; String.format("Unable to insert the ApplicationId %s into the FederationStateStore.",
RouterAuditLogger.logFailure(user.getShortUserName(), applicationId);
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
"RouterClientRMService", message, applicationId, subClusterId); TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId);
throw new YarnException(message, e); RouterServerUtil.logAndThrowException(message, e);
} }
} else { } else {
try { try {
@ -438,19 +439,19 @@ public SubmitApplicationResponse submitApplication(
// the new subClusterId we have selected // the new subClusterId we have selected
federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) { } catch (YarnException e) {
String message = "Unable to update the ApplicationId " + applicationId String message =
+ " into the FederationStateStore"; String.format("Unable to update the ApplicationId %s into the FederationStateStore.",
applicationId);
SubClusterId subClusterIdInStateStore = SubClusterId subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId); federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) { if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application {} already submitted on SubCluster {}.", applicationId, LOG.info("Application {} already submitted on SubCluster {}.",
subClusterId); applicationId, subClusterId);
} else { } else {
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId);
"RouterClientRMService", message, applicationId, subClusterId); RouterServerUtil.logAndThrowException(message, e);
throw new YarnException(message, e);
} }
} }
} }
@ -458,7 +459,6 @@ public SubmitApplicationResponse submitApplication(
ApplicationClientProtocol clientRMProxy = ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId); getClientRMProxyForSubCluster(subClusterId);
SubmitApplicationResponse response = null;
try { try {
response = clientRMProxy.submitApplication(request); response = clientRMProxy.submitApplication(request);
} catch (Exception e) { } catch (Exception e) {
@ -472,9 +472,8 @@ public SubmitApplicationResponse submitApplication(
applicationId, subClusterId); applicationId, subClusterId);
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppsSubmitted(stopTime - startTime); routerMetrics.succeededAppsSubmitted(stopTime - startTime);
RouterAuditLogger.logSuccess(user.getShortUserName(), RouterAuditLogger.logSuccess(user.getShortUserName(), SUBMIT_NEW_APP,
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, TARGET_CLIENT_RM_SERVICE, applicationId, subClusterId);
"RouterClientRMService", applicationId, subClusterId);
return response; return response;
} else { } else {
// Empty response from the ResourceManager. // Empty response from the ResourceManager.
@ -484,13 +483,11 @@ public SubmitApplicationResponse submitApplication(
} }
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Application " String msg = String.format("Application %s with appId %s failed to be submitted.",
+ request.getApplicationSubmissionContext().getApplicationName() request.getApplicationSubmissionContext().getApplicationName(), applicationId);
+ " with appId " + applicationId + " failed to be submitted."; RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
RouterAuditLogger.logFailure(user.getShortUserName(), TARGET_CLIENT_RM_SERVICE, msg, applicationId);
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", throw new YarnException(msg);
"RouterClientRMService", errMsg, applicationId);
throw new YarnException(errMsg);
} }
/** /**
@ -513,16 +510,16 @@ public SubmitApplicationResponse submitApplication(
public KillApplicationResponse forceKillApplication( public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException { KillApplicationRequest request) throws YarnException, IOException {
long startTime = clock.getTime();
if (request == null || request.getApplicationId() == null) { if (request == null || request.getApplicationId() == null) {
routerMetrics.incrAppsFailedKilled(); routerMetrics.incrAppsFailedKilled();
String message = "Missing forceKillApplication request or ApplicationId."; String msg = "Missing forceKillApplication request or ApplicationId.";
RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN", TARGET_CLIENT_RM_SERVICE, msg);
"RouterClientRMService", message); RouterServerUtil.logAndThrowException(msg, null);
throw new YarnException(message);
} }
long startTime = clock.getTime();
ApplicationId applicationId = request.getApplicationId(); ApplicationId applicationId = request.getApplicationId();
SubClusterId subClusterId = null; SubClusterId subClusterId = null;
@ -531,12 +528,11 @@ public KillApplicationResponse forceKillApplication(
.getApplicationHomeSubCluster(request.getApplicationId()); .getApplicationHomeSubCluster(request.getApplicationId());
} catch (YarnException e) { } catch (YarnException e) {
routerMetrics.incrAppsFailedKilled(); routerMetrics.incrAppsFailedKilled();
String msg = "Application " + applicationId String msg =
+ " does not exist in FederationStateStore"; String.format("Application %s does not exist in FederationStateStore.", applicationId);
RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN", TARGET_CLIENT_RM_SERVICE, msg, applicationId);
"RouterClientRMService", msg, applicationId); RouterServerUtil.logAndThrowException(msg, e);
throw new YarnException(msg, e);
} }
ApplicationClientProtocol clientRMProxy = ApplicationClientProtocol clientRMProxy =
@ -548,11 +544,10 @@ public KillApplicationResponse forceKillApplication(
response = clientRMProxy.forceKillApplication(request); response = clientRMProxy.forceKillApplication(request);
} catch (Exception e) { } catch (Exception e) {
routerMetrics.incrAppsFailedKilled(); routerMetrics.incrAppsFailedKilled();
RouterAuditLogger.logFailure(user.getShortUserName(), String msg = "Unable to kill the application report.";
RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN", RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
"RouterClientRMService", "Unable to kill the application report", TARGET_CLIENT_RM_SERVICE, msg, applicationId, subClusterId);
applicationId, subClusterId); RouterServerUtil.logAndThrowException(msg, e);
throw e;
} }
if (response == null) { if (response == null) {
@ -562,9 +557,8 @@ public KillApplicationResponse forceKillApplication(
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppsKilled(stopTime - startTime); routerMetrics.succeededAppsKilled(stopTime - startTime);
RouterAuditLogger.logSuccess(user.getShortUserName(), RouterAuditLogger.logSuccess(user.getShortUserName(), FORCE_KILL_APP,
RouterAuditLogger.AuditConstants.FORCE_KILL_APP, TARGET_CLIENT_RM_SERVICE, applicationId);
"RouterClientRMService", applicationId);
return response; return response;
} }
@ -588,18 +582,15 @@ public KillApplicationResponse forceKillApplication(
public GetApplicationReportResponse getApplicationReport( public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException { GetApplicationReportRequest request) throws YarnException, IOException {
long startTime = clock.getTime();
if (request == null || request.getApplicationId() == null) { if (request == null || request.getApplicationId() == null) {
routerMetrics.incrAppsFailedRetrieved(); routerMetrics.incrAppsFailedRetrieved();
String errMsg = String errMsg = "Missing getApplicationReport request or applicationId information.";
"Missing getApplicationReport request or applicationId information."; RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
RouterAuditLogger.logFailure(user.getShortUserName(), TARGET_CLIENT_RM_SERVICE, errMsg);
RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN", RouterServerUtil.logAndThrowException(errMsg, null);
"RouterClientRMService", errMsg);
throw new YarnException(errMsg);
} }
long startTime = clock.getTime();
SubClusterId subClusterId = null; SubClusterId subClusterId = null;
try { try {
@ -607,29 +598,26 @@ public GetApplicationReportResponse getApplicationReport(
.getApplicationHomeSubCluster(request.getApplicationId()); .getApplicationHomeSubCluster(request.getApplicationId());
} catch (YarnException e) { } catch (YarnException e) {
routerMetrics.incrAppsFailedRetrieved(); routerMetrics.incrAppsFailedRetrieved();
String errMsg = "Application " + request.getApplicationId() String errMsg = String.format("Application %s does not exist in FederationStateStore.",
+ " does not exist in FederationStateStore"; request.getApplicationId());
RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN", TARGET_CLIENT_RM_SERVICE, errMsg, request.getApplicationId());
"RouterClientRMService", errMsg, request.getApplicationId()); RouterServerUtil.logAndThrowException(errMsg, e);
throw new YarnException(errMsg, e);
} }
ApplicationClientProtocol clientRMProxy = ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId); getClientRMProxyForSubCluster(subClusterId);
GetApplicationReportResponse response = null; GetApplicationReportResponse response = null;
try { try {
response = clientRMProxy.getApplicationReport(request); response = clientRMProxy.getApplicationReport(request);
} catch (Exception e) { } catch (Exception e) {
routerMetrics.incrAppsFailedRetrieved(); routerMetrics.incrAppsFailedRetrieved();
String errMsg = "Unable to get the application report for " + request String errMsg = String.format("Unable to get the application report for %s to SubCluster %s.",
.getApplicationId() + "to SubCluster " + subClusterId.getId(); request.getApplicationId(), subClusterId.getId());
RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN", TARGET_CLIENT_RM_SERVICE, errMsg, request.getApplicationId(), subClusterId);
"RouterClientRMService", errMsg, request.getApplicationId(), RouterServerUtil.logAndThrowException(errMsg, e);
subClusterId);
throw e;
} }
if (response == null) { if (response == null) {
@ -637,12 +625,10 @@ public GetApplicationReportResponse getApplicationReport(
+ "the application {} to SubCluster {}.", + "the application {} to SubCluster {}.",
request.getApplicationId(), subClusterId.getId()); request.getApplicationId(), subClusterId.getId());
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppsRetrieved(stopTime - startTime); routerMetrics.succeededAppsRetrieved(stopTime - startTime);
RouterAuditLogger.logSuccess(user.getShortUserName(), RouterAuditLogger.logSuccess(user.getShortUserName(), GET_APP_REPORT,
RouterAuditLogger.AuditConstants.GET_APP_REPORT, TARGET_CLIENT_RM_SERVICE, request.getApplicationId());
"RouterClientRMService", request.getApplicationId());
return response; return response;
} }
@ -670,56 +656,48 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException { throws YarnException, IOException {
if (request == null) { if (request == null) {
routerMetrics.incrMultipleAppsFailedRetrieved(); routerMetrics.incrMultipleAppsFailedRetrieved();
RouterServerUtil.logAndThrowException( RouterServerUtil.logAndThrowException("Missing getApplications request.", null);
"Missing getApplications request.",
null);
} }
long startTime = clock.getTime(); long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subclusters = Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true); federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications", ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request}); new Class[] {GetApplicationsRequest.class}, new Object[] {request});
Map<SubClusterId, GetApplicationsResponse> applications; Map<SubClusterId, GetApplicationsResponse> applications = null;
try { try {
applications = invokeConcurrent(subclusters.keySet(), remoteMethod, applications = invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class); GetApplicationsResponse.class);
} catch (Exception ex) { } catch (Exception ex) {
routerMetrics.incrMultipleAppsFailedRetrieved(); routerMetrics.incrMultipleAppsFailedRetrieved();
LOG.error("Unable to get applications due to exception.", ex); RouterServerUtil.logAndThrowException("Unable to get applications due to exception.", ex);
throw ex;
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime); routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
// Merge the Application Reports // Merge the Application Reports
return RouterYarnClientUtils.mergeApplications(applications.values(), return RouterYarnClientUtils.mergeApplications(applications.values(), returnPartialReport);
returnPartialReport);
} }
@Override @Override
public GetClusterMetricsResponse getClusterMetrics( public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException, IOException { GetClusterMetricsRequest request) throws YarnException, IOException {
if (request == null) {
routerMetrics.incrGetClusterMetricsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getClusterMetrics request.", null);
}
long startTime = clock.getTime(); long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics", ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class}, new Object[] {request}); new Class[] {GetClusterMetricsRequest.class}, new Object[] {request});
Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics; Collection<GetClusterMetricsResponse> clusterMetrics = null;
try { try {
clusterMetrics = invokeConcurrent(subclusters.keySet(), remoteMethod, clusterMetrics = invokeAppClientProtocolMethod(
GetClusterMetricsResponse.class); true, remoteMethod, GetClusterMetricsResponse.class);
} catch (Exception ex) { } catch (Exception ex) {
routerMetrics.incrGetClusterMetricsFailedRetrieved(); routerMetrics.incrGetClusterMetricsFailedRetrieved();
LOG.error("Unable to get cluster metrics due to exception.", ex); RouterServerUtil.logAndThrowException("Unable to get cluster metrics due to exception.", ex);
throw ex;
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededGetClusterMetricsRetrieved(stopTime - startTime); routerMetrics.succeededGetClusterMetricsRetrieved(stopTime - startTime);
return RouterYarnClientUtils.merge(clusterMetrics.values()); return RouterYarnClientUtils.merge(clusterMetrics);
} }
<R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds, <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
@ -804,8 +782,7 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
clusterNodes.put(subClusterId, response); clusterNodes.put(subClusterId, response);
} catch (Exception ex) { } catch (Exception ex) {
routerMetrics.incrClusterNodesFailedRetrieved(); routerMetrics.incrClusterNodesFailedRetrieved();
LOG.error("Unable to get cluster nodes due to exception.", ex); RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex);
throw ex;
} }
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
@ -850,14 +827,13 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
long startTime = clock.getTime(); long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getQueueUserAcls", ClientMethod remoteMethod = new ClientMethod("getQueueUserAcls",
new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request}); new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request});
Collection<GetQueueUserAclsInfoResponse> queueUserAcls; Collection<GetQueueUserAclsInfoResponse> queueUserAcls = null;
try { try {
queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod, queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod,
GetQueueUserAclsInfoResponse.class); GetQueueUserAclsInfoResponse.class);
} catch (Exception ex) { } catch (Exception ex) {
routerMetrics.incrQueueUserAclsFailedRetrieved(); routerMetrics.incrQueueUserAclsFailedRetrieved();
LOG.error("Unable to get queue user Acls due to exception.", ex); RouterServerUtil.logAndThrowException("Unable to get queue user Acls due to exception.", ex);
throw ex;
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededGetQueueUserAclsRetrieved(stopTime - startTime); routerMetrics.succeededGetQueueUserAclsRetrieved(stopTime - startTime);
@ -931,14 +907,14 @@ public ReservationListResponse listReservations(
long startTime = clock.getTime(); long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("listReservations", ClientMethod remoteMethod = new ClientMethod("listReservations",
new Class[] {ReservationListRequest.class}, new Object[] {request}); new Class[] {ReservationListRequest.class}, new Object[] {request});
Collection<ReservationListResponse> listResponses; Collection<ReservationListResponse> listResponses = null;
try { try {
listResponses = invokeAppClientProtocolMethod(true, remoteMethod, listResponses = invokeAppClientProtocolMethod(true, remoteMethod,
ReservationListResponse.class); ReservationListResponse.class);
} catch (Exception ex) { } catch (Exception ex) {
routerMetrics.incrListReservationsFailedRetrieved(); routerMetrics.incrListReservationsFailedRetrieved();
LOG.error("Unable to list reservations node due to exception.", ex); RouterServerUtil.logAndThrowException(
throw ex; "Unable to list reservations node due to exception.", ex);
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededListReservationsRetrieved(stopTime - startTime); routerMetrics.succeededListReservationsRetrieved(stopTime - startTime);
@ -986,14 +962,13 @@ public GetNodesToLabelsResponse getNodeToLabels(
long startTime = clock.getTime(); long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getNodeToLabels", ClientMethod remoteMethod = new ClientMethod("getNodeToLabels",
new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request}); new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request});
Collection<GetNodesToLabelsResponse> clusterNodes; Collection<GetNodesToLabelsResponse> clusterNodes = null;
try { try {
clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod, clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod,
GetNodesToLabelsResponse.class); GetNodesToLabelsResponse.class);
} catch (Exception ex) { } catch (Exception ex) {
routerMetrics.incrNodeToLabelsFailedRetrieved(); routerMetrics.incrNodeToLabelsFailedRetrieved();
LOG.error("Unable to get label node due to exception.", ex); RouterServerUtil.logAndThrowException("Unable to get node label due to exception.", ex);
throw ex;
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime); routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime);
@ -1011,14 +986,13 @@ public GetLabelsToNodesResponse getLabelsToNodes(
long startTime = clock.getTime(); long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes", ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes",
new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request}); new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
Collection<GetLabelsToNodesResponse> labelNodes; Collection<GetLabelsToNodesResponse> labelNodes = null;
try { try {
labelNodes = invokeAppClientProtocolMethod(true, remoteMethod, labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
GetLabelsToNodesResponse.class); GetLabelsToNodesResponse.class);
} catch (Exception ex) { } catch (Exception ex) {
routerMetrics.incrLabelsToNodesFailedRetrieved(); routerMetrics.incrLabelsToNodesFailedRetrieved();
LOG.error("Unable to get label node due to exception.", ex); RouterServerUtil.logAndThrowException("Unable to get label node due to exception.", ex);
throw ex;
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime); routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
@ -1036,14 +1010,14 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
long startTime = clock.getTime(); long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels", ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels",
new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request}); new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
Collection<GetClusterNodeLabelsResponse> nodeLabels; Collection<GetClusterNodeLabelsResponse> nodeLabels = null;
try { try {
nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod, nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
GetClusterNodeLabelsResponse.class); GetClusterNodeLabelsResponse.class);
} catch (Exception ex) { } catch (Exception ex) {
routerMetrics.incrClusterNodeLabelsFailedRetrieved(); routerMetrics.incrClusterNodeLabelsFailedRetrieved();
LOG.error("Unable to get cluster nodeLabels due to exception.", ex); RouterServerUtil.logAndThrowException("Unable to get cluster nodeLabels due to exception.",
throw ex; ex);
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime); routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime);
@ -1096,15 +1070,15 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
ApplicationClientProtocol clientRMProxy = ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId); getClientRMProxyForSubCluster(subClusterId);
GetApplicationAttemptReportResponse response; GetApplicationAttemptReportResponse response = null;
try { try {
response = clientRMProxy.getApplicationAttemptReport(request); response = clientRMProxy.getApplicationAttemptReport(request);
} catch (Exception e) { } catch (Exception e) {
routerMetrics.incrAppAttemptsFailedRetrieved(); routerMetrics.incrAppAttemptsFailedRetrieved();
LOG.error("Unable to get the applicationAttempt report for {} " String msg = String.format(
+ "to SubCluster {}, error = {}.", "Unable to get the applicationAttempt report for %s to SubCluster %s.",
request.getApplicationAttemptId(), subClusterId.getId(), e); request.getApplicationAttemptId(), subClusterId.getId());
throw e; RouterServerUtil.logAndThrowException(msg, e);
} }
if (response == null) { if (response == null) {
@ -1328,8 +1302,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
} catch (YarnException e) { } catch (YarnException e) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved(); routerMetrics.incrUpdateAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " + RouterServerUtil.logAndThrowException("Application " +
request.getApplicationId() + request.getApplicationId() + " does not exist in FederationStateStore.", e);
" does not exist in FederationStateStore.", e);
} }
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);

View File

@ -156,14 +156,13 @@ public void setUp() {
stateStore = new MemoryFederationStateStore(); stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf()); stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance().reinitialize(stateStore, FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf());
getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore); stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf()); interceptor.setConf(this.getConf());
interceptor.init(user); interceptor.init(user);
subClusters = new ArrayList<SubClusterId>(); subClusters = new ArrayList<>();
try { try {
for (int i = 0; i < NUM_SUBCLUSTER; i++) { for (int i = 0; i < NUM_SUBCLUSTER; i++) {
@ -212,17 +211,16 @@ protected YarnConfiguration createConfiguration() {
* ApplicationId has to belong to one of the SubCluster in the cluster. * ApplicationId has to belong to one of the SubCluster in the cluster.
*/ */
@Test @Test
public void testGetNewApplication() public void testGetNewApplication() throws YarnException, IOException {
throws YarnException, IOException, InterruptedException { LOG.info("Test FederationClientInterceptor: Get New Application.");
LOG.info("Test FederationClientInterceptor: Get New Application");
GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
GetNewApplicationResponse response = interceptor.getNewApplication(request); GetNewApplicationResponse response = interceptor.getNewApplication(request);
Assert.assertNotNull(response); Assert.assertNotNull(response);
Assert.assertNotNull(response.getApplicationId()); Assert.assertNotNull(response.getApplicationId());
Assert.assertTrue(response.getApplicationId() Assert.assertEquals(response.getApplicationId().getClusterTimestamp(),
.getClusterTimestamp() == ResourceManager.getClusterTimeStamp()); ResourceManager.getClusterTimeStamp());
} }
/** /**
@ -232,10 +230,9 @@ public void testGetNewApplication()
@Test @Test
public void testSubmitApplication() public void testSubmitApplication()
throws YarnException, IOException { throws YarnException, IOException {
LOG.info("Test FederationClientInterceptor: Submit Application"); LOG.info("Test FederationClientInterceptor: Submit Application.");
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request); SubmitApplicationResponse response = interceptor.submitApplication(request);
@ -249,14 +246,12 @@ public void testSubmitApplication()
private SubmitApplicationRequest mockSubmitApplicationRequest( private SubmitApplicationRequest mockSubmitApplicationRequest(
ApplicationId appId) { ApplicationId appId) {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
ApplicationSubmissionContext context = ApplicationSubmissionContext ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance(
.newInstance(appId, MockApps.newAppName(), "default", appId, MockApps.newAppName(), "default",
Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1, Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
Resources.createResource( Resources.createResource(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
"MockApp"); "MockApp");
SubmitApplicationRequest request = SubmitApplicationRequest SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context);
.newInstance(context);
return request; return request;
} }
@ -297,37 +292,27 @@ public void testSubmitApplicationMultipleSubmission()
*/ */
@Test @Test
public void testSubmitApplicationEmptyRequest() public void testSubmitApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException { throws Exception {
LOG.info( LOG.info("Test FederationClientInterceptor: Submit Application - Empty.");
"Test FederationClientInterceptor: Submit Application - Empty");
try { // null request1
interceptor.submitApplication(null); LambdaTestUtils.intercept(YarnException.class,
Assert.fail(); "Missing submitApplication request or applicationSubmissionContext information.",
} catch (YarnException e) { () -> interceptor.submitApplication(null));
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or " // null request2
+ "applicationSubmissionContext information.")); LambdaTestUtils.intercept(YarnException.class,
} "Missing submitApplication request or applicationSubmissionContext information.",
try { () -> interceptor.submitApplication(SubmitApplicationRequest.newInstance(null)));
interceptor.submitApplication(SubmitApplicationRequest.newInstance(null));
Assert.fail(); // null request3
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or "
+ "applicationSubmissionContext information."));
}
try {
ApplicationSubmissionContext context = ApplicationSubmissionContext ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(null, "", "", null, null, false, false, -1, null, null); .newInstance(null, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request = SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context); SubmitApplicationRequest.newInstance(context);
interceptor.submitApplication(request); LambdaTestUtils.intercept(YarnException.class,
Assert.fail(); "Missing submitApplication request or applicationSubmissionContext information.",
} catch (YarnException e) { () -> interceptor.submitApplication(request));
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or "
+ "applicationSubmissionContext information."));
}
} }
/** /**
@ -337,7 +322,7 @@ public void testSubmitApplicationEmptyRequest()
@Test @Test
public void testForceKillApplication() public void testForceKillApplication()
throws YarnException, IOException, InterruptedException { throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: Force Kill Application"); LOG.info("Test FederationClientInterceptor: Force Kill Application.");
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
@ -349,10 +334,8 @@ public void testForceKillApplication()
Assert.assertNotNull(response); Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
KillApplicationRequest requestKill = KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId);
KillApplicationRequest.newInstance(appId); KillApplicationResponse responseKill = interceptor.forceKillApplication(requestKill);
KillApplicationResponse responseKill =
interceptor.forceKillApplication(requestKill);
Assert.assertNotNull(responseKill); Assert.assertNotNull(responseKill);
} }
@ -361,22 +344,17 @@ public void testForceKillApplication()
* application does not exist in StateStore. * application does not exist in StateStore.
*/ */
@Test @Test
public void testForceKillApplicationNotExists() public void testForceKillApplicationNotExists() throws Exception {
throws YarnException, IOException, InterruptedException { LOG.info("Test FederationClientInterceptor: Force Kill Application - Not Exists");
LOG.info("Test FederationClientInterceptor: "
+ "Force Kill Application - Not Exists");
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
KillApplicationRequest requestKill = KillApplicationRequest requestKill =
KillApplicationRequest.newInstance(appId); KillApplicationRequest.newInstance(appId);
try {
interceptor.forceKillApplication(requestKill); LambdaTestUtils.intercept(YarnException.class,
Assert.fail(); "Application " + appId + " does not exist in FederationStateStore.",
} catch (YarnException e) { () -> interceptor.forceKillApplication(requestKill));
Assert.assertTrue(e.getMessage().equals(
"Application " + appId + " does not exist in FederationStateStore"));
}
} }
/** /**
@ -385,24 +363,19 @@ public void testForceKillApplicationNotExists()
*/ */
@Test @Test
public void testForceKillApplicationEmptyRequest() public void testForceKillApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException { throws Exception {
LOG.info( LOG.info("Test FederationClientInterceptor: Force Kill Application - Empty.");
"Test FederationClientInterceptor: Force Kill Application - Empty");
try { // null request1
interceptor.forceKillApplication(null); LambdaTestUtils.intercept(YarnException.class,
Assert.fail(); "Missing forceKillApplication request or ApplicationId.",
} catch (YarnException e) { () -> interceptor.forceKillApplication(null));
Assert.assertTrue(e.getMessage().startsWith(
"Missing forceKillApplication request or ApplicationId.")); // null request2
} KillApplicationRequest killRequest = KillApplicationRequest.newInstance(null);
try { LambdaTestUtils.intercept(YarnException.class,
interceptor "Missing forceKillApplication request or ApplicationId.",
.forceKillApplication(KillApplicationRequest.newInstance(null)); () -> interceptor.forceKillApplication(killRequest));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith(
"Missing forceKillApplication request or ApplicationId."));
}
} }
/** /**
@ -439,20 +412,14 @@ public void testGetApplicationReport()
*/ */
@Test @Test
public void testGetApplicationNotExists() public void testGetApplicationNotExists()
throws YarnException, IOException, InterruptedException { throws Exception {
LOG.info( LOG.info("Test ApplicationClientProtocol: Get Application Report - Not Exists.");
"Test ApplicationClientProtocol: Get Application Report - Not Exists");
ApplicationId appId = ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(System.currentTimeMillis(), 1); GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId);
GetApplicationReportRequest requestGet = LambdaTestUtils.intercept(YarnException.class,
GetApplicationReportRequest.newInstance(appId); "Application " + appId + " does not exist in FederationStateStore.",
try { () -> interceptor.getApplicationReport(requestGet));
interceptor.getApplicationReport(requestGet);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().equals(
"Application " + appId + " does not exist in FederationStateStore"));
}
} }
/** /**
@ -461,31 +428,23 @@ public void testGetApplicationNotExists()
*/ */
@Test @Test
public void testGetApplicationEmptyRequest() public void testGetApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException { throws Exception {
LOG.info( LOG.info("Test FederationClientInterceptor: Get Application Report - Empty.");
"Test FederationClientInterceptor: Get Application Report - Empty");
try { // null request1
interceptor.getApplicationReport(null); LambdaTestUtils.intercept(YarnException.class,
Assert.fail(); "Missing getApplicationReport request or applicationId information.",
} catch (YarnException e) { () -> interceptor.getApplicationReport(null));
Assert.assertTrue(
e.getMessage().startsWith("Missing getApplicationReport request or " // null request2
+ "applicationId information.")); GetApplicationReportRequest reportRequest = GetApplicationReportRequest.newInstance(null);
} LambdaTestUtils.intercept(YarnException.class,
try { "Missing getApplicationReport request or applicationId information.",
interceptor () -> interceptor.getApplicationReport(reportRequest));
.getApplicationReport(GetApplicationReportRequest.newInstance(null));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing getApplicationReport request or "
+ "applicationId information."));
}
} }
/** /**
* This test validates the correctness of * This test validates the correctness of GetApplicationAttemptReport in case the
* GetApplicationAttemptReport in case the
* application exists in the cluster. * application exists in the cluster.
*/ */
@Test @Test
@ -529,16 +488,13 @@ public void testGetApplicationAttemptReport()
} }
/** /**
* This test validates the correctness of * This test validates the correctness of GetApplicationAttemptReport in case the
* GetApplicationAttemptReport in case the
* application does not exist in StateStore. * application does not exist in StateStore.
*/ */
@Test @Test
public void testGetApplicationAttemptNotExists() public void testGetApplicationAttemptNotExists() throws Exception {
throws Exception { LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Not Exists.");
LOG.info(
"Test ApplicationClientProtocol: " +
"Get ApplicationAttempt Report - Not Exists");
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId appAttemptID = ApplicationAttemptId appAttemptID =
@ -553,53 +509,47 @@ public void testGetApplicationAttemptNotExists()
} }
/** /**
* This test validates * This test validates the correctness of GetApplicationAttemptReport in case of
* the correctness of GetApplicationAttemptReport in case of
* empty request. * empty request.
*/ */
@Test @Test
public void testGetApplicationAttemptEmptyRequest() public void testGetApplicationAttemptEmptyRequest()
throws Exception { throws Exception {
LOG.info("Test FederationClientInterceptor: " + LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Empty.");
"Get ApplicationAttempt Report - Empty");
// null request1
LambdaTestUtils.intercept(YarnException.class, LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport " + "Missing getApplicationAttemptReport request or applicationId " +
"request or applicationId " +
"or applicationAttemptId information.", "or applicationAttemptId information.",
() -> interceptor.getApplicationAttemptReport(null)); () -> interceptor.getApplicationAttemptReport(null));
// null request2
LambdaTestUtils.intercept(YarnException.class, LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport " + "Missing getApplicationAttemptReport request or applicationId " +
"request or applicationId " +
"or applicationAttemptId information.", "or applicationAttemptId information.",
() -> interceptor () -> interceptor.getApplicationAttemptReport(
.getApplicationAttemptReport( GetApplicationAttemptReportRequest.newInstance(null)));
GetApplicationAttemptReportRequest
.newInstance(null)));
// null request3
LambdaTestUtils.intercept(YarnException.class, LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport " + "Missing getApplicationAttemptReport request or applicationId " +
"request or applicationId " +
"or applicationAttemptId information.", "or applicationAttemptId information.",
() -> interceptor () -> interceptor.getApplicationAttemptReport(
.getApplicationAttemptReport(
GetApplicationAttemptReportRequest.newInstance( GetApplicationAttemptReportRequest.newInstance(
ApplicationAttemptId ApplicationAttemptId.newInstance(null, 1))));
.newInstance(null, 1)
)));
} }
@Test @Test
public void testGetClusterMetricsRequest() throws YarnException, IOException { public void testGetClusterMetricsRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request"); LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request.");
// null request // null request
GetClusterMetricsResponse response = interceptor.getClusterMetrics(null); LambdaTestUtils.intercept(YarnException.class, "Missing getClusterMetrics request.",
Assert.assertEquals(subClusters.size(), () -> interceptor.getClusterMetrics(null));
response.getClusterMetrics().getNumNodeManagers());
// normal request. // normal request.
response = GetClusterMetricsResponse response =
interceptor.getClusterMetrics(GetClusterMetricsRequest.newInstance()); interceptor.getClusterMetrics(GetClusterMetricsRequest.newInstance());
Assert.assertEquals(subClusters.size(), Assert.assertEquals(subClusters.size(),
response.getClusterMetrics().getNumNodeManagers()); response.getClusterMetrics().getNumNodeManagers());
@ -607,23 +557,20 @@ public void testGetClusterMetricsRequest() throws YarnException, IOException {
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics", ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class}, new Class[] {GetClusterMetricsRequest.class},
new Object[] {GetClusterMetricsRequest.newInstance()}); new Object[] {GetClusterMetricsRequest.newInstance()});
Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics =interceptor. Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
invokeConcurrent(new ArrayList<>(), remoteMethod, invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class);
GetClusterMetricsResponse.class); Assert.assertTrue(clusterMetrics.isEmpty());
Assert.assertEquals(true, clusterMetrics.isEmpty());
} }
/** /**
* This test validates the correctness of * This test validates the correctness of GetApplicationsResponse in case the
* GetApplicationsResponse in case the
* application exists in the cluster. * application exists in the cluster.
*/ */
@Test @Test
public void testGetApplicationsResponse() public void testGetApplicationsResponse()
throws YarnException, IOException, InterruptedException { throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: Get Applications Response"); LOG.info("Test FederationClientInterceptor: Get Applications Response.");
ApplicationId appId = ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request); SubmitApplicationResponse response = interceptor.submitApplication(request);
@ -632,40 +579,32 @@ public void testGetApplicationsResponse()
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
Set<String> appTypes = Collections.singleton("MockApp"); Set<String> appTypes = Collections.singleton("MockApp");
GetApplicationsRequest requestGet = GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes);
GetApplicationsRequest.newInstance(appTypes); GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
GetApplicationsResponse responseGet =
interceptor.getApplications(requestGet);
Assert.assertNotNull(responseGet); Assert.assertNotNull(responseGet);
} }
/** /**
* This test validates * This test validates the correctness of GetApplicationsResponse in case of
* the correctness of GetApplicationsResponse in case of
* empty request. * empty request.
*/ */
@Test @Test
public void testGetApplicationsNullRequest() throws Exception { public void testGetApplicationsNullRequest() throws Exception {
LOG.info("Test FederationClientInterceptor: Get Applications request"); LOG.info("Test FederationClientInterceptor: Get Applications request.");
LambdaTestUtils.intercept(YarnException.class, LambdaTestUtils.intercept(YarnException.class, "Missing getApplications request.",
"Missing getApplications request.",
() -> interceptor.getApplications(null)); () -> interceptor.getApplications(null));
} }
/** /**
* This test validates * This test validates the correctness of GetApplicationsResponse in case applications
* the correctness of GetApplicationsResponse in case applications
* with given type does not exist. * with given type does not exist.
*/ */
@Test @Test
public void testGetApplicationsApplicationTypeNotExists() throws Exception{ public void testGetApplicationsApplicationTypeNotExists() throws Exception{
LOG.info("Test FederationClientInterceptor: Application with type does " LOG.info("Test FederationClientInterceptor: Application with type does not exist.");
+ "not exist");
ApplicationId appId = ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request); SubmitApplicationResponse response = interceptor.submitApplication(request);
@ -675,25 +614,20 @@ public void testGetApplicationsApplicationTypeNotExists() throws Exception{
Set<String> appTypes = Collections.singleton("SPARK"); Set<String> appTypes = Collections.singleton("SPARK");
GetApplicationsRequest requestGet = GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes);
GetApplicationsRequest.newInstance(appTypes); GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
GetApplicationsResponse responseGet =
interceptor.getApplications(requestGet);
Assert.assertNotNull(responseGet); Assert.assertNotNull(responseGet);
Assert.assertTrue(responseGet.getApplicationList().isEmpty()); Assert.assertTrue(responseGet.getApplicationList().isEmpty());
} }
/** /**
* This test validates * This test validates the correctness of GetApplicationsResponse in case applications
* the correctness of GetApplicationsResponse in case applications
* with given YarnApplicationState does not exist. * with given YarnApplicationState does not exist.
*/ */
@Test @Test
public void testGetApplicationsApplicationStateNotExists() throws Exception { public void testGetApplicationsApplicationStateNotExists() throws Exception {
LOG.info("Test FederationClientInterceptor:" + LOG.info("Test FederationClientInterceptor: Application with state does not exist.");
" Application with state does not exist");
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
@ -711,8 +645,7 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception {
GetApplicationsRequest requestGet = GetApplicationsRequest requestGet =
GetApplicationsRequest.newInstance(applicationStates); GetApplicationsRequest.newInstance(applicationStates);
GetApplicationsResponse responseGet = GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
interceptor.getApplications(requestGet);
Assert.assertNotNull(responseGet); Assert.assertNotNull(responseGet);
Assert.assertTrue(responseGet.getApplicationList().isEmpty()); Assert.assertTrue(responseGet.getApplicationList().isEmpty());
@ -720,7 +653,7 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception {
@Test @Test
public void testGetClusterNodesRequest() throws Exception { public void testGetClusterNodesRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Cluster Nodeds request"); LOG.info("Test FederationClientInterceptor : Get Cluster Nodes request.");
// null request // null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.", LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.",
() -> interceptor.getClusterNodes(null)); () -> interceptor.getClusterNodes(null));
@ -732,7 +665,7 @@ public void testGetClusterNodesRequest() throws Exception {
@Test @Test
public void testGetNodeToLabelsRequest() throws Exception { public void testGetNodeToLabelsRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Node To Labels request"); LOG.info("Test FederationClientInterceptor : Get Node To Labels request.");
// null request // null request
LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToLabels request.", LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToLabels request.",
() -> interceptor.getNodeToLabels(null)); () -> interceptor.getNodeToLabels(null));
@ -744,7 +677,7 @@ public void testGetNodeToLabelsRequest() throws Exception {
@Test @Test
public void testGetLabelsToNodesRequest() throws Exception { public void testGetLabelsToNodesRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Labels To Node request"); LOG.info("Test FederationClientInterceptor : Get Labels To Node request.");
// null request // null request
LambdaTestUtils.intercept(YarnException.class, "Missing getLabelsToNodes request.", LambdaTestUtils.intercept(YarnException.class, "Missing getLabelsToNodes request.",
() -> interceptor.getLabelsToNodes(null)); () -> interceptor.getLabelsToNodes(null));
@ -756,7 +689,7 @@ public void testGetLabelsToNodesRequest() throws Exception {
@Test @Test
public void testClusterNodeLabelsRequest() throws Exception { public void testClusterNodeLabelsRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Cluster NodeLabels request"); LOG.info("Test FederationClientInterceptor : Get Cluster NodeLabels request.");
// null request // null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeLabels request.", LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeLabels request.",
() -> interceptor.getClusterNodeLabels(null)); () -> interceptor.getClusterNodeLabels(null));
@ -768,13 +701,13 @@ public void testClusterNodeLabelsRequest() throws Exception {
@Test @Test
public void testGetQueueUserAcls() throws Exception { public void testGetQueueUserAcls() throws Exception {
LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request"); LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request.");
// null request // null request
LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.", LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.",
() -> interceptor.getQueueUserAcls(null)); () -> interceptor.getQueueUserAcls(null));
// noraml request // normal request
GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls( GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls(
GetQueueUserAclsInfoRequest.newInstance()); GetQueueUserAclsInfoRequest.newInstance());
@ -796,7 +729,7 @@ public void testGetQueueUserAcls() throws Exception {
@Test @Test
public void testListReservations() throws Exception { public void testListReservations() throws Exception {
LOG.info("Test FederationClientInterceptor : Get ListReservations request"); LOG.info("Test FederationClientInterceptor : Get ListReservations request.");
// null request // null request
LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.", LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.",
@ -812,7 +745,7 @@ public void testListReservations() throws Exception {
@Test @Test
public void testGetContainersRequest() throws Exception { public void testGetContainersRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Containers request"); LOG.info("Test FederationClientInterceptor : Get Containers request.");
// null request // null request
LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " + LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " +
@ -928,7 +861,7 @@ public void getApplicationAttempts() throws Exception {
@Test @Test
public void testGetResourceTypeInfoRequest() throws Exception { public void testGetResourceTypeInfoRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request"); LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request.");
// null request // null request
LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.", LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.",
() -> interceptor.getResourceTypeInfo(null)); () -> interceptor.getResourceTypeInfo(null));
@ -1109,7 +1042,7 @@ public void testSignalContainer() throws Exception {
RMAppAttemptState.SCHEDULED); RMAppAttemptState.SCHEDULED);
MockNM nm = interceptor.getMockNMs().get(subClusterId); MockNM nm = interceptor.getMockNMs().get(subClusterId);
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId()); mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId(); ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId();
@ -1150,7 +1083,7 @@ public void testMoveApplicationAcrossQueues() throws Exception {
RMAppAttemptState.SCHEDULED); RMAppAttemptState.SCHEDULED);
MockNM nm = interceptor.getMockNMs().get(subClusterId); MockNM nm = interceptor.getMockNMs().get(subClusterId);
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId()); mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
MoveApplicationAcrossQueuesRequest acrossQueuesRequest = MoveApplicationAcrossQueuesRequest acrossQueuesRequest =