YARN-10022. RM Rest API to validate the CapacityScheduler Configuration change

Contributed by Kinga Marton.
This commit is contained in:
Prabhu Joseph 2020-01-28 23:08:35 +05:30 committed by Prabhu Joseph
parent 87c198468b
commit 1ab9c692fa
10 changed files with 840 additions and 144 deletions

View File

@ -53,6 +53,16 @@ public interface MutableConfigurationProvider {
LogMutation logAndApplyMutation(UserGroupInformation user,
SchedConfUpdateInfo confUpdate) throws Exception;
/**
* Apply the changes on top of the actual configuration.
* @param oldConfiguration actual configuration
* @param confUpdate changelist
* @return new configuration with the applied changed
* @throws IOException if the merge failed
*/
Configuration applyChanges(Configuration oldConfiguration,
SchedConfUpdateInfo confUpdate) throws IOException;
/**
* Confirm last logged mutation.
* @param pendingMutation the log mutation to apply

View File

@ -207,40 +207,9 @@ public void setConf(Configuration conf) {
private void validateConf(Configuration conf) {
// validate scheduler memory allocation setting
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem <= 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
CapacitySchedulerConfigValidator.validateMemoryAllocation(conf);
// validate scheduler vcores allocation setting
int minVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
if (minVcores <= 0 || minVcores > maxVcores) {
throw new YarnRuntimeException("Invalid resource scheduler vcores"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ "=" + minVcores
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ "=" + maxVcores + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
CapacitySchedulerConfigValidator.validateVCores(conf);
}
@Override
@ -480,14 +449,17 @@ public void serviceStop() throws Exception {
super.serviceStop();
}
@Override
public void reinitialize(Configuration newConf, RMContext rmContext)
throws IOException {
public void reinitialize(Configuration newConf, RMContext rmContext,
boolean validation) throws IOException {
writeLock.lock();
try {
Configuration configuration = new Configuration(newConf);
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = csConfProvider.loadConfiguration(configuration);
if (validation) {
this.conf = new CapacitySchedulerConfiguration(newConf, false);
} else {
this.conf = csConfProvider.loadConfiguration(configuration);
}
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");
@ -501,17 +473,26 @@ public void reinitialize(Configuration newConf, RMContext rmContext)
throw new IOException("Failed to re-init queues : " + t.getMessage(),
t);
}
if (!validation) {
// update lazy preemption
this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
// update lazy preemption
this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
// Setup how many containers we can allocate for each round
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
// Setup how many containers we can allocate for each round
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
super.reinitialize(newConf, rmContext);
super.reinitialize(newConf, rmContext);
}
} finally {
writeLock.unlock();
}
}
@Override
public void reinitialize(Configuration newConf, RMContext rmContext)
throws IOException {
reinitialize(newConf, rmContext, false);
}
long getAsyncScheduleInterval() {
@ -714,19 +695,13 @@ public void updatePlacementRules() throws IOException {
Collection<String> placementRuleStrs = conf.getStringCollection(
YarnConfiguration.QUEUE_PLACEMENT_RULES);
List<PlacementRule> placementRules = new ArrayList<>();
Set<String> distingushRuleSet = new HashSet<>();
// fail the case if we get duplicate placementRule add in
for (String pls : placementRuleStrs) {
if (!distingushRuleSet.add(pls)) {
throw new IOException("Invalid PlacementRule inputs which "
+ "contains duplicate rule strings");
}
}
Set<String> distinguishRuleSet = CapacitySchedulerConfigValidator
.validatePlacementRules(placementRuleStrs);
// add UserGroupMappingPlacementRule if absent
distingushRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
distinguishRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
placementRuleStrs = new ArrayList<>(distingushRuleSet);
placementRuleStrs = new ArrayList<>(distinguishRuleSet);
for (String placementRuleStr : placementRuleStrs) {
switch (placementRuleStr) {

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public final class CapacitySchedulerConfigValidator {
private static final Logger LOG = LoggerFactory.getLogger(
CapacitySchedulerConfigValidator.class);
private CapacitySchedulerConfigValidator() {
throw new IllegalStateException("Utility class");
}
public static boolean validateCSConfiguration(
final Configuration oldConf, final Configuration newConf,
final RMContext rmContext) throws IOException {
//TODO: extract all the validation steps and replace reinitialize with
//the specific validation steps
CapacityScheduler newCs = new CapacityScheduler();
newCs.setConf(oldConf);
newCs.setRMContext(rmContext);
newCs.init(oldConf);
newCs.reinitialize(newConf, rmContext, true);
return true;
}
public static Set<String> validatePlacementRules(
Collection<String> placementRuleStrs) throws IOException {
Set<String> distinguishRuleSet = new HashSet<>();
// fail the case if we get duplicate placementRule add in
for (String pls : placementRuleStrs) {
if (!distinguishRuleSet.add(pls)) {
throw new IOException("Invalid PlacementRule inputs which "
+ "contains duplicate rule strings");
}
}
return distinguishRuleSet;
}
public static void validateMemoryAllocation(Configuration conf) {
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem <= 0 || minMem > maxMem) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}
public static void validateVCores(Configuration conf) {
int minVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
if (minVcores <= 0 || minVcores > maxVcores) {
throw new YarnRuntimeException("Invalid resource scheduler vcores"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ "=" + minVcores
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ "=" + maxVcores + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}
/**
* Ensure all existing queues are present. Queues cannot be deleted if its not
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
* Previous child queue could be converted into parent queue if it is in
* STOPPED state.
*
* @param queues existing queues
* @param newQueues new queues
*/
public static void validateQueueHierarchy(Map<String, CSQueue> queues,
Map<String, CSQueue> newQueues, CapacitySchedulerConfiguration newConf)
throws IOException {
// check that all static queues are included in the newQueues list
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
.getClass()))) {
String queueName = e.getKey();
CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName);
if (null == newQueue) {
// old queue doesn't exist in the new XML
String configPrefix = newConf.getQueuePrefix(
oldQueue.getQueuePath());
QueueState newQueueState = null;
try {
newQueueState = QueueState.valueOf(
newConf.get(configPrefix + "state"));
} catch (Exception ex) {
LOG.warn("Not a valid queue state for queue "
+ oldQueue.getQueuePath());
}
if (oldQueue.getState() == QueueState.STOPPED ||
newQueueState == QueueState.STOPPED) {
LOG.info("Deleting Queue " + queueName + ", as it is not"
+ " present in the modified capacity configuration xml");
} else{
throw new IOException(oldQueue.getQueuePath() + " cannot be"
+ " deleted from the capacity scheduler configuration, "
+ "as the queue is not yet in stopped state. "
+ "Current State : " + oldQueue.getState());
}
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
//Queue's cannot be moved from one hierarchy to other
throw new IOException(
queueName + " is moved from:" + oldQueue.getQueuePath()
+ " to:" + newQueue.getQueuePath()
+ " after refresh, which is not allowed.");
} else if (oldQueue instanceof ParentQueue
&& !(oldQueue instanceof ManagedParentQueue)
&& newQueue instanceof ManagedParentQueue) {
throw new IOException(
"Can not convert parent queue: " + oldQueue.getQueuePath()
+ " to auto create enabled parent queue since "
+ "it could have other pre-configured queues which is"
+ " not supported");
} else if (oldQueue instanceof ManagedParentQueue
&& !(newQueue instanceof ManagedParentQueue)) {
throw new IOException(
"Cannot convert auto create enabled parent queue: " + oldQueue
.getQueuePath() + " to leaf queue. Please check "
+ " parent queue's configuration "
+ CapacitySchedulerConfiguration
.AUTO_CREATE_CHILD_QUEUE_ENABLED
+ " is set to true");
} else if (oldQueue instanceof LeafQueue
&& newQueue instanceof ParentQueue) {
if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue.");
} else{
throw new IOException(
"Can not convert the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue since "
+ "it is not yet in stopped state. Current State : "
+ oldQueue.getState());
}
} else if (oldQueue instanceof ParentQueue
&& newQueue instanceof LeafQueue) {
LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
+ " to leaf queue.");
}
}
}
}
}

