From 5bc8f2532746453cb0b57a4c3552b02fae984ffd Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 29 Mar 2023 00:33:19 +0800 Subject: [PATCH] YARN-11446. [Federation] Add updateSchedulerConfiguration, getSchedulerConfiguration REST APIs for Router. (#5476) --- .../hadoop/yarn/webapp/dao/ConfInfo.java | 11 ++ .../yarn/webapp/dao/SchedConfUpdateInfo.java | 11 ++ .../yarn/server/router/RouterMetrics.java | 62 ++++++++ .../webapp/FederationInterceptorREST.java | 134 +++++++++++++++++- .../router/webapp/dao/FederationConfInfo.java | 55 +++++++ .../yarn/server/router/TestRouterMetrics.java | 66 +++++++++ .../MockDefaultRequestInterceptorREST.java | 28 +++- .../webapp/TestFederationInterceptorREST.java | 69 +++++++++ 8 files changed, 430 insertions(+), 6 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationConfInfo.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/dao/ConfInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/dao/ConfInfo.java index 1971efa5684..7ca396f49d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/dao/ConfInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/dao/ConfInfo.java @@ -33,6 +33,8 @@ public class ConfInfo { private ArrayList property = new ArrayList<>(); + private String subClusterId; + public ConfInfo() { } // JAXB needs this @@ -74,5 +76,14 @@ public String getKey() { public String getValue() { return value; } + + } + + public String getSubClusterId() { + return subClusterId; + } + + public void setSubClusterId(String subClusterId) { + this.subClusterId = subClusterId; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/dao/SchedConfUpdateInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/dao/SchedConfUpdateInfo.java index 45462919ed1..8f3ad5d66e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/dao/SchedConfUpdateInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/dao/SchedConfUpdateInfo.java @@ -44,6 +44,9 @@ public class SchedConfUpdateInfo { @XmlElement(name = "update-queue") private ArrayList updateQueueInfo = new ArrayList<>(); + @XmlElement(name = "subClusterId") + private String subClusterId = ""; + private HashMap global = new HashMap<>(); public SchedConfUpdateInfo() { @@ -82,4 +85,12 @@ public HashMap getGlobalParams() { public void setGlobalParams(HashMap globalInfo) { this.global = globalInfo; } + + public String getSubClusterId() { + return subClusterId; + } + + public void setSubClusterId(String subClusterId) { + this.subClusterId = subClusterId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 3a581dfbd1f..a84a315b93c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -159,6 +159,10 @@ public final class RouterMetrics { private MutableGaugeInt numAddToClusterNodeLabelsFailedRetrieved; @Metric("# of removeFromClusterNodeLabels failed to be retrieved") private MutableGaugeInt numRemoveFromClusterNodeLabelsFailedRetrieved; + @Metric("# of numUpdateSchedulerConfiguration failed to be retrieved") + private MutableGaugeInt numUpdateSchedulerConfigurationFailedRetrieved; + @Metric("# of numGetSchedulerConfiguration failed to be retrieved") + private MutableGaugeInt numGetSchedulerConfigurationFailedRetrieved; @Metric("# of getClusterInfo failed to be retrieved") private MutableGaugeInt numGetClusterInfoFailedRetrieved; @Metric("# of getClusterUserInfo failed to be retrieved") @@ -287,6 +291,10 @@ public final class RouterMetrics { private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved; @Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)") private MutableRate totalSucceededRemoveFromClusterNodeLabelsRetrieved; + @Metric("Total number of successful Retrieved updateSchedulerConfiguration and latency(ms)") + private MutableRate totalSucceededUpdateSchedulerConfigurationRetrieved; + @Metric("Total number of successful Retrieved getSchedulerConfiguration and latency(ms)") + private MutableRate totalSucceededGetSchedulerConfigurationRetrieved; @Metric("Total number of successful Retrieved GetClusterInfoRetrieved and latency(ms)") private MutableRate totalSucceededGetClusterInfoRetrieved; @Metric("Total number of successful Retrieved GetClusterUserInfoRetrieved and latency(ms)") @@ -358,6 +366,8 @@ public final class RouterMetrics { private MutableQuantiles replaceLabelsOnNodeLatency; private MutableQuantiles addToClusterNodeLabelsLatency; private MutableQuantiles removeFromClusterNodeLabelsLatency; + private MutableQuantiles updateSchedulerConfigLatency; + private MutableQuantiles getSchedulerConfigurationLatency; private MutableQuantiles getClusterInfoLatency; private MutableQuantiles getClusterUserInfoLatency; private MutableQuantiles updateNodeResourceLatency; @@ -572,6 +582,12 @@ private RouterMetrics() { removeFromClusterNodeLabelsLatency = registry.newQuantiles("removeFromClusterNodeLabelsLatency", "latency of remove cluster nodelabels timeouts", "ops", "latency", 10); + updateSchedulerConfigLatency = registry.newQuantiles("updateSchedulerConfigurationLatency", + "latency of update scheduler configuration timeouts", "ops", "latency", 10); + + getSchedulerConfigurationLatency = registry.newQuantiles("getSchedulerConfigurationLatency", + "latency of get scheduler configuration timeouts", "ops", "latency", 10); + getClusterInfoLatency = registry.newQuantiles("getClusterInfoLatency", "latency of get cluster info timeouts", "ops", "latency", 10); @@ -879,6 +895,16 @@ public long getNumSucceededRemoveFromClusterNodeLabelsRetrieved() { return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededUpdateSchedulerConfigurationRetrieved() { + return totalSucceededUpdateSchedulerConfigurationRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetSchedulerConfigurationRetrieved() { + return totalSucceededGetSchedulerConfigurationRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededGetClusterInfoRetrieved() { return totalSucceededGetClusterInfoRetrieved.lastStat().numSamples(); @@ -1189,6 +1215,16 @@ public double getLatencySucceededRemoveFromClusterNodeLabelsRetrieved() { return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededUpdateSchedulerConfigurationRetrieved() { + return totalSucceededUpdateSchedulerConfigurationRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetSchedulerConfigurationRetrieved() { + return totalSucceededGetSchedulerConfigurationRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededGetClusterInfoRetrieved() { return totalSucceededGetClusterInfoRetrieved.lastStat().mean(); @@ -1454,6 +1490,14 @@ public int getNumRemoveFromClusterNodeLabelsFailedRetrieved() { return numRemoveFromClusterNodeLabelsFailedRetrieved.value(); } + public int getUpdateSchedulerConfigurationFailedRetrieved() { + return numUpdateSchedulerConfigurationFailedRetrieved.value(); + } + + public int getSchedulerConfigurationFailedRetrieved() { + return numGetSchedulerConfigurationFailedRetrieved.value(); + } + public int getClusterInfoFailedRetrieved() { return numGetClusterInfoFailedRetrieved.value(); } @@ -1773,6 +1817,16 @@ public void succeededRemoveFromClusterNodeLabelsRetrieved(long duration) { removeFromClusterNodeLabelsLatency.add(duration); } + public void succeededUpdateSchedulerConfigurationRetrieved(long duration) { + totalSucceededUpdateSchedulerConfigurationRetrieved.add(duration); + updateSchedulerConfigLatency.add(duration); + } + + public void succeededGetSchedulerConfigurationRetrieved(long duration) { + totalSucceededGetSchedulerConfigurationRetrieved.add(duration); + getSchedulerConfigurationLatency.add(duration); + } + public void succeededGetClusterInfoRetrieved(long duration) { totalSucceededGetClusterInfoRetrieved.add(duration); getClusterInfoLatency.add(duration); @@ -2013,6 +2067,14 @@ public void incrRemoveFromClusterNodeLabelsFailedRetrieved() { numRemoveFromClusterNodeLabelsFailedRetrieved.incr(); } + public void incrUpdateSchedulerConfigurationFailedRetrieved() { + numUpdateSchedulerConfigurationFailedRetrieved.incr(); + } + + public void incrGetSchedulerConfigurationFailedRetrieved() { + numGetSchedulerConfigurationFailedRetrieved.incr(); + } + public void incrGetClusterInfoFailedRetrieved() { numGetClusterInfoFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 857e4c52c6f..9975823ec2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -44,7 +44,6 @@ import javax.ws.rs.core.Response.Status; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.prefetch.Validate; @@ -129,6 +128,7 @@ import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationConfInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -136,6 +136,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.LRUCacheHashMap; +import org.apache.hadoop.yarn.webapp.dao.ConfInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; @@ -848,6 +849,29 @@ private Map getActiveSubclusters() } } + /** + * Get the active subcluster in the federation. + * + * @param subClusterId subClusterId. + * @return subClusterInfo. + * @throws NotFoundException If the subclusters cannot be found. + */ + private SubClusterInfo getActiveSubCluster(String subClusterId) + throws NotFoundException { + try { + SubClusterId pSubClusterId = SubClusterId.newInstance(subClusterId); + Map subClusterInfoMap = + federationFacade.getSubClusters(true); + SubClusterInfo subClusterInfo = subClusterInfoMap.get(pSubClusterId); + if (subClusterInfo == null) { + throw new NotFoundException(subClusterId + " not found."); + } + return subClusterInfo; + } catch (YarnException e) { + throw new NotFoundException(e.getMessage()); + } + } + /** * The YARN Router will forward to the request to all the SubClusters to find * where the node is running. @@ -2906,17 +2930,117 @@ public ContainerInfo getContainer(HttpServletRequest req, throw new RuntimeException("getContainer Failed."); } + /** + * This method updates the Scheduler configuration, and it is reachable by + * using {@link RMWSConsts#SCHEDULER_CONF}. + * + * @param mutationInfo th information for making scheduler configuration + * changes (supports adding, removing, or updating a queue, as well + * as global scheduler conf changes) + * @param hsr the servlet request + * @return Response containing the status code + * @throws AuthorizationException if the user is not authorized to invoke this + * method + * @throws InterruptedException if interrupted + */ @Override public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo, - HttpServletRequest hsr) - throws AuthorizationException, InterruptedException { - throw new NotImplementedException("Code is not implemented"); + HttpServletRequest hsr) throws AuthorizationException, InterruptedException { + + // Make Sure mutationInfo is not null. + if (mutationInfo == null) { + routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved(); + throw new IllegalArgumentException( + "Parameter error, the schedConfUpdateInfo is empty or null."); + } + + // In federated mode, we may have a mix of multiple schedulers. + // In order to ensure accurate update scheduler configuration, + // we need users to explicitly set subClusterId. + String pSubClusterId = mutationInfo.getSubClusterId(); + if (StringUtils.isBlank(pSubClusterId)) { + routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved(); + throw new IllegalArgumentException("Parameter error, " + + "the subClusterId is empty or null."); + } + + // Get the subClusterInfo , then update the scheduler configuration. + try { + long startTime = clock.getTime(); + SubClusterInfo subClusterInfo = getActiveSubCluster(pSubClusterId); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + Response response = interceptor.updateSchedulerConfiguration(mutationInfo, hsr); + if (response != null) { + long endTime = clock.getTime(); + routerMetrics.succeededUpdateSchedulerConfigurationRetrieved(endTime - startTime); + return Response.status(response.getStatus()).entity(response.getEntity()).build(); + } + } catch (NotFoundException e) { + routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException(e, + "Get subCluster error. subClusterId = %s", pSubClusterId); + } catch (Exception e) { + routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException(e, + "UpdateSchedulerConfiguration error. subClusterId = %s", pSubClusterId); + } + + routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved(); + throw new RuntimeException("UpdateSchedulerConfiguration error. subClusterId = " + + pSubClusterId); } + /** + * This method retrieves all the Scheduler configuration, and it is reachable + * by using {@link RMWSConsts#SCHEDULER_CONF}. + * + * @param hsr the servlet request + * @return Response containing the status code + * @throws AuthorizationException if the user is not authorized to invoke this + * method. + */ @Override public Response getSchedulerConfiguration(HttpServletRequest hsr) throws AuthorizationException { - throw new NotImplementedException("Code is not implemented"); + try { + long startTime = clock.getTime(); + FederationConfInfo federationConfInfo = new FederationConfInfo(); + Map subClustersActive = getActiveSubclusters(); + final HttpServletRequest hsrCopy = clone(hsr); + Class[] argsClasses = new Class[]{HttpServletRequest.class}; + Object[] args = new Object[]{hsrCopy}; + ClientMethod remoteMethod = new ClientMethod("getSchedulerConfiguration", argsClasses, args); + Map responseMap = + invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class); + responseMap.forEach((subClusterInfo, response) -> { + SubClusterId subClusterId = subClusterInfo.getSubClusterId(); + if (response == null) { + String errorMsg = subClusterId + " Can't getSchedulerConfiguration."; + federationConfInfo.getErrorMsgs().add(errorMsg); + } else if (response.getStatus() == Status.BAD_REQUEST.getStatusCode()) { + String errorMsg = String.valueOf(response.getEntity()); + federationConfInfo.getErrorMsgs().add(errorMsg); + } else if (response.getStatus() == Status.OK.getStatusCode()) { + ConfInfo fedConfInfo = ConfInfo.class.cast(response.getEntity()); + fedConfInfo.setSubClusterId(subClusterId.getId()); + federationConfInfo.getList().add(fedConfInfo); + } + }); + long endTime = clock.getTime(); + routerMetrics.succeededGetSchedulerConfigurationRetrieved(endTime - startTime); + return Response.status(Status.OK).entity(federationConfInfo).build(); + } catch (NotFoundException e) { + RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e); + routerMetrics.incrGetSchedulerConfigurationFailedRetrieved(); + } catch (Exception e) { + routerMetrics.incrGetSchedulerConfigurationFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException("getSchedulerConfiguration error.", e); + return Response.status(Status.BAD_REQUEST).entity("getSchedulerConfiguration error.").build(); + } + + routerMetrics.incrGetSchedulerConfigurationFailedRetrieved(); + throw new RuntimeException("getSchedulerConfiguration error."); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationConfInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationConfInfo.java new file mode 100644 index 00000000000..6a5e611a4f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationConfInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.webapp.dao; + +import org.apache.hadoop.yarn.webapp.dao.ConfInfo; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class FederationConfInfo extends ConfInfo { + @XmlElement(name = "subCluster") + private List list = new ArrayList<>(); + + @XmlElement(name = "errorMsgs") + private List errorMsgs = new ArrayList<>(); + public FederationConfInfo() { + } // JAXB needs this + + public List getList() { + return list; + } + + public void setList(List list) { + this.list = list; + } + + public List getErrorMsgs() { + return errorMsgs; + } + + public void setErrorMsgs(List errorMsgs) { + this.errorMsgs = errorMsgs; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 4af7e8c7f5a..f8dc03a04c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -569,6 +569,16 @@ public void getBulkActivitiesFailed() { metrics.incrGetBulkActivitiesFailedRetrieved(); } + public void getSchedulerConfigurationFailed() { + LOG.info("Mocked: failed getSchedulerConfiguration call"); + metrics.incrGetSchedulerConfigurationFailedRetrieved(); + } + + public void updateSchedulerConfigurationFailedRetrieved() { + LOG.info("Mocked: failed updateSchedulerConfiguration call"); + metrics.incrUpdateSchedulerConfigurationFailedRetrieved(); + } + public void getClusterInfoFailed() { LOG.info("Mocked: failed getClusterInfo call"); metrics.incrGetClusterInfoFailedRetrieved(); @@ -859,6 +869,16 @@ public void addToClusterNodeLabelsRetrieved(long duration) { metrics.succeededAddToClusterNodeLabelsRetrieved(duration); } + public void getSchedulerConfigurationRetrieved(long duration) { + LOG.info("Mocked: successful GetSchedulerConfiguration call with duration {}", duration); + metrics.succeededGetSchedulerConfigurationRetrieved(duration); + } + + public void getUpdateSchedulerConfigurationRetrieved(long duration) { + LOG.info("Mocked: successful UpdateSchedulerConfiguration call with duration {}", duration); + metrics.succeededUpdateSchedulerConfigurationRetrieved(duration); + } + public void getClusterInfoRetrieved(long duration) { LOG.info("Mocked: successful GetClusterInfoRetrieved call with duration {}", duration); metrics.succeededGetClusterInfoRetrieved(duration); @@ -1889,6 +1909,52 @@ public void testAddToClusterNodeLabelsRetrieved() { metrics.getLatencySucceededAddToClusterNodeLabelsRetrieved(), ASSERT_DOUBLE_DELTA); } + @Test + public void testGetSchedulerConfigurationRetrievedFailed() { + long totalBadBefore = metrics.getSchedulerConfigurationFailedRetrieved(); + badSubCluster.getSchedulerConfigurationFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getSchedulerConfigurationFailedRetrieved()); + } + + @Test + public void testGetSchedulerConfigurationRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetSchedulerConfigurationRetrieved(); + goodSubCluster.getSchedulerConfigurationRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetSchedulerConfigurationRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetSchedulerConfigurationRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getSchedulerConfigurationRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetSchedulerConfigurationRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetSchedulerConfigurationRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testUpdateSchedulerConfigurationRetrievedFailed() { + long totalBadBefore = metrics.getUpdateSchedulerConfigurationFailedRetrieved(); + badSubCluster.updateSchedulerConfigurationFailedRetrieved(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getUpdateSchedulerConfigurationFailedRetrieved()); + } + + @Test + public void testUpdateSchedulerConfigurationRetrieved() { + long totalGoodBefore = metrics.getNumSucceededUpdateSchedulerConfigurationRetrieved(); + goodSubCluster.getUpdateSchedulerConfigurationRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededUpdateSchedulerConfigurationRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededUpdateSchedulerConfigurationRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getUpdateSchedulerConfigurationRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededUpdateSchedulerConfigurationRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededUpdateSchedulerConfigurationRetrieved(), ASSERT_DOUBLE_DELTA); + } + @Test public void testGetClusterInfoRetrievedFailed() { long totalBadBefore = metrics.getClusterInfoFailedRetrieved(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index c34167f9219..d4e1b5145cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -102,6 +102,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; @@ -159,6 +160,8 @@ import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.apache.hadoop.yarn.webapp.dao.ConfInfo; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1007,7 +1010,7 @@ private void updateReservation(ReservationUpdateRequestInfo resContext) throws I } if (resContext.getReservationId() == null) { - throw new BadRequestException("Update operations must specify an existing ReservaitonId"); + throw new BadRequestException("Update operations must specify an existing ReservationId"); } ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values(); @@ -1366,6 +1369,29 @@ public Response removeFromClusterNodeLabels(Set oldNodeLabels, HttpServl } @Override + public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo, + HttpServletRequest req) throws AuthorizationException, InterruptedException { + RMContext rmContext = mockRM.getRMContext(); + MutableCSConfigurationProvider provider = new MutableCSConfigurationProvider(rmContext); + try { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.MEMORY_CONFIGURATION_STORE); + provider.init(conf); + provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(), mutationInfo); + } catch (Exception e) { + throw new RuntimeException(e); + } + return Response.status(Status.OK). + entity("Configuration change successfully applied.").build(); + } + + @Override + public Response getSchedulerConfiguration(HttpServletRequest req) throws AuthorizationException { + return Response.status(Status.OK).entity(new ConfInfo(mockRM.getConfig())) + .build(); + } + public ClusterInfo getClusterInfo() { ClusterInfo clusterInfo = new ClusterInfo(mockRM); return clusterInfo; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 784fbd15ce1..19bba51e270 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -126,6 +126,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeAllocationInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationConfInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; @@ -138,6 +139,9 @@ import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.webapp.BadRequestException; +import org.apache.hadoop.yarn.webapp.dao.ConfInfo; +import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.junit.Assert; import org.junit.Test; @@ -171,6 +175,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { private final static int NUM_SUBCLUSTER = 4; private static final int BAD_REQUEST = 400; private static final int ACCEPTED = 202; + private static final int OK = 200; private static String user = "test-user"; private TestableFederationInterceptorREST interceptor; private MemoryFederationStateStore stateStore; @@ -2134,6 +2139,35 @@ public void testRemoveFromClusterNodeLabelsError() throws Exception { () -> interceptor.removeFromClusterNodeLabels(oldNodeLabels1, null)); } + @Test + public void testGetSchedulerConfiguration() throws Exception { + Response response = interceptor.getSchedulerConfiguration(null); + Assert.assertNotNull(response); + Assert.assertEquals(OK, response.getStatus()); + + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + Assert.assertTrue(entity instanceof FederationConfInfo); + + FederationConfInfo federationConfInfo = FederationConfInfo.class.cast(entity); + List confInfos = federationConfInfo.getList(); + Assert.assertNotNull(confInfos); + Assert.assertEquals(4, confInfos.size()); + + List errors = federationConfInfo.getErrorMsgs(); + Assert.assertEquals(0, errors.size()); + + Set subClusterSet = subClusters.stream() + .map(subClusterId -> subClusterId.getId()).collect(Collectors.toSet()); + + for (ConfInfo confInfo : confInfos) { + List confItems = confInfo.getItems(); + Assert.assertNotNull(confItems); + Assert.assertTrue(confItems.size() > 0); + Assert.assertTrue(subClusterSet.contains(confInfo.getSubClusterId())); + } + } + @Test public void testGetClusterUserInfo() { String requestUserName = "test-user"; @@ -2173,6 +2207,41 @@ public void testGetClusterUserInfo() { } } + @Test + public void testUpdateSchedulerConfigurationErrorMsg() throws Exception { + SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo(); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the subClusterId is empty or null.", + () -> interceptor.updateSchedulerConfiguration(mutationInfo, null)); + + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the schedConfUpdateInfo is empty or null.", + () -> interceptor.updateSchedulerConfiguration(null, null)); + } + + @Test + public void testUpdateSchedulerConfiguration() + throws AuthorizationException, InterruptedException { + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + updateInfo.setSubClusterId("1"); + Map goodUpdateMap = new HashMap<>(); + goodUpdateMap.put("goodKey", "goodVal"); + QueueConfigInfo goodUpdateInfo = new + QueueConfigInfo("root.default", goodUpdateMap); + updateInfo.getUpdateQueueInfo().add(goodUpdateInfo); + Response response = interceptor.updateSchedulerConfiguration(updateInfo, null); + + Assert.assertNotNull(response); + Assert.assertEquals(OK, response.getStatus()); + + String expectMsg = "Configuration change successfully applied."; + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + + String entityMsg = String.valueOf(entity); + Assert.assertEquals(expectMsg, entityMsg); + } + @Test public void testGetClusterInfo() { ClusterInfo clusterInfos = interceptor.getClusterInfo();