YARN-10487. Support getQueueUserAcls, listReservations, getApplicationAttempts, getContainerReport, getContainers, getResourceTypeInfo API's for Federation (#4341)

This commit is contained in:
slfan1989 2022-06-02 12:54:29 -07:00 committed by GitHub
parent 7ac271307c
commit 1cc2416f68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1110 additions and 54 deletions

View File

@ -51,7 +51,7 @@ public final class RouterMetrics {
private MutableGaugeInt numAppsFailedRetrieved; private MutableGaugeInt numAppsFailedRetrieved;
@Metric("# of multiple applications reports failed to be retrieved") @Metric("# of multiple applications reports failed to be retrieved")
private MutableGaugeInt numMultipleAppsFailedRetrieved; private MutableGaugeInt numMultipleAppsFailedRetrieved;
@Metric("# of applicationAttempt reports failed to be retrieved") @Metric("# of getApplicationAttempts failed to be retrieved")
private MutableGaugeInt numAppAttemptsFailedRetrieved; private MutableGaugeInt numAppAttemptsFailedRetrieved;
@Metric("# of getClusterMetrics failed to be retrieved") @Metric("# of getClusterMetrics failed to be retrieved")
private MutableGaugeInt numGetClusterMetricsFailedRetrieved; private MutableGaugeInt numGetClusterMetricsFailedRetrieved;
@ -63,6 +63,18 @@ public final class RouterMetrics {
private MutableGaugeInt numGetLabelsToNodesFailedRetrieved; private MutableGaugeInt numGetLabelsToNodesFailedRetrieved;
@Metric("# of getClusterNodeLabels failed to be retrieved") @Metric("# of getClusterNodeLabels failed to be retrieved")
private MutableGaugeInt numGetClusterNodeLabelsFailedRetrieved; private MutableGaugeInt numGetClusterNodeLabelsFailedRetrieved;
@Metric("# of getApplicationAttemptReports failed to be retrieved")
private MutableGaugeInt numAppAttemptReportFailedRetrieved;
@Metric("# of getQueueUserAcls failed to be retrieved")
private MutableGaugeInt numGetQueueUserAclsFailedRetrieved;
@Metric("# of getContainerReport failed to be retrieved")
private MutableGaugeInt numGetContainerReportFailedRetrieved;
@Metric("# of getContainers failed to be retrieved")
private MutableGaugeInt numGetContainersFailedRetrieved;
@Metric("# of getContainers failed to be retrieved")
private MutableGaugeInt numListReservationsFailedRetrieved;
@Metric("# of getResourceTypeInfo failed to be retrieved")
private MutableGaugeInt numGetResourceTypeInfo;
// Aggregate metrics are shared, and don't have to be looked up per call // Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)") @Metric("Total number of successful Submitted apps and latency(ms)")
@ -90,6 +102,18 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetLabelsToNodesRetrieved; private MutableRate totalSucceededGetLabelsToNodesRetrieved;
@Metric("Total number of successful Retrieved getClusterNodeLabels and latency(ms)") @Metric("Total number of successful Retrieved getClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededGetClusterNodeLabelsRetrieved; private MutableRate totalSucceededGetClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved getApplicationAttemptReport and latency(ms)")
private MutableRate totalSucceededAppAttemptReportRetrieved;
@Metric("Total number of successful Retrieved getQueueUserAcls and latency(ms)")
private MutableRate totalSucceededGetQueueUserAclsRetrieved;
@Metric("Total number of successful Retrieved getContainerReport and latency(ms)")
private MutableRate totalSucceededGetContainerReportRetrieved;
@Metric("Total number of successful Retrieved getContainers and latency(ms)")
private MutableRate totalSucceededGetContainersRetrieved;
@Metric("Total number of successful Retrieved listReservations and latency(ms)")
private MutableRate totalSucceededListReservationsRetrieved;
@Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)")
private MutableRate totalSucceededGetResourceTypeInfoRetrieved;
/** /**
* Provide quantile counters for all latencies. * Provide quantile counters for all latencies.
@ -105,6 +129,12 @@ public final class RouterMetrics {
private MutableQuantiles getNodeToLabelsLatency; private MutableQuantiles getNodeToLabelsLatency;
private MutableQuantiles getLabelToNodesLatency; private MutableQuantiles getLabelToNodesLatency;
private MutableQuantiles getClusterNodeLabelsLatency; private MutableQuantiles getClusterNodeLabelsLatency;
private MutableQuantiles getApplicationAttemptsLatency;
private MutableQuantiles getQueueUserAclsLatency;
private MutableQuantiles getContainerReportLatency;
private MutableQuantiles getContainerLatency;
private MutableQuantiles listReservationsLatency;
private MutableQuantiles listResourceTypeInfoLatency;
private static volatile RouterMetrics INSTANCE = null; private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry; private static MetricsRegistry registry;
@ -147,6 +177,30 @@ public final class RouterMetrics {
getClusterNodeLabelsLatency = getClusterNodeLabelsLatency =
registry.newQuantiles("getClusterNodeLabelsLatency", registry.newQuantiles("getClusterNodeLabelsLatency",
"latency of get cluster node labels", "ops", "latency", 10); "latency of get cluster node labels", "ops", "latency", 10);
getApplicationAttemptsLatency =
registry.newQuantiles("getApplicationAttemptsLatency",
"latency of get application attempts", "ops", "latency", 10);
getQueueUserAclsLatency =
registry.newQuantiles("getQueueUserAclsLatency",
"latency of get queue user acls", "ops", "latency", 10);
getContainerReportLatency =
registry.newQuantiles("getContainerReportLatency",
"latency of get container report", "ops", "latency", 10);
getContainerLatency =
registry.newQuantiles("getContainerLatency",
"latency of get container", "ops", "latency", 10);
listReservationsLatency =
registry.newQuantiles("listReservationsLatency",
"latency of list reservations", "ops", "latency", 10);
listResourceTypeInfoLatency =
registry.newQuantiles("getResourceTypeInfoLatency",
"latency of get resource type info", "ops", "latency", 10);
} }
public static RouterMetrics getMetrics() { public static RouterMetrics getMetrics() {
@ -223,6 +277,36 @@ public final class RouterMetrics {
return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().numSamples(); return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().numSamples();
} }
@VisibleForTesting
public long getNumSucceededAppAttemptReportRetrieved(){
return totalSucceededAppAttemptReportRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetQueueUserAclsRetrieved(){
return totalSucceededGetQueueUserAclsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetContainerReportRetrieved() {
return totalSucceededGetContainerReportRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetContainersRetrieved() {
return totalSucceededGetContainersRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededListReservationsRetrieved() {
return totalSucceededListReservationsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetResourceTypeInfoRetrieved() {
return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples();
}
@VisibleForTesting @VisibleForTesting
public double getLatencySucceededAppsCreated() { public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean(); return totalSucceededAppsCreated.lastStat().mean();
@ -240,7 +324,7 @@ public final class RouterMetrics {
@VisibleForTesting @VisibleForTesting
public double getLatencySucceededGetAppAttemptReport() { public double getLatencySucceededGetAppAttemptReport() {
return totalSucceededAppAttemptsRetrieved.lastStat().mean(); return totalSucceededAppAttemptReportRetrieved.lastStat().mean();
} }
@VisibleForTesting @VisibleForTesting
@ -278,6 +362,36 @@ public final class RouterMetrics {
return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().mean(); return totalSucceededGetClusterNodeLabelsRetrieved.lastStat().mean();
} }
@VisibleForTesting
public double getLatencySucceededAppAttemptRetrieved() {
return totalSucceededAppAttemptsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetQueueUserAclsRetrieved() {
return totalSucceededGetQueueUserAclsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetContainerReportRetrieved() {
return totalSucceededGetContainerReportRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetContainersRetrieved() {
return totalSucceededGetContainersRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededListReservationsRetrieved() {
return totalSucceededListReservationsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetResourceTypeInfoRetrieved() {
return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean();
}
@VisibleForTesting @VisibleForTesting
public int getAppsFailedCreated() { public int getAppsFailedCreated() {
return numAppsFailedCreated.value(); return numAppsFailedCreated.value();
@ -300,7 +414,7 @@ public final class RouterMetrics {
@VisibleForTesting @VisibleForTesting
public int getAppAttemptsFailedRetrieved() { public int getAppAttemptsFailedRetrieved() {
return numAppsFailedRetrieved.value(); return numAppAttemptsFailedRetrieved.value();
} }
@VisibleForTesting @VisibleForTesting
@ -333,6 +447,36 @@ public final class RouterMetrics {
return numGetClusterNodeLabelsFailedRetrieved.value(); return numGetClusterNodeLabelsFailedRetrieved.value();
} }
@VisibleForTesting
public int getAppAttemptReportFailedRetrieved() {
return numAppAttemptReportFailedRetrieved.value();
}
@VisibleForTesting
public int getQueueUserAclsFailedRetrieved() {
return numGetQueueUserAclsFailedRetrieved.value();
}
@VisibleForTesting
public int getContainerReportFailedRetrieved() {
return numGetContainerReportFailedRetrieved.value();
}
@VisibleForTesting
public int getContainersFailedRetrieved() {
return numGetContainersFailedRetrieved.value();
}
@VisibleForTesting
public int getListReservationsFailedRetrieved() {
return numListReservationsFailedRetrieved.value();
}
@VisibleForTesting
public int getGetResourceTypeInfoRetrieved() {
return numGetResourceTypeInfo.value();
}
public void succeededAppsCreated(long duration) { public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration); totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration); getNewApplicationLatency.add(duration);
@ -360,7 +504,7 @@ public final class RouterMetrics {
public void succeededAppAttemptsRetrieved(long duration) { public void succeededAppAttemptsRetrieved(long duration) {
totalSucceededAppAttemptsRetrieved.add(duration); totalSucceededAppAttemptsRetrieved.add(duration);
getApplicationAttemptReportLatency.add(duration); getApplicationAttemptsLatency.add(duration);
} }
public void succeededGetClusterMetricsRetrieved(long duration) { public void succeededGetClusterMetricsRetrieved(long duration) {
@ -388,6 +532,36 @@ public final class RouterMetrics {
getClusterNodeLabelsLatency.add(duration); getClusterNodeLabelsLatency.add(duration);
} }
public void succeededAppAttemptReportRetrieved(long duration) {
totalSucceededAppAttemptReportRetrieved.add(duration);
getApplicationAttemptReportLatency.add(duration);
}
public void succeededGetQueueUserAclsRetrieved(long duration) {
totalSucceededGetQueueUserAclsRetrieved.add(duration);
getQueueUserAclsLatency.add(duration);
}
public void succeededGetContainerReportRetrieved(long duration) {
totalSucceededGetContainerReportRetrieved.add(duration);
getContainerReportLatency.add(duration);
}
public void succeededGetContainersRetrieved(long duration) {
totalSucceededGetContainersRetrieved.add(duration);
getContainerLatency.add(duration);
}
public void succeededListReservationsRetrieved(long duration) {
totalSucceededListReservationsRetrieved.add(duration);
listReservationsLatency.add(duration);
}
public void succeededGetResourceTypeInfoRetrieved(long duration) {
totalSucceededGetResourceTypeInfoRetrieved.add(duration);
listResourceTypeInfoLatency.add(duration);
}
public void incrAppsFailedCreated() { public void incrAppsFailedCreated() {
numAppsFailedCreated.incr(); numAppsFailedCreated.incr();
} }
@ -431,4 +605,28 @@ public final class RouterMetrics {
public void incrClusterNodeLabelsFailedRetrieved() { public void incrClusterNodeLabelsFailedRetrieved() {
numGetClusterNodeLabelsFailedRetrieved.incr(); numGetClusterNodeLabelsFailedRetrieved.incr();
} }
public void incrAppAttemptReportFailedRetrieved() {
numAppAttemptReportFailedRetrieved.incr();
}
public void incrQueueUserAclsFailedRetrieved() {
numGetQueueUserAclsFailedRetrieved.incr();
}
public void incrContainerReportFailedRetrieved() {
numGetContainerReportFailedRetrieved.incr();
}
public void incrContainerFailedRetrieved() {
numGetContainersFailedRetrieved.incr();
}
public void incrListReservationsFailedRetrieved() {
numListReservationsFailedRetrieved.incr();
}
public void incrResourceTypeInfoFailedRetrieved() {
numGetResourceTypeInfo.incr();
}
} }

View File

@ -828,7 +828,26 @@ public class FederationClientInterceptor
@Override @Override
public GetQueueUserAclsInfoResponse getQueueUserAcls( public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnException, IOException { GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if(request == null){
routerMetrics.incrQueueUserAclsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getQueueUserAcls request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getQueueUserAcls",
new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request});
Collection<GetQueueUserAclsInfoResponse> queueUserAcls;
try {
queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod,
GetQueueUserAclsInfoResponse.class);
} catch (Exception ex) {
routerMetrics.incrQueueUserAclsFailedRetrieved();
LOG.error("Unable to get queue user Acls due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededGetQueueUserAclsRetrieved(stopTime - startTime);
// Merge the QueueUserAclsInfoResponse
return RouterYarnClientUtils.mergeQueueUserAcls(queueUserAcls);
} }
@Override @Override
@ -853,7 +872,26 @@ public class FederationClientInterceptor
@Override @Override
public ReservationListResponse listReservations( public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException { ReservationListRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if (request == null || request.getReservationId() == null) {
routerMetrics.incrListReservationsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing listReservations request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("listReservations",
new Class[] {ReservationListRequest.class}, new Object[] {request});
Collection<ReservationListResponse> listResponses;
try {
listResponses = invokeAppClientProtocolMethod(true, remoteMethod,
ReservationListResponse.class);
} catch (Exception ex) {
routerMetrics.incrListReservationsFailedRetrieved();
LOG.error("Unable to list reservations node due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededListReservationsRetrieved(stopTime - startTime);
// Merge the ReservationListResponse
return RouterYarnClientUtils.mergeReservationsList(listResponses);
} }
@Override @Override
@ -982,38 +1020,31 @@ public class FederationClientInterceptor
GetApplicationAttemptReportRequest request) GetApplicationAttemptReportRequest request)
throws YarnException, IOException { throws YarnException, IOException {
long startTime = clock.getTime();
if (request == null || request.getApplicationAttemptId() == null if (request == null || request.getApplicationAttemptId() == null
|| request.getApplicationAttemptId().getApplicationId() == null) { || request.getApplicationAttemptId().getApplicationId() == null) {
routerMetrics.incrAppAttemptsFailedRetrieved(); routerMetrics.incrAppAttemptReportFailedRetrieved();
RouterServerUtil.logAndThrowException( RouterServerUtil.logAndThrowException(
"Missing getApplicationAttemptReport " + "Missing getApplicationAttemptReport request or applicationId " +
"request or applicationId " + "or applicationAttemptId information.", null);
"or applicationAttemptId information.",
null);
} }
long startTime = clock.getTime();
SubClusterId subClusterId = null; SubClusterId subClusterId = null;
ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId();
try { try {
subClusterId = federationFacade subClusterId = getApplicationHomeSubCluster(applicationId);
.getApplicationHomeSubCluster(
request.getApplicationAttemptId().getApplicationId());
} catch (YarnException e) { } catch (YarnException e) {
routerMetrics.incrAppAttemptsFailedRetrieved(); routerMetrics.incrAppAttemptReportFailedRetrieved();
RouterServerUtil RouterServerUtil.logAndThrowException("ApplicationAttempt " +
.logAndThrowException("ApplicationAttempt " + request.getApplicationAttemptId() + " belongs to Application " +
request.getApplicationAttemptId() +
"belongs to Application " +
request.getApplicationAttemptId().getApplicationId() + request.getApplicationAttemptId().getApplicationId() +
" does not exist in FederationStateStore", e); " does not exist in FederationStateStore.", e);
} }
ApplicationClientProtocol clientRMProxy = ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId); getClientRMProxyForSubCluster(subClusterId);
GetApplicationAttemptReportResponse response = null; GetApplicationAttemptReportResponse response;
try { try {
response = clientRMProxy.getApplicationAttemptReport(request); response = clientRMProxy.getApplicationAttemptReport(request);
} catch (Exception e) { } catch (Exception e) {
@ -1031,26 +1062,134 @@ public class FederationClientInterceptor
} }
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime); routerMetrics.succeededAppAttemptReportRetrieved(stopTime - startTime);
return response; return response;
} }
@Override @Override
public GetApplicationAttemptsResponse getApplicationAttempts( public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException { GetApplicationAttemptsRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if (request == null || request.getApplicationId() == null) {
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getApplicationAttempts " +
"request or application id.", null);
}
long startTime = clock.getTime();
ApplicationId applicationId = request.getApplicationId();
SubClusterId subClusterId = null;
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException ex) {
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " + applicationId +
" does not exist in FederationStateStore.", ex);
}
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
GetApplicationAttemptsResponse response = null;
try {
response = clientRMProxy.getApplicationAttempts(request);
} catch (Exception ex) {
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get the application attempts for " +
applicationId + " from SubCluster " + subClusterId.getId(), ex);
}
if (response == null) {
LOG.error("No response when attempting to retrieve the attempts list of " +
"the application = {} to SubCluster = {}.", applicationId,
subClusterId.getId());
}
long stopTime = clock.getTime();
routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime);
return response;
} }
@Override @Override
public GetContainerReportResponse getContainerReport( public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException { GetContainerReportRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if(request == null || request.getContainerId() == null){
routerMetrics.incrContainerReportFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getContainerReport request " +
"or containerId", null);
}
long startTime = clock.getTime();
ApplicationId applicationId = request.getContainerId().
getApplicationAttemptId().getApplicationId();
SubClusterId subClusterId = null;
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException ex) {
routerMetrics.incrContainerReportFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " + applicationId +
" does not exist in FederationStateStore.", ex);
}
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
GetContainerReportResponse response = null;
try {
response = clientRMProxy.getContainerReport(request);
} catch (Exception ex) {
routerMetrics.incrContainerReportFailedRetrieved();
LOG.error("Unable to get the container report for {} from SubCluster {}.",
applicationId, subClusterId.getId(), ex);
}
if (response == null) {
LOG.error("No response when attempting to retrieve the container report of " +
"the ContainerId = {} From SubCluster = {}.", request.getContainerId(),
subClusterId.getId());
}
long stopTime = clock.getTime();
routerMetrics.succeededGetContainerReportRetrieved(stopTime - startTime);
return response;
} }
@Override @Override
public GetContainersResponse getContainers(GetContainersRequest request) public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException { throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if (request == null || request.getApplicationAttemptId() == null) {
routerMetrics.incrContainerFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getContainers request or ApplicationAttemptId.", null);
}
long startTime = clock.getTime();
ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId();
SubClusterId subClusterId = null;
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException ex) {
routerMetrics.incrContainerFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " + applicationId +
" does not exist in FederationStateStore.", ex);
}
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
GetContainersResponse response = null;
try {
response = clientRMProxy.getContainers(request);
} catch (Exception ex) {
routerMetrics.incrContainerFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get the containers for " +
applicationId + " from SubCluster " + subClusterId.getId(), ex);
}
if (response == null) {
LOG.error("No response when attempting to retrieve the container report of " +
"the ApplicationId = {} From SubCluster = {}.", applicationId,
subClusterId.getId());
}
long stopTime = clock.getTime();
routerMetrics.succeededGetContainersRetrieved(stopTime - startTime);
return response;
} }
@Override @Override
@ -1112,7 +1251,26 @@ public class FederationClientInterceptor
@Override @Override
public GetAllResourceTypeInfoResponse getResourceTypeInfo( public GetAllResourceTypeInfoResponse getResourceTypeInfo(
GetAllResourceTypeInfoRequest request) throws YarnException, IOException { GetAllResourceTypeInfoRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if (request == null) {
routerMetrics.incrResourceTypeInfoFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getResourceTypeInfo request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getResourceTypeInfo",
new Class[] {GetAllResourceTypeInfoRequest.class}, new Object[] {request});
Collection<GetAllResourceTypeInfoResponse> listResourceTypeInfo;
try {
listResourceTypeInfo = invokeAppClientProtocolMethod(true, remoteMethod,
GetAllResourceTypeInfoResponse.class);
} catch (Exception ex) {
routerMetrics.incrResourceTypeInfoFailedRetrieved();
LOG.error("Unable to get all resource type info node due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededGetResourceTypeInfoRetrieved(stopTime - startTime);
// Merge the GetAllResourceTypeInfoResponse
return RouterYarnClientUtils.mergeResourceTypes(listResourceTypeInfo);
} }
@Override @Override
@ -1139,4 +1297,61 @@ public class FederationClientInterceptor
GetNodesToAttributesRequest request) throws YarnException, IOException { GetNodesToAttributesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); throw new NotImplementedException("Code is not implemented");
} }
protected SubClusterId getApplicationHomeSubCluster(
ApplicationId applicationId) throws YarnException {
if (applicationId == null) {
LOG.error("ApplicationId is Null, Can't find in SubCluster.");
return null;
}
SubClusterId resultSubClusterId = null;
// try looking for applicationId in Home SubCluster
try {
resultSubClusterId = federationFacade.
getApplicationHomeSubCluster(applicationId);
} catch (YarnException ex) {
if(LOG.isDebugEnabled()){
LOG.debug("can't find applicationId = {} in home sub cluster, " +
" try foreach sub clusters.", applicationId);
}
}
if (resultSubClusterId != null) {
return resultSubClusterId;
}
// if applicationId not found in Home SubCluster,
// foreach Clusters
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(true);
for (SubClusterId subClusterId : subClusters.keySet()) {
try {
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
if(clientRMProxy == null) {
continue;
}
GetApplicationReportRequest appReportRequest =
GetApplicationReportRequest.newInstance(applicationId);
GetApplicationReportResponse appReportResponse =
clientRMProxy.getApplicationReport(appReportRequest);
if(appReportResponse!=null && applicationId.equals(
appReportResponse.getApplicationReport().getApplicationId())){
resultSubClusterId = federationFacade.addApplicationHomeSubCluster(
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId));
return resultSubClusterId;
}
} catch (Exception ex) {
if(LOG.isDebugEnabled()){
LOG.debug("Can't Find ApplicationId = {} in Sub Cluster!", applicationId);
}
}
}
String errorMsg =
String.format("Can't Found applicationId = %s in any sub clusters", applicationId);
throw new YarnException(errorMsg);
}
} }

View File

@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -38,6 +41,9 @@ import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -295,5 +301,69 @@ public final class RouterYarnClientUtils {
nodeLabelsResponse.setNodeLabelList(new ArrayList<>(nodeLabelsList)); nodeLabelsResponse.setNodeLabelList(new ArrayList<>(nodeLabelsList));
return nodeLabelsResponse; return nodeLabelsResponse;
} }
/**
* Merges a list of GetQueueUserAclsInfoResponse.
*
* @param responses a list of GetQueueUserAclsInfoResponse to merge.
* @return the merged GetQueueUserAclsInfoResponse.
*/
public static GetQueueUserAclsInfoResponse mergeQueueUserAcls(
Collection<GetQueueUserAclsInfoResponse> responses) {
GetQueueUserAclsInfoResponse aclsInfoResponse = Records.newRecord(
GetQueueUserAclsInfoResponse.class);
Set<QueueUserACLInfo> queueUserACLInfos = new HashSet<>();
for (GetQueueUserAclsInfoResponse response : responses) {
if (response != null && response.getUserAclsInfoList() != null) {
queueUserACLInfos.addAll(response.getUserAclsInfoList());
}
}
aclsInfoResponse.setUserAclsInfoList(new ArrayList<>(queueUserACLInfos));
return aclsInfoResponse;
}
/**
* Merges a list of ReservationListResponse.
*
* @param responses a list of ReservationListResponse to merge.
* @return the merged ReservationListResponse.
*/
public static ReservationListResponse mergeReservationsList(
Collection<ReservationListResponse> responses) {
ReservationListResponse reservationListResponse =
Records.newRecord(ReservationListResponse.class);
List<ReservationAllocationState> reservationAllocationStates =
new ArrayList<>();
for (ReservationListResponse response : responses) {
if (response != null && response.getReservationAllocationState() != null) {
reservationAllocationStates.addAll(
response.getReservationAllocationState());
}
}
reservationListResponse.setReservationAllocationState(
reservationAllocationStates);
return reservationListResponse;
}
/**
* Merges a list of GetAllResourceTypeInfoResponse.
*
* @param responses a list of GetAllResourceTypeInfoResponse to merge.
* @return the merged GetAllResourceTypeInfoResponse.
*/
public static GetAllResourceTypeInfoResponse mergeResourceTypes(
Collection<GetAllResourceTypeInfoResponse> responses) {
GetAllResourceTypeInfoResponse resourceTypeInfoResponse =
Records.newRecord(GetAllResourceTypeInfoResponse.class);
Set<ResourceTypeInfo> resourceTypeInfoSet = new HashSet<>();
for (GetAllResourceTypeInfoResponse response : responses) {
if (response != null && response.getResourceTypeInfo() != null) {
resourceTypeInfoSet.addAll(response.getResourceTypeInfo());
}
}
resourceTypeInfoResponse.setResourceTypeInfo(
new ArrayList<>(resourceTypeInfoSet));
return resourceTypeInfoResponse;
}
} }

