YARN-9949. Add missing queue configs for root queue in RMWebService#CapacitySchedulerInfo.
Contributed by Prabhu Joseph.
This commit is contained in:
parent
1172ebd6e8
commit
57c499ef19
|
@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.P
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
|
||||
|
@ -161,6 +162,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY
|
||||
= "fifo-with-partitions";
|
||||
|
||||
public static final String FIFO_FOR_PENDING_APPS
|
||||
= "fifo-for-pending-apps";
|
||||
|
||||
public static final String DEFAULT_APP_ORDERING_POLICY =
|
||||
FIFO_APP_ORDERING_POLICY;
|
||||
|
||||
|
@ -571,6 +575,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) {
|
||||
policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName();
|
||||
}
|
||||
if (policyType.trim().equals(FIFO_FOR_PENDING_APPS)) {
|
||||
policyType = FifoOrderingPolicyForPendingApps.class.getName();
|
||||
}
|
||||
try {
|
||||
orderingPolicy = (OrderingPolicy<S>)
|
||||
Class.forName(policyType).newInstance();
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
|
||||
|
@ -119,4 +120,9 @@ public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
|
|||
return "FairOrderingPolicy" + sbw;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConfigName() {
|
||||
return CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
|||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
||||
/**
|
||||
|
@ -61,5 +62,10 @@ public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
|
|||
public String getInfo() {
|
||||
return "FifoOrderingPolicy";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConfigName() {
|
||||
return CapacitySchedulerConfiguration.FIFO_APP_ORDERING_POLICY;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
|||
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
|
@ -55,6 +56,11 @@ public class FifoOrderingPolicyForPendingApps<S extends SchedulableEntity>
|
|||
return "FifoOrderingPolicyForPendingApps";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConfigName() {
|
||||
return CapacitySchedulerConfiguration.FIFO_FOR_PENDING_APPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, String> conf) {
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.HashMap;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
||||
|
||||
|
@ -136,6 +137,12 @@ public class FifoOrderingPolicyWithExclusivePartitions<S extends SchedulableEnti
|
|||
return "FifoOrderingPolicyWithExclusivePartitions";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConfigName() {
|
||||
return CapacitySchedulerConfiguration
|
||||
.FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY;
|
||||
}
|
||||
|
||||
private OrderingPolicy<S> getPartitionOrderingPolicy(String partition) {
|
||||
String keyPartition = orderingPolicies.containsKey(partition) ?
|
||||
partition : DEFAULT_PARTITION;
|
||||
|
|
|
@ -129,4 +129,10 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
|
|||
*/
|
||||
public String getInfo();
|
||||
|
||||
/**
|
||||
* Return configuration name (which will be used to set ordering policy).
|
||||
* @return configuration name
|
||||
*/
|
||||
String getConfigName();
|
||||
|
||||
}
|
||||
|
|
|
@ -201,7 +201,7 @@ class CapacitySchedulerPage extends RmView {
|
|||
__("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
|
||||
__("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
|
||||
__("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
|
||||
__("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
|
||||
__("Ordering Policy: ", lqinfo.getOrderingPolicyDisplayName()).
|
||||
__("Preemption:",
|
||||
lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
|
||||
__("Intra-queue Preemption:", lqinfo.getIntraQueuePreemptionDisabled()
|
||||
|
|
|
@ -24,11 +24,17 @@ import javax.xml.bind.annotation.XmlRootElement;
|
|||
import javax.xml.bind.annotation.XmlTransient;
|
||||
import javax.xml.bind.annotation.XmlType;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
|
||||
@XmlRootElement(name = "capacityScheduler")
|
||||
|
@ -43,6 +49,10 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
|
|||
protected CapacitySchedulerQueueInfoList queues;
|
||||
protected QueueCapacitiesInfo capacities;
|
||||
protected CapacitySchedulerHealthInfo health;
|
||||
protected ResourceInfo maximumAllocation;
|
||||
protected QueueAclsInfo queueAcls;
|
||||
protected int queuePriority;
|
||||
protected String orderingPolicyInfo;
|
||||
|
||||
@XmlTransient
|
||||
static final float EPSILON = 1e-8f;
|
||||
|
@ -63,6 +73,31 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
|
|||
parent.getQueueResourceQuotas(), false);
|
||||
queues = getQueues(cs, parent);
|
||||
health = new CapacitySchedulerHealthInfo(cs);
|
||||
maximumAllocation = new ResourceInfo(parent.getMaximumAllocation());
|
||||
|
||||
CapacitySchedulerConfiguration conf = cs.getConfiguration();
|
||||
queueAcls = new QueueAclsInfo();
|
||||
for (Map.Entry<AccessType, AccessControlList> e : conf
|
||||
.getAcls(queueName).entrySet()) {
|
||||
QueueAclInfo queueAcl = new QueueAclInfo(e.getKey().toString(),
|
||||
e.getValue().getAclString());
|
||||
queueAcls.add(queueAcl);
|
||||
}
|
||||
|
||||
String aclApplicationMaxPriority = "acl_" +
|
||||
StringUtils.toLowerCase(AccessType.APPLICATION_MAX_PRIORITY.toString());
|
||||
String priorityAcls = conf.get(parent.getQueuePath()
|
||||
+ aclApplicationMaxPriority, conf.ALL_ACL);
|
||||
|
||||
QueueAclInfo queueAcl = new QueueAclInfo(
|
||||
AccessType.APPLICATION_MAX_PRIORITY.toString(), priorityAcls);
|
||||
queueAcls.add(queueAcl);
|
||||
|
||||
queuePriority = parent.getPriority().getPriority();
|
||||
if (parent instanceof ParentQueue) {
|
||||
orderingPolicyInfo = ((ParentQueue) parent).getQueueOrderingPolicy()
|
||||
.getConfigName();
|
||||
}
|
||||
}
|
||||
|
||||
public float getCapacity() {
|
||||
|
@ -85,6 +120,22 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
|
|||
return this.queueName;
|
||||
}
|
||||
|
||||
public ResourceInfo getMaximumAllocation() {
|
||||
return maximumAllocation;
|
||||
}
|
||||
|
||||
public QueueAclsInfo getQueueAcls() {
|
||||
return queueAcls;
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
return queuePriority;
|
||||
}
|
||||
|
||||
public String getOrderingPolicyInfo() {
|
||||
return orderingPolicyInfo;
|
||||
}
|
||||
|
||||
public CapacitySchedulerQueueInfoList getQueues() {
|
||||
return this.queues;
|
||||
}
|
||||
|
|
|
@ -58,6 +58,9 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
protected long maxApplicationLifetime;
|
||||
protected long defaultApplicationLifetime;
|
||||
|
||||
@XmlTransient
|
||||
protected String orderingPolicyDisplayName;
|
||||
|
||||
CapacitySchedulerLeafQueueInfo() {
|
||||
};
|
||||
|
||||
|
@ -76,7 +79,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
|
||||
preemptionDisabled = q.getPreemptionDisabled();
|
||||
intraQueuePreemptionDisabled = q.getIntraQueuePreemptionDisabled();
|
||||
orderingPolicyInfo = q.getOrderingPolicy().getInfo();
|
||||
orderingPolicyDisplayName = q.getOrderingPolicy().getInfo();
|
||||
orderingPolicyInfo = q.getOrderingPolicy().getConfigName();
|
||||
defaultNodeLabelExpression = q.getDefaultNodeLabelExpression();
|
||||
defaultPriority = q.getDefaultApplicationPriority().getPriority();
|
||||
ArrayList<UserInfo> usersList = users.getUsersList();
|
||||
|
@ -165,6 +169,10 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
return intraQueuePreemptionDisabled;
|
||||
}
|
||||
|
||||
public String getOrderingPolicyDisplayName() {
|
||||
return orderingPolicyDisplayName;
|
||||
}
|
||||
|
||||
public String getDefaultNodeLabelExpression() {
|
||||
return defaultNodeLabelExpression;
|
||||
}
|
||||
|
|
|
@ -87,6 +87,8 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
int maxApplicationsPerUser;
|
||||
int userLimit;
|
||||
float userLimitFactor;
|
||||
long defaultApplicationLifetime;
|
||||
long maxApplicationLifetime;
|
||||
}
|
||||
|
||||
private static class WebServletModule extends ServletModule {
|
||||
|
@ -131,6 +133,8 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
config.setQueues(A, new String[] {"a1", "a2"});
|
||||
config.setCapacity(A1, 30);
|
||||
config.setMaximumCapacity(A1, 50);
|
||||
config.setMaximumLifetimePerQueue(A2, 100);
|
||||
config.setDefaultLifetimePerQueue(A2, 50);
|
||||
|
||||
config.setUserLimitFactor(A1, 100.0f);
|
||||
config.setCapacity(A2, 70);
|
||||
|
@ -311,6 +315,10 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
|
||||
lqi.userLimitFactor =
|
||||
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
|
||||
lqi.defaultApplicationLifetime =
|
||||
WebServicesTestUtils.getXmlLong(qElem, "defaultApplicationLifetime");
|
||||
lqi.maxApplicationLifetime =
|
||||
WebServicesTestUtils.getXmlLong(qElem, "maxApplicationLifetime");
|
||||
verifyLeafQueueGeneric(q, lqi);
|
||||
}
|
||||
}
|
||||
|
@ -321,7 +329,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
JSONObject info = json.getJSONObject("scheduler");
|
||||
assertEquals("incorrect number of elements", 1, info.length());
|
||||
info = info.getJSONObject("schedulerInfo");
|
||||
assertEquals("incorrect number of elements", 8, info.length());
|
||||
assertEquals("incorrect number of elements in: " + info, 12, info.length());
|
||||
verifyClusterSchedulerGeneric(info.getString("type"),
|
||||
(float) info.getDouble("usedCapacity"),
|
||||
(float) info.getDouble("capacity"),
|
||||
|
@ -330,6 +338,16 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
assertNotNull(health);
|
||||
assertEquals("incorrect number of elements", 3, health.length());
|
||||
|
||||
JSONObject maximumAllocation = info.getJSONObject("maximumAllocation");
|
||||
assertEquals("8192", maximumAllocation.getString("memory"));
|
||||
assertEquals("4", maximumAllocation.getString("vCores"));
|
||||
|
||||
JSONObject queueAcls = info.getJSONObject("queueAcls");
|
||||
assertEquals(1, queueAcls.length());
|
||||
|
||||
assertEquals("0", info.getString("queuePriority"));
|
||||
assertEquals("utilization", info.getString("orderingPolicyInfo"));
|
||||
|
||||
JSONArray arr = info.getJSONObject("queues").getJSONArray("queue");
|
||||
assertEquals("incorrect number of elements", 2, arr.length());
|
||||
|
||||
|
@ -383,6 +401,16 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
String q2 = q + "." + obj.getString("queueName");
|
||||
verifySubQueue(obj, q2, qi.absoluteCapacity, qi.absoluteMaxCapacity);
|
||||
}
|
||||
|
||||
JSONObject maximumAllocation = info.getJSONObject("maximumAllocation");
|
||||
assertEquals("8192", maximumAllocation.getString("memory"));
|
||||
assertEquals("4", maximumAllocation.getString("vCores"));
|
||||
|
||||
JSONObject queueAcls = info.getJSONObject("queueAcls");
|
||||
assertEquals(1, queueAcls.length());
|
||||
|
||||
assertEquals("0", info.getString("queuePriority"));
|
||||
assertEquals("utilization", info.getString("orderingPolicyInfo"));
|
||||
} else {
|
||||
Assert.assertEquals("\"type\" field is incorrect",
|
||||
"capacitySchedulerLeafQueueInfo", info.getString("type"));
|
||||
|
@ -394,6 +422,9 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
|
||||
lqi.userLimit = info.getInt("userLimit");
|
||||
lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
|
||||
lqi.defaultApplicationLifetime =
|
||||
info.getLong("defaultApplicationLifetime");
|
||||
lqi.maxApplicationLifetime = info.getLong("maxApplicationLifetime");
|
||||
verifyLeafQueueGeneric(q, lqi);
|
||||
// resourcesUsed and users (per-user resources used) are checked in
|
||||
// testPerUserResource()
|
||||
|
@ -458,6 +489,15 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
info.userLimit);
|
||||
assertEquals("userLimitFactor doesn't match",
|
||||
csConf.getUserLimitFactor(q), info.userLimitFactor, 1e-3f);
|
||||
|
||||
if (q.equals("root.a.a2")) {
|
||||
assertEquals("defaultApplicationLifetime doesn't match",
|
||||
csConf.getDefaultLifetimePerQueue(q),
|
||||
info.defaultApplicationLifetime);
|
||||
assertEquals("maxApplicationLifetime doesn't match",
|
||||
csConf.getMaximumLifetimePerQueue(q),
|
||||
info.maxApplicationLifetime);
|
||||
}
|
||||
}
|
||||
|
||||
//Return a child Node of node with the tagname or null if none exists
|
||||
|
|
|
@ -466,7 +466,7 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
|
|||
JSONObject info = json.getJSONObject("scheduler");
|
||||
assertEquals("incorrect number of elements", 1, info.length());
|
||||
info = info.getJSONObject("schedulerInfo");
|
||||
assertEquals("incorrect number of elements", 8, info.length());
|
||||
assertEquals("incorrect number of elements", 12, info.length());
|
||||
JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES);
|
||||
JSONArray partitionsCapsArray =
|
||||
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);
|
||||
|
|
Loading…
Reference in New Issue