YARN-10022. Add RM Rest API to validate a CapacityScheduler Config with delta change
Contributed by Kinga Marton.
(cherry-picked from commit 1ab9c692fa
)
This commit is contained in:
parent
e89436b183
commit
7136ebbb7a
|
@ -51,6 +51,16 @@ public interface MutableConfigurationProvider {
|
||||||
void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
|
void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
|
||||||
confUpdate) throws Exception;
|
confUpdate) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply the changes on top of the actual configuration.
|
||||||
|
* @param oldConfiguration actual configuration
|
||||||
|
* @param confUpdate changelist
|
||||||
|
* @return new configuration with the applied changed
|
||||||
|
* @throws IOException if the merge failed
|
||||||
|
*/
|
||||||
|
Configuration applyChanges(Configuration oldConfiguration,
|
||||||
|
SchedConfUpdateInfo confUpdate) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Confirm last logged mutation.
|
* Confirm last logged mutation.
|
||||||
* @param isValid if the last logged mutation is applied to scheduler
|
* @param isValid if the last logged mutation is applied to scheduler
|
||||||
|
|
|
@ -209,40 +209,9 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
private void validateConf(Configuration conf) {
|
private void validateConf(Configuration conf) {
|
||||||
// validate scheduler memory allocation setting
|
// validate scheduler memory allocation setting
|
||||||
int minMem = conf.getInt(
|
CapacitySchedulerConfigValidator.validateMemoryAllocation(conf);
|
||||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
|
||||||
int maxMem = conf.getInt(
|
|
||||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
|
||||||
|
|
||||||
if (minMem <= 0 || minMem > maxMem) {
|
|
||||||
throw new YarnRuntimeException("Invalid resource scheduler memory"
|
|
||||||
+ " allocation configuration"
|
|
||||||
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
|
|
||||||
+ "=" + minMem
|
|
||||||
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
|
|
||||||
+ "=" + maxMem + ", min and max should be greater than 0"
|
|
||||||
+ ", max should be no smaller than min.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// validate scheduler vcores allocation setting
|
// validate scheduler vcores allocation setting
|
||||||
int minVcores = conf.getInt(
|
CapacitySchedulerConfigValidator.validateVCores(conf);
|
||||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
|
||||||
int maxVcores = conf.getInt(
|
|
||||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
|
||||||
|
|
||||||
if (minVcores <= 0 || minVcores > maxVcores) {
|
|
||||||
throw new YarnRuntimeException("Invalid resource scheduler vcores"
|
|
||||||
+ " allocation configuration"
|
|
||||||
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
|
|
||||||
+ "=" + minVcores
|
|
||||||
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
|
|
||||||
+ "=" + maxVcores + ", min and max should be greater than 0"
|
|
||||||
+ ", max should be no smaller than min.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -482,14 +451,18 @@ public class CapacityScheduler extends
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public void reinitialize(Configuration newConf, RMContext rmContext,
|
||||||
public void reinitialize(Configuration newConf, RMContext rmContext)
|
boolean validation) throws IOException {
|
||||||
throws IOException {
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
Configuration configuration = new Configuration(newConf);
|
Configuration configuration = new Configuration(newConf);
|
||||||
CapacitySchedulerConfiguration oldConf = this.conf;
|
CapacitySchedulerConfiguration oldConf = this.conf;
|
||||||
this.conf = csConfProvider.loadConfiguration(configuration);
|
if (validation) {
|
||||||
|
this.conf = new CapacitySchedulerConfiguration(newConf, false);
|
||||||
|
} else {
|
||||||
|
this.conf = csConfProvider.loadConfiguration(configuration);
|
||||||
|
}
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
try {
|
try {
|
||||||
LOG.info("Re-initializing queues...");
|
LOG.info("Re-initializing queues...");
|
||||||
|
@ -503,17 +476,26 @@ public class CapacityScheduler extends
|
||||||
throw new IOException("Failed to re-init queues : " + t.getMessage(),
|
throw new IOException("Failed to re-init queues : " + t.getMessage(),
|
||||||
t);
|
t);
|
||||||
}
|
}
|
||||||
|
if (!validation) {
|
||||||
|
|
||||||
// update lazy preemption
|
// update lazy preemption
|
||||||
this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
|
this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
|
||||||
|
|
||||||
// Setup how many containers we can allocate for each round
|
// Setup how many containers we can allocate for each round
|
||||||
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
|
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
|
||||||
|
|
||||||
super.reinitialize(newConf, rmContext);
|
super.reinitialize(newConf, rmContext);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reinitialize(Configuration newConf, RMContext rmContext)
|
||||||
|
throws IOException {
|
||||||
|
reinitialize(newConf, rmContext, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
long getAsyncScheduleInterval() {
|
long getAsyncScheduleInterval() {
|
||||||
|
@ -717,19 +699,13 @@ public class CapacityScheduler extends
|
||||||
Collection<String> placementRuleStrs = conf.getStringCollection(
|
Collection<String> placementRuleStrs = conf.getStringCollection(
|
||||||
YarnConfiguration.QUEUE_PLACEMENT_RULES);
|
YarnConfiguration.QUEUE_PLACEMENT_RULES);
|
||||||
List<PlacementRule> placementRules = new ArrayList<>();
|
List<PlacementRule> placementRules = new ArrayList<>();
|
||||||
Set<String> distingushRuleSet = new HashSet<>();
|
Set<String> distinguishRuleSet = CapacitySchedulerConfigValidator
|
||||||
// fail the case if we get duplicate placementRule add in
|
.validatePlacementRules(placementRuleStrs);
|
||||||
for (String pls : placementRuleStrs) {
|
|
||||||
if (!distingushRuleSet.add(pls)) {
|
|
||||||
throw new IOException("Invalid PlacementRule inputs which "
|
|
||||||
+ "contains duplicate rule strings");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// add UserGroupMappingPlacementRule if absent
|
// add UserGroupMappingPlacementRule if absent
|
||||||
distingushRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
|
distinguishRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
|
||||||
|
|
||||||
placementRuleStrs = new ArrayList<>(distingushRuleSet);
|
placementRuleStrs = new ArrayList<>(distinguishRuleSet);
|
||||||
|
|
||||||
for (String placementRuleStr : placementRuleStrs) {
|
for (String placementRuleStr : placementRuleStrs) {
|
||||||
switch (placementRuleStr) {
|
switch (placementRuleStr) {
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public final class CapacitySchedulerConfigValidator {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
CapacitySchedulerConfigValidator.class);
|
||||||
|
|
||||||
|
private CapacitySchedulerConfigValidator() {
|
||||||
|
throw new IllegalStateException("Utility class");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean validateCSConfiguration(
|
||||||
|
final Configuration oldConf, final Configuration newConf,
|
||||||
|
final RMContext rmContext) throws IOException {
|
||||||
|
//TODO: extract all the validation steps and replace reinitialize with
|
||||||
|
//the specific validation steps
|
||||||
|
CapacityScheduler newCs = new CapacityScheduler();
|
||||||
|
newCs.setConf(oldConf);
|
||||||
|
newCs.setRMContext(rmContext);
|
||||||
|
newCs.init(oldConf);
|
||||||
|
newCs.reinitialize(newConf, rmContext, true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Set<String> validatePlacementRules(
|
||||||
|
Collection<String> placementRuleStrs) throws IOException {
|
||||||
|
Set<String> distinguishRuleSet = new HashSet<>();
|
||||||
|
// fail the case if we get duplicate placementRule add in
|
||||||
|
for (String pls : placementRuleStrs) {
|
||||||
|
if (!distinguishRuleSet.add(pls)) {
|
||||||
|
throw new IOException("Invalid PlacementRule inputs which "
|
||||||
|
+ "contains duplicate rule strings");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return distinguishRuleSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void validateMemoryAllocation(Configuration conf) {
|
||||||
|
int minMem = conf.getInt(
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
||||||
|
int maxMem = conf.getInt(
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
|
||||||
|
if (minMem <= 0 || minMem > maxMem) {
|
||||||
|
throw new YarnRuntimeException("Invalid resource scheduler memory"
|
||||||
|
+ " allocation configuration"
|
||||||
|
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
|
||||||
|
+ "=" + minMem
|
||||||
|
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
|
||||||
|
+ "=" + maxMem + ", min and max should be greater than 0"
|
||||||
|
+ ", max should be no smaller than min.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static void validateVCores(Configuration conf) {
|
||||||
|
int minVcores = conf.getInt(
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||||
|
int maxVcores = conf.getInt(
|
||||||
|
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||||
|
|
||||||
|
if (minVcores <= 0 || minVcores > maxVcores) {
|
||||||
|
throw new YarnRuntimeException("Invalid resource scheduler vcores"
|
||||||
|
+ " allocation configuration"
|
||||||
|
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
|
||||||
|
+ "=" + minVcores
|
||||||
|
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
|
||||||
|
+ "=" + maxVcores + ", min and max should be greater than 0"
|
||||||
|
+ ", max should be no smaller than min.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure all existing queues are present. Queues cannot be deleted if its not
|
||||||
|
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
|
||||||
|
* Previous child queue could be converted into parent queue if it is in
|
||||||
|
* STOPPED state.
|
||||||
|
*
|
||||||
|
* @param queues existing queues
|
||||||
|
* @param newQueues new queues
|
||||||
|
*/
|
||||||
|
public static void validateQueueHierarchy(Map<String, CSQueue> queues,
|
||||||
|
Map<String, CSQueue> newQueues, CapacitySchedulerConfiguration newConf)
|
||||||
|
throws IOException {
|
||||||
|
// check that all static queues are included in the newQueues list
|
||||||
|
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
|
||||||
|
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
|
||||||
|
.getClass()))) {
|
||||||
|
String queueName = e.getKey();
|
||||||
|
CSQueue oldQueue = e.getValue();
|
||||||
|
CSQueue newQueue = newQueues.get(queueName);
|
||||||
|
if (null == newQueue) {
|
||||||
|
// old queue doesn't exist in the new XML
|
||||||
|
String configPrefix = newConf.getQueuePrefix(
|
||||||
|
oldQueue.getQueuePath());
|
||||||
|
QueueState newQueueState = null;
|
||||||
|
try {
|
||||||
|
newQueueState = QueueState.valueOf(
|
||||||
|
newConf.get(configPrefix + "state"));
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Not a valid queue state for queue "
|
||||||
|
+ oldQueue.getQueuePath());
|
||||||
|
}
|
||||||
|
if (oldQueue.getState() == QueueState.STOPPED ||
|
||||||
|
newQueueState == QueueState.STOPPED) {
|
||||||
|
LOG.info("Deleting Queue " + queueName + ", as it is not"
|
||||||
|
+ " present in the modified capacity configuration xml");
|
||||||
|
} else{
|
||||||
|
throw new IOException(oldQueue.getQueuePath() + " cannot be"
|
||||||
|
+ " deleted from the capacity scheduler configuration, "
|
||||||
|
+ "as the queue is not yet in stopped state. "
|
||||||
|
+ "Current State : " + oldQueue.getState());
|
||||||
|
}
|
||||||
|
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
|
||||||
|
//Queue's cannot be moved from one hierarchy to other
|
||||||
|
throw new IOException(
|
||||||
|
queueName + " is moved from:" + oldQueue.getQueuePath()
|
||||||
|
+ " to:" + newQueue.getQueuePath()
|
||||||
|
+ " after refresh, which is not allowed.");
|
||||||
|
} else if (oldQueue instanceof ParentQueue
|
||||||
|
&& !(oldQueue instanceof ManagedParentQueue)
|
||||||
|
&& newQueue instanceof ManagedParentQueue) {
|
||||||
|
throw new IOException(
|
||||||
|
"Can not convert parent queue: " + oldQueue.getQueuePath()
|
||||||
|
+ " to auto create enabled parent queue since "
|
||||||
|
+ "it could have other pre-configured queues which is"
|
||||||
|
+ " not supported");
|
||||||
|
} else if (oldQueue instanceof ManagedParentQueue
|
||||||
|
&& !(newQueue instanceof ManagedParentQueue)) {
|
||||||
|
throw new IOException(
|
||||||
|
"Cannot convert auto create enabled parent queue: " + oldQueue
|
||||||
|
.getQueuePath() + " to leaf queue. Please check "
|
||||||
|
+ " parent queue's configuration "
|
||||||
|
+ CapacitySchedulerConfiguration
|
||||||
|
.AUTO_CREATE_CHILD_QUEUE_ENABLED
|
||||||
|
+ " is set to true");
|
||||||
|
} else if (oldQueue instanceof LeafQueue
|
||||||
|
&& newQueue instanceof ParentQueue) {
|
||||||
|
if (oldQueue.getState() == QueueState.STOPPED) {
|
||||||
|
LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
|
||||||
|
+ " to parent queue.");
|
||||||
|
} else{
|
||||||
|
throw new IOException(
|
||||||
|
"Can not convert the leaf queue: " + oldQueue.getQueuePath()
|
||||||
|
+ " to parent queue since "
|
||||||
|
+ "it is not yet in stopped state. Current State : "
|
||||||
|
+ oldQueue.getState());
|
||||||
|
}
|
||||||
|
} else if (oldQueue instanceof ParentQueue
|
||||||
|
&& newQueue instanceof LeafQueue) {
|
||||||
|
LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
|
||||||
|
+ " to leaf queue.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.Permission;
|
import org.apache.hadoop.yarn.security.Permission;
|
||||||
|
@ -177,7 +176,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
csContext.getRMContext().getHAServiceState()
|
csContext.getRMContext().getHAServiceState()
|
||||||
!= HAServiceProtocol.HAServiceState.STANDBY) {
|
!= HAServiceProtocol.HAServiceState.STANDBY) {
|
||||||
// Ensure queue hierarchy in the new XML file is proper.
|
// Ensure queue hierarchy in the new XML file is proper.
|
||||||
validateQueueHierarchy(queues, newQueues);
|
CapacitySchedulerConfigValidator
|
||||||
|
.validateQueueHierarchy(queues, newQueues, newConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add new queues and delete OldQeueus only after validation.
|
// Add new queues and delete OldQeueus only after validation.
|
||||||
|
@ -299,78 +299,6 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Ensure all existing queues are present. Queues cannot be deleted if its not
|
|
||||||
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
|
|
||||||
* Previous child queue could be converted into parent queue if it is in
|
|
||||||
* STOPPED state.
|
|
||||||
*
|
|
||||||
* @param queues existing queues
|
|
||||||
* @param newQueues new queues
|
|
||||||
*/
|
|
||||||
private void validateQueueHierarchy(Map<String, CSQueue> queues,
|
|
||||||
Map<String, CSQueue> newQueues) throws IOException {
|
|
||||||
// check that all static queues are included in the newQueues list
|
|
||||||
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
|
|
||||||
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
|
|
||||||
.getClass()))) {
|
|
||||||
String queueName = e.getKey();
|
|
||||||
CSQueue oldQueue = e.getValue();
|
|
||||||
CSQueue newQueue = newQueues.get(queueName);
|
|
||||||
if (null == newQueue) {
|
|
||||||
// old queue doesn't exist in the new XML
|
|
||||||
if (oldQueue.getState() == QueueState.STOPPED) {
|
|
||||||
LOG.info("Deleting Queue " + queueName + ", as it is not"
|
|
||||||
+ " present in the modified capacity configuration xml");
|
|
||||||
} else{
|
|
||||||
throw new IOException(oldQueue.getQueuePath() + " is deleted from"
|
|
||||||
+ " the new capacity scheduler configuration, but the"
|
|
||||||
+ " queue is not yet in stopped state. " + "Current State : "
|
|
||||||
+ oldQueue.getState());
|
|
||||||
}
|
|
||||||
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
|
|
||||||
//Queue's cannot be moved from one hierarchy to other
|
|
||||||
throw new IOException(
|
|
||||||
queueName + " is moved from:" + oldQueue.getQueuePath() + " to:"
|
|
||||||
+ newQueue.getQueuePath()
|
|
||||||
+ " after refresh, which is not allowed.");
|
|
||||||
} else if (oldQueue instanceof ParentQueue
|
|
||||||
&& !(oldQueue instanceof ManagedParentQueue)
|
|
||||||
&& newQueue instanceof ManagedParentQueue) {
|
|
||||||
throw new IOException(
|
|
||||||
"Can not convert parent queue: " + oldQueue.getQueuePath()
|
|
||||||
+ " to auto create enabled parent queue since "
|
|
||||||
+ "it could have other pre-configured queues which is not "
|
|
||||||
+ "supported");
|
|
||||||
} else if (oldQueue instanceof ManagedParentQueue
|
|
||||||
&& !(newQueue instanceof ManagedParentQueue)) {
|
|
||||||
throw new IOException(
|
|
||||||
"Cannot convert auto create enabled parent queue: " + oldQueue
|
|
||||||
.getQueuePath() + " to leaf queue. Please check "
|
|
||||||
+ " parent queue's configuration "
|
|
||||||
+ CapacitySchedulerConfiguration
|
|
||||||
.AUTO_CREATE_CHILD_QUEUE_ENABLED
|
|
||||||
+ " is set to true");
|
|
||||||
} else if (oldQueue instanceof LeafQueue
|
|
||||||
&& newQueue instanceof ParentQueue) {
|
|
||||||
if (oldQueue.getState() == QueueState.STOPPED) {
|
|
||||||
LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
|
|
||||||
+ " to parent queue.");
|
|
||||||
} else{
|
|
||||||
throw new IOException(
|
|
||||||
"Can not convert the leaf queue: " + oldQueue.getQueuePath()
|
|
||||||
+ " to parent queue since "
|
|
||||||
+ "it is not yet in stopped state. Current State : "
|
|
||||||
+ oldQueue.getState());
|
|
||||||
}
|
|
||||||
} else if (oldQueue instanceof ParentQueue
|
|
||||||
&& newQueue instanceof LeafQueue) {
|
|
||||||
LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
|
|
||||||
+ " to leaf queue.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates to our list of queues: Adds the new queues and deletes the removed
|
* Updates to our list of queues: Adds the new queues and deletes the removed
|
||||||
|
|
|
@ -131,14 +131,32 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
public void logAndApplyMutation(UserGroupInformation user,
|
public void logAndApplyMutation(UserGroupInformation user,
|
||||||
SchedConfUpdateInfo confUpdate) throws Exception {
|
SchedConfUpdateInfo confUpdate) throws Exception {
|
||||||
oldConf = new Configuration(schedConf);
|
oldConf = new Configuration(schedConf);
|
||||||
Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
|
CapacitySchedulerConfiguration proposedConf =
|
||||||
|
new CapacitySchedulerConfiguration(schedConf, false);
|
||||||
|
Map<String, String> kvUpdate
|
||||||
|
= constructKeyValueConfUpdate(proposedConf, confUpdate);
|
||||||
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
|
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
|
||||||
confStore.logMutation(log);
|
confStore.logMutation(log);
|
||||||
|
applyMutation(proposedConf, kvUpdate);
|
||||||
|
schedConf = proposedConf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Configuration applyChanges(Configuration oldConfiguration,
|
||||||
|
SchedConfUpdateInfo confUpdate) throws IOException {
|
||||||
|
CapacitySchedulerConfiguration proposedConf =
|
||||||
|
new CapacitySchedulerConfiguration(oldConfiguration, false);
|
||||||
|
Map<String, String> kvUpdate
|
||||||
|
= constructKeyValueConfUpdate(proposedConf, confUpdate);
|
||||||
|
applyMutation(proposedConf, kvUpdate);
|
||||||
|
return proposedConf;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void applyMutation(Configuration conf, Map<String, String> kvUpdate) {
|
||||||
for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
|
for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
|
||||||
if (kv.getValue() == null) {
|
if (kv.getValue() == null) {
|
||||||
schedConf.unset(kv.getKey());
|
conf.unset(kv.getKey());
|
||||||
} else {
|
} else {
|
||||||
schedConf.set(kv.getKey(), kv.getValue());
|
conf.set(kv.getKey(), kv.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -215,9 +233,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, String> constructKeyValueConfUpdate(
|
private Map<String, String> constructKeyValueConfUpdate(
|
||||||
|
CapacitySchedulerConfiguration proposedConf,
|
||||||
SchedConfUpdateInfo mutationInfo) throws IOException {
|
SchedConfUpdateInfo mutationInfo) throws IOException {
|
||||||
CapacitySchedulerConfiguration proposedConf =
|
|
||||||
new CapacitySchedulerConfiguration(schedConf, false);
|
|
||||||
Map<String, String> confUpdate = new HashMap<>();
|
Map<String, String> confUpdate = new HashMap<>();
|
||||||
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
|
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
|
||||||
removeQueue(queueToRemove, proposedConf, confUpdate);
|
removeQueue(queueToRemove, proposedConf, confUpdate);
|
||||||
|
|
|
@ -57,6 +57,12 @@ public final class RMWSConsts {
|
||||||
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
|
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
|
||||||
public static final String SCHEDULER_LOGS = "/scheduler/logs";
|
public static final String SCHEDULER_LOGS = "/scheduler/logs";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Path for {@code RMWebServiceProtocol#validateAndGetSchedulerConfiguration}.
|
||||||
|
*/
|
||||||
|
public static final String SCHEDULER_CONF_VALIDATE
|
||||||
|
= "/scheduler-conf/validate";
|
||||||
|
|
||||||
/** Path for {@code RMWebServiceProtocol#getNodes}. */
|
/** Path for {@code RMWebServiceProtocol#getNodes}. */
|
||||||
public static final String NODES = "/nodes";
|
public static final String NODES = "/nodes";
|
||||||
|
|
||||||
|
|
|
@ -141,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigValidator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
@ -2370,6 +2371,53 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@POST
|
||||||
|
@Path(RMWSConsts.SCHEDULER_CONF_VALIDATE)
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
|
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
|
public synchronized Response validateAndGetSchedulerConfiguration(
|
||||||
|
SchedConfUpdateInfo mutationInfo,
|
||||||
|
@Context HttpServletRequest hsr) throws AuthorizationException {
|
||||||
|
// Only admin user is allowed to read scheduler conf,
|
||||||
|
// in order to avoid leaking sensitive info, such as ACLs
|
||||||
|
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||||
|
initForWritableEndpoints(callerUGI, true);
|
||||||
|
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||||
|
if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
|
||||||
|
scheduler).isConfigurationMutable()) {
|
||||||
|
try {
|
||||||
|
MutableConfigurationProvider mutableConfigurationProvider =
|
||||||
|
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
||||||
|
Configuration schedulerConf = mutableConfigurationProvider
|
||||||
|
.getConfiguration();
|
||||||
|
Configuration newConfig = mutableConfigurationProvider
|
||||||
|
.applyChanges(schedulerConf, mutationInfo);
|
||||||
|
Configuration yarnConf = ((CapacityScheduler) scheduler).getConf();
|
||||||
|
CapacitySchedulerConfigValidator.validateCSConfiguration(yarnConf,
|
||||||
|
newConfig, rm.getRMContext());
|
||||||
|
|
||||||
|
return Response.status(Status.OK)
|
||||||
|
.entity(new ConfInfo(newConfig))
|
||||||
|
.build();
|
||||||
|
} catch (Exception e) {
|
||||||
|
String errorMsg = "CapacityScheduler configuration validation failed:"
|
||||||
|
+ e.toString();
|
||||||
|
LOG.warn(errorMsg);
|
||||||
|
return Response.status(Status.BAD_REQUEST)
|
||||||
|
.entity(errorMsg)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
String errorMsg = "Configuration change validation only supported by " +
|
||||||
|
"MutableConfScheduler.";
|
||||||
|
LOG.warn(errorMsg);
|
||||||
|
return Response.status(Status.BAD_REQUEST)
|
||||||
|
.entity(errorMsg)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@PUT
|
@PUT
|
||||||
@Path(RMWSConsts.SCHEDULER_CONF)
|
@Path(RMWSConsts.SCHEDULER_CONF)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public final class CapacitySchedulerConfigGeneratorForTest {
|
||||||
|
|
||||||
|
private CapacitySchedulerConfigGeneratorForTest() {
|
||||||
|
throw new IllegalStateException("Utility class");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Configuration createConfiguration(Map<String, String> configs) {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
for (Map.Entry entry: configs.entrySet()) {
|
||||||
|
config.set((String)entry.getKey(), (String)entry.getValue());
|
||||||
|
}
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Configuration createBasicCSConfiguration() {
|
||||||
|
Map<String, String> conf = new HashMap<>();
|
||||||
|
conf.put("yarn.scheduler.capacity.root.queues", "test1, test2");
|
||||||
|
conf.put("yarn.scheduler.capacity.root.test1.capacity", "50");
|
||||||
|
conf.put("yarn.scheduler.capacity.root.test2.capacity", "50");
|
||||||
|
conf.put("yarn.scheduler.capacity.root.test1.maximum-capacity", "100");
|
||||||
|
conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING");
|
||||||
|
conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING");
|
||||||
|
conf.put("yarn.scheduler.capacity.queue-mappings",
|
||||||
|
"u:test1:test1,u:test2:test2");
|
||||||
|
return createConfiguration(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,366 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
public class TestCapacitySchedulerConfigValidator {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the case when the scheduler.minimum-allocation-mb == 0.
|
||||||
|
*/
|
||||||
|
@Test (expected = YarnRuntimeException.class)
|
||||||
|
public void testValidateMemoryAllocationInvalidMinMem() {
|
||||||
|
Map<String, String> configs = new HashMap();
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "0");
|
||||||
|
Configuration config = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createConfiguration(configs);
|
||||||
|
CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
|
||||||
|
fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB +
|
||||||
|
" should be > 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the case when the scheduler.minimum-allocation-mb is greater than
|
||||||
|
* scheduler.maximum-allocation-mb.
|
||||||
|
*/
|
||||||
|
@Test (expected = YarnRuntimeException.class)
|
||||||
|
public void testValidateMemoryAllocationHIgherMinThanMaxMem() {
|
||||||
|
Map<String, String> configs = new HashMap();
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "8192");
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "1024");
|
||||||
|
Configuration config = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createConfiguration(configs);
|
||||||
|
CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
|
||||||
|
fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + " should be > "
|
||||||
|
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateMemoryAllocation() {
|
||||||
|
Map<String, String> configs = new HashMap();
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "1024");
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "8192");
|
||||||
|
Configuration config = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createConfiguration(configs);
|
||||||
|
// there is no need for assertion, since there is no further method call
|
||||||
|
// inside the tested code and in case of a valid configuration no exception
|
||||||
|
// is thrown
|
||||||
|
CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the case when the scheduler.minimum-allocation-vcores == 0.
|
||||||
|
*/
|
||||||
|
@Test (expected = YarnRuntimeException.class)
|
||||||
|
public void testValidateVCoresInvalidMinVCore() {
|
||||||
|
Map<String, String> configs = new HashMap();
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "0");
|
||||||
|
Configuration config = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createConfiguration(configs);
|
||||||
|
CapacitySchedulerConfigValidator.validateVCores(config);
|
||||||
|
fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
|
||||||
|
+ " should be > 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the case when the scheduler.minimum-allocation-vcores is greater
|
||||||
|
* than scheduler.maximum-allocation-vcores.
|
||||||
|
*/
|
||||||
|
@Test (expected = YarnRuntimeException.class)
|
||||||
|
public void testValidateVCoresHigherMinThanMaxVCore() {
|
||||||
|
Map<String, String> configs = new HashMap();
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4");
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "1");
|
||||||
|
Configuration config = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createConfiguration(configs);
|
||||||
|
CapacitySchedulerConfigValidator.validateVCores(config);
|
||||||
|
fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES +
|
||||||
|
" should be > "
|
||||||
|
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateVCores() {
|
||||||
|
Map<String, String> configs = new HashMap();
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "1");
|
||||||
|
configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "4");
|
||||||
|
Configuration config = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createConfiguration(configs);
|
||||||
|
// there is no need for assertion, since there is no further method call
|
||||||
|
// inside the tested code and in case of a valid configuration no exception
|
||||||
|
// is thrown
|
||||||
|
CapacitySchedulerConfigValidator.validateVCores(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigInvalidCapacity() {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test1.capacity", "500");
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
try {
|
||||||
|
CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
fail("Invalid capacity");
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.assertTrue(e.getCause().getMessage()
|
||||||
|
.startsWith("Illegal capacity"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigStopALeafQueue() throws IOException {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
Boolean isValidConfig = CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
Assert.assertTrue(isValidConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the root queue if there are running child queues.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigStopANonLeafQueueInvalid() {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.state", "STOPPED");
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
try {
|
||||||
|
CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
fail("There are child queues in running state");
|
||||||
|
} catch (IOException e) {
|
||||||
|
// TODO: change the assertion message in case of backporting YARN-8678
|
||||||
|
Assert.assertTrue(e.getCause().getMessage()
|
||||||
|
.contains("The parent queue:root state is STOPPED,"
|
||||||
|
+ " child queue:test1 state cannot be RUNNING"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigStopANonLeafQueue() throws IOException {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.state", "STOPPED");
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
Boolean isValidConfig = CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
Assert.assertTrue(isValidConfig);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a leaf queue without modifying the capacity of other leaf queues
|
||||||
|
* so the total capacity != 100.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigAddALeafQueueInvalid() {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test3.capacity", "30");
|
||||||
|
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
try {
|
||||||
|
CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
fail("Invalid capacity for children of queue root");
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.assertTrue(e.getCause().getMessage()
|
||||||
|
.startsWith("Illegal capacity"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a leaf queue by modifying the capacity of other leaf queues
|
||||||
|
* and adjust the capacities of other leaf queues, so total capacity = 100.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigAddALeafQueueValid() throws IOException {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test3.capacity", "30");
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test1.capacity", "20");
|
||||||
|
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
Boolean isValidConfig = CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
Assert.assertTrue(isValidConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete a running queue.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigInvalidQueueDeletion() {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
|
||||||
|
newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100");
|
||||||
|
newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
|
||||||
|
newConfig.unset("yarn.scheduler.capacity.root.test2.state");
|
||||||
|
newConfig.set("yarn.scheduler.capacity.queue-mappings",
|
||||||
|
"u:test1:test1");
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
try {
|
||||||
|
CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
fail("Invalid capacity for children of queue root");
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.assertTrue(e.getCause().getMessage()
|
||||||
|
.contains("root.test2 cannot be deleted"));
|
||||||
|
Assert.assertTrue(e.getCause().getMessage()
|
||||||
|
.contains("the queue is not yet in stopped state"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete a queue and not adjust capacities.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigInvalidQueueDeletion2() {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
|
||||||
|
newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
|
||||||
|
newConfig.unset("yarn.scheduler.capacity.root.test2.state");
|
||||||
|
newConfig.set("yarn.scheduler.capacity.queue-mappings",
|
||||||
|
"u:test1:test1");
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
try {
|
||||||
|
CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
fail("Invalid capacity for children of queue root");
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.assertTrue(e.getCause().getMessage()
|
||||||
|
.contains("Illegal capacity"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete a queue and adjust capacities to have total capacity = 100.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateCSConfigValidQueueDeletion() throws IOException {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
oldConfig.set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
|
||||||
|
newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100");
|
||||||
|
newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
|
||||||
|
newConfig.unset("yarn.scheduler.capacity.root.test2.state");
|
||||||
|
newConfig.set("yarn.scheduler.capacity.queue-mappings",
|
||||||
|
"u:test1:test1");
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
boolean isValidConfig = CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
Assert.assertTrue(isValidConfig);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddQueueToALeafQueue() throws IOException {
|
||||||
|
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
oldConfig.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
|
||||||
|
Configuration newConfig = new Configuration(oldConfig);
|
||||||
|
newConfig.set("yarn.scheduler.capacity.root.test1.queues", "newQueue");
|
||||||
|
newConfig
|
||||||
|
.set("yarn.scheduler.capacity.root.test1.newQueue.capacity", "100");
|
||||||
|
newConfig.set("yarn.scheduler.capacity.queue-mappings",
|
||||||
|
"u:test1:test2");
|
||||||
|
RMContext rmContext = prepareRMContext();
|
||||||
|
boolean isValidConfig = CapacitySchedulerConfigValidator
|
||||||
|
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||||
|
Assert.assertTrue(isValidConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static RMContext prepareRMContext() {
|
||||||
|
RMContext rmContext = Mockito.mock(RMContext.class);
|
||||||
|
LocalConfigurationProvider configProvider = Mockito
|
||||||
|
.mock(LocalConfigurationProvider.class);
|
||||||
|
Mockito.when(rmContext.getConfigurationProvider())
|
||||||
|
.thenReturn(configProvider);
|
||||||
|
RMNodeLabelsManager nodeLabelsManager = Mockito
|
||||||
|
.mock(RMNodeLabelsManager.class);
|
||||||
|
Mockito.when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager);
|
||||||
|
LightWeightResource partitionResource = Mockito
|
||||||
|
.mock(LightWeightResource.class);
|
||||||
|
Mockito.when(nodeLabelsManager
|
||||||
|
.getResourceByLabel(Mockito.any(), Mockito.any()))
|
||||||
|
.thenReturn(partitionResource);
|
||||||
|
PlacementManager queuePlacementManager = Mockito
|
||||||
|
.mock(PlacementManager.class);
|
||||||
|
Mockito.when(rmContext.getQueuePlacementManager())
|
||||||
|
.thenReturn(queuePlacementManager);
|
||||||
|
return rmContext;
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,15 +22,20 @@ import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseS
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
import java.security.Principal;
|
import java.security.Principal;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
@ -45,6 +50,7 @@ import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.http.JettyUtils;
|
import org.apache.hadoop.http.JettyUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||||
|
@ -59,6 +65,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerConfigValidator;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
|
||||||
|
@ -73,6 +82,8 @@ import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||||
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
|
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
import org.codehaus.jettison.json.JSONException;
|
import org.codehaus.jettison.json.JSONException;
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -884,4 +895,112 @@ public class TestRMWebServices extends JerseyTestBase {
|
||||||
assertEquals("requestedUser doesn't match: ",
|
assertEquals("requestedUser doesn't match: ",
|
||||||
requestedUser, userInfo.getRequestedUser());
|
requestedUser, userInfo.getRequestedUser());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateAndGetSchedulerConfigurationInvalidScheduler()
|
||||||
|
throws AuthorizationException {
|
||||||
|
ResourceScheduler scheduler = new CapacityScheduler();
|
||||||
|
RMWebServices webService = prepareWebServiceForValidation(scheduler);
|
||||||
|
SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
|
||||||
|
HttpServletRequest mockHsr = prepareServletRequestForValidation();
|
||||||
|
Response response = webService
|
||||||
|
.validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
|
||||||
|
Assert.assertEquals(Status.BAD_REQUEST
|
||||||
|
.getStatusCode(), response.getStatus());
|
||||||
|
Assert.assertTrue(response.getEntity().toString()
|
||||||
|
.contains("Configuration change validation only supported by"
|
||||||
|
+" MutableConfScheduler."));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateAndGetSchedulerConfigurationInvalidConfig()
|
||||||
|
throws IOException {
|
||||||
|
Configuration config = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
ResourceScheduler scheduler = prepareCSForValidation(config);
|
||||||
|
|
||||||
|
SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
|
||||||
|
ArrayList<String> queuesToRemove = new ArrayList();
|
||||||
|
queuesToRemove.add("root.test1");
|
||||||
|
mutationInfo.setRemoveQueueInfo(queuesToRemove);
|
||||||
|
|
||||||
|
RMWebServices webService = prepareWebServiceForValidation(scheduler);
|
||||||
|
HttpServletRequest mockHsr = prepareServletRequestForValidation();
|
||||||
|
|
||||||
|
Response response = webService
|
||||||
|
.validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
|
||||||
|
Assert.assertEquals(Status.BAD_REQUEST
|
||||||
|
.getStatusCode(), response.getStatus());
|
||||||
|
Assert.assertTrue(response.getEntity().toString()
|
||||||
|
.contains("Illegal capacity of 0.5 for children of queue"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateAndGetSchedulerConfigurationValidScheduler()
|
||||||
|
throws IOException {
|
||||||
|
Configuration config = CapacitySchedulerConfigGeneratorForTest
|
||||||
|
.createBasicCSConfiguration();
|
||||||
|
config.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
|
||||||
|
config.set("yarn.scheduler.capacity.queue-mappings",
|
||||||
|
"u:test2:test2");
|
||||||
|
ResourceScheduler scheduler = prepareCSForValidation(config);
|
||||||
|
|
||||||
|
SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
|
||||||
|
ArrayList<String> queuesToRemove = new ArrayList();
|
||||||
|
queuesToRemove.add("root.test1");
|
||||||
|
mutationInfo.setRemoveQueueInfo(queuesToRemove);
|
||||||
|
ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
|
||||||
|
String queueToUpdate = "root.test2";
|
||||||
|
Map<String, String> propertiesToUpdate = new HashMap<>();
|
||||||
|
propertiesToUpdate.put("capacity", "100");
|
||||||
|
updateQueueInfo.add(new QueueConfigInfo(queueToUpdate, propertiesToUpdate));
|
||||||
|
mutationInfo.setUpdateQueueInfo(updateQueueInfo);
|
||||||
|
|
||||||
|
RMWebServices webService = prepareWebServiceForValidation(scheduler);
|
||||||
|
HttpServletRequest mockHsr = prepareServletRequestForValidation();
|
||||||
|
|
||||||
|
Response response = webService
|
||||||
|
.validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
|
||||||
|
Assert.assertEquals(Status.OK
|
||||||
|
.getStatusCode(), response.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
private CapacityScheduler prepareCSForValidation(Configuration config)
|
||||||
|
throws IOException {
|
||||||
|
CapacityScheduler scheduler = mock(CapacityScheduler.class);
|
||||||
|
when(scheduler.isConfigurationMutable())
|
||||||
|
.thenReturn(true);
|
||||||
|
MutableCSConfigurationProvider configurationProvider =
|
||||||
|
mock(MutableCSConfigurationProvider.class);
|
||||||
|
when(scheduler.getMutableConfProvider())
|
||||||
|
.thenReturn(configurationProvider);
|
||||||
|
|
||||||
|
when(configurationProvider.getConfiguration()).thenReturn(config);
|
||||||
|
when(scheduler.getConf()).thenReturn(config);
|
||||||
|
when(configurationProvider
|
||||||
|
.applyChanges(any(), any())).thenCallRealMethod();
|
||||||
|
return scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
private HttpServletRequest prepareServletRequestForValidation() {
|
||||||
|
HttpServletRequest mockHsr = mock(HttpServletRequest.class);
|
||||||
|
when(mockHsr.getUserPrincipal()).thenReturn(() -> "yarn");
|
||||||
|
return mockHsr;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RMWebServices prepareWebServiceForValidation(
|
||||||
|
ResourceScheduler scheduler) {
|
||||||
|
ResourceManager mockRM = mock(ResourceManager.class);
|
||||||
|
ApplicationACLsManager acLsManager = mock(ApplicationACLsManager.class);
|
||||||
|
RMWebServices webService = new RMWebServices(mockRM, new Configuration(),
|
||||||
|
mock(HttpServletResponse.class));
|
||||||
|
when(mockRM.getResourceScheduler()).thenReturn(scheduler);
|
||||||
|
when(acLsManager.areACLsEnabled()).thenReturn(false);
|
||||||
|
when(mockRM.getApplicationACLsManager()).thenReturn(acLsManager);
|
||||||
|
RMContext context = TestCapacitySchedulerConfigValidator.prepareRMContext();
|
||||||
|
when(mockRM.getRMContext()).thenReturn(context);
|
||||||
|
|
||||||
|
return webService;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue