diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 03902e384d4..1f213717616 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -51,6 +51,16 @@ public interface MutableConfigurationProvider { void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws Exception; + /** + * Apply the changes on top of the actual configuration. + * @param oldConfiguration actual configuration + * @param confUpdate changelist + * @return new configuration with the applied changed + * @throws IOException if the merge failed + */ + Configuration applyChanges(Configuration oldConfiguration, + SchedConfUpdateInfo confUpdate) throws IOException; + /** * Confirm last logged mutation. * @param isValid if the last logged mutation is applied to scheduler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 75342176168..bb3d7ca1cf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -209,40 +209,9 @@ public class CapacityScheduler extends private void validateConf(Configuration conf) { // validate scheduler memory allocation setting - int minMem = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - int maxMem = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - - if (minMem <= 0 || minMem > maxMem) { - throw new YarnRuntimeException("Invalid resource scheduler memory" - + " allocation configuration" - + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB - + "=" + minMem - + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB - + "=" + maxMem + ", min and max should be greater than 0" - + ", max should be no smaller than min."); - } - + CapacitySchedulerConfigValidator.validateMemoryAllocation(conf); // validate scheduler vcores allocation setting - int minVcores = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - int maxVcores = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - - if (minVcores <= 0 || minVcores > maxVcores) { - throw new YarnRuntimeException("Invalid resource scheduler vcores" - + " allocation configuration" - + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES - + "=" + minVcores - + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - + "=" + maxVcores + ", min and max should be greater than 0" - + ", max should be no smaller than min."); - } + CapacitySchedulerConfigValidator.validateVCores(conf); } @Override @@ -482,14 +451,18 @@ public class CapacityScheduler extends super.serviceStop(); } - @Override - public void reinitialize(Configuration newConf, RMContext rmContext) - throws IOException { + public void reinitialize(Configuration newConf, RMContext rmContext, + boolean validation) throws IOException { + writeLock.lock(); try { writeLock.lock(); Configuration configuration = new Configuration(newConf); CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = csConfProvider.loadConfiguration(configuration); + if (validation) { + this.conf = new CapacitySchedulerConfiguration(newConf, false); + } else { + this.conf = csConfProvider.loadConfiguration(configuration); + } validateConf(this.conf); try { LOG.info("Re-initializing queues..."); @@ -503,17 +476,26 @@ public class CapacityScheduler extends throw new IOException("Failed to re-init queues : " + t.getMessage(), t); } + if (!validation) { - // update lazy preemption - this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); + // update lazy preemption + this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled(); - // Setup how many containers we can allocate for each round - offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + // Setup how many containers we can allocate for each round + offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); - super.reinitialize(newConf, rmContext); + super.reinitialize(newConf, rmContext); + } } finally { writeLock.unlock(); } + + } + + @Override + public void reinitialize(Configuration newConf, RMContext rmContext) + throws IOException { + reinitialize(newConf, rmContext, false); } long getAsyncScheduleInterval() { @@ -717,19 +699,13 @@ public class CapacityScheduler extends Collection placementRuleStrs = conf.getStringCollection( YarnConfiguration.QUEUE_PLACEMENT_RULES); List placementRules = new ArrayList<>(); - Set distingushRuleSet = new HashSet<>(); - // fail the case if we get duplicate placementRule add in - for (String pls : placementRuleStrs) { - if (!distingushRuleSet.add(pls)) { - throw new IOException("Invalid PlacementRule inputs which " - + "contains duplicate rule strings"); - } - } + Set distinguishRuleSet = CapacitySchedulerConfigValidator + .validatePlacementRules(placementRuleStrs); // add UserGroupMappingPlacementRule if absent - distingushRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + distinguishRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); - placementRuleStrs = new ArrayList<>(distingushRuleSet); + placementRuleStrs = new ArrayList<>(distinguishRuleSet); for (String placementRuleStr : placementRuleStrs) { switch (placementRuleStr) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java new file mode 100644 index 00000000000..525ea43ba66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public final class CapacitySchedulerConfigValidator { + private static final Logger LOG = LoggerFactory.getLogger( + CapacitySchedulerConfigValidator.class); + + private CapacitySchedulerConfigValidator() { + throw new IllegalStateException("Utility class"); + } + + public static boolean validateCSConfiguration( + final Configuration oldConf, final Configuration newConf, + final RMContext rmContext) throws IOException { + //TODO: extract all the validation steps and replace reinitialize with + //the specific validation steps + CapacityScheduler newCs = new CapacityScheduler(); + newCs.setConf(oldConf); + newCs.setRMContext(rmContext); + newCs.init(oldConf); + newCs.reinitialize(newConf, rmContext, true); + return true; + } + + public static Set validatePlacementRules( + Collection placementRuleStrs) throws IOException { + Set distinguishRuleSet = new HashSet<>(); + // fail the case if we get duplicate placementRule add in + for (String pls : placementRuleStrs) { + if (!distinguishRuleSet.add(pls)) { + throw new IOException("Invalid PlacementRule inputs which " + + "contains duplicate rule strings"); + } + } + return distinguishRuleSet; + } + + public static void validateMemoryAllocation(Configuration conf) { + int minMem = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + int maxMem = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + + if (minMem <= 0 || minMem > maxMem) { + throw new YarnRuntimeException("Invalid resource scheduler memory" + + " allocation configuration" + + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + + "=" + minMem + + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + + "=" + maxMem + ", min and max should be greater than 0" + + ", max should be no smaller than min."); + } + } + public static void validateVCores(Configuration conf) { + int minVcores = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + int maxVcores = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + if (minVcores <= 0 || minVcores > maxVcores) { + throw new YarnRuntimeException("Invalid resource scheduler vcores" + + " allocation configuration" + + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES + + "=" + minVcores + + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + + "=" + maxVcores + ", min and max should be greater than 0" + + ", max should be no smaller than min."); + } + } + + /** + * Ensure all existing queues are present. Queues cannot be deleted if its not + * in Stopped state, Queue's cannot be moved from one hierarchy to other also. + * Previous child queue could be converted into parent queue if it is in + * STOPPED state. + * + * @param queues existing queues + * @param newQueues new queues + */ + public static void validateQueueHierarchy(Map queues, + Map newQueues, CapacitySchedulerConfiguration newConf) + throws IOException { + // check that all static queues are included in the newQueues list + for (Map.Entry e : queues.entrySet()) { + if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue() + .getClass()))) { + String queueName = e.getKey(); + CSQueue oldQueue = e.getValue(); + CSQueue newQueue = newQueues.get(queueName); + if (null == newQueue) { + // old queue doesn't exist in the new XML + String configPrefix = newConf.getQueuePrefix( + oldQueue.getQueuePath()); + QueueState newQueueState = null; + try { + newQueueState = QueueState.valueOf( + newConf.get(configPrefix + "state")); + } catch (Exception ex) { + LOG.warn("Not a valid queue state for queue " + + oldQueue.getQueuePath()); + } + if (oldQueue.getState() == QueueState.STOPPED || + newQueueState == QueueState.STOPPED) { + LOG.info("Deleting Queue " + queueName + ", as it is not" + + " present in the modified capacity configuration xml"); + } else{ + throw new IOException(oldQueue.getQueuePath() + " cannot be" + + " deleted from the capacity scheduler configuration, " + + "as the queue is not yet in stopped state. " + + "Current State : " + oldQueue.getState()); + } + } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { + //Queue's cannot be moved from one hierarchy to other + throw new IOException( + queueName + " is moved from:" + oldQueue.getQueuePath() + + " to:" + newQueue.getQueuePath() + + " after refresh, which is not allowed."); + } else if (oldQueue instanceof ParentQueue + && !(oldQueue instanceof ManagedParentQueue) + && newQueue instanceof ManagedParentQueue) { + throw new IOException( + "Can not convert parent queue: " + oldQueue.getQueuePath() + + " to auto create enabled parent queue since " + + "it could have other pre-configured queues which is" + + " not supported"); + } else if (oldQueue instanceof ManagedParentQueue + && !(newQueue instanceof ManagedParentQueue)) { + throw new IOException( + "Cannot convert auto create enabled parent queue: " + oldQueue + .getQueuePath() + " to leaf queue. Please check " + + " parent queue's configuration " + + CapacitySchedulerConfiguration + .AUTO_CREATE_CHILD_QUEUE_ENABLED + + " is set to true"); + } else if (oldQueue instanceof LeafQueue + && newQueue instanceof ParentQueue) { + if (oldQueue.getState() == QueueState.STOPPED) { + LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath() + + " to parent queue."); + } else{ + throw new IOException( + "Can not convert the leaf queue: " + oldQueue.getQueuePath() + + " to parent queue since " + + "it is not yet in stopped state. Current State : " + + oldQueue.getState()); + } + } else if (oldQueue instanceof ParentQueue + && newQueue instanceof LeafQueue) { + LOG.info("Converting the parent queue: " + oldQueue.getQueuePath() + + " to leaf queue."); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index cdde25d23e6..1bbc7ca891e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.Permission; @@ -177,7 +176,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< csContext.getRMContext().getHAServiceState() != HAServiceProtocol.HAServiceState.STANDBY) { // Ensure queue hierarchy in the new XML file is proper. - validateQueueHierarchy(queues, newQueues); + CapacitySchedulerConfigValidator + .validateQueueHierarchy(queues, newQueues, newConf); } // Add new queues and delete OldQeueus only after validation. @@ -299,78 +299,6 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< return queue; } - /** - * Ensure all existing queues are present. Queues cannot be deleted if its not - * in Stopped state, Queue's cannot be moved from one hierarchy to other also. - * Previous child queue could be converted into parent queue if it is in - * STOPPED state. - * - * @param queues existing queues - * @param newQueues new queues - */ - private void validateQueueHierarchy(Map queues, - Map newQueues) throws IOException { - // check that all static queues are included in the newQueues list - for (Map.Entry e : queues.entrySet()) { - if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue() - .getClass()))) { - String queueName = e.getKey(); - CSQueue oldQueue = e.getValue(); - CSQueue newQueue = newQueues.get(queueName); - if (null == newQueue) { - // old queue doesn't exist in the new XML - if (oldQueue.getState() == QueueState.STOPPED) { - LOG.info("Deleting Queue " + queueName + ", as it is not" - + " present in the modified capacity configuration xml"); - } else{ - throw new IOException(oldQueue.getQueuePath() + " is deleted from" - + " the new capacity scheduler configuration, but the" - + " queue is not yet in stopped state. " + "Current State : " - + oldQueue.getState()); - } - } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { - //Queue's cannot be moved from one hierarchy to other - throw new IOException( - queueName + " is moved from:" + oldQueue.getQueuePath() + " to:" - + newQueue.getQueuePath() - + " after refresh, which is not allowed."); - } else if (oldQueue instanceof ParentQueue - && !(oldQueue instanceof ManagedParentQueue) - && newQueue instanceof ManagedParentQueue) { - throw new IOException( - "Can not convert parent queue: " + oldQueue.getQueuePath() - + " to auto create enabled parent queue since " - + "it could have other pre-configured queues which is not " - + "supported"); - } else if (oldQueue instanceof ManagedParentQueue - && !(newQueue instanceof ManagedParentQueue)) { - throw new IOException( - "Cannot convert auto create enabled parent queue: " + oldQueue - .getQueuePath() + " to leaf queue. Please check " - + " parent queue's configuration " - + CapacitySchedulerConfiguration - .AUTO_CREATE_CHILD_QUEUE_ENABLED - + " is set to true"); - } else if (oldQueue instanceof LeafQueue - && newQueue instanceof ParentQueue) { - if (oldQueue.getState() == QueueState.STOPPED) { - LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath() - + " to parent queue."); - } else{ - throw new IOException( - "Can not convert the leaf queue: " + oldQueue.getQueuePath() - + " to parent queue since " - + "it is not yet in stopped state. Current State : " - + oldQueue.getState()); - } - } else if (oldQueue instanceof ParentQueue - && newQueue instanceof LeafQueue) { - LOG.info("Converting the parent queue: " + oldQueue.getQueuePath() - + " to leaf queue."); - } - } - } - } /** * Updates to our list of queues: Adds the new queues and deletes the removed diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index c8d0a0c6eaa..77e0924cfb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -131,14 +131,32 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, public void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws Exception { oldConf = new Configuration(schedConf); - Map kvUpdate = constructKeyValueConfUpdate(confUpdate); + CapacitySchedulerConfiguration proposedConf = + new CapacitySchedulerConfiguration(schedConf, false); + Map kvUpdate + = constructKeyValueConfUpdate(proposedConf, confUpdate); LogMutation log = new LogMutation(kvUpdate, user.getShortUserName()); confStore.logMutation(log); + applyMutation(proposedConf, kvUpdate); + schedConf = proposedConf; + } + + public Configuration applyChanges(Configuration oldConfiguration, + SchedConfUpdateInfo confUpdate) throws IOException { + CapacitySchedulerConfiguration proposedConf = + new CapacitySchedulerConfiguration(oldConfiguration, false); + Map kvUpdate + = constructKeyValueConfUpdate(proposedConf, confUpdate); + applyMutation(proposedConf, kvUpdate); + return proposedConf; + } + + private void applyMutation(Configuration conf, Map kvUpdate) { for (Map.Entry kv : kvUpdate.entrySet()) { if (kv.getValue() == null) { - schedConf.unset(kv.getKey()); + conf.unset(kv.getKey()); } else { - schedConf.set(kv.getKey(), kv.getValue()); + conf.set(kv.getKey(), kv.getValue()); } } } @@ -215,9 +233,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, } private Map constructKeyValueConfUpdate( + CapacitySchedulerConfiguration proposedConf, SchedConfUpdateInfo mutationInfo) throws IOException { - CapacitySchedulerConfiguration proposedConf = - new CapacitySchedulerConfiguration(schedConf, false); + Map confUpdate = new HashMap<>(); for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { removeQueue(queueToRemove, proposedConf, confUpdate); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java index c02b3f46e71..51d105e5b21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java @@ -57,6 +57,12 @@ public final class RMWSConsts { /** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */ public static final String SCHEDULER_LOGS = "/scheduler/logs"; + /** + * Path for {@code RMWebServiceProtocol#validateAndGetSchedulerConfiguration}. + */ + public static final String SCHEDULER_CONF_VALIDATE + = "/scheduler-conf/validate"; + /** Path for {@code RMWebServiceProtocol#getNodes}. */ public static final String NODES = "/nodes"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 240aacafe60..9f8d0b9bd7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -141,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigValidator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; @@ -2370,6 +2371,53 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } } + @POST + @Path(RMWSConsts.SCHEDULER_CONF_VALIDATE) + @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public synchronized Response validateAndGetSchedulerConfiguration( + SchedConfUpdateInfo mutationInfo, + @Context HttpServletRequest hsr) throws AuthorizationException { + // Only admin user is allowed to read scheduler conf, + // in order to avoid leaking sensitive info, such as ACLs + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + initForWritableEndpoints(callerUGI, true); + ResourceScheduler scheduler = rm.getResourceScheduler(); + if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler) + scheduler).isConfigurationMutable()) { + try { + MutableConfigurationProvider mutableConfigurationProvider = + ((MutableConfScheduler) scheduler).getMutableConfProvider(); + Configuration schedulerConf = mutableConfigurationProvider + .getConfiguration(); + Configuration newConfig = mutableConfigurationProvider + .applyChanges(schedulerConf, mutationInfo); + Configuration yarnConf = ((CapacityScheduler) scheduler).getConf(); + CapacitySchedulerConfigValidator.validateCSConfiguration(yarnConf, + newConfig, rm.getRMContext()); + + return Response.status(Status.OK) + .entity(new ConfInfo(newConfig)) + .build(); + } catch (Exception e) { + String errorMsg = "CapacityScheduler configuration validation failed:" + + e.toString(); + LOG.warn(errorMsg); + return Response.status(Status.BAD_REQUEST) + .entity(errorMsg) + .build(); + } + } else { + String errorMsg = "Configuration change validation only supported by " + + "MutableConfScheduler."; + LOG.warn(errorMsg); + return Response.status(Status.BAD_REQUEST) + .entity(errorMsg) + .build(); + } + } + @PUT @Path(RMWSConsts.SCHEDULER_CONF) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java new file mode 100644 index 00000000000..1477a336763 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; + +import java.util.HashMap; +import java.util.Map; + +public final class CapacitySchedulerConfigGeneratorForTest { + + private CapacitySchedulerConfigGeneratorForTest() { + throw new IllegalStateException("Utility class"); + } + + public static Configuration createConfiguration(Map configs) { + Configuration config = new Configuration(); + for (Map.Entry entry: configs.entrySet()) { + config.set((String)entry.getKey(), (String)entry.getValue()); + } + return config; + } + + public static Configuration createBasicCSConfiguration() { + Map conf = new HashMap<>(); + conf.put("yarn.scheduler.capacity.root.queues", "test1, test2"); + conf.put("yarn.scheduler.capacity.root.test1.capacity", "50"); + conf.put("yarn.scheduler.capacity.root.test2.capacity", "50"); + conf.put("yarn.scheduler.capacity.root.test1.maximum-capacity", "100"); + conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING"); + conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING"); + conf.put("yarn.scheduler.capacity.queue-mappings", + "u:test1:test1,u:test2:test2"); + return createConfiguration(conf); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java new file mode 100644 index 00000000000..2b35a7c8f08 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.LocalConfigurationProvider; +import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.fail; + +public class TestCapacitySchedulerConfigValidator { + + /** + * Test for the case when the scheduler.minimum-allocation-mb == 0. + */ + @Test (expected = YarnRuntimeException.class) + public void testValidateMemoryAllocationInvalidMinMem() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "0"); + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createConfiguration(configs); + CapacitySchedulerConfigValidator.validateMemoryAllocation(config); + fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + + " should be > 0"); + } + + /** + * Test for the case when the scheduler.minimum-allocation-mb is greater than + * scheduler.maximum-allocation-mb. + */ + @Test (expected = YarnRuntimeException.class) + public void testValidateMemoryAllocationHIgherMinThanMaxMem() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "8192"); + configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "1024"); + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createConfiguration(configs); + CapacitySchedulerConfigValidator.validateMemoryAllocation(config); + fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + " should be > " + + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + + } + + + @Test + public void testValidateMemoryAllocation() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "1024"); + configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "8192"); + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createConfiguration(configs); + // there is no need for assertion, since there is no further method call + // inside the tested code and in case of a valid configuration no exception + // is thrown + CapacitySchedulerConfigValidator.validateMemoryAllocation(config); + } + + /** + * Test for the case when the scheduler.minimum-allocation-vcores == 0. + */ + @Test (expected = YarnRuntimeException.class) + public void testValidateVCoresInvalidMinVCore() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "0"); + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createConfiguration(configs); + CapacitySchedulerConfigValidator.validateVCores(config); + fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES + + " should be > 0"); + } + + /** + * Test for the case when the scheduler.minimum-allocation-vcores is greater + * than scheduler.maximum-allocation-vcores. + */ + @Test (expected = YarnRuntimeException.class) + public void testValidateVCoresHigherMinThanMaxVCore() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4"); + configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "1"); + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createConfiguration(configs); + CapacitySchedulerConfigValidator.validateVCores(config); + fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES + + " should be > " + + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + + } + + + @Test + public void testValidateVCores() { + Map configs = new HashMap(); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "1"); + configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "4"); + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createConfiguration(configs); + // there is no need for assertion, since there is no further method call + // inside the tested code and in case of a valid configuration no exception + // is thrown + CapacitySchedulerConfigValidator.validateVCores(config); + } + + @Test + public void testValidateCSConfigInvalidCapacity() { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.test1.capacity", "500"); + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("Invalid capacity"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .startsWith("Illegal capacity")); + } + } + + @Test + public void testValidateCSConfigStopALeafQueue() throws IOException { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.test1.state", "STOPPED"); + RMContext rmContext = prepareRMContext(); + Boolean isValidConfig = CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + Assert.assertTrue(isValidConfig); + } + + /** + * Stop the root queue if there are running child queues. + */ + @Test + public void testValidateCSConfigStopANonLeafQueueInvalid() { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.state", "STOPPED"); + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("There are child queues in running state"); + } catch (IOException e) { + // TODO: change the assertion message in case of backporting YARN-8678 + Assert.assertTrue(e.getCause().getMessage() + .contains("The parent queue:root state is STOPPED," + + " child queue:test1 state cannot be RUNNING")); + } + } + + @Test + public void testValidateCSConfigStopANonLeafQueue() throws IOException { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.state", "STOPPED"); + newConfig + .set("yarn.scheduler.capacity.root.test1.state", "STOPPED"); + newConfig + .set("yarn.scheduler.capacity.root.test2.state", "STOPPED"); + RMContext rmContext = prepareRMContext(); + Boolean isValidConfig = CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + Assert.assertTrue(isValidConfig); + + } + + /** + * Add a leaf queue without modifying the capacity of other leaf queues + * so the total capacity != 100. + */ + @Test + public void testValidateCSConfigAddALeafQueueInvalid() { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3"); + newConfig + .set("yarn.scheduler.capacity.root.test3.state", "RUNNING"); + newConfig + .set("yarn.scheduler.capacity.root.test3.capacity", "30"); + + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("Invalid capacity for children of queue root"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .startsWith("Illegal capacity")); + } + } + + /** + * Add a leaf queue by modifying the capacity of other leaf queues + * and adjust the capacities of other leaf queues, so total capacity = 100. + */ + @Test + public void testValidateCSConfigAddALeafQueueValid() throws IOException { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3"); + newConfig + .set("yarn.scheduler.capacity.root.test3.state", "RUNNING"); + newConfig + .set("yarn.scheduler.capacity.root.test3.capacity", "30"); + newConfig + .set("yarn.scheduler.capacity.root.test1.capacity", "20"); + + RMContext rmContext = prepareRMContext(); + Boolean isValidConfig = CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + Assert.assertTrue(isValidConfig); + } + + /** + * Delete a running queue. + */ + @Test + public void testValidateCSConfigInvalidQueueDeletion() { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig.set("yarn.scheduler.capacity.root.queues", "test1"); + newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100"); + newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity"); + newConfig.unset("yarn.scheduler.capacity.root.test2.state"); + newConfig.set("yarn.scheduler.capacity.queue-mappings", + "u:test1:test1"); + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("Invalid capacity for children of queue root"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .contains("root.test2 cannot be deleted")); + Assert.assertTrue(e.getCause().getMessage() + .contains("the queue is not yet in stopped state")); + } + } + + /** + * Delete a queue and not adjust capacities. + */ + @Test + public void testValidateCSConfigInvalidQueueDeletion2() { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + Configuration newConfig = new Configuration(oldConfig); + newConfig.set("yarn.scheduler.capacity.root.queues", "test1"); + newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity"); + newConfig.unset("yarn.scheduler.capacity.root.test2.state"); + newConfig.set("yarn.scheduler.capacity.queue-mappings", + "u:test1:test1"); + RMContext rmContext = prepareRMContext(); + try { + CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + fail("Invalid capacity for children of queue root"); + } catch (IOException e) { + Assert.assertTrue(e.getCause().getMessage() + .contains("Illegal capacity")); + } + } + + /** + * Delete a queue and adjust capacities to have total capacity = 100. + */ + @Test + public void testValidateCSConfigValidQueueDeletion() throws IOException { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + oldConfig.set("yarn.scheduler.capacity.root.test2.state", "STOPPED"); + Configuration newConfig = new Configuration(oldConfig); + newConfig.set("yarn.scheduler.capacity.root.queues", "test1"); + newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100"); + newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity"); + newConfig.unset("yarn.scheduler.capacity.root.test2.state"); + newConfig.set("yarn.scheduler.capacity.queue-mappings", + "u:test1:test1"); + RMContext rmContext = prepareRMContext(); + boolean isValidConfig = CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + Assert.assertTrue(isValidConfig); + + } + + @Test + public void testAddQueueToALeafQueue() throws IOException { + Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + oldConfig.set("yarn.scheduler.capacity.root.test1.state", "STOPPED"); + Configuration newConfig = new Configuration(oldConfig); + newConfig.set("yarn.scheduler.capacity.root.test1.queues", "newQueue"); + newConfig + .set("yarn.scheduler.capacity.root.test1.newQueue.capacity", "100"); + newConfig.set("yarn.scheduler.capacity.queue-mappings", + "u:test1:test2"); + RMContext rmContext = prepareRMContext(); + boolean isValidConfig = CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + Assert.assertTrue(isValidConfig); + } + + + public static RMContext prepareRMContext() { + RMContext rmContext = Mockito.mock(RMContext.class); + LocalConfigurationProvider configProvider = Mockito + .mock(LocalConfigurationProvider.class); + Mockito.when(rmContext.getConfigurationProvider()) + .thenReturn(configProvider); + RMNodeLabelsManager nodeLabelsManager = Mockito + .mock(RMNodeLabelsManager.class); + Mockito.when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager); + LightWeightResource partitionResource = Mockito + .mock(LightWeightResource.class); + Mockito.when(nodeLabelsManager + .getResourceByLabel(Mockito.any(), Mockito.any())) + .thenReturn(partitionResource); + PlacementManager queuePlacementManager = Mockito + .mock(PlacementManager.class); + Mockito.when(rmContext.getQueuePlacementManager()) + .thenReturn(queuePlacementManager); + return rmContext; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java index f20ace073a4..a9a6cc3be86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java @@ -22,15 +22,20 @@ import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseS import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import java.io.StringReader; import java.security.Principal; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import javax.servlet.http.HttpServletRequest; @@ -45,6 +50,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -59,6 +65,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerConfigValidator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo; @@ -73,6 +82,8 @@ import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GuiceServletConfig; import org.apache.hadoop.yarn.webapp.JerseyTestBase; import org.apache.hadoop.yarn.webapp.WebServicesTestUtils; +import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.junit.Assert; @@ -884,4 +895,112 @@ public class TestRMWebServices extends JerseyTestBase { assertEquals("requestedUser doesn't match: ", requestedUser, userInfo.getRequestedUser()); } + + @Test + public void testValidateAndGetSchedulerConfigurationInvalidScheduler() + throws AuthorizationException { + ResourceScheduler scheduler = new CapacityScheduler(); + RMWebServices webService = prepareWebServiceForValidation(scheduler); + SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo(); + HttpServletRequest mockHsr = prepareServletRequestForValidation(); + Response response = webService + .validateAndGetSchedulerConfiguration(mutationInfo, mockHsr); + Assert.assertEquals(Status.BAD_REQUEST + .getStatusCode(), response.getStatus()); + Assert.assertTrue(response.getEntity().toString() + .contains("Configuration change validation only supported by" + +" MutableConfScheduler.")); + } + + @Test + public void testValidateAndGetSchedulerConfigurationInvalidConfig() + throws IOException { + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + ResourceScheduler scheduler = prepareCSForValidation(config); + + SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo(); + ArrayList queuesToRemove = new ArrayList(); + queuesToRemove.add("root.test1"); + mutationInfo.setRemoveQueueInfo(queuesToRemove); + + RMWebServices webService = prepareWebServiceForValidation(scheduler); + HttpServletRequest mockHsr = prepareServletRequestForValidation(); + + Response response = webService + .validateAndGetSchedulerConfiguration(mutationInfo, mockHsr); + Assert.assertEquals(Status.BAD_REQUEST + .getStatusCode(), response.getStatus()); + Assert.assertTrue(response.getEntity().toString() + .contains("Illegal capacity of 0.5 for children of queue")); + } + + @Test + public void testValidateAndGetSchedulerConfigurationValidScheduler() + throws IOException { + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration(); + config.set("yarn.scheduler.capacity.root.test1.state", "STOPPED"); + config.set("yarn.scheduler.capacity.queue-mappings", + "u:test2:test2"); + ResourceScheduler scheduler = prepareCSForValidation(config); + + SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo(); + ArrayList queuesToRemove = new ArrayList(); + queuesToRemove.add("root.test1"); + mutationInfo.setRemoveQueueInfo(queuesToRemove); + ArrayList updateQueueInfo = new ArrayList<>(); + String queueToUpdate = "root.test2"; + Map propertiesToUpdate = new HashMap<>(); + propertiesToUpdate.put("capacity", "100"); + updateQueueInfo.add(new QueueConfigInfo(queueToUpdate, propertiesToUpdate)); + mutationInfo.setUpdateQueueInfo(updateQueueInfo); + + RMWebServices webService = prepareWebServiceForValidation(scheduler); + HttpServletRequest mockHsr = prepareServletRequestForValidation(); + + Response response = webService + .validateAndGetSchedulerConfiguration(mutationInfo, mockHsr); + Assert.assertEquals(Status.OK + .getStatusCode(), response.getStatus()); + } + + private CapacityScheduler prepareCSForValidation(Configuration config) + throws IOException { + CapacityScheduler scheduler = mock(CapacityScheduler.class); + when(scheduler.isConfigurationMutable()) + .thenReturn(true); + MutableCSConfigurationProvider configurationProvider = + mock(MutableCSConfigurationProvider.class); + when(scheduler.getMutableConfProvider()) + .thenReturn(configurationProvider); + + when(configurationProvider.getConfiguration()).thenReturn(config); + when(scheduler.getConf()).thenReturn(config); + when(configurationProvider + .applyChanges(any(), any())).thenCallRealMethod(); + return scheduler; + } + + private HttpServletRequest prepareServletRequestForValidation() { + HttpServletRequest mockHsr = mock(HttpServletRequest.class); + when(mockHsr.getUserPrincipal()).thenReturn(() -> "yarn"); + return mockHsr; + } + + private RMWebServices prepareWebServiceForValidation( + ResourceScheduler scheduler) { + ResourceManager mockRM = mock(ResourceManager.class); + ApplicationACLsManager acLsManager = mock(ApplicationACLsManager.class); + RMWebServices webService = new RMWebServices(mockRM, new Configuration(), + mock(HttpServletResponse.class)); + when(mockRM.getResourceScheduler()).thenReturn(scheduler); + when(acLsManager.areACLsEnabled()).thenReturn(false); + when(mockRM.getApplicationACLsManager()).thenReturn(acLsManager); + RMContext context = TestCapacitySchedulerConfigValidator.prepareRMContext(); + when(mockRM.getRMContext()).thenReturn(context); + + return webService; + } + }