View File

@ -36,7 +36,6 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.Permission;
@ -177,7 +176,8 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
csContext.getRMContext().getHAServiceState()
!= HAServiceProtocol.HAServiceState.STANDBY) {
// Ensure queue hierarchy in the new XML file is proper.
validateQueueHierarchy(queues, newQueues, newConf);
CapacitySchedulerConfigValidator
.validateQueueHierarchy(queues, newQueues, newConf);
}
// Add new queues and delete OldQeueus only after validation.
@ -299,90 +299,6 @@ static CSQueue parseQueue(
return queue;
}
/**
* Ensure all existing queues are present. Queues cannot be deleted if its not
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
* Previous child queue could be converted into parent queue if it is in
* STOPPED state.
*
* @param queues existing queues
* @param newQueues new queues
*/
private void validateQueueHierarchy(Map<String, CSQueue> queues,
Map<String, CSQueue> newQueues, CapacitySchedulerConfiguration newConf)
throws IOException {
// check that all static queues are included in the newQueues list
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
.getClass()))) {
String queueName = e.getKey();
CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName);
if (null == newQueue) {
// old queue doesn't exist in the new XML
String configPrefix = newConf.getQueuePrefix(
oldQueue.getQueuePath());
QueueState newQueueState = null;
try {
newQueueState = QueueState.valueOf(
newConf.get(configPrefix + "state"));
} catch (Exception ex) {
LOG.warn("Not a valid queue state for queue "
+ oldQueue.getQueuePath());
}
if (oldQueue.getState() == QueueState.STOPPED ||
newQueueState == QueueState.STOPPED) {
LOG.info("Deleting Queue " + queueName + ", as it is not"
+ " present in the modified capacity configuration xml");
} else{
throw new IOException(oldQueue.getQueuePath() + " cannot be"
+ " deleted from the capacity scheduler configuration, as the"
+ " queue is not yet in stopped state. Current State : "
+ oldQueue.getState());
}
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
//Queue's cannot be moved from one hierarchy to other
throw new IOException(
queueName + " is moved from:" + oldQueue.getQueuePath() + " to:"
+ newQueue.getQueuePath()
+ " after refresh, which is not allowed.");
} else if (oldQueue instanceof ParentQueue
&& !(oldQueue instanceof ManagedParentQueue)
&& newQueue instanceof ManagedParentQueue) {
throw new IOException(
"Can not convert parent queue: " + oldQueue.getQueuePath()
+ " to auto create enabled parent queue since "
+ "it could have other pre-configured queues which is not "
+ "supported");
} else if (oldQueue instanceof ManagedParentQueue
&& !(newQueue instanceof ManagedParentQueue)) {
throw new IOException(
"Cannot convert auto create enabled parent queue: " + oldQueue
.getQueuePath() + " to leaf queue. Please check "
+ " parent queue's configuration "
+ CapacitySchedulerConfiguration
.AUTO_CREATE_CHILD_QUEUE_ENABLED
+ " is set to true");
} else if (oldQueue instanceof LeafQueue
&& newQueue instanceof ParentQueue) {
if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue.");
} else{
throw new IOException(
"Can not convert the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue since "
+ "it is not yet in stopped state. Current State : "
+ oldQueue.getState());
}
} else if (oldQueue instanceof ParentQueue
&& newQueue instanceof LeafQueue) {
LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
+ " to leaf queue.");
}
}
}
}
/**
* Updates to our list of queues: Adds the new queues and deletes the removed

View File

@ -131,17 +131,35 @@ public ConfigurationMutationACLPolicy getAclMutationPolicy() {
public LogMutation logAndApplyMutation(UserGroupInformation user,
SchedConfUpdateInfo confUpdate) throws Exception {
oldConf = new Configuration(schedConf);
Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(schedConf, false);
Map<String, String> kvUpdate
= constructKeyValueConfUpdate(proposedConf, confUpdate);
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
confStore.logMutation(log);
applyMutation(proposedConf, kvUpdate);
schedConf = proposedConf;
return log;
}
public Configuration applyChanges(Configuration oldConfiguration,
SchedConfUpdateInfo confUpdate) throws IOException {
CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(oldConfiguration, false);
Map<String, String> kvUpdate
= constructKeyValueConfUpdate(proposedConf, confUpdate);
applyMutation(proposedConf, kvUpdate);
return proposedConf;
}
private void applyMutation(Configuration conf, Map<String, String> kvUpdate) {
for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
if (kv.getValue() == null) {
schedConf.unset(kv.getKey());
conf.unset(kv.getKey());
} else {
schedConf.set(kv.getKey(), kv.getValue());
conf.set(kv.getKey(), kv.getValue());
}
}
return log;
}
@Override
@ -217,9 +235,9 @@ private List<String> getSiblingQueues(String queuePath, Configuration conf) {
}
private Map<String, String> constructKeyValueConfUpdate(
CapacitySchedulerConfiguration proposedConf,
SchedConfUpdateInfo mutationInfo) throws IOException {
CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(schedConf, false);
Map<String, String> confUpdate = new HashMap<>();
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
removeQueue(queueToRemove, proposedConf, confUpdate);

View File

@ -57,6 +57,12 @@ public final class RMWSConsts {
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
public static final String SCHEDULER_LOGS = "/scheduler/logs";
/**
* Path for {@code RMWebServiceProtocol#validateAndGetSchedulerConfiguration}.
*/
public static final String SCHEDULER_CONF_VALIDATE
= "/scheduler-conf/validate";
/** Path for {@code RMWebServiceProtocol#getNodes}. */
public static final String NODES = "/nodes";