View File

@ -210,21 +210,21 @@ public class TestRouterMetrics {
@Test @Test
public void testSucceededAppAttemptReport() { public void testSucceededAppAttemptReport() {
long totalGoodBefore = metrics.getNumSucceededAppAttemptsRetrieved(); long totalGoodBefore = metrics.getNumSucceededAppAttemptReportRetrieved();
goodSubCluster.getApplicationAttemptReport(100); goodSubCluster.getApplicationAttemptReport(100);
Assert.assertEquals(totalGoodBefore + 1, Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededAppAttemptsRetrieved()); metrics.getNumSucceededAppAttemptReportRetrieved());
Assert.assertEquals(100, Assert.assertEquals(100,
metrics.getLatencySucceededGetAppAttemptReport(), 0); metrics.getLatencySucceededGetAppAttemptReport(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getApplicationAttemptReport(200); goodSubCluster.getApplicationAttemptReport(200);
Assert.assertEquals(totalGoodBefore + 2, Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededAppAttemptsRetrieved()); metrics.getNumSucceededAppAttemptReportRetrieved());
Assert.assertEquals(150, Assert.assertEquals(150,
metrics.getLatencySucceededGetAppAttemptReport(), 0); metrics.getLatencySucceededGetAppAttemptReport(), ASSERT_DOUBLE_DELTA);
} }
/** /**
@ -234,12 +234,12 @@ public class TestRouterMetrics {
@Test @Test
public void testAppAttemptReportFailed() { public void testAppAttemptReportFailed() {
long totalBadbefore = metrics.getAppAttemptsFailedRetrieved(); long totalBadBefore = metrics.getAppAttemptReportFailedRetrieved();
badSubCluster.getApplicationAttemptReport(); badSubCluster.getApplicationAttemptReport();
Assert.assertEquals(totalBadbefore + 1, Assert.assertEquals(totalBadBefore + 1,
metrics.getAppAttemptsFailedRetrieved()); metrics.getAppAttemptReportFailedRetrieved());
} }
/** /**
@ -336,7 +336,7 @@ public class TestRouterMetrics {
public void getApplicationAttemptReport() { public void getApplicationAttemptReport() {
LOG.info("Mocked: failed getApplicationAttemptReport call"); LOG.info("Mocked: failed getApplicationAttemptReport call");
metrics.incrAppsFailedRetrieved(); metrics.incrAppAttemptReportFailedRetrieved();
} }
public void getApplicationsReport() { public void getApplicationsReport() {
@ -368,6 +368,36 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getClusterNodeLabels call"); LOG.info("Mocked: failed getClusterNodeLabels call");
metrics.incrClusterNodeLabelsFailedRetrieved(); metrics.incrClusterNodeLabelsFailedRetrieved();
} }
public void getQueueUserAcls() {
LOG.info("Mocked: failed getQueueUserAcls call");
metrics.incrQueueUserAclsFailedRetrieved();
}
public void getListReservations() {
LOG.info("Mocked: failed listReservations call");
metrics.incrListReservationsFailedRetrieved();
}
public void getApplicationAttempts() {
LOG.info("Mocked: failed getApplicationAttempts call");
metrics.incrAppAttemptsFailedRetrieved();
}
public void getContainerReport() {
LOG.info("Mocked: failed getContainerReport call");
metrics.incrContainerReportFailedRetrieved();
}
public void getContainer() {
LOG.info("Mocked: failed getContainer call");
metrics.incrContainerFailedRetrieved();
}
public void getResourceTypeInfo() {
LOG.info("Mocked: failed getResourceTypeInfo call");
metrics.incrResourceTypeInfoFailedRetrieved();
}
} }
// Records successes for all calls // Records successes for all calls
@ -397,10 +427,9 @@ public class TestRouterMetrics {
} }
public void getApplicationAttemptReport(long duration) { public void getApplicationAttemptReport(long duration) {
LOG.info("Mocked: successful " + LOG.info("Mocked: successful getApplicationAttemptReport call " +
"getApplicationAttemptReport call with duration {}", "with duration {}", duration);
duration); metrics.succeededAppAttemptReportRetrieved(duration);
metrics.succeededAppAttemptsRetrieved(duration);
} }
public void getApplicationsReport(long duration) { public void getApplicationsReport(long duration) {
@ -434,6 +463,36 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful getClusterNodeLabels call with duration {}", duration); LOG.info("Mocked: successful getClusterNodeLabels call with duration {}", duration);
metrics.succeededGetClusterNodeLabelsRetrieved(duration); metrics.succeededGetClusterNodeLabelsRetrieved(duration);
} }
public void getQueueUserAcls(long duration) {
LOG.info("Mocked: successful getQueueUserAcls call with duration {}", duration);
metrics.succeededGetQueueUserAclsRetrieved(duration);
}
public void getListReservations(long duration) {
LOG.info("Mocked: successful listReservations call with duration {}", duration);
metrics.succeededListReservationsRetrieved(duration);
}
public void getApplicationAttempts(long duration) {
LOG.info("Mocked: successful getApplicationAttempts call with duration {}", duration);
metrics.succeededAppAttemptsRetrieved(duration);
}
public void getContainerReport(long duration) {
LOG.info("Mocked: successful getContainerReport call with duration {}", duration);
metrics.succeededGetContainerReportRetrieved(duration);
}
public void getContainer(long duration) {
LOG.info("Mocked: successful getContainer call with duration {}", duration);
metrics.succeededGetContainersRetrieved(duration);
}
public void getResourceTypeInfo(long duration) {
LOG.info("Mocked: successful getResourceTypeInfo call with duration {}", duration);
metrics.succeededGetResourceTypeInfoRetrieved(duration);
}
} }
@Test @Test
@ -517,4 +576,136 @@ public class TestRouterMetrics {
badSubCluster.getClusterNodeLabels(); badSubCluster.getClusterNodeLabels();
Assert.assertEquals(totalBadBefore + 1, metrics.getGetClusterNodeLabelsFailedRetrieved()); Assert.assertEquals(totalBadBefore + 1, metrics.getGetClusterNodeLabelsFailedRetrieved());
} }
@Test
public void testSucceededQueueUserAcls() {
long totalGoodBefore = metrics.getNumSucceededGetQueueUserAclsRetrieved();
goodSubCluster.getQueueUserAcls(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetQueueUserAclsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetQueueUserAclsRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getQueueUserAcls(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetQueueUserAclsRetrieved());
Assert.assertEquals(225, metrics.getLatencySucceededGetQueueUserAclsRetrieved(),
ASSERT_DOUBLE_DELTA);
}
@Test
public void testQueueUserAclsFailed() {
long totalBadBefore = metrics.getQueueUserAclsFailedRetrieved();
badSubCluster.getQueueUserAcls();
Assert.assertEquals(totalBadBefore + 1, metrics.getQueueUserAclsFailedRetrieved());
}
@Test
public void testSucceededListReservations() {
long totalGoodBefore = metrics.getNumSucceededListReservationsRetrieved();
goodSubCluster.getListReservations(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededListReservationsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededListReservationsRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getListReservations(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededListReservationsRetrieved());
Assert.assertEquals(225, metrics.getLatencySucceededListReservationsRetrieved(),
ASSERT_DOUBLE_DELTA);
}
@Test
public void testListReservationsFailed() {
long totalBadBefore = metrics.getListReservationsFailedRetrieved();
badSubCluster.getListReservations();
Assert.assertEquals(totalBadBefore + 1, metrics.getListReservationsFailedRetrieved());
}
@Test
public void testSucceededGetApplicationAttempts() {
long totalGoodBefore = metrics.getNumSucceededAppAttemptsRetrieved();
goodSubCluster.getApplicationAttempts(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededAppAttemptsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getApplicationAttempts(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededAppAttemptsRetrieved());
Assert.assertEquals(225, metrics.getLatencySucceededAppAttemptRetrieved(),
ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetApplicationAttemptsFailed() {
long totalBadBefore = metrics.getAppAttemptsFailedRetrieved();
badSubCluster.getApplicationAttempts();
Assert.assertEquals(totalBadBefore + 1, metrics.getAppAttemptsFailedRetrieved());
}
@Test
public void testSucceededGetContainerReport() {
long totalGoodBefore = metrics.getNumSucceededGetContainerReportRetrieved();
goodSubCluster.getContainerReport(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetContainerReportRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetContainerReportRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getContainerReport(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetContainerReportRetrieved());
Assert.assertEquals(225, metrics.getLatencySucceededGetContainerReportRetrieved(),
ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetContainerReportFailed() {
long totalBadBefore = metrics.getContainerReportFailedRetrieved();
badSubCluster.getContainerReport();
Assert.assertEquals(totalBadBefore + 1, metrics.getContainerReportFailedRetrieved());
}
@Test
public void testSucceededGetContainers() {
long totalGoodBefore = metrics.getNumSucceededGetContainersRetrieved();
goodSubCluster.getContainer(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetContainersRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetContainersRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getContainer(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetContainersRetrieved());
Assert.assertEquals(225, metrics.getLatencySucceededGetContainersRetrieved(),
ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetContainerFailed() {
long totalBadBefore = metrics.getContainersFailedRetrieved();
badSubCluster.getContainer();
Assert.assertEquals(totalBadBefore + 1, metrics.getContainersFailedRetrieved());
}
@Test
public void testSucceededGetResourceTypeInfo() {
long totalGoodBefore = metrics.getNumSucceededGetResourceTypeInfoRetrieved();
goodSubCluster.getResourceTypeInfo(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetResourceTypeInfoRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetResourceTypeInfoRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getResourceTypeInfo(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetResourceTypeInfoRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetResourceTypeInfoRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetResourceTypeInfoFailed() {
long totalBadBefore = metrics.getGetResourceTypeInfoRetrieved();
badSubCluster.getResourceTypeInfo();
Assert.assertEquals(totalBadBefore + 1, metrics.getGetResourceTypeInfoRetrieved());
}
} }

View File

@ -27,6 +27,7 @@ import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
@ -52,12 +53,28 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@ -435,13 +452,10 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
@Test @Test
public void testGetApplicationAttemptReport() public void testGetApplicationAttemptReport()
throws YarnException, IOException, InterruptedException { throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: " + LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report.");
"Get ApplicationAttempt Report");
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
@ -451,8 +465,23 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
Assert.assertNotNull(response); Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
// Call GetApplicationAttempts Get ApplicationAttemptId
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
// Wait for app to start
while(attemptsResponse.getApplicationAttemptList().size() == 0) {
attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
}
Assert.assertNotNull(attemptsResponse);
GetApplicationAttemptReportRequest requestGet = GetApplicationAttemptReportRequest requestGet =
GetApplicationAttemptReportRequest.newInstance(appAttemptId); GetApplicationAttemptReportRequest.newInstance(
attemptsResponse.getApplicationAttemptList().get(0).getApplicationAttemptId());
GetApplicationAttemptReportResponse responseGet = GetApplicationAttemptReportResponse responseGet =
interceptor.getApplicationAttemptReport(requestGet); interceptor.getApplicationAttemptReport(requestGet);
@ -479,8 +508,8 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
GetApplicationAttemptReportRequest.newInstance(appAttemptID); GetApplicationAttemptReportRequest.newInstance(appAttemptID);
LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " + LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " +
appAttemptID + "belongs to Application " + appAttemptID + " belongs to Application " +
appId + " does not exist in FederationStateStore", appId + " does not exist in FederationStateStore.",
() -> interceptor.getApplicationAttemptReport(requestGet)); () -> interceptor.getApplicationAttemptReport(requestGet));
} }
@ -697,4 +726,176 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
interceptor.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); interceptor.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
Assert.assertEquals(0, response.getNodeLabelList().size()); Assert.assertEquals(0, response.getNodeLabelList().size());
} }
@Test
public void testGetQueueUserAcls() throws Exception {
LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.",
() -> interceptor.getQueueUserAcls(null));
// noraml request
GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls(
GetQueueUserAclsInfoRequest.newInstance());
Assert.assertNotNull(response);
List<QueueACL> submitAndAdministerAcl = new ArrayList<>();
submitAndAdministerAcl.add(QueueACL.SUBMIT_APPLICATIONS);
submitAndAdministerAcl.add(QueueACL.ADMINISTER_QUEUE);
QueueUserACLInfo exceptRootQueueACLInfo = QueueUserACLInfo.newInstance("root",
submitAndAdministerAcl);
QueueUserACLInfo queueRootQueueACLInfo = response.getUserAclsInfoList().stream().
filter(acl->acl.getQueueName().equals("root")).
collect(Collectors.toList()).get(0);
Assert.assertEquals(exceptRootQueueACLInfo, queueRootQueueACLInfo);
}
@Test
public void testListReservations() throws Exception {
LOG.info("Test FederationClientInterceptor : Get ListReservations request");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.",
() -> interceptor.listReservations(null));
// normal request
ReservationId reservationId = ReservationId.newInstance(1653487680L, 1L);
ReservationListResponse response = interceptor.listReservations(
ReservationListRequest.newInstance("root.decided", reservationId.toString()));
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size());
}
@Test
public void testGetContainersRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Containers request");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " +
"or ApplicationAttemptId.", () -> interceptor.getContainers(null));
// normal request
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
// Wait for app to start
while(attemptsResponse.getApplicationAttemptList().size() == 0) {
attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
}
Assert.assertNotNull(attemptsResponse);
// Call GetContainers
GetContainersRequest containersRequest =
GetContainersRequest.newInstance(
attemptsResponse.getApplicationAttemptList().get(0).getApplicationAttemptId());
GetContainersResponse containersResponse =
interceptor.getContainers(containersRequest);
Assert.assertNotNull(containersResponse);
}
@Test
public void testGetContainerReportRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Container Report request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getContainerReport request " +
"or containerId", () -> interceptor.getContainerReport(null));
// normal request
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
// Wait for app to start
while(attemptsResponse.getApplicationAttemptList().size() == 0) {
attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
}
Assert.assertNotNull(attemptsResponse);
ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList().
get(0).getApplicationAttemptId();
ContainerId containerId = ContainerId.newContainerId(attemptId, 1);
// Call ContainerReport, RM does not allocate Container, Here is null
GetContainerReportRequest containerReportRequest =
GetContainerReportRequest.newInstance(containerId);
GetContainerReportResponse containerReportResponse =
interceptor.getContainerReport(containerReportRequest);
Assert.assertEquals(containerReportResponse, null);
}
@Test
public void getApplicationAttempts() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Application Attempts request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getApplicationAttempts " +
"request or application id.", () -> interceptor.getApplicationAttempts(null));
// normal request
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
Assert.assertNotNull(attemptsResponse);
}
@Test
public void testGetResourceTypeInfoRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.",
() -> interceptor.getResourceTypeInfo(null));
// normal request.
GetAllResourceTypeInfoResponse response =
interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance());
Assert.assertEquals(2, response.getResourceTypeInfo().size());
}
} }

View File

@ -32,6 +32,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -42,6 +45,12 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.junit.Assert; import org.junit.Assert;
@ -367,4 +376,167 @@ public class TestRouterYarnClientUtils {
Assert.assertEquals(expectedResponse, response.getLabelsToNodes()); Assert.assertEquals(expectedResponse, response.getLabelsToNodes());
} }
@Test
public void testMergeQueueUserAclsResponse() {
List<QueueACL> submitOnlyAcl = new ArrayList<>();
submitOnlyAcl.add(QueueACL.SUBMIT_APPLICATIONS);
List<QueueACL> administerOnlyAcl = new ArrayList<>();
administerOnlyAcl.add(QueueACL.ADMINISTER_QUEUE);
List<QueueACL> submitAndAdministerAcl = new ArrayList<>();
submitAndAdministerAcl.add(QueueACL.ADMINISTER_QUEUE);
submitAndAdministerAcl.add(QueueACL.SUBMIT_APPLICATIONS);
QueueUserACLInfo queueUserACLInfo1 = QueueUserACLInfo.newInstance(
"root", submitAndAdministerAcl);
QueueUserACLInfo queueUserACLInfo2 = QueueUserACLInfo.newInstance(
"default", submitOnlyAcl);
QueueUserACLInfo queueUserACLInfo3 = QueueUserACLInfo.newInstance(
"root", submitAndAdministerAcl);
QueueUserACLInfo queueUserACLInfo4 = QueueUserACLInfo.newInstance(
"yarn", administerOnlyAcl);
List<QueueUserACLInfo> queueUserACLInfoList1 = new ArrayList<>();
List<QueueUserACLInfo> queueUserACLInfoList2 = new ArrayList<>();
queueUserACLInfoList1.add(queueUserACLInfo1);
queueUserACLInfoList1.add(queueUserACLInfo2);
queueUserACLInfoList2.add(queueUserACLInfo3);
queueUserACLInfoList2.add(queueUserACLInfo4);
// normal response
GetQueueUserAclsInfoResponse response1 = Records.newRecord(
GetQueueUserAclsInfoResponse.class);
response1.setUserAclsInfoList(queueUserACLInfoList1);
GetQueueUserAclsInfoResponse response2 = Records.newRecord(
GetQueueUserAclsInfoResponse.class);
response2.setUserAclsInfoList(queueUserACLInfoList2);
// empty response
GetQueueUserAclsInfoResponse response3 = Records.newRecord(
GetQueueUserAclsInfoResponse.class);
// null responce
GetQueueUserAclsInfoResponse response4 = null;
List<GetQueueUserAclsInfoResponse> responses = new ArrayList<>();
responses.add(response1);
responses.add(response2);
responses.add(response3);
responses.add(response4);
// expected user acls
List<QueueUserACLInfo> expectedOutput = new ArrayList<>();
expectedOutput.add(queueUserACLInfo1);
expectedOutput.add(queueUserACLInfo2);
expectedOutput.add(queueUserACLInfo4);
GetQueueUserAclsInfoResponse response =
RouterYarnClientUtils.mergeQueueUserAcls(responses);
Assert.assertTrue(CollectionUtils.isEqualCollection(expectedOutput,
response.getUserAclsInfoList()));
}
@Test
public void testMergeReservationsList() {
// normal response
ReservationListResponse response1 = createReservationListResponse(
165348678000L, 165348690000L, 165348678000L, 1L);
ReservationListResponse response2 = createReservationListResponse(
165348750000L, 165348768000L, 165348750000L, 1L);
// empty response
ReservationListResponse response3 = ReservationListResponse.newInstance(new ArrayList<>());
// null response
ReservationListResponse response4 = null;
List<ReservationListResponse> responses = new ArrayList<>();
responses.add(response1);
responses.add(response2);
responses.add(response3);
responses.add(response4);
// expected response
List<ReservationAllocationState> expectedResponse = new ArrayList<>();
expectedResponse.addAll(response1.getReservationAllocationState());
expectedResponse.addAll(response2.getReservationAllocationState());
ReservationListResponse response =
RouterYarnClientUtils.mergeReservationsList(responses);
Assert.assertEquals(expectedResponse, response.getReservationAllocationState());
}
private ReservationListResponse createReservationListResponse(long startTime,
long endTime, long reservationTime, long reservationNumber) {
List<ReservationAllocationState> reservationsList = new ArrayList<>();
ReservationDefinition reservationDefinition =
Records.newRecord(ReservationDefinition.class);
reservationDefinition.setArrival(startTime);
reservationDefinition.setDeadline(endTime);
ReservationAllocationState reservationAllocationState =
Records.newRecord(ReservationAllocationState.class);
ReservationId reservationId = ReservationId.newInstance(reservationTime,
reservationNumber);
reservationAllocationState.setReservationDefinition(reservationDefinition);
reservationAllocationState.setReservationId(reservationId);
reservationsList.add(reservationAllocationState);
return ReservationListResponse.newInstance(reservationsList);
}
@Test
public void testMergeResourceTypes() {
ResourceTypeInfo resourceTypeInfo1 = ResourceTypeInfo.newInstance("vcores");
ResourceTypeInfo resourceTypeInfo2 = ResourceTypeInfo.newInstance("gpu");
ResourceTypeInfo resourceTypeInfo3 = ResourceTypeInfo.newInstance("memory-mb");
List<ResourceTypeInfo> resourceTypeInfoList1 = new ArrayList<>();
resourceTypeInfoList1.add(resourceTypeInfo1);
resourceTypeInfoList1.add(resourceTypeInfo3);
List<ResourceTypeInfo> resourceTypeInfoList2 = new ArrayList<>();
resourceTypeInfoList2.add(resourceTypeInfo3);
resourceTypeInfoList2.add(resourceTypeInfo2);
// normal response
GetAllResourceTypeInfoResponse response1 =
Records.newRecord(GetAllResourceTypeInfoResponse.class);
response1.setResourceTypeInfo(resourceTypeInfoList1);
GetAllResourceTypeInfoResponse response2 =
Records.newRecord(GetAllResourceTypeInfoResponse.class);
response2.setResourceTypeInfo(resourceTypeInfoList2);
// empty response
GetAllResourceTypeInfoResponse response3 =
Records.newRecord(GetAllResourceTypeInfoResponse.class);
// null response
GetAllResourceTypeInfoResponse response4 = null;
List<GetAllResourceTypeInfoResponse> responses = new ArrayList<>();
responses.add(response1);
responses.add(response2);
responses.add(response3);
responses.add(response4);
// expected response
List<ResourceTypeInfo> expectedResponse = new ArrayList<>();
expectedResponse.add(resourceTypeInfo1);
expectedResponse.add(resourceTypeInfo2);
expectedResponse.add(resourceTypeInfo3);
GetAllResourceTypeInfoResponse response =
RouterYarnClientUtils.mergeResourceTypes(responses);
Assert.assertTrue(CollectionUtils.isEqualCollection(expectedResponse,
response.getResourceTypeInfo()));
}
} }

View File

@ -45,7 +45,7 @@
<property> <property>
<name>yarn.scheduler.capacity.root.queues</name> <name>yarn.scheduler.capacity.root.queues</name>
<value>default</value> <value>default,decided</value>
<description> <description>
The queues at the this level (root is the root queue). The queues at the this level (root is the root queue).
</description> </description>
@ -97,6 +97,15 @@
</description> </description>
</property> </property>
<property>
<name>yarn.scheduler.capacity.root.decided.reservable</name>
<value>true</value>
<description>
indicates to the ReservationSystem that the queues resources
is available for users to reserve.
</description>
</property>
<property> <property>
<name>yarn.scheduler.capacity.node-locality-delay</name> <name>yarn.scheduler.capacity.node-locality-delay</name>
<value>-1</value> <value>-1</value>