YARN-7813. Capacity Scheduler Intra-queue Preemption should be configurable for each queue. Contributed by Eric Payne

This commit is contained in:
Jason Lowe 2018-02-19 14:38:49 -06:00
parent 313c38b502
commit 85c611ad7d
18 changed files with 254 additions and 26 deletions

View File

@ -94,6 +94,26 @@ public abstract class QueueInfo {
return queueInfo;
}
@Private
@Unstable
public static QueueInfo newInstance(String queueName, float capacity,
float maximumCapacity, float currentCapacity,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled,
Map<String, QueueConfigurations> queueConfigurations,
boolean intraQueuePreemptionDisabled) {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, capacity,
maximumCapacity, currentCapacity,
childQueues, applications,
queueState, accessibleNodeLabels,
defaultNodeLabelExpression, queueStatistics,
preemptionDisabled, queueConfigurations);
queueInfo.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
return queueInfo;
}
/**
* Get the <em>name</em> of the queue.
* @return <em>name</em> of the queue
@ -261,4 +281,19 @@ public abstract class QueueInfo {
@Unstable
public abstract void setQueueConfigurations(
Map<String, QueueConfigurations> queueConfigurations);
/**
* Get the intra-queue preemption status of the queue.
* @return if property is not in proto, return null;
* otherwise, return intra-queue preemption status of the queue
*/
@Public
@Stable
public abstract Boolean getIntraQueuePreemptionDisabled();
@Private
@Unstable
public abstract void setIntraQueuePreemptionDisabled(
boolean intraQueuePreemptionDisabled);
}

View File

@ -528,6 +528,7 @@ message QueueInfoProto {
optional QueueStatisticsProto queueStatistics = 10;
optional bool preemptionDisabled = 11;
repeated QueueConfigurationsMapProto queueConfigurationsMap = 12;
optional bool intraQueuePreemptionDisabled = 13;
}
message QueueConfigurationsProto {

View File

@ -158,5 +158,11 @@ public class QueueCLI extends YarnCLI {
writer.print("\tPreemption : ");
writer.println(preemptStatus ? "disabled" : "enabled");
}
Boolean intraQueuePreemption = queueInfo.getIntraQueuePreemptionDisabled();
if (intraQueuePreemption != null) {
writer.print("\tIntra-queue Preemption : ");
writer.println(intraQueuePreemption ? "disabled" : "enabled");
}
}
}

View File