View File

@ -148,6 +148,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigValidator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@ -2618,6 +2619,53 @@ public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr)
}
}
@POST
@Path(RMWSConsts.SCHEDULER_CONF_VALIDATE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public synchronized Response validateAndGetSchedulerConfiguration(
SchedConfUpdateInfo mutationInfo,
@Context HttpServletRequest hsr) throws AuthorizationException {
// Only admin user is allowed to read scheduler conf,
// in order to avoid leaking sensitive info, such as ACLs
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
scheduler).isConfigurationMutable()) {
try {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
Configuration schedulerConf = mutableConfigurationProvider
.getConfiguration();
Configuration newConfig = mutableConfigurationProvider
.applyChanges(schedulerConf, mutationInfo);
Configuration yarnConf = ((CapacityScheduler) scheduler).getConf();
CapacitySchedulerConfigValidator.validateCSConfiguration(yarnConf,
newConfig, rm.getRMContext());
return Response.status(Status.OK)
.entity(new ConfInfo(newConfig))
.build();
} catch (Exception e) {
String errorMsg = "CapacityScheduler configuration validation failed:"
+ e.toString();
LOG.warn(errorMsg);
return Response.status(Status.BAD_REQUEST)
.entity(errorMsg)
.build();
}
} else {
String errorMsg = "Configuration change validation only supported by " +
"MutableConfScheduler.";
LOG.warn(errorMsg);
return Response.status(Status.BAD_REQUEST)
.entity(errorMsg)
.build();
}
}
@PUT
@Path(RMWSConsts.SCHEDULER_CONF)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.Map;
public final class CapacitySchedulerConfigGeneratorForTest {
private CapacitySchedulerConfigGeneratorForTest() {
throw new IllegalStateException("Utility class");
}
public static Configuration createConfiguration(Map<String, String> configs) {
Configuration config = new Configuration();
for (Map.Entry entry: configs.entrySet()) {
config.set((String)entry.getKey(), (String)entry.getValue());
}
return config;
}
public static Configuration createBasicCSConfiguration() {
Map<String, String> conf = new HashMap<>();
conf.put("yarn.scheduler.capacity.root.queues", "test1, test2");
conf.put("yarn.scheduler.capacity.root.test1.capacity", "50");
conf.put("yarn.scheduler.capacity.root.test2.capacity", "50");
conf.put("yarn.scheduler.capacity.root.test1.maximum-capacity", "100");
conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING");
conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING");
conf.put("yarn.scheduler.capacity.queue-mappings",
"u:test1:test1,u:test2:test2");
return createConfiguration(conf);
}
}

