YARN-9949. Add missing queue configs for root queue in RMWebService#CapacitySchedulerInfo. Contributed by Prabhu Joseph.

This commit is contained in:
Sunil G 2019-11-03 08:47:27 +05:30
parent de6b8b0c0b
commit d462308e04
12 changed files with 146 additions and 4 deletions

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@ -166,6 +167,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY
= "fifo-with-partitions";
public static final String FIFO_FOR_PENDING_APPS
= "fifo-for-pending-apps";
public static final String DEFAULT_APP_ORDERING_POLICY =
FIFO_APP_ORDERING_POLICY;
@ -579,6 +583,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) {
policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName();
}
if (policyType.trim().equals(FIFO_FOR_PENDING_APPS)) {
policyType = FifoOrderingPolicyForPendingApps.class.getName();
}
try {
orderingPolicy = (OrderingPolicy<S>)
Class.forName(policyType).newInstance();

View File

@ -139,4 +139,7 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
@Override
public abstract String getInfo();
@Override
public abstract String getConfigName();
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
@ -119,4 +120,9 @@ public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
return "FairOrderingPolicy" + sbw;
}
@Override
public String getConfigName() {
return CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
@ -61,5 +62,10 @@ public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
public String getInfo() {
return "FifoOrderingPolicy";
}
@Override
public String getConfigName() {
return CapacitySchedulerConfiguration.FIFO_APP_ORDERING_POLICY;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import java.util.concurrent.ConcurrentSkipListSet;
@ -55,6 +56,11 @@ public class FifoOrderingPolicyForPendingApps<S extends SchedulableEntity>
return "FifoOrderingPolicyForPendingApps";
}
@Override
public String getConfigName() {
return CapacitySchedulerConfiguration.FIFO_FOR_PENDING_APPS;
}
@Override
public void configure(Map<String, String> conf) {
}

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -136,6 +137,12 @@ public class FifoOrderingPolicyWithExclusivePartitions<S extends SchedulableEnti
return "FifoOrderingPolicyWithExclusivePartitions";
}
@Override
public String getConfigName() {
return CapacitySchedulerConfiguration
.FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY;
}
private OrderingPolicy<S> getPartitionOrderingPolicy(String partition) {
String keyPartition = orderingPolicies.containsKey(partition) ?
partition : DEFAULT_PARTITION;

View File

@ -129,4 +129,10 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
*/
public String getInfo();
/**
* Return configuration name (which will be used to set ordering policy).
* @return configuration name
*/
String getConfigName();
}

View File

