YARN-2932. Add entry for preemptable status (enabled/disabled) to scheduler web UI and queue initialize/refresh logging. (Eric Payne via wangda)
This commit is contained in:
parent
fd93e5387b
commit
18741adf97
|
@ -215,6 +215,10 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-3028. Better syntax for replaceLabelsOnNode in RMAdmin CLI
|
||||
(Rohith Sharmaks via wangda)
|
||||
|
||||
YARN-2932. Add entry for "preemptable" status (enabled/disabled) to
|
||||
scheduler web UI and queue initialize/refresh logging.
|
||||
(Eric Payne via wangda)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -116,9 +116,6 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
public static final String NATURAL_TERMINATION_FACTOR =
|
||||
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
|
||||
|
||||
public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity.";
|
||||
public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption";
|
||||
|
||||
// the dispatcher to send preempt and kill events
|
||||
public EventHandler<ContainerPreemptEvent> dispatcher;
|
||||
|
||||
|
@ -227,7 +224,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
// extract a summary of the queues from scheduler
|
||||
TempQueue tRoot;
|
||||
synchronized (scheduler) {
|
||||
tRoot = cloneQueues(root, clusterResources, false);
|
||||
tRoot = cloneQueues(root, clusterResources);
|
||||
}
|
||||
|
||||
// compute the ideal distribution of resources among queues
|
||||
|
@ -728,11 +725,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
*
|
||||
* @param root the root of the CapacityScheduler queue hierarchy
|
||||
* @param clusterResources the total amount of resources in the cluster
|
||||
* @param parentDisablePreempt true if disable preemption is set for parent
|
||||
* @return the root of the cloned queue hierarchy
|
||||
*/
|
||||
private TempQueue cloneQueues(CSQueue root, Resource clusterResources,
|
||||
boolean parentDisablePreempt) {
|
||||
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
|
||||
TempQueue ret;
|
||||
synchronized (root) {
|
||||
String queueName = root.getQueueName();
|
||||
|
@ -744,12 +739,6 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
Resource guaranteed = Resources.multiply(clusterResources, absCap);
|
||||
Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
|
||||
|
||||
boolean queueDisablePreemption = false;
|
||||
String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath()
|
||||
+ SUFFIX_DISABLE_PREEMPTION;
|
||||
queueDisablePreemption = scheduler.getConfiguration()
|
||||
.getBoolean(queuePropName, parentDisablePreempt);
|
||||
|
||||
Resource extra = Resource.newInstance(0, 0);
|
||||
if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) {
|
||||
extra = Resources.subtract(current, guaranteed);
|
||||
|
@ -759,7 +748,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
Resource pending = l.getTotalResourcePending();
|
||||
ret = new TempQueue(queueName, current, pending, guaranteed,
|
||||
maxCapacity);
|
||||
if (queueDisablePreemption) {
|
||||
if (root.getPreemptionDisabled()) {
|
||||
ret.untouchableExtra = extra;
|
||||
} else {
|
||||
ret.preemptableExtra = extra;
|
||||
|
@ -771,8 +760,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
maxCapacity);
|
||||
Resource childrensPreemptable = Resource.newInstance(0, 0);
|
||||
for (CSQueue c : root.getChildQueues()) {
|
||||
TempQueue subq =
|
||||
cloneQueues(c, clusterResources, queueDisablePreemption);
|
||||
TempQueue subq = cloneQueues(c, clusterResources);
|
||||
Resources.addTo(childrensPreemptable, subq.preemptableExtra);
|
||||
ret.addChild(subq);
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
|
|||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
|
@ -38,14 +39,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public abstract class AbstractCSQueue implements CSQueue {
|
||||
|
||||
CSQueue parent;
|
||||
final String queueName;
|
||||
|
||||
float capacity;
|
||||
float maximumCapacity;
|
||||
float absoluteCapacity;
|
||||
|
@ -74,10 +73,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
Map<QueueACL, AccessControlList> acls =
|
||||
new HashMap<QueueACL, AccessControlList>();
|
||||
boolean reservationsContinueLooking;
|
||||
|
||||
private boolean preemptionDisabled;
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
private CapacitySchedulerContext csContext;
|
||||
|
||||
public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
this.minimumAllocation = cs.getMinimumResourceCapability();
|
||||
|
@ -120,6 +121,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
maxCapacityByNodeLabels =
|
||||
cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
|
||||
accessibleLabels, labelManager);
|
||||
|
||||
this.csContext = cs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -318,6 +321,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels);
|
||||
|
||||
this.reservationsContinueLooking = reservationContinueLooking;
|
||||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
||||
}
|
||||
|
||||
protected QueueInfo getQueueInfo() {
|
||||
|
@ -454,4 +459,43 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
public Resource getUsedResourceByLabel(String nodeLabel) {
|
||||
return usedResourcesByNodeLabels.get(nodeLabel);
|
||||
}
|
||||
|
||||
@Private
|
||||
public boolean getPreemptionDisabled() {
|
||||
return preemptionDisabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* The specified queue is preemptable if system-wide preemption is turned on
|
||||
* unless any queue in the <em>qPath</em> hierarchy has explicitly turned
|
||||
* preemption off.
|
||||
* NOTE: Preemptability is inherited from a queue's parent.
|
||||
*
|
||||
* @return true if queue has preemption disabled, false otherwise
|
||||
*/
|
||||
private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) {
|
||||
CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
|
||||
boolean systemWidePreemption =
|
||||
csConf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
|
||||
CSQueue parentQ = q.getParent();
|
||||
|
||||
// If the system-wide preemption switch is turned off, all of the queues in
|
||||
// the qPath hierarchy have preemption disabled, so return true.
|
||||
if (!systemWidePreemption) return true;
|
||||
|
||||
// If q is the root queue and the system-wide preemption switch is turned
|
||||
// on, then q does not have preemption disabled (default=false, below)
|
||||
// unless the preemption_disabled property is explicitly set.
|
||||
if (parentQ == null) {
|
||||
return csConf.getPreemptionDisabled(q.getQueuePath(), false);
|
||||
}
|
||||
|
||||
// If this is not the root queue, inherit the default value for the
|
||||
// preemption_disabled property from the parent. Preemptability will be
|
||||
// inherited from the parent's hierarchy unless explicitly overridden at
|
||||
// this level.
|
||||
return csConf.getPreemptionDisabled(q.getQueuePath(),
|
||||
parentQ.getPreemptionDisabled());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -300,4 +300,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
|||
* @return capacity by node label
|
||||
*/
|
||||
public float getCapacityByNodeLabel(String nodeLabel);
|
||||
|
||||
/**
|
||||
* Check whether <em>disable_preemption</em> property is set for this queue
|
||||
* @return true if <em>disable_preemption</em> is set, false if not
|
||||
*/
|
||||
public boolean getPreemptionDisabled();
|
||||
}
|
||||
|
|
|
@ -180,6 +180,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
@Private
|
||||
public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
|
||||
|
||||
@Private
|
||||
public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";
|
||||
|
||||
@Private
|
||||
public static class QueueMapping {
|
||||
|
||||
|
@ -802,4 +805,32 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
DEFAULT_RESERVATION_ENFORCEMENT_WINDOW);
|
||||
return enforcementWindow;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the <em>disable_preemption</em> property in order to indicate
|
||||
* whether or not container preemption will be disabled for the specified
|
||||
* queue.
|
||||
*
|
||||
* @param queue queue path
|
||||
* @param preemptionDisabled true if preemption is disabled on queue
|
||||
*/
|
||||
public void setPreemptionDisabled(String queue, boolean preemptionDisabled) {
|
||||
setBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED,
|
||||
preemptionDisabled);
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether preemption is disabled on the specified queue.
|
||||
*
|
||||
* @param queue queue path to query
|
||||
* @param defaultVal used as default if the <em>disable_preemption</em>
|
||||
* is not set in the configuration
|
||||
* @return true if preemption is disabled on <em>queue</em>, false otherwise
|
||||
*/
|
||||
public boolean getPreemptionDisabled(String queue, boolean defaultVal) {
|
||||
boolean preemptionDisabled =
|
||||
getBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED,
|
||||
defaultVal);
|
||||
return preemptionDisabled;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -296,7 +296,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
"labels=" + labelStrBuilder.toString() + "\n" +
|
||||
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
|
||||
"reservationsContinueLooking = " +
|
||||
reservationsContinueLooking + "\n");
|
||||
reservationsContinueLooking + "\n" +
|
||||
"preemptionDisabled = " + getPreemptionDisabled() + "\n");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -121,7 +121,8 @@ class CapacitySchedulerPage extends RmView {
|
|||
_("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
|
||||
_("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())).
|
||||
_r("Active Users: ", activeUserList.toString()).
|
||||
_("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels()));
|
||||
_("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
|
||||
_("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled");
|
||||
|
||||
html._(InfoBlock.class);
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
protected float userLimitFactor;
|
||||
protected ResourceInfo aMResourceLimit;
|
||||
protected ResourceInfo userAMResourceLimit;
|
||||
protected boolean preemptionDisabled;
|
||||
|
||||
CapacitySchedulerLeafQueueInfo() {
|
||||
};
|
||||
|
@ -53,6 +54,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
userLimitFactor = q.getUserLimitFactor();
|
||||
aMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
|
||||
userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
|
||||
preemptionDisabled = q.getPreemptionDisabled();
|
||||
}
|
||||
|
||||
public int getNumActiveApplications() {
|
||||
|
@ -95,4 +97,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
public ResourceInfo getUserAMResourceLimit() {
|
||||
return userAMResourceLimit;
|
||||
}
|
||||
|
||||
public boolean getPreemptionDisabled() {
|
||||
return preemptionDisabled;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,12 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
|
||||
|
@ -52,6 +50,7 @@ import java.util.Map;
|
|||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.collections.map.HashedMap;
|
||||
|
@ -322,24 +321,22 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
{ 3, 0, 0, 0 }, // subqueues
|
||||
};
|
||||
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
||||
schedConf.setPreemptionDisabled("root.queueB", true);
|
||||
|
||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||
policy.editSchedule();
|
||||
// With PREEMPTION_DISABLED set for queueB, get resources from queueC
|
||||
// Since queueB is not preemptable, get resources from queueC
|
||||
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
||||
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
|
||||
|
||||
// With no PREEMPTION_DISABLED set for queueB, resources will be preempted
|
||||
// from both queueB and queueC. Test must be reset for so that the mDisp
|
||||
// Since queueB is preemptable, resources will be preempted
|
||||
// from both queueB and queueC. Test must be reset so that the mDisp
|
||||
// event handler will count only events from the following test and not the
|
||||
// previous one.
|
||||
setup();
|
||||
schedConf.setPreemptionDisabled("root.queueB", false);
|
||||
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
||||
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false);
|
||||
|
||||
policy2.editSchedule();
|
||||
|
||||
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
||||
|
@ -375,9 +372,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
|
||||
// Need to call setup() again to reset mDisp
|
||||
setup();
|
||||
// Disable preemption for queueB and it's children
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
||||
// Turn off preemption for queueB and it's children
|
||||
schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
||||
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
||||
policy2.editSchedule();
|
||||
ApplicationAttemptId expectedAttemptOnQueueC =
|
||||
|
@ -423,9 +419,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
|
||||
// Need to call setup() again to reset mDisp
|
||||
setup();
|
||||
// Disable preemption for queueB(appA)
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
||||
// Turn off preemption for queueB(appA)
|
||||
schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
||||
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
||||
policy2.editSchedule();
|
||||
// Now that queueB(appA) is not preemptable, verify that resources come
|
||||
|
@ -434,11 +429,9 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||
|
||||
setup();
|
||||
// Disable preemption for two of the 3 queues with over-capacity.
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true);
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
|
||||
// Turn off preemption for two of the 3 queues with over-capacity.
|
||||
schedConf.setPreemptionDisabled("root.queueD.queueE", true);
|
||||
schedConf.setPreemptionDisabled("root.queueA.queueB", true);
|
||||
ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
|
||||
policy3.editSchedule();
|
||||
|
||||
|
@ -476,11 +469,10 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||
verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
|
||||
|
||||
// Disable preemption for queueA and it's children. queueF(appC)'s request
|
||||
// Turn off preemption for queueA and it's children. queueF(appC)'s request
|
||||
// should starve.
|
||||
setup(); // Call setup() to reset mDisp
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true);
|
||||
schedConf.setPreemptionDisabled("root.queueA", true);
|
||||
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
|
||||
policy2.editSchedule();
|
||||
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
|
||||
|
@ -504,8 +496,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
{ -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity
|
||||
{ 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
|
||||
};
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root.queueA.queueC" + SUFFIX_DISABLE_PREEMPTION, true);
|
||||
schedConf.setPreemptionDisabled("root.queueA.queueC", true);
|
||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||
policy.editSchedule();
|
||||
// Although queueC(appB) is way over capacity and is untouchable,
|
||||
|
@ -529,9 +520,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
||||
};
|
||||
|
||||
schedConf.setPreemptionDisabled("root", true);
|
||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
|
||||
+ "root" + SUFFIX_DISABLE_PREEMPTION, true);
|
||||
policy.editSchedule();
|
||||
// All queues should be non-preemptable, so request should starve.
|
||||
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
|
||||
|
@ -893,7 +883,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||
setAMContainer = false;
|
||||
}
|
||||
|
||||
|
||||
static class IsPreemptionRequestFor
|
||||
extends ArgumentMatcher<ContainerPreemptEvent> {
|
||||
private final ApplicationAttemptId appAttId;
|
||||
|
@ -952,6 +942,8 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
|
||||
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
|
||||
when(root.getQueuePath()).thenReturn("root");
|
||||
boolean preemptionDisabled = mockPreemptionStatus("root");
|
||||
when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
|
||||
|
||||
for (int i = 1; i < queues.length; ++i) {
|
||||
final CSQueue q;
|
||||
|
@ -971,11 +963,29 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
parentPathName = (parentPathName == null) ? "root" : parentPathName;
|
||||
String queuePathName = (parentPathName+"."+queueName).replace("/","root");
|
||||
when(q.getQueuePath()).thenReturn(queuePathName);
|
||||
preemptionDisabled = mockPreemptionStatus(queuePathName);
|
||||
when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled);
|
||||
}
|
||||
assert 0 == pqs.size();
|
||||
return root;
|
||||
}
|
||||
|
||||
// Determine if any of the elements in the queupath have preemption disabled.
|
||||
// Also must handle the case where preemption disabled property is explicitly
|
||||
// set to something other than the default. Assumes system-wide preemption
|
||||
// property is true.
|
||||
private boolean mockPreemptionStatus(String queuePathName) {
|
||||
boolean preemptionDisabled = false;
|
||||
StringTokenizer tokenizer = new StringTokenizer(queuePathName, ".");
|
||||
String qName = "";
|
||||
while(tokenizer.hasMoreTokens()) {
|
||||
qName += tokenizer.nextToken();
|
||||
preemptionDisabled = schedConf.getPreemptionDisabled(qName, preemptionDisabled);
|
||||
qName += ".";
|
||||
}
|
||||
return preemptionDisabled;
|
||||
}
|
||||
|
||||
ParentQueue mockParentQueue(ParentQueue p, int subqueues,
|
||||
Deque<ParentQueue> pqs) {
|
||||
ParentQueue pq = mock(ParentQueue.class);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -2071,4 +2072,56 @@ public class TestCapacityScheduler {
|
|||
Assert.assertEquals(0, report.getNumReservedContainers());
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreemptionDisabled() throws Exception {
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
||||
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null);
|
||||
setupQueueConfiguration(conf);
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
cs.reinitialize(conf, rmContext);
|
||||
|
||||
CSQueue rootQueue = cs.getRootQueue();
|
||||
CSQueue queueB = findQueue(rootQueue, B);
|
||||
CSQueue queueB2 = findQueue(queueB, B2);
|
||||
|
||||
// When preemption turned on for the whole system
|
||||
// (yarn.resourcemanager.scheduler.monitor.enable=true), and with no other
|
||||
// preemption properties set, queue root.b.b2 should be preemptable.
|
||||
assertFalse("queue " + B2 + " should default to preemptable",
|
||||
queueB2.getPreemptionDisabled());
|
||||
|
||||
// Disable preemption at the root queue level.
|
||||
// The preemption property should be inherited from root all the
|
||||
// way down so that root.b.b2 should NOT be preemptable.
|
||||
conf.setPreemptionDisabled(rootQueue.getQueuePath(), true);
|
||||
cs.reinitialize(conf, rmContext);
|
||||
assertTrue(
|
||||
"queue " + B2 + " should have inherited non-preemptability from root",
|
||||
queueB2.getPreemptionDisabled());
|
||||
|
||||
// Enable preemption for root (grandparent) but disable for root.b (parent).
|
||||
// root.b.b2 should inherit property from parent and NOT be preemptable
|
||||
conf.setPreemptionDisabled(rootQueue.getQueuePath(), false);
|
||||
conf.setPreemptionDisabled(queueB.getQueuePath(), true);
|
||||
cs.reinitialize(conf, rmContext);
|
||||
assertTrue(
|
||||
"queue " + B2 + " should have inherited non-preemptability from parent",
|
||||
queueB2.getPreemptionDisabled());
|
||||
|
||||
// When preemption is turned on for root.b.b2, it should be preemptable
|
||||
// even though preemption is disabled on root.b (parent).
|
||||
conf.setPreemptionDisabled(queueB2.getQueuePath(), false);
|
||||
cs.reinitialize(conf, rmContext);
|
||||
assertFalse("queue " + B2 + " should have been preemptable",
|
||||
queueB2.getPreemptionDisabled());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -347,7 +347,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
int numExpectedElements = 13;
|
||||
boolean isParentQueue = true;
|
||||
if (!info.has("queues")) {
|
||||
numExpectedElements = 23;
|
||||
numExpectedElements = 24;
|
||||
isParentQueue = false;
|
||||
}
|
||||
assertEquals("incorrect number of elements", numExpectedElements, info.length());
|
||||
|
|
Loading…
Reference in New Issue