YARN-7813: Capacity Scheduler Intra-queue Preemption should be configurable for each queue
This commit is contained in:
parent
0c5d7d71a8
commit
c5e6e3de1c
|
@ -94,6 +94,26 @@ public abstract class QueueInfo {
|
|||
return queueInfo;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static QueueInfo newInstance(String queueName, float capacity,
|
||||
float maximumCapacity, float currentCapacity,
|
||||
List<QueueInfo> childQueues, List<ApplicationReport> applications,
|
||||
QueueState queueState, Set<String> accessibleNodeLabels,
|
||||
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
|
||||
boolean preemptionDisabled,
|
||||
Map<String, QueueConfigurations> queueConfigurations,
|
||||
boolean intraQueuePreemptionDisabled) {
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, capacity,
|
||||
maximumCapacity, currentCapacity,
|
||||
childQueues, applications,
|
||||
queueState, accessibleNodeLabels,
|
||||
defaultNodeLabelExpression, queueStatistics,
|
||||
preemptionDisabled, queueConfigurations);
|
||||
queueInfo.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
|
||||
return queueInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the <em>name</em> of the queue.
|
||||
* @return <em>name</em> of the queue
|
||||
|
@ -261,4 +281,19 @@ public abstract class QueueInfo {
|
|||
@Unstable
|
||||
public abstract void setQueueConfigurations(
|
||||
Map<String, QueueConfigurations> queueConfigurations);
|
||||
|
||||
|
||||
/**
|
||||
* Get the intra-queue preemption status of the queue.
|
||||
* @return if property is not in proto, return null;
|
||||
* otherwise, return intra-queue preemption status of the queue
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract Boolean getIntraQueuePreemptionDisabled();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setIntraQueuePreemptionDisabled(
|
||||
boolean intraQueuePreemptionDisabled);
|
||||
}
|
||||
|
|
|
@ -569,6 +569,7 @@ message QueueInfoProto {
|
|||
optional QueueStatisticsProto queueStatistics = 10;
|
||||
optional bool preemptionDisabled = 11;
|
||||
repeated QueueConfigurationsMapProto queueConfigurationsMap = 12;
|
||||
optional bool intraQueuePreemptionDisabled = 13;
|
||||
}
|
||||
|
||||
message QueueConfigurationsProto {
|
||||
|
|
|
@ -158,5 +158,11 @@ public class QueueCLI extends YarnCLI {
|
|||
writer.print("\tPreemption : ");
|
||||
writer.println(preemptStatus ? "disabled" : "enabled");
|
||||
}
|
||||
|
||||
Boolean intraQueuePreemption = queueInfo.getIntraQueuePreemptionDisabled();
|
||||
if (intraQueuePreemption != null) {
|
||||
writer.print("\tIntra-queue Preemption : ");
|
||||
writer.println(intraQueuePreemption ? "disabled" : "enabled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -665,7 +665,8 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
|||
|
||||
public QueueInfo createFakeQueueInfo() {
|
||||
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
|
||||
createFakeAppReports(), QueueState.RUNNING, null, null, null, false);
|
||||
createFakeAppReports(), QueueState.RUNNING, null, null, null, false,
|
||||
null, false);
|
||||
}
|
||||
|
||||
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {
|
||||
|
|
|
@ -1712,7 +1712,8 @@ public class TestYarnCLI {
|
|||
nodeLabels.add("GPU");
|
||||
nodeLabels.add("JDK_7");
|
||||
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
|
||||
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, null);
|
||||
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, null,
|
||||
false);
|
||||
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
|
||||
int result = cli.run(new String[] { "-status", "queueA" });
|
||||
assertEquals(0, result);
|
||||
|
@ -1728,11 +1729,82 @@ public class TestYarnCLI {
|
|||
pw.println("\tDefault Node Label expression : " + "GPU");
|
||||
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
|
||||
pw.println("\tPreemption : " + "enabled");
|
||||
pw.println("\tIntra-queue Preemption : " + "enabled");
|
||||
pw.close();
|
||||
String queueInfoStr = baos.toString("UTF-8");
|
||||
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetQueueInfoOverrideIntraQueuePreemption() throws Exception {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
ReservationSystemTestUtil.setupQueueConfiguration(conf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
||||
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
|
||||
+ "ProportionalCapacityPreemptionPolicy");
|
||||
// Turn on cluster-wide intra-queue preemption
|
||||
conf.setBoolean(
|
||||
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
|
||||
// Disable intra-queue preemption for all queues
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.PREFIX
|
||||
+ "root.intra-queue-preemption.disable_preemption", true);
|
||||
// Enable intra-queue preemption for the a1 queue
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.PREFIX
|
||||
+ "root.a.a1.intra-queue-preemption.disable_preemption", false);
|
||||
MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testGetQueueInfoOverrideIntraQueuePreemption",
|
||||
2, 1, 1);
|
||||
|
||||
YarnClient yarnClient = null;
|
||||
try {
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
final Configuration yarnConf = cluster.getConfig();
|
||||
yarnClient = YarnClient.createYarnClient();
|
||||
yarnClient.init(yarnConf);
|
||||
yarnClient.start();
|
||||
|
||||
QueueCLI cli = new QueueCLI();
|
||||
cli.setClient(yarnClient);
|
||||
cli.setSysOutPrintStream(sysOut);
|
||||
cli.setSysErrPrintStream(sysErr);
|
||||
sysOutStream.reset();
|
||||
// Get status for the root.a queue
|
||||
int result = cli.run(new String[] { "-status", "a" });
|
||||
assertEquals(0, result);
|
||||
String queueStatusOut = sysOutStream.toString();
|
||||
Assert.assertTrue(queueStatusOut
|
||||
.contains("\tPreemption : enabled"));
|
||||
// In-queue preemption is disabled at the "root.a" queue level
|
||||
Assert.assertTrue(queueStatusOut
|
||||
.contains("Intra-queue Preemption : disabled"));
|
||||
cli = new QueueCLI();
|
||||
cli.setClient(yarnClient);
|
||||
cli.setSysOutPrintStream(sysOut);
|
||||
cli.setSysErrPrintStream(sysErr);
|
||||
sysOutStream.reset();
|
||||
// Get status for the root.a.a1 queue
|
||||
result = cli.run(new String[] { "-status", "a1" });
|
||||
assertEquals(0, result);
|
||||
queueStatusOut = sysOutStream.toString();
|
||||
Assert.assertTrue(queueStatusOut
|
||||
.contains("\tPreemption : enabled"));
|
||||
// In-queue preemption is enabled at the "root.a.a1" queue level
|
||||
Assert.assertTrue(queueStatusOut
|
||||
.contains("Intra-queue Preemption : enabled"));
|
||||
} finally {
|
||||
// clean-up
|
||||
if (yarnClient != null) {
|
||||
yarnClient.stop();
|
||||
}
|
||||
cluster.stop();
|
||||
cluster.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetQueueInfoPreemptionEnabled() throws Exception {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
|
@ -1743,9 +1815,10 @@ public class TestYarnCLI {
|
|||
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
|
||||
+ "ProportionalCapacityPreemptionPolicy");
|
||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
||||
conf.setBoolean(
|
||||
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
|
||||
MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
|
||||
new MiniYARNCluster("testGetQueueInfoPreemptionEnabled", 2, 1, 1);
|
||||
|
||||
YarnClient yarnClient = null;
|
||||
try {
|
||||
|
@ -1763,8 +1836,11 @@ public class TestYarnCLI {
|
|||
sysOutStream.reset();
|
||||
int result = cli.run(new String[] { "-status", "a1" });
|
||||
assertEquals(0, result);
|
||||
Assert.assertTrue(sysOutStream.toString()
|
||||
.contains("Preemption : enabled"));
|
||||
String queueStatusOut = sysOutStream.toString();
|
||||
Assert.assertTrue(queueStatusOut
|
||||
.contains("\tPreemption : enabled"));
|
||||
Assert.assertTrue(queueStatusOut
|
||||
.contains("Intra-queue Preemption : enabled"));
|
||||
} finally {
|
||||
// clean-up
|
||||
if (yarnClient != null) {
|
||||
|
@ -1804,8 +1880,11 @@ public class TestYarnCLI {
|
|||
sysOutStream.reset();
|
||||
int result = cli.run(new String[] { "-status", "a1" });
|
||||
assertEquals(0, result);
|
||||
Assert.assertTrue(sysOutStream.toString()
|
||||
.contains("Preemption : disabled"));
|
||||
String queueStatusOut = sysOutStream.toString();
|
||||
Assert.assertTrue(queueStatusOut
|
||||
.contains("\tPreemption : disabled"));
|
||||
Assert.assertTrue(queueStatusOut
|
||||
.contains("Intra-queue Preemption : disabled"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1813,7 +1892,7 @@ public class TestYarnCLI {
|
|||
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
|
||||
QueueCLI cli = createAndGetQueueCLI();
|
||||
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
|
||||
null, null, QueueState.RUNNING, null, null, null, true, null);
|
||||
null, null, QueueState.RUNNING, null, null, null, true, null, true);
|
||||
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
|
||||
int result = cli.run(new String[] { "-status", "queueA" });
|
||||
assertEquals(0, result);
|
||||
|
@ -1830,6 +1909,7 @@ public class TestYarnCLI {
|
|||
+ NodeLabel.DEFAULT_NODE_LABEL_PARTITION);
|
||||
pw.println("\tAccessible Node Labels : ");
|
||||
pw.println("\tPreemption : " + "disabled");
|
||||
pw.println("\tIntra-queue Preemption : " + "disabled");
|
||||
pw.close();
|
||||
String queueInfoStr = baos.toString("UTF-8");
|
||||
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
|
||||
|
|
|
@ -500,4 +500,17 @@ public class QueueInfoPBImpl extends QueueInfo {
|
|||
this.queueConfigurations.putAll(queueConfigurations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean getIntraQueuePreemptionDisabled() {
|
||||
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.hasIntraQueuePreemptionDisabled()) ? p
|
||||
.getIntraQueuePreemptionDisabled() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIntraQueuePreemptionDisabled(
|
||||
boolean intraQueuePreemptionDisabled) {
|
||||
maybeInitBuilder();
|
||||
builder.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -414,7 +414,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
|
|||
// it is recursive(has sub queues)
|
||||
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
|
||||
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
|
||||
"x && y", null, false));
|
||||
"x && y", null, false, null, false));
|
||||
generateByNewInstance(QueueStatistics.class);
|
||||
generateByNewInstance(QueueUserACLInfo.class);
|
||||
generateByNewInstance(YarnClusterMetrics.class);
|
||||
|
|
|
@ -114,8 +114,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
|
|||
continue;
|
||||
}
|
||||
|
||||
// Don't preempt if disabled for this queue.
|
||||
if (leafQueue.getPreemptionDisabled()) {
|
||||
// Don't preempt if intra-queue preemption is disabled for this queue.
|
||||
if (leafQueue.getIntraQueuePreemptionDisabled()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
new HashMap<AccessType, AccessControlList>();
|
||||
volatile boolean reservationsContinueLooking;
|
||||
private volatile boolean preemptionDisabled;
|
||||
// Indicates if the in-queue preemption setting is ever disabled within the
|
||||
// hierarchy of this queue.
|
||||
private boolean intraQueuePreemptionDisabledInHierarchy;
|
||||
|
||||
// Track resource usage-by-label like used-resource/pending-resource, etc.
|
||||
volatile ResourceUsage queueUsage;
|
||||
|
@ -405,6 +408,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this,
|
||||
configuration);
|
||||
this.intraQueuePreemptionDisabledInHierarchy =
|
||||
isIntraQueueHierarchyPreemptionDisabled(this, configuration);
|
||||
|
||||
this.priority = configuration.getQueuePriority(
|
||||
getQueuePath());
|
||||
|
@ -613,6 +618,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
queueInfo.setCurrentCapacity(getUsedCapacity());
|
||||
queueInfo.setQueueStatistics(getQueueStatistics());
|
||||
queueInfo.setPreemptionDisabled(preemptionDisabled);
|
||||
queueInfo.setIntraQueuePreemptionDisabled(
|
||||
getIntraQueuePreemptionDisabled());
|
||||
queueInfo.setQueueConfigurations(getQueueConfigurations());
|
||||
return queueInfo;
|
||||
}
|
||||
|
@ -736,6 +743,16 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
return preemptionDisabled;
|
||||
}
|
||||
|
||||
@Private
|
||||
public boolean getIntraQueuePreemptionDisabled() {
|
||||
return intraQueuePreemptionDisabledInHierarchy || preemptionDisabled;
|
||||
}
|
||||
|
||||
@Private
|
||||
public boolean getIntraQueuePreemptionDisabledInHierarchy() {
|
||||
return intraQueuePreemptionDisabledInHierarchy;
|
||||
}
|
||||
|
||||
@Private
|
||||
public QueueCapacities getQueueCapacities() {
|
||||
return queueCapacities;
|
||||
|
@ -757,17 +774,19 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
}
|
||||
|
||||
/**
|
||||
* The specified queue is preemptable if system-wide preemption is turned on
|
||||
* unless any queue in the <em>qPath</em> hierarchy has explicitly turned
|
||||
* preemption off.
|
||||
* NOTE: Preemptability is inherited from a queue's parent.
|
||||
* The specified queue is cross-queue preemptable if system-wide cross-queue
|
||||
* preemption is turned on unless any queue in the <em>qPath</em> hierarchy
|
||||
* has explicitly turned cross-queue preemption off.
|
||||
* NOTE: Cross-queue preemptability is inherited from a queue's parent.
|
||||
*
|
||||
* @return true if queue has preemption disabled, false otherwise
|
||||
* @param q queue to check preemption state
|
||||
* @param configuration capacity scheduler config
|
||||
* @return true if queue has cross-queue preemption disabled, false otherwise
|
||||
*/
|
||||
private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
|
||||
CapacitySchedulerConfiguration configuration) {
|
||||
boolean systemWidePreemption =
|
||||
csContext.getConfiguration()
|
||||
configuration
|
||||
.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
|
||||
CSQueue parentQ = q.getParent();
|
||||
|
@ -791,6 +810,43 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
parentQ.getPreemptionDisabled());
|
||||
}
|
||||
|
||||
/**
|
||||
* The specified queue is intra-queue preemptable if
|
||||
* 1) system-wide intra-queue preemption is turned on
|
||||
* 2) no queue in the <em>qPath</em> hierarchy has explicitly turned off intra
|
||||
* queue preemption.
|
||||
* NOTE: Intra-queue preemptability is inherited from a queue's parent.
|
||||
*
|
||||
* @param q queue to check intra-queue preemption state
|
||||
* @param configuration capacity scheduler config
|
||||
* @return true if queue has intra-queue preemption disabled, false otherwise
|
||||
*/
|
||||
private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
|
||||
CapacitySchedulerConfiguration configuration) {
|
||||
boolean systemWideIntraQueuePreemption =
|
||||
configuration.getBoolean(
|
||||
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
|
||||
CapacitySchedulerConfiguration
|
||||
.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
|
||||
// Intra-queue preemption is disabled for this queue if the system-wide
|
||||
// intra-queue preemption flag is false
|
||||
if (!systemWideIntraQueuePreemption) return true;
|
||||
|
||||
// Check if this is the root queue and the root queue's intra-queue
|
||||
// preemption disable switch is set
|
||||
CSQueue parentQ = q.getParent();
|
||||
if (parentQ == null) {
|
||||
return configuration
|
||||
.getIntraQueuePreemptionDisabled(q.getQueuePath(), false);
|
||||
}
|
||||
|
||||
// At this point, the master preemption switch is enabled down to this
|
||||
// queue's level. Determine whether or not intra-queue preemption is enabled
|
||||
// down to this queu's level and return that value.
|
||||
return configuration.getIntraQueuePreemptionDisabled(q.getQueuePath(),
|
||||
parentQ.getIntraQueuePreemptionDisabledInHierarchy());
|
||||
}
|
||||
|
||||
private Resource getCurrentLimitResource(String nodePartition,
|
||||
Resource clusterResource, ResourceLimits currentResourceLimits,
|
||||
SchedulingMode schedulingMode) {
|
||||
|
|
|
@ -277,6 +277,20 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
|
|||
*/
|
||||
public boolean getPreemptionDisabled();
|
||||
|
||||
/**
|
||||
* Check whether intra-queue preemption is disabled for this queue
|
||||
* @return true if either intra-queue preemption or inter-queue preemption
|
||||
* is disabled for this queue, false if neither is disabled.
|
||||
*/
|
||||
public boolean getIntraQueuePreemptionDisabled();
|
||||
|
||||
/**
|
||||
* Determines whether or not the intra-queue preemption disabled switch is set
|
||||
* at any level in this queue's hierarchy.
|
||||
* @return state of the intra-queue preemption switch at this queue level
|
||||
*/
|
||||
public boolean getIntraQueuePreemptionDisabledInHierarchy();
|
||||
|
||||
/**
|
||||
* Get QueueCapacities of this queue
|
||||
* @return queueCapacities
|
||||
|
|
|
@ -1215,6 +1215,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
return preemptionDisabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether intra-queue preemption is disabled on the specified queue
|
||||
*
|
||||
* @param queue queue path to query
|
||||
* @param defaultVal used as default if the property is not set in the
|
||||
* configuration
|
||||
* @return true if preemption is disabled on queue, false otherwise
|
||||
*/
|
||||
public boolean getIntraQueuePreemptionDisabled(String queue,
|
||||
boolean defaultVal) {
|
||||
return
|
||||
getBoolean(getQueuePrefix(queue) + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX
|
||||
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get configured node labels in a given queuePath
|
||||
*/
|
||||
|
|
|
@ -200,7 +200,10 @@ class CapacitySchedulerPage extends RmView {
|
|||
__("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
|
||||
__("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
|
||||
__("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
|
||||
__("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
|
||||
__("Preemption:",
|
||||
lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
|
||||
__("Intra-queue Preemption:", lqinfo.getIntraQueuePreemptionDisabled()
|
||||
? "disabled" : "enabled").
|
||||
__("Default Node Label Expression:",
|
||||
lqinfo.getDefaultNodeLabelExpression() == null
|
||||
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION
|
||||
|
|
|
@ -49,6 +49,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
protected ResourceInfo usedAMResource;
|
||||
protected ResourceInfo userAMResourceLimit;
|
||||
protected boolean preemptionDisabled;
|
||||
protected boolean intraQueuePreemptionDisabled;
|
||||
protected String defaultNodeLabelExpression;
|
||||
protected int defaultPriority;
|
||||
protected boolean isAutoCreatedLeafQueue;
|
||||
|
@ -72,6 +73,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
|
||||
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
|
||||
preemptionDisabled = q.getPreemptionDisabled();
|
||||
intraQueuePreemptionDisabled = q.getIntraQueuePreemptionDisabled();
|
||||
orderingPolicyInfo = q.getOrderingPolicy().getInfo();
|
||||
defaultNodeLabelExpression = q.getDefaultNodeLabelExpression();
|
||||
defaultPriority = q.getDefaultApplicationPriority().getPriority();
|
||||
|
@ -151,6 +153,10 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
return preemptionDisabled;
|
||||
}
|
||||
|
||||
public boolean getIntraQueuePreemptionDisabled() {
|
||||
return intraQueuePreemptionDisabled;
|
||||
}
|
||||
|
||||
public String getOrderingPolicyInfo() {
|
||||
return orderingPolicyInfo;
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ public class TestConfigurationMutationACLPolicies {
|
|||
private void mockQueue(String queueName, MutableConfScheduler scheduler)
|
||||
throws IOException {
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, 0, 0, 0, null, null,
|
||||
null, null, null, null, false);
|
||||
null, null, null, null, false, null, false);
|
||||
when(scheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
|
||||
.thenReturn(queueInfo);
|
||||
Queue queue = mock(Queue.class);
|
||||
|
|
|
@ -165,7 +165,7 @@ public class TestSchedulerApplicationAttempt {
|
|||
private Queue createQueue(String name, Queue parent, float capacity) {
|
||||
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
||||
null, QueueState.RUNNING, null, "", null, false);
|
||||
null, QueueState.RUNNING, null, "", null, false, null, false);
|
||||
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
||||
Queue queue = mock(Queue.class);
|
||||
when(queue.getMetrics()).thenReturn(metrics);
|
||||
|
|
|
@ -4103,7 +4103,7 @@ public class TestLeafQueue {
|
|||
float absCap, Resource res) {
|
||||
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
||||
null, QueueState.RUNNING, null, "", null, false);
|
||||
null, QueueState.RUNNING, null, "", null, false, null, false);
|
||||
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
||||
AbstractCSQueue queue = mock(AbstractCSQueue.class);
|
||||
when(queue.getMetrics()).thenReturn(metrics);
|
||||
|
|
|
@ -236,6 +236,7 @@ The following configuration parameters can be configured in yarn-site.xml to con
|
|||
| Property | Description |
|
||||
|:---- |:---- |
|
||||
| `yarn.scheduler.capacity.<queue-path>.disable_preemption` | This configuration can be set to `true` to selectively disable preemption of application containers submitted to a given queue. This property applies only when system wide preemption is enabled by configuring `yarn.resourcemanager.scheduler.monitor.enable` to *true* and `yarn.resourcemanager.scheduler.monitor.policies` to *ProportionalCapacityPreemptionPolicy*. If this property is not set for a queue, then the property value is inherited from the queue's parent. Default value is false.
|
||||
| `yarn.scheduler.capacity.<queue-path>.intra-queue-preemption.disable_preemption` | This configuration can be set to *true* to selectively disable intra-queue preemption of application containers submitted to a given queue. This property applies only when system wide preemption is enabled by configuring `yarn.resourcemanager.scheduler.monitor.enable` to *true*, `yarn.resourcemanager.scheduler.monitor.policies` to *ProportionalCapacityPreemptionPolicy*, and `yarn.resourcemanager.monitor.capacity.preemption.intra-queue-preemption.enabled` to *true*. If this property is not set for a queue, then the property value is inherited from the queue's parent. Default value is *false*.
|
||||
|
||||
###Reservation Properties
|
||||
|
||||
|
|
Loading…
Reference in New Issue