View File

@ -0,0 +1,364 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.fail;
public class TestCapacitySchedulerConfigValidator {
/**
* Test for the case when the scheduler.minimum-allocation-mb == 0.
*/
@Test (expected = YarnRuntimeException.class)
public void testValidateMemoryAllocationInvalidMinMem() {
Map<String, String> configs = new HashMap();
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "0");
Configuration config = CapacitySchedulerConfigGeneratorForTest
.createConfiguration(configs);
CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB +
" should be > 0");
}
/**
* Test for the case when the scheduler.minimum-allocation-mb is greater than
* scheduler.maximum-allocation-mb.
*/
@Test (expected = YarnRuntimeException.class)
public void testValidateMemoryAllocationHIgherMinThanMaxMem() {
Map<String, String> configs = new HashMap();
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "8192");
configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "1024");
Configuration config = CapacitySchedulerConfigGeneratorForTest
.createConfiguration(configs);
CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + " should be > "
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
}
@Test
public void testValidateMemoryAllocation() {
Map<String, String> configs = new HashMap();
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "1024");
configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "8192");
Configuration config = CapacitySchedulerConfigGeneratorForTest
.createConfiguration(configs);
// there is no need for assertion, since there is no further method call
// inside the tested code and in case of a valid configuration no exception
// is thrown
CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
}
/**
* Test for the case when the scheduler.minimum-allocation-vcores == 0.
*/
@Test (expected = YarnRuntimeException.class)
public void testValidateVCoresInvalidMinVCore() {
Map<String, String> configs = new HashMap();
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "0");
Configuration config = CapacitySchedulerConfigGeneratorForTest
.createConfiguration(configs);
CapacitySchedulerConfigValidator.validateVCores(config);
fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ " should be > 0");
}
/**
* Test for the case when the scheduler.minimum-allocation-vcores is greater
* than scheduler.maximum-allocation-vcores.
*/
@Test (expected = YarnRuntimeException.class)
public void testValidateVCoresHigherMinThanMaxVCore() {
Map<String, String> configs = new HashMap();
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4");
configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "1");
Configuration config = CapacitySchedulerConfigGeneratorForTest
.createConfiguration(configs);
CapacitySchedulerConfigValidator.validateVCores(config);
fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES +
" should be > "
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
}
@Test
public void testValidateVCores() {
Map<String, String> configs = new HashMap();
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "1");
configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "4");
Configuration config = CapacitySchedulerConfigGeneratorForTest
.createConfiguration(configs);
// there is no need for assertion, since there is no further method call
// inside the tested code and in case of a valid configuration no exception
// is thrown
CapacitySchedulerConfigValidator.validateVCores(config);
}
@Test
public void testValidateCSConfigInvalidCapacity() {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
Configuration newConfig = new Configuration(oldConfig);
newConfig
.set("yarn.scheduler.capacity.root.test1.capacity", "500");
RMContext rmContext = prepareRMContext();
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
fail("Invalid capacity");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Illegal capacity"));
}
}
@Test
public void testValidateCSConfigStopALeafQueue() throws IOException {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
Configuration newConfig = new Configuration(oldConfig);
newConfig
.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
RMContext rmContext = prepareRMContext();
Boolean isValidConfig = CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
Assert.assertTrue(isValidConfig);
}
/**
* Stop the root queue if there are running child queues.
*/
@Test
public void testValidateCSConfigStopANonLeafQueueInvalid() {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
Configuration newConfig = new Configuration(oldConfig);
newConfig
.set("yarn.scheduler.capacity.root.state", "STOPPED");
RMContext rmContext = prepareRMContext();
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
fail("There are child queues in running state");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.contains("The parent queue:root cannot be STOPPED"));
}
}
@Test
public void testValidateCSConfigStopANonLeafQueue() throws IOException {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
Configuration newConfig = new Configuration(oldConfig);
newConfig
.set("yarn.scheduler.capacity.root.state", "STOPPED");
newConfig
.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
newConfig
.set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
RMContext rmContext = prepareRMContext();
Boolean isValidConfig = CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
Assert.assertTrue(isValidConfig);
}
/**
* Add a leaf queue without modifying the capacity of other leaf queues
* so the total capacity != 100.
*/
@Test
public void testValidateCSConfigAddALeafQueueInvalid() {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
Configuration newConfig = new Configuration(oldConfig);
newConfig
.set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
newConfig
.set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
newConfig
.set("yarn.scheduler.capacity.root.test3.capacity", "30");
RMContext rmContext = prepareRMContext();
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
fail("Invalid capacity for children of queue root");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Illegal capacity"));
}
}
/**
* Add a leaf queue by modifying the capacity of other leaf queues
* and adjust the capacities of other leaf queues, so total capacity = 100.
*/
@Test
public void testValidateCSConfigAddALeafQueueValid() throws IOException {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
Configuration newConfig = new Configuration(oldConfig);
newConfig
.set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
newConfig
.set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
newConfig
.set("yarn.scheduler.capacity.root.test3.capacity", "30");
newConfig
.set("yarn.scheduler.capacity.root.test1.capacity", "20");
RMContext rmContext = prepareRMContext();
Boolean isValidConfig = CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
Assert.assertTrue(isValidConfig);
}
/**
* Delete a running queue.
*/
@Test
public void testValidateCSConfigInvalidQueueDeletion() {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
Configuration newConfig = new Configuration(oldConfig);
newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100");
newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
newConfig.unset("yarn.scheduler.capacity.root.test2.state");
newConfig.set("yarn.scheduler.capacity.queue-mappings",
"u:test1:test1");
RMContext rmContext = prepareRMContext();
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
fail("Invalid capacity for children of queue root");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.contains("root.test2 cannot be deleted"));
Assert.assertTrue(e.getCause().getMessage()
.contains("the queue is not yet in stopped state"));
}
}
/**
* Delete a queue and not adjust capacities.
*/
@Test
public void testValidateCSConfigInvalidQueueDeletion2() {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
Configuration newConfig = new Configuration(oldConfig);
newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
newConfig.unset("yarn.scheduler.capacity.root.test2.state");
newConfig.set("yarn.scheduler.capacity.queue-mappings",
"u:test1:test1");
RMContext rmContext = prepareRMContext();
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
fail("Invalid capacity for children of queue root");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.contains("Illegal capacity"));
}
}
/**
* Delete a queue and adjust capacities to have total capacity = 100.
*/
@Test
public void testValidateCSConfigValidQueueDeletion() throws IOException {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
oldConfig.set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
Configuration newConfig = new Configuration(oldConfig);
newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100");
newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
newConfig.unset("yarn.scheduler.capacity.root.test2.state");
newConfig.set("yarn.scheduler.capacity.queue-mappings",
"u:test1:test1");
RMContext rmContext = prepareRMContext();
boolean isValidConfig = CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
Assert.assertTrue(isValidConfig);
}
@Test
public void testAddQueueToALeafQueue() throws IOException {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
oldConfig.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
Configuration newConfig = new Configuration(oldConfig);
newConfig.set("yarn.scheduler.capacity.root.test1.queues", "newQueue");
newConfig
.set("yarn.scheduler.capacity.root.test1.newQueue.capacity", "100");
newConfig.set("yarn.scheduler.capacity.queue-mappings",
"u:test1:test2");
RMContext rmContext = prepareRMContext();
boolean isValidConfig = CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
Assert.assertTrue(isValidConfig);
}
public static RMContext prepareRMContext() {
RMContext rmContext = Mockito.mock(RMContext.class);
LocalConfigurationProvider configProvider = Mockito
.mock(LocalConfigurationProvider.class);
Mockito.when(rmContext.getConfigurationProvider())
.thenReturn(configProvider);
RMNodeLabelsManager nodeLabelsManager = Mockito
.mock(RMNodeLabelsManager.class);
Mockito.when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager);
LightWeightResource partitionResource = Mockito
.mock(LightWeightResource.class);
Mockito.when(nodeLabelsManager
.getResourceByLabel(Mockito.any(), Mockito.any()))
.thenReturn(partitionResource);
PlacementManager queuePlacementManager = Mockito
.mock(PlacementManager.class);
Mockito.when(rmContext.getQueuePlacementManager())
.thenReturn(queuePlacementManager);
return rmContext;
}
}

