YARN-11445. [Federation] Add getClusterInfo, getClusterUserInfo REST APIs for Router. (#5472)
This commit is contained in:
parent
759ddebb13
commit
fa723ae839
|
@ -44,6 +44,8 @@ public class ClusterInfo {
|
|||
protected String hadoopVersionBuiltOn;
|
||||
protected String haZooKeeperConnectionState;
|
||||
|
||||
private String subClusterId;
|
||||
|
||||
public ClusterInfo() {
|
||||
} // JAXB needs this
|
||||
|
||||
|
@ -113,4 +115,12 @@ public class ClusterInfo {
|
|||
public String getHAZookeeperConnectionState() {
|
||||
return this.haZooKeeperConnectionState;
|
||||
}
|
||||
|
||||
public String getSubClusterId() {
|
||||
return subClusterId;
|
||||
}
|
||||
|
||||
public void setSubClusterId(String subClusterId) {
|
||||
this.subClusterId = subClusterId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,8 @@ public class ClusterUserInfo {
|
|||
// User who has placed the request
|
||||
protected String requestedUser;
|
||||
|
||||
private String subClusterId;
|
||||
|
||||
public ClusterUserInfo() {
|
||||
}
|
||||
|
||||
|
@ -61,4 +63,12 @@ public class ClusterUserInfo {
|
|||
public String getRequestedUser() {
|
||||
return requestedUser;
|
||||
}
|
||||
|
||||
public String getSubClusterId() {
|
||||
return subClusterId;
|
||||
}
|
||||
|
||||
public void setSubClusterId(String subClusterId) {
|
||||
this.subClusterId = subClusterId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -159,6 +159,10 @@ public final class RouterMetrics {
|
|||
private MutableGaugeInt numAddToClusterNodeLabelsFailedRetrieved;
|
||||
@Metric("# of removeFromClusterNodeLabels failed to be retrieved")
|
||||
private MutableGaugeInt numRemoveFromClusterNodeLabelsFailedRetrieved;
|
||||
@Metric("# of getClusterInfo failed to be retrieved")
|
||||
private MutableGaugeInt numGetClusterInfoFailedRetrieved;
|
||||
@Metric("# of getClusterUserInfo failed to be retrieved")
|
||||
private MutableGaugeInt numGetClusterUserInfoFailedRetrieved;
|
||||
|
||||
// Aggregate metrics are shared, and don't have to be looked up per call
|
||||
@Metric("Total number of successful Submitted apps and latency(ms)")
|
||||
|
@ -279,6 +283,10 @@ public final class RouterMetrics {
|
|||
private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved;
|
||||
@Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)")
|
||||
private MutableRate totalSucceededRemoveFromClusterNodeLabelsRetrieved;
|
||||
@Metric("Total number of successful Retrieved GetClusterInfoRetrieved and latency(ms)")
|
||||
private MutableRate totalSucceededGetClusterInfoRetrieved;
|
||||
@Metric("Total number of successful Retrieved GetClusterUserInfoRetrieved and latency(ms)")
|
||||
private MutableRate totalSucceededGetClusterUserInfoRetrieved;
|
||||
|
||||
/**
|
||||
* Provide quantile counters for all latencies.
|
||||
|
@ -342,6 +350,8 @@ public final class RouterMetrics {
|
|||
private MutableQuantiles replaceLabelsOnNodeLatency;
|
||||
private MutableQuantiles addToClusterNodeLabelsLatency;
|
||||
private MutableQuantiles removeFromClusterNodeLabelsLatency;
|
||||
private MutableQuantiles getClusterInfoLatency;
|
||||
private MutableQuantiles getClusterUserInfoLatency;
|
||||
|
||||
private static volatile RouterMetrics instance = null;
|
||||
private static MetricsRegistry registry;
|
||||
|
@ -551,6 +561,12 @@ public final class RouterMetrics {
|
|||
|
||||
removeFromClusterNodeLabelsLatency = registry.newQuantiles("removeFromClusterNodeLabelsLatency",
|
||||
"latency of remove cluster nodelabels timeouts", "ops", "latency", 10);
|
||||
|
||||
getClusterInfoLatency = registry.newQuantiles("getClusterInfoLatency",
|
||||
"latency of get cluster info timeouts", "ops", "latency", 10);
|
||||
|
||||
getClusterUserInfoLatency = registry.newQuantiles("getClusterUserInfoLatency",
|
||||
"latency of get cluster user info timeouts", "ops", "latency", 10);
|
||||
}
|
||||
|
||||
public static RouterMetrics getMetrics() {
|
||||
|
@ -847,6 +863,16 @@ public final class RouterMetrics {
|
|||
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededGetClusterInfoRetrieved() {
|
||||
return totalSucceededGetClusterInfoRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededGetClusterUserInfoRetrieved() {
|
||||
return totalSucceededGetClusterUserInfoRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
|
||||
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
|
||||
|
@ -1137,6 +1163,16 @@ public final class RouterMetrics {
|
|||
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededGetClusterInfoRetrieved() {
|
||||
return totalSucceededGetClusterInfoRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededGetClusterUserInfoRetrieved() {
|
||||
return totalSucceededGetClusterUserInfoRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
|
||||
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
|
||||
|
@ -1382,6 +1418,14 @@ public final class RouterMetrics {
|
|||
return numRemoveFromClusterNodeLabelsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getClusterInfoFailedRetrieved() {
|
||||
return numGetClusterInfoFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getClusterUserInfoFailedRetrieved() {
|
||||
return numGetClusterUserInfoFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getDelegationTokenFailedRetrieved() {
|
||||
return numGetDelegationTokenFailedRetrieved.value();
|
||||
}
|
||||
|
@ -1685,6 +1729,16 @@ public final class RouterMetrics {
|
|||
removeFromClusterNodeLabelsLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededGetClusterInfoRetrieved(long duration) {
|
||||
totalSucceededGetClusterInfoRetrieved.add(duration);
|
||||
getClusterInfoLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededGetClusterUserInfoRetrieved(long duration) {
|
||||
totalSucceededGetClusterUserInfoRetrieved.add(duration);
|
||||
getClusterUserInfoLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
|
||||
totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
|
||||
refreshSuperUserGroupsConfLatency.add(duration);
|
||||
|
@ -1905,6 +1959,14 @@ public final class RouterMetrics {
|
|||
numRemoveFromClusterNodeLabelsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrGetClusterInfoFailedRetrieved() {
|
||||
numGetClusterInfoFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrGetClusterUserInfoFailedRetrieved() {
|
||||
numGetClusterUserInfoFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrGetDelegationTokenFailedRetrieved() {
|
||||
numGetDelegationTokenFailedRetrieved.incr();
|
||||
}
|
||||
|
|
|
@ -129,6 +129,8 @@ import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesI
|
|||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
|
@ -1137,14 +1139,84 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
return getClusterInfo();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method retrieves the cluster information, and it is reachable by using
|
||||
* {@link RMWSConsts#INFO}.
|
||||
*
|
||||
* In Federation mode, we will return a FederationClusterInfo object,
|
||||
* which contains a set of ClusterInfo.
|
||||
*
|
||||
* @return the cluster information.
|
||||
*/
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
try {
|
||||
long startTime = Time.now();
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
||||
Class[] argsClasses = new Class[]{};
|
||||
Object[] args = new Object[]{};
|
||||
ClientMethod remoteMethod = new ClientMethod("getClusterInfo", argsClasses, args);
|
||||
Map<SubClusterInfo, ClusterInfo> subClusterInfoMap =
|
||||
invokeConcurrent(subClustersActive.values(), remoteMethod, ClusterInfo.class);
|
||||
FederationClusterInfo federationClusterInfo = new FederationClusterInfo();
|
||||
subClusterInfoMap.forEach((subClusterInfo, clusterInfo) -> {
|
||||
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
|
||||
clusterInfo.setSubClusterId(subClusterId.getId());
|
||||
federationClusterInfo.getList().add(clusterInfo);
|
||||
});
|
||||
long stopTime = Time.now();
|
||||
routerMetrics.succeededGetClusterInfoRetrieved(stopTime - startTime);
|
||||
return federationClusterInfo;
|
||||
} catch (NotFoundException e) {
|
||||
routerMetrics.incrGetClusterInfoFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
|
||||
} catch (YarnException | IOException e) {
|
||||
routerMetrics.incrGetClusterInfoFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowRunTimeException("getClusterInfo error.", e);
|
||||
}
|
||||
routerMetrics.incrGetClusterInfoFailedRetrieved();
|
||||
throw new RuntimeException("getClusterInfo error.");
|
||||
}
|
||||
|
||||
/**
|
||||
* This method retrieves the cluster user information, and it is reachable by using
|
||||
* {@link RMWSConsts#CLUSTER_USER_INFO}.
|
||||
*
|
||||
* In Federation mode, we will return a ClusterUserInfo object,
|
||||
* which contains a set of ClusterUserInfo.
|
||||
*
|
||||
* @param hsr the servlet request
|
||||
* @return the cluster user information
|
||||
*/
|
||||
@Override
|
||||
public ClusterUserInfo getClusterUserInfo(HttpServletRequest hsr) {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
try {
|
||||
long startTime = Time.now();
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
|
||||
final HttpServletRequest hsrCopy = clone(hsr);
|
||||
Class[] argsClasses = new Class[]{HttpServletRequest.class};
|
||||
Object[] args = new Object[]{hsrCopy};
|
||||
ClientMethod remoteMethod = new ClientMethod("getClusterUserInfo", argsClasses, args);
|
||||
Map<SubClusterInfo, ClusterUserInfo> subClusterInfoMap =
|
||||
invokeConcurrent(subClustersActive.values(), remoteMethod, ClusterUserInfo.class);
|
||||
FederationClusterUserInfo federationClusterUserInfo = new FederationClusterUserInfo();
|
||||
subClusterInfoMap.forEach((subClusterInfo, clusterUserInfo) -> {
|
||||
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
|
||||
clusterUserInfo.setSubClusterId(subClusterId.getId());
|
||||
federationClusterUserInfo.getList().add(clusterUserInfo);
|
||||
});
|
||||
long stopTime = Time.now();
|
||||
routerMetrics.succeededGetClusterUserInfoRetrieved(stopTime - startTime);
|
||||
return federationClusterUserInfo;
|
||||
} catch (NotFoundException e) {
|
||||
routerMetrics.incrGetClusterUserInfoFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
|
||||
} catch (YarnException | IOException e) {
|
||||
routerMetrics.incrGetClusterUserInfoFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowRunTimeException("getClusterUserInfo error.", e);
|
||||
}
|
||||
routerMetrics.incrGetClusterUserInfoFailedRetrieved();
|
||||
throw new RuntimeException("getClusterUserInfo error.");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.router.webapp.dao;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class FederationClusterInfo extends ClusterInfo {
|
||||
|
||||
@XmlElement(name = "subCluster")
|
||||
private List<ClusterInfo> list = new ArrayList<>();
|
||||
|
||||
public FederationClusterInfo() {
|
||||
} // JAXB needs this
|
||||
|
||||
public FederationClusterInfo(ArrayList<ClusterInfo> list) {
|
||||
this.list = list;
|
||||
}
|
||||
|
||||
public List<ClusterInfo> getList() {
|
||||
return list;
|
||||
}
|
||||
|
||||
public void setList(List<ClusterInfo> list) {
|
||||
this.list = list;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.router.webapp.dao;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class FederationClusterUserInfo extends ClusterUserInfo {
|
||||
@XmlElement(name = "subCluster")
|
||||
private List<ClusterUserInfo> list = new ArrayList<>();
|
||||
|
||||
public FederationClusterUserInfo() {
|
||||
} // JAXB needs this
|
||||
|
||||
public FederationClusterUserInfo(ArrayList<ClusterUserInfo> list) {
|
||||
this.list = list;
|
||||
}
|
||||
|
||||
public List<ClusterUserInfo> getList() {
|
||||
return list;
|
||||
}
|
||||
|
||||
public void setList(List<ClusterUserInfo> list) {
|
||||
this.list = list;
|
||||
}
|
||||
}
|
|
@ -568,6 +568,16 @@ public class TestRouterMetrics {
|
|||
LOG.info("Mocked: failed getBulkActivitie call");
|
||||
metrics.incrGetBulkActivitiesFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getClusterInfoFailed() {
|
||||
LOG.info("Mocked: failed getClusterInfo call");
|
||||
metrics.incrGetClusterInfoFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getClusterUserInfoFailed() {
|
||||
LOG.info("Mocked: failed getClusterUserInfo call");
|
||||
metrics.incrGetClusterUserInfoFailedRetrieved();
|
||||
}
|
||||
}
|
||||
|
||||
// Records successes for all calls
|
||||
|
@ -838,6 +848,16 @@ public class TestRouterMetrics {
|
|||
LOG.info("Mocked: successful AddToClusterNodeLabels call with duration {}", duration);
|
||||
metrics.succeededAddToClusterNodeLabelsRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getClusterInfoRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful GetClusterInfoRetrieved call with duration {}", duration);
|
||||
metrics.succeededGetClusterInfoRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getClusterUserInfoRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful GetClusterUserInfoRetrieved call with duration {}", duration);
|
||||
metrics.succeededGetClusterUserInfoRetrieved(duration);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1848,4 +1868,48 @@ public class TestRouterMetrics {
|
|||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededAddToClusterNodeLabelsRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterInfoRetrievedFailed() {
|
||||
long totalBadBefore = metrics.getClusterInfoFailedRetrieved();
|
||||
badSubCluster.getClusterInfoFailed();
|
||||
Assert.assertEquals(totalBadBefore + 1, metrics.getClusterInfoFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterInfoRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededGetClusterInfoRetrieved();
|
||||
goodSubCluster.getClusterInfoRetrieved(150);
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededGetClusterInfoRetrieved());
|
||||
Assert.assertEquals(150,
|
||||
metrics.getLatencySucceededGetClusterInfoRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
goodSubCluster.getClusterInfoRetrieved(300);
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededGetClusterInfoRetrieved());
|
||||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededGetClusterInfoRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterUserInfoRetrievedFailed() {
|
||||
long totalBadBefore = metrics.getClusterUserInfoFailedRetrieved();
|
||||
badSubCluster.getClusterUserInfoFailed();
|
||||
Assert.assertEquals(totalBadBefore + 1, metrics.getClusterUserInfoFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterUserInfoRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededGetClusterUserInfoRetrieved();
|
||||
goodSubCluster.getClusterUserInfoRetrieved(150);
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededGetClusterUserInfoRetrieved());
|
||||
Assert.assertEquals(150,
|
||||
metrics.getLatencySucceededGetClusterUserInfoRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
goodSubCluster.getClusterUserInfoRetrieved(300);
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededGetClusterUserInfoRetrieved());
|
||||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededGetClusterUserInfoRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
}
|
|
@ -111,7 +111,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||
|
@ -161,7 +163,6 @@ import org.mockito.Mockito;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT;
|
||||
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL;
|
||||
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED;
|
||||
|
@ -1363,4 +1364,17 @@ public class MockDefaultRequestInterceptorREST
|
|||
}
|
||||
throw new YarnException("removeFromClusterNodeLabels Error");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
ClusterInfo clusterInfo = new ClusterInfo(mockRM);
|
||||
return clusterInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterUserInfo getClusterUserInfo(HttpServletRequest hsr) {
|
||||
String remoteUser = hsr.getRemoteUser();
|
||||
UserGroupInformation callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
return new ClusterUserInfo(mockRM, callerUGI);
|
||||
}
|
||||
}
|
|
@ -73,10 +73,13 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHome
|
|||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
|
||||
|
@ -128,9 +131,12 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
|||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.junit.Assert;
|
||||
|
@ -2127,4 +2133,86 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
|
|||
LambdaTestUtils.intercept(YarnRuntimeException.class, "removeFromClusterNodeLabels Error",
|
||||
() -> interceptor.removeFromClusterNodeLabels(oldNodeLabels1, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterUserInfo() {
|
||||
String requestUserName = "test-user";
|
||||
HttpServletRequest hsr = mock(HttpServletRequest.class);
|
||||
when(hsr.getRemoteUser()).thenReturn(requestUserName);
|
||||
ClusterUserInfo clusterUserInfo = interceptor.getClusterUserInfo(hsr);
|
||||
|
||||
Assert.assertNotNull(clusterUserInfo);
|
||||
Assert.assertTrue(clusterUserInfo instanceof FederationClusterUserInfo);
|
||||
|
||||
FederationClusterUserInfo federationClusterUserInfo =
|
||||
(FederationClusterUserInfo) clusterUserInfo;
|
||||
|
||||
List<ClusterUserInfo> fedClusterUserInfoList = federationClusterUserInfo.getList();
|
||||
Assert.assertNotNull(fedClusterUserInfoList);
|
||||
Assert.assertEquals(4, fedClusterUserInfoList.size());
|
||||
|
||||
List<String> subClusterIds = subClusters.stream().map(
|
||||
subClusterId -> subClusterId.getId()).collect(Collectors.toList());
|
||||
MockRM mockRM = interceptor.getMockRM();
|
||||
|
||||
for (ClusterUserInfo fedClusterUserInfo : fedClusterUserInfoList) {
|
||||
// Check subClusterId
|
||||
String subClusterId = fedClusterUserInfo.getSubClusterId();
|
||||
Assert.assertNotNull(subClusterId);
|
||||
Assert.assertTrue(subClusterIds.contains(subClusterId));
|
||||
|
||||
// Check requestedUser
|
||||
String requestedUser = fedClusterUserInfo.getRequestedUser();
|
||||
Assert.assertNotNull(requestedUser);
|
||||
Assert.assertEquals(requestUserName, requestedUser);
|
||||
|
||||
// Check rmLoginUser
|
||||
String rmLoginUser = fedClusterUserInfo.getRmLoginUser();
|
||||
Assert.assertNotNull(rmLoginUser);
|
||||
Assert.assertEquals(mockRM.getRMLoginUser(), rmLoginUser);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterInfo() {
|
||||
ClusterInfo clusterInfos = interceptor.getClusterInfo();
|
||||
Assert.assertNotNull(clusterInfos);
|
||||
Assert.assertTrue(clusterInfos instanceof FederationClusterInfo);
|
||||
|
||||
FederationClusterInfo federationClusterInfos =
|
||||
(FederationClusterInfo) (clusterInfos);
|
||||
|
||||
List<ClusterInfo> fedClusterInfosList = federationClusterInfos.getList();
|
||||
Assert.assertNotNull(fedClusterInfosList);
|
||||
Assert.assertEquals(4, fedClusterInfosList.size());
|
||||
|
||||
List<String> subClusterIds = subClusters.stream().map(
|
||||
subClusterId -> subClusterId.getId()).collect(Collectors.toList());
|
||||
|
||||
MockRM mockRM = interceptor.getMockRM();
|
||||
String yarnVersion = YarnVersionInfo.getVersion();
|
||||
|
||||
for (ClusterInfo clusterInfo : fedClusterInfosList) {
|
||||
String subClusterId = clusterInfo.getSubClusterId();
|
||||
// Check subClusterId
|
||||
Assert.assertTrue(subClusterIds.contains(subClusterId));
|
||||
|
||||
// Check state
|
||||
String clusterState = mockRM.getServiceState().toString();
|
||||
Assert.assertEquals(clusterState, clusterInfo.getState());
|
||||
|
||||
// Check rmStateStoreName
|
||||
String rmStateStoreName =
|
||||
mockRM.getRMContext().getStateStore().getClass().getName();
|
||||
Assert.assertEquals(rmStateStoreName, clusterInfo.getRMStateStore());
|
||||
|
||||
// Check RM Version
|
||||
Assert.assertEquals(yarnVersion, clusterInfo.getRMVersion());
|
||||
|
||||
// Check haZooKeeperConnectionState
|
||||
String rmHAZookeeperConnectionState = mockRM.getRMContext().getHAZookeeperConnectionState();
|
||||
Assert.assertEquals(rmHAZookeeperConnectionState,
|
||||
clusterInfo.getHAZookeeperConnectionState());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -117,4 +117,8 @@ public class TestableFederationInterceptorREST
|
|||
}
|
||||
super.shutdown();
|
||||
}
|
||||
|
||||
public MockRM getMockRM() {
|
||||
return mockRM;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue