YARN-4225. Add preemption status to yarn queue -status for capacity scheduler. (Eric Payne via wangda)

This commit is contained in:
Wangda Tan 2015-12-16 13:19:40 -08:00
parent 79c41b1d83
commit 7faa406f27
11 changed files with 145 additions and 6 deletions

View File

@ -1145,6 +1145,9 @@ Release 2.8.0 - UNRELEASED
YARN-4452. NPE when submit Unmanaged application. (Naganarasimha G R
via junping_du)
YARN-4225. Add preemption status to yarn queue -status for capacity scheduler.
(Eric Payne via wangda)
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -500,4 +500,9 @@
</Or>
<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
</Match>
<Match>
<Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
</Match>
</FindBugsFilter>

View File

@ -55,7 +55,8 @@ public abstract class QueueInfo {
float maximumCapacity, float currentCapacity,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics) {
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled) {
QueueInfo queueInfo = Records.newRecord(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setCapacity(capacity);
@ -67,6 +68,7 @@ public abstract class QueueInfo {
queueInfo.setAccessibleNodeLabels(accessibleNodeLabels);
queueInfo.setDefaultNodeLabelExpression(defaultNodeLabelExpression);
queueInfo.setQueueStatistics(queueStatistics);
queueInfo.setPreemptionDisabled(preemptionDisabled);
return queueInfo;
}
@ -205,4 +207,16 @@ public abstract class QueueInfo {
@Unstable
public abstract void setQueueStatistics(QueueStatistics queueStatistics);
/**
* Get the <em>preemption status</em> of the queue.
* @return if property is not in proto, return null;
* otherwise, return <em>preemption status</em> of the queue
*/
@Public
@Stable
public abstract Boolean getPreemptionDisabled();
@Private
@Unstable
public abstract void setPreemptionDisabled(boolean preemptionDisabled);
}

View File

@ -416,6 +416,7 @@ message QueueInfoProto {
repeated string accessibleNodeLabels = 8;
optional string defaultNodeLabelExpression = 9;
optional QueueStatisticsProto queueStatistics = 10;
optional bool preemptionDisabled = 11;
}
enum QueueACLProto {

View File

@ -152,5 +152,11 @@ public class QueueCLI extends YarnCLI {
labelList.append(nodeLabel);
}
writer.println(labelList.toString());
Boolean preemptStatus = queueInfo.getPreemptionDisabled();
if (preemptStatus != null) {
writer.print("\tPreemption : ");
writer.println(preemptStatus ? "disabled" : "enabled");
}
}
}

View File

@ -665,7 +665,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
createFakeAppReports(), QueueState.RUNNING, null, null, null);
createFakeAppReports(), QueueState.RUNNING, null, null, null, false);
}
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {

View File

@ -46,6 +46,7 @@ import java.util.regex.Pattern;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -69,9 +70,15 @@ import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Before;
@ -1324,7 +1331,7 @@ public class TestYarnCLI {
nodeLabels.add("GPU");
nodeLabels.add("JDK_7");
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU", null);
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1339,16 +1346,103 @@ public class TestYarnCLI {
pw.println("\tMaximum Capacity : " + "80.0%");
pw.println("\tDefault Node Label expression : " + "GPU");
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
pw.println("\tPreemption : " + "enabled");
pw.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
}
@Test
public void testGetQueueInfoPreemptionEnabled() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
+ "ProportionalCapacityPreemptionPolicy");
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
MiniYARNCluster cluster =
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
YarnClient yarnClient = null;
try {
cluster.init(conf);
cluster.start();
final Configuration yarnConf = cluster.getConfig();
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
QueueCLI cli = new QueueCLI();
cli.setClient(yarnClient);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
sysOutStream.reset();
int result = cli.run(new String[] { "-status", "a1" });
assertEquals(0, result);
Assert.assertTrue(sysOutStream.toString()
.contains("Preemption : enabled"));
} finally {
// clean-up
if (yarnClient != null) {
yarnClient.stop();
}
cluster.stop();
cluster.close();
}
}
@Test
public void testGetQueueInfoPreemptionDisabled() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
"org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity."
+ "ProportionalCapacityPreemptionPolicy");
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.setBoolean(
"yarn.scheduler.capacity.root.a.a1.disable_preemption", true);
MiniYARNCluster cluster =
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
YarnClient yarnClient = null;
try {
cluster.init(conf);
cluster.start();
final Configuration yarnConf = cluster.getConfig();
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
QueueCLI cli = new QueueCLI();
cli.setClient(yarnClient);
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
sysOutStream.reset();
int result = cli.run(new String[] { "-status", "a1" });
assertEquals(0, result);
Assert.assertTrue(sysOutStream.toString()
.contains("Preemption : disabled"));
} finally {
// clean-up
if (yarnClient != null) {
yarnClient.stop();
}
cluster.stop();
cluster.close();
}
}
@Test
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
QueueCLI cli = createAndGetQueueCLI();
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, null, null, null);
null, null, QueueState.RUNNING, null, null, null, true);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1364,6 +1458,7 @@ public class TestYarnCLI {
pw.println("\tDefault Node Label expression : "
+ NodeLabel.DEFAULT_NODE_LABEL_PARTITION);
pw.println("\tAccessible Node Labels : ");
pw.println("\tPreemption : " + "disabled");
pw.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());

View File

@ -395,4 +395,17 @@ public class QueueInfoPBImpl extends QueueInfo {
}
builder.setQueueStatistics(convertToProtoFormat(queueStatistics));
}
@Override
public Boolean getPreemptionDisabled() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasPreemptionDisabled()) ? p
.getPreemptionDisabled() : null;
}
@Override
public void setPreemptionDisabled(boolean preemptionDisabled) {
maybeInitBuilder();
builder.setPreemptionDisabled(preemptionDisabled);
}
}

View File

@ -488,7 +488,7 @@ public class TestPBImplRecords {
// it is recursive(has sub queues)
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
"x && y", null));
"x && y", null, false));
generateByNewInstance(QueueStatistics.class);
generateByNewInstance(QueueUserACLInfo.class);
generateByNewInstance(YarnClusterMetrics.class);

View File

@ -303,6 +303,7 @@ public abstract class AbstractCSQueue implements CSQueue {
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics());
queueInfo.setPreemptionDisabled(preemptionDisabled);
return queueInfo;
}

View File

@ -149,7 +149,8 @@ public class TestSchedulerApplicationAttempt {
private Queue createQueue(String name, Queue parent, float capacity) {
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null, null, QueueState.RUNNING, null, "", null);
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);