@ -665,7 +665,8 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
createFakeAppReports(), QueueState.RUNNING, null, null, null, false);
createFakeAppReports(), QueueState.RUNNING, null, null, null, false,
null, false);
}
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {

View File

@ -1712,7 +1712,8 @@ public class TestYarnCLI {
nodeLabels.add("GPU");
nodeLabels.add("JDK_7");
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, null);
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, null,
false);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1728,11 +1729,82 @@ public class TestYarnCLI {
pw.println("\tDefault Node Label expression : " + "GPU");
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
pw.println("\tPreemption : " + "enabled");
pw.println("\tIntra-queue Preemption : " + "enabled");
pw.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
}
@Test
public void testGetQueueInfoOverrideIntraQueuePreemption() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
+ "ProportionalCapacityPreemptionPolicy");
// Turn on cluster-wide intra-queue preemption
conf.setBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
// Disable intra-queue preemption for all queues
conf.setBoolean(CapacitySchedulerConfiguration.PREFIX
+ "root.intra-queue-preemption.disable_preemption", true);
// Enable intra-queue preemption for the a1 queue
conf.setBoolean(CapacitySchedulerConfiguration.PREFIX
+ "root.a.a1.intra-queue-preemption.disable_preemption", false);
MiniYARNCluster cluster =
new MiniYARNCluster("testGetQueueInfoOverrideIntraQueuePreemption",
2, 1, 1);
YarnClient yarnClient = null;
try {
cluster.init(conf);
cluster.start();
final Configuration yarnConf = cluster.getConfig();
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
QueueCLI cli = new QueueCLI();
cli.setClient(yarnClient);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
sysOutStream.reset();
// Get status for the root.a queue
int result = cli.run(new String[] { "-status", "a" });
assertEquals(0, result);
String queueStatusOut = sysOutStream.toString();
Assert.assertTrue(queueStatusOut
.contains("\tPreemption : enabled"));
// In-queue preemption is disabled at the "root.a" queue level
Assert.assertTrue(queueStatusOut
.contains("Intra-queue Preemption : disabled"));
cli = new QueueCLI();
cli.setClient(yarnClient);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
sysOutStream.reset();
// Get status for the root.a.a1 queue
result = cli.run(new String[] { "-status", "a1" });
assertEquals(0, result);
queueStatusOut = sysOutStream.toString();
Assert.assertTrue(queueStatusOut
.contains("\tPreemption : enabled"));
// In-queue preemption is enabled at the "root.a.a1" queue level
Assert.assertTrue(queueStatusOut
.contains("Intra-queue Preemption : enabled"));
} finally {
// clean-up
if (yarnClient != null) {
yarnClient.stop();
}
cluster.stop();
cluster.close();
}
}
@Test
public void testGetQueueInfoPreemptionEnabled() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
@ -1743,9 +1815,10 @@ public class TestYarnCLI {
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
+ "ProportionalCapacityPreemptionPolicy");
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.setBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
MiniYARNCluster cluster =
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
new MiniYARNCluster("testGetQueueInfoPreemptionEnabled", 2, 1, 1);
YarnClient yarnClient = null;
try {
@ -1763,8 +1836,11 @@ public class TestYarnCLI {
sysOutStream.reset();
int result = cli.run(new String[] { "-status", "a1" });
assertEquals(0, result);
Assert.assertTrue(sysOutStream.toString()
.contains("Preemption : enabled"));
String queueStatusOut = sysOutStream.toString();
Assert.assertTrue(queueStatusOut
.contains("\tPreemption : enabled"));
Assert.assertTrue(queueStatusOut
.contains("Intra-queue Preemption : enabled"));
} finally {
// clean-up
if (yarnClient != null) {
@ -1804,8 +1880,11 @@ public class TestYarnCLI {
sysOutStream.reset();
int result = cli.run(new String[] { "-status", "a1" });
assertEquals(0, result);
Assert.assertTrue(sysOutStream.toString()
.contains("Preemption : disabled"));
String queueStatusOut = sysOutStream.toString();
Assert.assertTrue(queueStatusOut
.contains("\tPreemption : disabled"));
Assert.assertTrue(queueStatusOut
.contains("Intra-queue Preemption : disabled"));
}
}
@ -1813,7 +1892,7 @@ public class TestYarnCLI {
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
QueueCLI cli = createAndGetQueueCLI();
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, null, null, null, true, null);
null, null, QueueState.RUNNING, null, null, null, true, null, true);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1830,6 +1909,7 @@ public class TestYarnCLI {
+ NodeLabel.DEFAULT_NODE_LABEL_PARTITION);
pw.println("\tAccessible Node Labels : ");
pw.println("\tPreemption : " + "disabled");
pw.println("\tIntra-queue Preemption : " + "disabled");
pw.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());

View File

@ -500,4 +500,17 @@ public class QueueInfoPBImpl extends QueueInfo {
this.queueConfigurations.putAll(queueConfigurations);
}
@Override
public Boolean getIntraQueuePreemptionDisabled() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasIntraQueuePreemptionDisabled()) ? p
.getIntraQueuePreemptionDisabled() : null;
}
@Override
public void setIntraQueuePreemptionDisabled(
boolean intraQueuePreemptionDisabled) {
maybeInitBuilder();
builder.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
}
}

View File

@ -397,7 +397,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
// it is recursive(has sub queues)
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
"x && y", null, false));
"x && y", null, false, null, false));
generateByNewInstance(QueueStatistics.class);
generateByNewInstance(QueueUserACLInfo.class);
generateByNewInstance(YarnClusterMetrics.class);

View File

