YARN-10891. Extend QueueInfo with max-parallel-apps in CS. (#3314)

Co-authored-by: Tamas Domok <tdomok@cloudera.com>
This commit is contained in:
Tamas Domok 2021-08-27 23:09:54 +02:00 committed by GitHub
parent 4c94831364
commit 16e6030e25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 88 additions and 16 deletions

View File

@ -60,7 +60,8 @@ public abstract class QueueInfo {
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled, float weight) {
boolean preemptionDisabled, float weight,
int maxParallelApps) {
QueueInfo queueInfo = Records.newRecord(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setQueuePath(queuePath);
@ -75,6 +76,7 @@ public abstract class QueueInfo {
queueInfo.setQueueStatistics(queueStatistics);
queueInfo.setPreemptionDisabled(preemptionDisabled);
queueInfo.setWeight(weight);
queueInfo.setMaxParallelApps(maxParallelApps);
return queueInfo;
}
@ -86,14 +88,14 @@ public abstract class QueueInfo {
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled, float weight,
boolean preemptionDisabled, float weight, int maxParallelApps,
Map<String, QueueConfigurations> queueConfigurations) {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, queuePath, capacity,
maximumCapacity, currentCapacity,
childQueues, applications,
queueState, accessibleNodeLabels,
defaultNodeLabelExpression, queueStatistics,
preemptionDisabled, weight);
preemptionDisabled, weight, maxParallelApps);
queueInfo.setQueueConfigurations(queueConfigurations);
return queueInfo;
}
@ -106,7 +108,7 @@ public abstract class QueueInfo {
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled, float weight,
boolean preemptionDisabled, float weight, int maxParallelApps,
Map<String, QueueConfigurations> queueConfigurations,
boolean intraQueuePreemptionDisabled) {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, queuePath, capacity,
@ -114,7 +116,7 @@ public abstract class QueueInfo {
childQueues, applications,
queueState, accessibleNodeLabels,
defaultNodeLabelExpression, queueStatistics,
preemptionDisabled, weight, queueConfigurations);
preemptionDisabled, weight, maxParallelApps, queueConfigurations);
queueInfo.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
return queueInfo;
}
@ -166,6 +168,18 @@ public abstract class QueueInfo {
@Private
@Unstable
public abstract void setWeight(float weight);
/**
* Get the <em>configured max parallel apps</em> of the queue.
* @return <em>configured max parallel apps</em> of the queue
*/
@Public
@Stable
public abstract int getMaxParallelApps();
@Private
@Unstable
public abstract void setMaxParallelApps(int maxParallelApps);
/**
* Get the <em>maximum capacity</em> of the queue.

View File

@ -617,6 +617,7 @@ message QueueInfoProto {
optional bool intraQueuePreemptionDisabled = 13;
optional float weight = 14;
optional string queuePath = 15;
optional int32 maxParallelApps = 16;
}
message QueueConfigurationsProto {

View File

@ -139,6 +139,8 @@ public class QueueCLI extends YarnCLI {
writer.println(df.format(queueInfo.getMaximumCapacity() * 100) + "%");
writer.print("\tWeight : ");
writer.println(df.format(queueInfo.getWeight()));
writer.print("\tMaximum Parallel Apps : ");
writer.println(queueInfo.getMaxParallelApps());
writer.print("\tDefault Node Label expression : ");
String nodeLabelExpression = queueInfo.getDefaultNodeLabelExpression();
nodeLabelExpression =

View File

@ -670,7 +670,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", "root", 100f, 100f, 50f, null,
createFakeAppReports(), QueueState.RUNNING, null,
null, null, false, -1.0f,
null, null, false, -1.0f, 10,
null, false);
}

View File

@ -1722,8 +1722,8 @@ public class TestYarnCLI {
newInstance("queueA", "root.queueA",
0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels,
"GPU", null, false, -1.0f, null,
false);
"GPU", null, false, -1.0f, 10,
null, false);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1738,6 +1738,7 @@ public class TestYarnCLI {
pw.println("\tCurrent Capacity : " + "50.00%");
pw.println("\tMaximum Capacity : " + "80.00%");
pw.println("\tWeight : " + "-1.00");
pw.println("\tMaximum Parallel Apps : " + "10");
pw.println("\tDefault Node Label expression : " + "GPU");
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
pw.println("\tPreemption : " + "enabled");
@ -1895,7 +1896,7 @@ public class TestYarnCLI {
newInstance("queueA", "root.queueA",
0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, null, null, null,
true, -1.0f, null, true);
true, -1.0f, 10, null, true);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1910,6 +1911,7 @@ public class TestYarnCLI {
pw.println("\tCurrent Capacity : " + "50.00%");
pw.println("\tMaximum Capacity : " + "80.00%");
pw.println("\tWeight : " + "-1.00");
pw.println("\tMaximum Parallel Apps : " + "10");
pw.println("\tDefault Node Label expression : "
+ NodeLabel.DEFAULT_NODE_LABEL_PARTITION);
pw.println("\tAccessible Node Labels : ");

View File

@ -142,6 +142,18 @@ public class QueueInfoPBImpl extends QueueInfo {
builder.setWeight(weight);
}
@Override
public int getMaxParallelApps() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMaxParallelApps()) ? p.getMaxParallelApps() : -1;
}
@Override
public void setMaxParallelApps(int weight) {
maybeInitBuilder();
builder.setMaxParallelApps(weight);
}
@Override
public void setChildQueues(List<QueueInfo> childQueues) {
if (childQueues == null) {

View File

@ -435,7 +435,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
typeValueCache.put(QueueInfo.class, QueueInfo.
newInstance("root", "root", 1.0f,
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
"x && y", null, false, -1.0f, null, false));
"x && y", null, false, -1.0f, 10, null, false));
generateByNewInstance(QueueStatistics.class);
generateByNewInstance(QueueUserACLInfo.class);
generateByNewInstance(YarnClusterMetrics.class);

View File

@ -774,6 +774,7 @@ public abstract class AbstractCSQueue implements CSQueue {
getIntraQueuePreemptionDisabled());
queueInfo.setQueueConfigurations(getQueueConfigurations());
queueInfo.setWeight(queueCapacities.getWeight());
queueInfo.setMaxParallelApps(maxParallelApps);
return queueInfo;
}
@ -1494,6 +1495,7 @@ public abstract class AbstractCSQueue implements CSQueue {
this.maxParallelApps = maxParallelApps;
}
@Override
public int getMaxParallelApps() {
return maxParallelApps;
}

View File

@ -151,6 +151,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* @return current run-state
*/
public QueueState getState();
/**
* Get the max-parallel-applications property of the queue
* @return max-parallel-applications
*/
public int getMaxParallelApps();
/**
* Get child queues

View File

@ -65,6 +65,7 @@ public class CapacitySchedulerQueueInfo {
protected float weight;
protected float normalizedWeight;
protected int numApplications;
protected int maxParallelApps;
protected String queueName;
protected boolean isAbsoluteResource;
protected QueueState state;
@ -120,6 +121,7 @@ public class CapacitySchedulerQueueInfo {
weight = q.getQueueCapacities().getWeight();
normalizedWeight = q.getQueueCapacities().getNormalizedWeight();
numApplications = q.getNumApplications();
maxParallelApps = q.getMaxParallelApps();
allocatedContainers = q.getMetrics().getAllocatedContainers();
pendingContainers = q.getMetrics().getPendingContainers();
reservedContainers = q.getMetrics().getReservedContainers();
@ -352,6 +354,10 @@ public class CapacitySchedulerQueueInfo {
return normalizedWeight;
}
public int getMaxParallelApps() {
return maxParallelApps;
}
public String getDefaultNodeLabelExpression() {
return defaultNodeLabelExpression;
}

View File

@ -70,7 +70,7 @@ public class TestConfigurationMutationACLPolicies {
QueueInfo queueInfo = QueueInfo.
newInstance(queueName, queuePath, 0, 0,
0, null, null,
null, null, null, null, false, -1.0f, null, false);
null, null, null, null, false, -1.0f, 10, null, false);
when(confScheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
.thenReturn(queueInfo);
Queue queue = mock(Queue.class);

View File

@ -219,7 +219,7 @@ public class TestSchedulerApplicationAttempt {
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
QueueInfo queueInfo = QueueInfo.newInstance(name,
"root." + name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
null, QueueState.RUNNING, null, "", null, false, -1.0f, 10, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);

View File

@ -5101,7 +5101,7 @@ public class TestLeafQueue {
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
QueueInfo queueInfo = QueueInfo.
newInstance(name, path, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
null, QueueState.RUNNING, null, "", null, false, -1.0f, 10, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
AbstractCSQueue queue = mock(AbstractCSQueue.class);
when(queue.getMetrics()).thenReturn(metrics);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertFalse;
@ -83,6 +84,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
float absoluteMaxCapacity;
float absoluteUsedCapacity;
int numApplications;
int maxParallelApps;
String queueName;
private String queuePath;
String state;
@ -140,6 +142,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
config.setCapacity(A, 10.5f);
config.setMaximumCapacity(A, 50);
config.setInt(CapacitySchedulerConfiguration.getQueuePrefix(A) + MAX_PARALLEL_APPLICATIONS, 42);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
config.setCapacity(B, 89.5f);
@ -311,6 +314,8 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
WebServicesTestUtils.getXmlFloat(qElem, "absoluteUsedCapacity");
qi.numApplications =
WebServicesTestUtils.getXmlInt(qElem, "numApplications");
qi.maxParallelApps =
WebServicesTestUtils.getXmlInt(qElem, "maxParallelApps");
qi.queueName = WebServicesTestUtils.getXmlString(qElem, "queueName");
qi.queuePath = WebServicesTestUtils.getXmlString(qElem, "queuePath");
qi.state = WebServicesTestUtils.getXmlString(qElem, "state");
@ -424,10 +429,10 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
private void verifySubQueue(JSONObject info, String q,
float parentAbsCapacity, float parentAbsMaxCapacity)
throws JSONException, Exception {
int numExpectedElements = 37;
int numExpectedElements = 38;
boolean isParentQueue = true;
if (!info.has("queues")) {
numExpectedElements = 55;
numExpectedElements = 56;
isParentQueue = false;
}
assertEquals("incorrect number of elements", numExpectedElements, info.length());
@ -440,6 +445,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
qi.absoluteMaxCapacity = (float) info.getDouble("absoluteMaxCapacity");
qi.absoluteUsedCapacity = (float) info.getDouble("absoluteUsedCapacity");
qi.numApplications = info.getInt("numApplications");
qi.maxParallelApps = info.getInt("maxParallelApps");
qi.queueName = info.getString("queueName");
qi.queuePath = info.getString("queuePath");
qi.state = info.getString("state");
@ -526,7 +532,9 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
+ " queue is not configured in Absolute resource",
info.isAbsoluteResource);
}
assertEquals("maxParallelApps doesn't match " + q,
(q.equals("root.a") ? 42 : Integer.MAX_VALUE),
info.maxParallelApps);
}
private void verifyLeafQueueGeneric(String q, LeafQueueInfo info)

View File

@ -324,6 +324,7 @@ The capacity scheduler supports hierarchical queues. This one request will print
| absoluteMaxCapacity | float | Absolute maximum capacity percentage this queue can use of the entire cluster |
| absoluteUsedCapacity | float | Absolute used capacity percentage this queue is using of the entire cluster |
| numApplications | int | The number of applications currently in the queue |
| maxParallelApps | int | Maximum number of applications that can run at the same time |
| usedResources | string | A string describing the current resources used by the queue |
| queueName | string | The name of the queue |
| state | string of QueueState | The state of the queue |
@ -423,6 +424,7 @@ Response Body:
"capacity": 10.5,
"maxCapacity": 50.0,
"numApplications": 0,
"maxParallelApps": 2147483647,
"queueName": "a",
"queues": {
"queue": [
@ -433,6 +435,7 @@ Response Body:
"capacity": 30.000002,
"maxCapacity": 50.0,
"numApplications": 0,
"maxParallelApps": 2147483647,
"queueName": "a1",
"queues": {
"queue": [
@ -448,6 +451,7 @@ Response Body:
"maxCapacity": 100.0,
"numActiveApplications": 0,
"numApplications": 0,
"maxParallelApps": 2147483647,
"numContainers": 0,
"numPendingApplications": 0,
"queueName": "a1a",
@ -475,6 +479,7 @@ Response Body:
"maxCapacity": 100.0,
"numActiveApplications": 0,
"numApplications": 0,
"maxParallelApps": 2147483647,
"numContainers": 0,
"numPendingApplications": 0,
"queueName": "a1b",
@ -512,6 +517,7 @@ Response Body:
"maxCapacity": 100.0,
"numActiveApplications": 0,
"numApplications": 0,
"maxParallelApps": 2147483647,
"numContainers": 0,
"numPendingApplications": 0,
"queueName": "a2",
@ -544,6 +550,7 @@ Response Body:
"capacity": 89.5,
"maxCapacity": 100.0,
"numApplications": 2,
"maxParallelApps": 2147483647,
"queueName": "b",
"queues": {
"queue": [
@ -559,6 +566,7 @@ Response Body:
"maxCapacity": 100.0,
"numActiveApplications": 1,
"numApplications": 2,
"maxParallelApps": 2147483647,
"numContainers": 0,
"numPendingApplications": 1,
"queueName": "b1",
@ -607,6 +615,7 @@ Response Body:
"maxCapacity": 100.0,
"numActiveApplications": 0,
"numApplications": 0,
"maxParallelApps": 2147483647,
"numContainers": 0,
"numPendingApplications": 0,
"queueName": "b2",
@ -634,6 +643,7 @@ Response Body:
"maxCapacity": 100.0,
"numActiveApplications": 0,
"numApplications": 0,
"maxParallelApps": 2147483647,
"numContainers": 0,
"numPendingApplications": 0,
"queueName": "b3",
@ -756,6 +766,7 @@ Response Body:
<absoluteMaxCapacity>50.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>0</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>a</queueName>
<state>RUNNING</state>
@ -768,6 +779,7 @@ Response Body:
<absoluteMaxCapacity>25.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>0</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>a1</queueName>
<state>RUNNING</state>
@ -780,6 +792,7 @@ Response Body:
<absoluteMaxCapacity>25.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>0</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>a1a</queueName>
<state>RUNNING</state>
@ -806,6 +819,7 @@ Response Body:
<absoluteMaxCapacity>25.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>0</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>a1b</queueName>
<state>RUNNING</state>
@ -838,6 +852,7 @@ Response Body:
<absoluteMaxCapacity>50.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>0</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>a2</queueName>
<state>RUNNING</state>
@ -870,6 +885,7 @@ Response Body:
<absoluteMaxCapacity>100.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>2</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>b</queueName>
<state>RUNNING</state>
@ -882,6 +898,7 @@ Response Body:
<absoluteMaxCapacity>100.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>2</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>b1</queueName>
<state>RUNNING</state>
@ -927,6 +944,7 @@ Response Body:
<absoluteMaxCapacity>100.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>0</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>b2</queueName>
<state>RUNNING</state>
@ -953,6 +971,7 @@ Response Body:
<absoluteMaxCapacity>100.0</absoluteMaxCapacity>
<absoluteUsedCapacity>0.0</absoluteUsedCapacity>
<numApplications>0</numApplications>
<maxParallelApps>2147483647</maxParallelApps>
<usedResources>&lt;memory:0, vCores:0&gt;</usedResources>
<queueName>b3</queueName>
<state>RUNNING</state>