YARN-11320. [Federation] Add getSchedulerInfo REST APIs for Router. (#5217)

This commit is contained in:
slfan1989 2023-01-18 01:36:19 +08:00 committed by GitHub
parent 4de31123ce
commit 442a5fb285
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 217 additions and 3 deletions

View File

@ -121,6 +121,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
.getAutoCreationEligibility(parent);
defaultNodeLabelExpression = parent.getDefaultNodeLabelExpression();
schedulerName = "Capacity Scheduler";
}
public float getCapacity() {

View File

@ -45,6 +45,7 @@ public class FairSchedulerInfo extends SchedulerInfo {
scheduler = fs;
rootQueue = new FairSchedulerQueueInfo(scheduler.getQueueManager().
getRootQueue(), scheduler);
schedulerName = "Fair Scheduler";
}
/**

View File

@ -84,6 +84,8 @@ public class FifoSchedulerInfo extends SchedulerInfo {
this.totalNodeCapacity += ni.getTotalCapability().getMemorySize();
this.numContainers += fs.getNodeReport(ni.getNodeID()).getNumContainers();
}
this.schedulerName = "Fifo Scheduler";
}
public int getNumNodes() {

View File

@ -25,7 +25,8 @@ import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "scheduler")
@XmlAccessorType(XmlAccessType.FIELD)
public class SchedulerTypeInfo {
protected SchedulerInfo schedulerInfo;
private SchedulerInfo schedulerInfo;
private String subClusterId;
public SchedulerTypeInfo() {
} // JAXB needs this
@ -37,4 +38,12 @@ public class SchedulerTypeInfo {
public SchedulerInfo getSchedulerInfo() {
return schedulerInfo;
}
public String getSubClusterId() {
return subClusterId;
}
public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}

View File

@ -172,7 +172,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
}
}
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =

View File

@ -135,6 +135,8 @@ public final class RouterMetrics {
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
@Metric("# of getSchedulerInfo failed to be retrieved")
private MutableGaugeInt numGetSchedulerInfoFailedRetrieved;
@Metric("# of refreshSuperUserGroupsConfiguration failed to be retrieved")
private MutableGaugeInt numRefreshSuperUserGroupsConfigurationFailedRetrieved;
@Metric("# of refreshUserToGroupsMappings failed to be retrieved")
@ -240,6 +242,9 @@ public final class RouterMetrics {
@Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)")
private MutableRate totalSucceededRefreshUserToGroupsMappingsRetrieved;
@Metric("Total number of successful Retrieved GetSchedulerInfo and latency(ms)")
private MutableRate totalSucceededGetSchedulerInfoRetrieved;
/**
* Provide quantile counters for all latencies.
*/
@ -290,6 +295,7 @@ public final class RouterMetrics {
private MutableQuantiles getDelegationTokenLatency;
private MutableQuantiles renewDelegationTokenLatency;
private MutableQuantiles cancelDelegationTokenLatency;
private MutableQuantiles getSchedulerInfoRetrievedLatency;
private MutableQuantiles refreshSuperUserGroupsConfLatency;
private MutableQuantiles refreshUserToGroupsMappingsLatency;
@ -466,6 +472,9 @@ public final class RouterMetrics {
cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
"latency of cancel delegation token timeouts", "ops", "latency", 10);
getSchedulerInfoRetrievedLatency = registry.newQuantiles("getSchedulerInfoRetrievedLatency",
"latency of get scheduler info timeouts", "ops", "latency", 10);
refreshSuperUserGroupsConfLatency = registry.newQuantiles("refreshSuperUserGroupsConfLatency",
"latency of refresh superuser groups configuration timeouts", "ops", "latency", 10);
@ -727,6 +736,11 @@ public final class RouterMetrics {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetSchedulerInfoRetrieved() {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
@ -967,6 +981,11 @@ public final class RouterMetrics {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetSchedulerInfoRetrieved() {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
@ -1190,6 +1209,10 @@ public final class RouterMetrics {
return numCancelDelegationTokenFailedRetrieved.value();
}
public int getSchedulerInfoFailedRetrieved() {
return numGetSchedulerInfoFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -1425,6 +1448,11 @@ public final class RouterMetrics {
cancelDelegationTokenLatency.add(duration);
}
public void succeededGetSchedulerInfoRetrieved(long duration) {
totalSucceededGetSchedulerInfoRetrieved.add(duration);
getSchedulerInfoRetrievedLatency.add(duration);
}
public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
refreshSuperUserGroupsConfLatency.add(duration);
@ -1630,4 +1658,8 @@ public final class RouterMetrics {
public void incrCancelDelegationTokenFailedRetrieved() {
numCancelDelegationTokenFailedRetrieved.incr();
}
public void incrGetSchedulerInfoFailedRetrieved() {
numGetSchedulerInfoFailedRetrieved.incr();
}
}

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
@ -122,6 +123,7 @@ import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@ -1140,9 +1142,43 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
throw new NotImplementedException("Code is not implemented");
}
/**
* This method retrieves the current scheduler status, and it is reachable by
* using {@link RMWSConsts#SCHEDULER}.
*
* For the federation mode, the SchedulerType information of the cluster
* cannot be integrated and displayed, and the specific cluster information needs to be marked.
*
* @return the current scheduler status
*/
@Override
public SchedulerTypeInfo getSchedulerInfo() {
throw new NotImplementedException("Code is not implemented");
try {
long startTime = Time.now();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
Class[] argsClasses = new Class[]{};
Object[] args = new Object[]{};
ClientMethod remoteMethod = new ClientMethod("getSchedulerInfo", argsClasses, args);
Map<SubClusterInfo, SchedulerTypeInfo> subClusterInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, SchedulerTypeInfo.class);
FederationSchedulerTypeInfo federationSchedulerTypeInfo = new FederationSchedulerTypeInfo();
subClusterInfoMap.forEach((subClusterInfo, schedulerTypeInfo) -> {
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
schedulerTypeInfo.setSubClusterId(subClusterId.getId());
federationSchedulerTypeInfo.getList().add(schedulerTypeInfo);
});
long stopTime = Time.now();
routerMetrics.succeededGetSchedulerInfoRetrieved(stopTime - startTime);
return federationSchedulerTypeInfo;
} catch (NotFoundException e) {
routerMetrics.incrGetSchedulerInfoFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
} catch (YarnException | IOException e) {
routerMetrics.incrGetSchedulerInfoFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getSchedulerInfo error.", e);
}
routerMetrics.incrGetSchedulerInfoFailedRetrieved();
throw new RuntimeException("getSchedulerInfo error.");
}
@Override

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp.dao;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class FederationSchedulerTypeInfo extends SchedulerTypeInfo {
@XmlElement(name = "subCluster")
private List<SchedulerTypeInfo> list = new ArrayList<>();
public FederationSchedulerTypeInfo() {
} // JAXB needs this
public FederationSchedulerTypeInfo(ArrayList<SchedulerTypeInfo> list) {
this.list = list;
}
public List<SchedulerTypeInfo> getList() {
return list;
}
public void setList(List<SchedulerTypeInfo> list) {
this.list = list;
}
}

View File

@ -100,6 +100,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
@ -138,6 +140,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDelet
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@ -1206,4 +1211,17 @@ public class MockDefaultRequestInterceptorREST
return new RMQueueAclInfo(true, user.getUserName(), "");
}
}
@Override
public SchedulerTypeInfo getSchedulerInfo() {
try {
ResourceManager resourceManager = CapacitySchedulerTestUtilities.createResourceManager();
CapacityScheduler cs = (CapacityScheduler) resourceManager.getResourceScheduler();
CSQueue root = cs.getRootQueue();
SchedulerInfo schedulerInfo = new CapacitySchedulerInfo(root, cs);
return new SchedulerTypeInfo(schedulerInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
@ -37,6 +38,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -105,12 +107,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInf
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.Times;
@ -1526,6 +1534,64 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress());
}
@Test
public void testGetSchedulerInfo() {
// In this test case, we will get the return results of 4 sub-clusters.
SchedulerTypeInfo typeInfo = interceptor.getSchedulerInfo();
Assert.assertNotNull(typeInfo);
Assert.assertTrue(typeInfo instanceof FederationSchedulerTypeInfo);
FederationSchedulerTypeInfo federationSchedulerTypeInfo =
FederationSchedulerTypeInfo.class.cast(typeInfo);
Assert.assertNotNull(federationSchedulerTypeInfo);
List<SchedulerTypeInfo> schedulerTypeInfos = federationSchedulerTypeInfo.getList();
Assert.assertNotNull(schedulerTypeInfos);
Assert.assertEquals(4, schedulerTypeInfos.size());
List<String> subClusterIds =
subClusters.stream().map(subClusterId -> subClusterId.getId()).
collect(Collectors.toList());
for (SchedulerTypeInfo schedulerTypeInfo : schedulerTypeInfos) {
Assert.assertNotNull(schedulerTypeInfo);
// 1. Whether the returned subClusterId is in the subCluster list
String subClusterId = schedulerTypeInfo.getSubClusterId();
Assert.assertTrue(subClusterIds.contains(subClusterId));
// 2. We test CapacityScheduler, the returned type should be CapacityScheduler.
SchedulerInfo schedulerInfo = schedulerTypeInfo.getSchedulerInfo();
Assert.assertNotNull(schedulerInfo);
Assert.assertTrue(schedulerInfo instanceof CapacitySchedulerInfo);
CapacitySchedulerInfo capacitySchedulerInfo =
CapacitySchedulerInfo.class.cast(schedulerInfo);
Assert.assertNotNull(capacitySchedulerInfo);
// 3. The parent queue name should be root
String queueName = capacitySchedulerInfo.getQueueName();
Assert.assertEquals("root", queueName);
// 4. schedulerType should be CapacityScheduler
String schedulerType = capacitySchedulerInfo.getSchedulerType();
Assert.assertEquals("Capacity Scheduler", schedulerType);
// 5. queue path should be root
String queuePath = capacitySchedulerInfo.getQueuePath();
Assert.assertEquals("root", queuePath);
// 6. mockRM has 2 test queues, [root.a, root.b]
List<String> queues = Lists.newArrayList("root.a", "root.b");
CapacitySchedulerQueueInfoList csSchedulerQueueInfoList = capacitySchedulerInfo.getQueues();
Assert.assertNotNull(csSchedulerQueueInfoList);
List<CapacitySchedulerQueueInfo> csQueueInfoList =
csSchedulerQueueInfoList.getQueueInfoList();
Assert.assertEquals(2, csQueueInfoList.size());
for (CapacitySchedulerQueueInfo csQueueInfo : csQueueInfoList) {
Assert.assertNotNull(csQueueInfo);
Assert.assertTrue(queues.contains(csQueueInfo.getQueuePath()));
}
}
}
@Test
public void testPostDelegationTokenErrorHsr() throws Exception {
// Prepare delegationToken data