@ -201,7 +201,7 @@ class CapacitySchedulerPage extends RmView {
__("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
__("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
__("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
__("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
__("Ordering Policy: ", lqinfo.getOrderingPolicyDisplayName()).
__("Preemption:",
lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
__("Intra-queue Preemption:", lqinfo.getIntraQueuePreemptionDisabled()

View File

@ -24,11 +24,17 @@ import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import java.util.ArrayList;
import java.util.Map;
import java.util.List;
@XmlRootElement(name = "capacityScheduler")
@ -43,6 +49,10 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
protected CapacitySchedulerQueueInfoList queues;
protected QueueCapacitiesInfo capacities;
protected CapacitySchedulerHealthInfo health;
protected ResourceInfo maximumAllocation;
protected QueueAclsInfo queueAcls;
protected int queuePriority;
protected String orderingPolicyInfo;
@XmlTransient
static final float EPSILON = 1e-8f;
@ -63,6 +73,31 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
parent.getQueueResourceQuotas(), false);
queues = getQueues(cs, parent);
health = new CapacitySchedulerHealthInfo(cs);
maximumAllocation = new ResourceInfo(parent.getMaximumAllocation());
CapacitySchedulerConfiguration conf = cs.getConfiguration();
queueAcls = new QueueAclsInfo();
for (Map.Entry<AccessType, AccessControlList> e : conf
.getAcls(queueName).entrySet()) {
QueueAclInfo queueAcl = new QueueAclInfo(e.getKey().toString(),
e.getValue().getAclString());
queueAcls.add(queueAcl);
}
String aclApplicationMaxPriority = "acl_" +
StringUtils.toLowerCase(AccessType.APPLICATION_MAX_PRIORITY.toString());
String priorityAcls = conf.get(parent.getQueuePath()
+ aclApplicationMaxPriority, conf.ALL_ACL);
QueueAclInfo queueAcl = new QueueAclInfo(
AccessType.APPLICATION_MAX_PRIORITY.toString(), priorityAcls);
queueAcls.add(queueAcl);
queuePriority = parent.getPriority().getPriority();
if (parent instanceof ParentQueue) {
orderingPolicyInfo = ((ParentQueue) parent).getQueueOrderingPolicy()
.getConfigName();
}
}
public float getCapacity() {
@ -85,6 +120,22 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
return this.queueName;
}
public ResourceInfo getMaximumAllocation() {
return maximumAllocation;
}
public QueueAclsInfo getQueueAcls() {
return queueAcls;
}
public int getPriority() {
return queuePriority;
}
public String getOrderingPolicyInfo() {
return orderingPolicyInfo;
}
public CapacitySchedulerQueueInfoList getQueues() {
return this.queues;
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
@ -57,6 +58,9 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
protected long maxApplicationLifetime;
protected long defaultApplicationLifetime;
@XmlTransient
protected String orderingPolicyDisplayName;
CapacitySchedulerLeafQueueInfo() {
};
@ -75,7 +79,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
preemptionDisabled = q.getPreemptionDisabled();
intraQueuePreemptionDisabled = q.getIntraQueuePreemptionDisabled();
orderingPolicyInfo = q.getOrderingPolicy().getInfo();
orderingPolicyDisplayName = q.getOrderingPolicy().getInfo();
orderingPolicyInfo = q.getOrderingPolicy().getConfigName();
defaultNodeLabelExpression = q.getDefaultNodeLabelExpression();
defaultPriority = q.getDefaultApplicationPriority().getPriority();
ArrayList<UserInfo> usersList = users.getUsersList();
@ -163,6 +168,10 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
public boolean getIntraQueuePreemptionDisabled() {
return intraQueuePreemptionDisabled;
}
public String getOrderingPolicyDisplayName() {
return orderingPolicyDisplayName;
}
public String getDefaultNodeLabelExpression() {
return defaultNodeLabelExpression;

View File

@ -87,6 +87,8 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
int maxApplicationsPerUser;
int userLimit;
float userLimitFactor;
long defaultApplicationLifetime;
long maxApplicationLifetime;
}
private static class WebServletModule extends ServletModule {
@ -133,6 +135,8 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
config.setQueues(A, new String[] {"a1", "a2"});
config.setCapacity(A1, 30);
config.setMaximumCapacity(A1, 50);
config.setMaximumLifetimePerQueue(A2, 100);
config.setDefaultLifetimePerQueue(A2, 50);
config.setUserLimitFactor(A1, 100.0f);
config.setCapacity(A2, 70);
@ -313,6 +317,10 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
lqi.userLimitFactor =
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
lqi.defaultApplicationLifetime =
WebServicesTestUtils.getXmlLong(qElem, "defaultApplicationLifetime");
lqi.maxApplicationLifetime =
WebServicesTestUtils.getXmlLong(qElem, "maxApplicationLifetime");
verifyLeafQueueGeneric(q, lqi);
}
}
@ -323,7 +331,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements in: " + info, 1, info.length());
info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements in: " + info, 8, info.length());
assertEquals("incorrect number of elements in: " + info, 12, info.length());
verifyClusterSchedulerGeneric(info.getString("type"),
(float) info.getDouble("usedCapacity"),
(float) info.getDouble("capacity"),
@ -339,6 +347,16 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
assertEquals("incorrect number of elements in: " + health, 3,
lastRunDetails.length());
JSONObject maximumAllocation = info.getJSONObject("maximumAllocation");
assertEquals("8192", maximumAllocation.getString("memory"));
assertEquals("4", maximumAllocation.getString("vCores"));
JSONObject queueAcls = info.getJSONObject("queueAcls");
assertEquals(1, queueAcls.length());
assertEquals("0", info.getString("queuePriority"));
assertEquals("utilization", info.getString("orderingPolicyInfo"));
JSONArray arr = info.getJSONObject("queues").getJSONArray("queue");
assertEquals("incorrect number of elements in: " + arr, 2, arr.length());
@ -393,6 +411,16 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
String q2 = q + "." + obj.getString("queueName");
verifySubQueue(obj, q2, qi.absoluteCapacity, qi.absoluteMaxCapacity);
}
JSONObject maximumAllocation = info.getJSONObject("maximumAllocation");
assertEquals("8192", maximumAllocation.getString("memory"));
assertEquals("4", maximumAllocation.getString("vCores"));
JSONObject queueAcls = info.getJSONObject("queueAcls");
assertEquals(1, queueAcls.length());
assertEquals("0", info.getString("queuePriority"));
assertEquals("utilization", info.getString("orderingPolicyInfo"));
} else {
Assert.assertEquals("\"type\" field is incorrect",
"capacitySchedulerLeafQueueInfo", info.getString("type"));
@ -404,6 +432,9 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
lqi.userLimit = info.getInt("userLimit");
lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
lqi.defaultApplicationLifetime =
info.getLong("defaultApplicationLifetime");
lqi.maxApplicationLifetime = info.getLong("maxApplicationLifetime");
verifyLeafQueueGeneric(q, lqi);
// resourcesUsed and users (per-user resources used) are checked in
// testPerUserResource()
@ -468,6 +499,15 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
info.userLimit);
assertEquals("userLimitFactor doesn't match",
csConf.getUserLimitFactor(q), info.userLimitFactor, 1e-3f);
if (q.equals("root.a.a2")) {
assertEquals("defaultApplicationLifetime doesn't match",
csConf.getDefaultLifetimePerQueue(q),
info.defaultApplicationLifetime);
assertEquals("maxApplicationLifetime doesn't match",
csConf.getMaximumLifetimePerQueue(q),
info.maxApplicationLifetime);
}
}
//Return a child Node of node with the tagname or null if none exists

View File

@ -563,7 +563,7 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements", 1, info.length());
info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements", 8, info.length());
assertEquals("incorrect number of elements", 12, info.length());
JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES);
JSONArray partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);