@ -114,8 +114,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
continue;
}
// Don't preempt if disabled for this queue.
if (leafQueue.getPreemptionDisabled()) {
// Don't preempt if intra-queue preemption is disabled for this queue.
if (leafQueue.getIntraQueuePreemptionDisabled()) {
continue;
}

View File

@ -94,6 +94,9 @@ public abstract class AbstractCSQueue implements CSQueue {
new HashMap<AccessType, AccessControlList>();
volatile boolean reservationsContinueLooking;
private volatile boolean preemptionDisabled;
// Indicates if the in-queue preemption setting is ever disabled within the
// hierarchy of this queue.
private boolean intraQueuePreemptionDisabledInHierarchy;
// Track resource usage-by-label like used-resource/pending-resource, etc.
volatile ResourceUsage queueUsage;
@ -339,6 +342,8 @@ public abstract class AbstractCSQueue implements CSQueue {
csContext.getConfiguration().getReservationContinueLook();
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
this.intraQueuePreemptionDisabledInHierarchy =
isIntraQueueHierarchyPreemptionDisabled(this);
this.priority = csContext.getConfiguration().getQueuePriority(
getQueuePath());
@ -429,6 +434,8 @@ public abstract class AbstractCSQueue implements CSQueue {
queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics());
queueInfo.setPreemptionDisabled(preemptionDisabled);
queueInfo.setIntraQueuePreemptionDisabled(
getIntraQueuePreemptionDisabled());
queueInfo.setQueueConfigurations(getQueueConfigurations());
return queueInfo;
}
@ -544,6 +551,16 @@ public abstract class AbstractCSQueue implements CSQueue {
return preemptionDisabled;
}
@Private
public boolean getIntraQueuePreemptionDisabled() {
return intraQueuePreemptionDisabledInHierarchy || preemptionDisabled;
}
@Private
public boolean getIntraQueuePreemptionDisabledInHierarchy() {
return intraQueuePreemptionDisabledInHierarchy;
}
@Private
public QueueCapacities getQueueCapacities() {
return queueCapacities;
@ -560,12 +577,13 @@ public abstract class AbstractCSQueue implements CSQueue {
}
/**
* The specified queue is preemptable if system-wide preemption is turned on
* unless any queue in the <em>qPath</em> hierarchy has explicitly turned
* preemption off.
* NOTE: Preemptability is inherited from a queue's parent.
* The specified queue is cross-queue preemptable if system-wide cross-queue
* preemption is turned on unless any queue in the <em>qPath</em> hierarchy
* has explicitly turned cross-queue preemption off.
* NOTE: Cross-queue preemptability is inherited from a queue's parent.
*
* @return true if queue has preemption disabled, false otherwise
* @param q queue to check preemption state
* @return true if queue has cross-queue preemption disabled, false otherwise
*/
private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) {
CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
@ -593,6 +611,41 @@ public abstract class AbstractCSQueue implements CSQueue {
parentQ.getPreemptionDisabled());
}
/**
* The specified queue is intra-queue preemptable if
* 1) system-wide intra-queue preemption is turned on
* 2) no queue in the <em>qPath</em> hierarchy has explicitly turned off intra
* queue preemption.
* NOTE: Intra-queue preemptability is inherited from a queue's parent.
*
* @param q queue to check intra-queue preemption state
* @return true if queue has intra-queue preemption disabled, false otherwise
*/
private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q) {
CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
boolean systemWideIntraQueuePreemption =
csConf.getBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
CapacitySchedulerConfiguration
.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
// Intra-queue preemption is disabled for this queue if the system-wide
// intra-queue preemption flag is false
if (!systemWideIntraQueuePreemption) return true;
// Check if this is the root queue and the root queue's intra-queue
// preemption disable switch is set
CSQueue parentQ = q.getParent();
if (parentQ == null) {
return csConf.getIntraQueuePreemptionDisabled(q.getQueuePath(), false);
}
// At this point, the master preemption switch is enabled down to this
// queue's level. Determine whether or not intra-queue preemption is enabled
// down to this queu's level and return that value.
return csConf.getIntraQueuePreemptionDisabled(q.getQueuePath(),
parentQ.getIntraQueuePreemptionDisabledInHierarchy());
}
private Resource getCurrentLimitResource(String nodePartition,
Resource clusterResource, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {

View File

@ -274,6 +274,20 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
*/
public boolean getPreemptionDisabled();
/**
* Check whether intra-queue preemption is disabled for this queue
* @return true if either intra-queue preemption or inter-queue preemption
* is disabled for this queue, false if neither is disabled.
*/
public boolean getIntraQueuePreemptionDisabled();
/**
* Determines whether or not the intra-queue preemption disabled switch is set
* at any level in this queue's hierarchy.
* @return state of the intra-queue preemption switch at this queue level
*/
public boolean getIntraQueuePreemptionDisabledInHierarchy();
/**
* Get QueueCapacities of this queue
* @return queueCapacities

View File

@ -1116,6 +1116,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return preemptionDisabled;
}
/**
* Indicates whether intra-queue preemption is disabled on the specified queue
*
* @param queue queue path to query
* @param defaultVal used as default if the property is not set in the
* configuration
* @return true if preemption is disabled on queue, false otherwise
*/
public boolean getIntraQueuePreemptionDisabled(String queue,
boolean defaultVal) {
return
getBoolean(getQueuePrefix(queue) + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
}
/**
* Get configured node labels in a given queuePath
*/

View File

@ -183,7 +183,10 @@ class CapacitySchedulerPage extends RmView {
__("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
__("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
__("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
__("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
__("Preemption:",
lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
__("Intra-queue Preemption:", lqinfo.getIntraQueuePreemptionDisabled()
? "disabled" : "enabled").
__("Default Node Label Expression:",
lqinfo.getDefaultNodeLabelExpression() == null
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION

View File

@ -46,6 +46,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
protected ResourceInfo usedAMResource;
protected ResourceInfo userAMResourceLimit;
protected boolean preemptionDisabled;
protected boolean intraQueuePreemptionDisabled;
protected String defaultNodeLabelExpression;
protected int defaultPriority;
@ -68,6 +69,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
preemptionDisabled = q.getPreemptionDisabled();
intraQueuePreemptionDisabled = q.getIntraQueuePreemptionDisabled();
orderingPolicyInfo = q.getOrderingPolicy().getInfo();
defaultNodeLabelExpression = q.getDefaultNodeLabelExpression();
defaultPriority = q.getDefaultApplicationPriority().getPriority();
@ -142,6 +144,10 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
return preemptionDisabled;
}
public boolean getIntraQueuePreemptionDisabled() {
return intraQueuePreemptionDisabled;
}
public String getOrderingPolicyInfo() {
return orderingPolicyInfo;
}

View File

@ -67,7 +67,7 @@ public class TestConfigurationMutationACLPolicies {
private void mockQueue(String queueName, MutableConfScheduler scheduler)
throws IOException {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, 0, 0, 0, null, null,
null, null, null, null, false);
null, null, null, null, false, null, false);
when(scheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
.thenReturn(queueInfo);
Queue queue = mock(Queue.class);

View File

@ -165,7 +165,7 @@ public class TestSchedulerApplicationAttempt {
private Queue createQueue(String name, Queue parent, float capacity) {
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false);
null, QueueState.RUNNING, null, "", null, false, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);

View File

@ -3972,7 +3972,7 @@ public class TestLeafQueue {
float absCap) {
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false);
null, QueueState.RUNNING, null, "", null, false, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
AbstractCSQueue queue = mock(AbstractCSQueue.class);
when(queue.getMetrics()).thenReturn(metrics);

View File

@ -357,7 +357,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
int numExpectedElements = 18;
boolean isParentQueue = true;
if (!info.has("queues")) {
numExpectedElements = 31;
numExpectedElements = 32;
isParentQueue = false;
}
assertEquals("incorrect number of elements", numExpectedElements, info.length());

View File

@ -228,6 +228,7 @@ The following configuration parameters can be configured in yarn-site.xml to con
| Property | Description |
|:---- |:---- |
| `yarn.scheduler.capacity.<queue-path>.disable_preemption` | This configuration can be set to `true` to selectively disable preemption of application containers submitted to a given queue. This property applies only when system wide preemption is enabled by configuring `yarn.resourcemanager.scheduler.monitor.enable` to *true* and `yarn.resourcemanager.scheduler.monitor.policies` to *ProportionalCapacityPreemptionPolicy*. If this property is not set for a queue, then the property value is inherited from the queue's parent. Default value is false.
| `yarn.scheduler.capacity.<queue-path>.intra-queue-preemption.disable_preemption` | This configuration can be set to *true* to selectively disable intra-queue preemption of application containers submitted to a given queue. This property applies only when system wide preemption is enabled by configuring `yarn.resourcemanager.scheduler.monitor.enable` to *true*, `yarn.resourcemanager.scheduler.monitor.policies` to *ProportionalCapacityPreemptionPolicy*, and `yarn.resourcemanager.monitor.capacity.preemption.intra-queue-preemption.enabled` to *true*. If this property is not set for a queue, then the property value is inherited from the queue's parent. Default value is *false*.
###Reservation Properties