View File

@ -28,12 +28,14 @@
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.security.Principal;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@ -52,6 +54,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@ -75,6 +78,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerConfigValidator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
@ -89,6 +95,8 @@
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
@ -982,4 +990,112 @@ public void verifyClusterUserInfo(ClusterUserInfo userInfo,
assertEquals("requestedUser doesn't match: ",
requestedUser, userInfo.getRequestedUser());
}
@Test
public void testValidateAndGetSchedulerConfigurationInvalidScheduler()
throws AuthorizationException {
ResourceScheduler scheduler = new CapacityScheduler();
RMWebServices webService = prepareWebServiceForValidation(scheduler);
SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
HttpServletRequest mockHsr = prepareServletRequestForValidation();
Response response = webService
.validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
Assert.assertEquals(Status.BAD_REQUEST
.getStatusCode(), response.getStatus());
Assert.assertTrue(response.getEntity().toString()
.contains("Configuration change validation only supported by"
+" MutableConfScheduler."));
}
@Test
public void testValidateAndGetSchedulerConfigurationInvalidConfig()
throws IOException {
Configuration config = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
ResourceScheduler scheduler = prepareCSForValidation(config);
SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
ArrayList<String> queuesToRemove = new ArrayList();
queuesToRemove.add("root.test1");
mutationInfo.setRemoveQueueInfo(queuesToRemove);
RMWebServices webService = prepareWebServiceForValidation(scheduler);
HttpServletRequest mockHsr = prepareServletRequestForValidation();
Response response = webService
.validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
Assert.assertEquals(Status.BAD_REQUEST
.getStatusCode(), response.getStatus());
Assert.assertTrue(response.getEntity().toString()
.contains("Illegal capacity of 0.5 for children of queue"));
}
@Test
public void testValidateAndGetSchedulerConfigurationValidScheduler()
throws IOException {
Configuration config = CapacitySchedulerConfigGeneratorForTest
.createBasicCSConfiguration();
config.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
config.set("yarn.scheduler.capacity.queue-mappings",
"u:test2:test2");
ResourceScheduler scheduler = prepareCSForValidation(config);
SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
ArrayList<String> queuesToRemove = new ArrayList();
queuesToRemove.add("root.test1");
mutationInfo.setRemoveQueueInfo(queuesToRemove);
ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
String queueToUpdate = "root.test2";
Map<String, String> propertiesToUpdate = new HashMap<>();
propertiesToUpdate.put("capacity", "100");
updateQueueInfo.add(new QueueConfigInfo(queueToUpdate, propertiesToUpdate));
mutationInfo.setUpdateQueueInfo(updateQueueInfo);
RMWebServices webService = prepareWebServiceForValidation(scheduler);
HttpServletRequest mockHsr = prepareServletRequestForValidation();
Response response = webService
.validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
Assert.assertEquals(Status.OK
.getStatusCode(), response.getStatus());
}
private CapacityScheduler prepareCSForValidation(Configuration config)
throws IOException {
CapacityScheduler scheduler = mock(CapacityScheduler.class);
when(scheduler.isConfigurationMutable())
.thenReturn(true);
MutableCSConfigurationProvider configurationProvider =
mock(MutableCSConfigurationProvider.class);
when(scheduler.getMutableConfProvider())
.thenReturn(configurationProvider);
when(configurationProvider.getConfiguration()).thenReturn(config);
when(scheduler.getConf()).thenReturn(config);
when(configurationProvider
.applyChanges(any(), any())).thenCallRealMethod();
return scheduler;
}
private HttpServletRequest prepareServletRequestForValidation() {
HttpServletRequest mockHsr = mock(HttpServletRequest.class);
when(mockHsr.getUserPrincipal()).thenReturn(() -> "yarn");
return mockHsr;
}
private RMWebServices prepareWebServiceForValidation(
ResourceScheduler scheduler) {
ResourceManager mockRM = mock(ResourceManager.class);
ApplicationACLsManager acLsManager = mock(ApplicationACLsManager.class);
RMWebServices webService = new RMWebServices(mockRM, new Configuration(),
mock(HttpServletResponse.class));
when(mockRM.getResourceScheduler()).thenReturn(scheduler);
when(acLsManager.areACLsEnabled()).thenReturn(false);
when(mockRM.getApplicationACLsManager()).thenReturn(acLsManager);
RMContext context = TestCapacitySchedulerConfigValidator.prepareRMContext();
when(mockRM.getRMContext()).thenReturn(context);
return webService;
}
}