YARN-4162. CapacityScheduler: Add resource usage by partition and queue capacity by partition to REST API. (Naganarasimha G R via wangda)

This commit is contained in:
Wangda Tan 2015-10-16 15:05:47 -07:00
parent 79b8d60d08
commit 4337b263aa
16 changed files with 1147 additions and 94 deletions

View File

@ -507,6 +507,9 @@ Release 2.8.0 - UNRELEASED
YARN-4258. Add support for controlling capabilities for docker containers. YARN-4258. Add support for controlling capabilities for docker containers.
(Sidharta Seethana via vvasudev) (Sidharta Seethana via vvasudev)
YARN-4162. CapacityScheduler: Add resource usage by partition and queue capacity
by partition to REST API. (Naganarasimha G R via wangda)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -29,12 +29,11 @@ import org.apache.hadoop.yarn.util.Records;
public abstract class NodeLabel implements Comparable<NodeLabel> { public abstract class NodeLabel implements Comparable<NodeLabel> {
/** /**
* Default node label partition. * Default node label partition used for displaying.
*/ */
@Private @Private
@Unstable @Unstable
public static final String DEFAULT_NODE_LABEL_PARTITION = public static final String DEFAULT_NODE_LABEL_PARTITION = "<DEFAULT_PARTITION>";
"<DEFAULT_PARTITION>";
/** /**
* Node Label expression not set . * Node Label expression not set .

View File

@ -58,8 +58,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -425,7 +425,7 @@ public class LeafQueue extends AbstractCSQueue {
.getAllUsed()), user.getActiveApplications(), user .getAllUsed()), user.getActiveApplications(), user
.getPendingApplications(), Resources.clone(user .getPendingApplications(), Resources.clone(user
.getConsumedAMResources()), Resources.clone(user .getConsumedAMResources()), Resources.clone(user
.getUserResourceLimit()))); .getUserResourceLimit()), user.getResourceUsage()));
} }
return usersToReturn; return usersToReturn;
} }

View File

@ -23,7 +23,9 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceUsageInfo;
@XmlRootElement @XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
@ -34,17 +36,19 @@ public class UserInfo {
protected int numActiveApplications; protected int numActiveApplications;
protected ResourceInfo AMResourceUsed; protected ResourceInfo AMResourceUsed;
protected ResourceInfo userResourceLimit; protected ResourceInfo userResourceLimit;
protected ResourceUsageInfo resources;
UserInfo() {} UserInfo() {}
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps, UserInfo(String username, Resource resUsed, int activeApps, int pendingApps,
Resource amResUsed, Resource resourceLimit) { Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage) {
this.username = username; this.username = username;
this.resourcesUsed = new ResourceInfo(resUsed); this.resourcesUsed = new ResourceInfo(resUsed);
this.numActiveApplications = activeApps; this.numActiveApplications = activeApps;
this.numPendingApplications = pendingApps; this.numPendingApplications = pendingApps;
this.AMResourceUsed = new ResourceInfo(amResUsed); this.AMResourceUsed = new ResourceInfo(amResUsed);
this.userResourceLimit = new ResourceInfo(resourceLimit); this.userResourceLimit = new ResourceInfo(resourceLimit);
this.resources = new ResourceUsageInfo(resourceUsage);
} }
public String getUsername() { public String getUsername() {
@ -70,4 +74,8 @@ public class UserInfo {
public ResourceInfo getUserResourceLimit() { public ResourceInfo getUserResourceLimit() {
return userResourceLimit; return userResourceLimit;
} }
public ResourceUsageInfo getResourceUsageInfo() {
return resources;
}
} }

View File

@ -34,11 +34,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionQueueCapacitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionResourceUsageInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.ResponseInfo; import org.apache.hadoop.yarn.webapp.ResponseInfo;
@ -70,6 +72,7 @@ class CapacitySchedulerPage extends RmView {
CapacitySchedulerInfo csinfo; CapacitySchedulerInfo csinfo;
CapacitySchedulerQueueInfo qinfo; CapacitySchedulerQueueInfo qinfo;
String label; String label;
boolean isExclusiveNodeLabel;
} }
static class LeafQueueInfoBlock extends HtmlBlock { static class LeafQueueInfoBlock extends HtmlBlock {
@ -92,13 +95,13 @@ class CapacitySchedulerPage extends RmView {
} }
private void renderLeafQueueInfoWithPartition(Block html) { private void renderLeafQueueInfoWithPartition(Block html) {
nodeLabel = nodeLabel.length() == 0 String nodeLabelDisplay = nodeLabel.length() == 0
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : nodeLabel; ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : nodeLabel;
// first display the queue's label specific details : // first display the queue's label specific details :
ResponseInfo ri = ResponseInfo ri =
info("\'" + lqinfo.getQueuePath().substring(5) info("\'" + lqinfo.getQueuePath().substring(5)
+ "\' Queue Status for Partition \'" + nodeLabel + "\'"); + "\' Queue Status for Partition \'" + nodeLabelDisplay + "\'");
renderQueueCapacityInfo(ri); renderQueueCapacityInfo(ri, nodeLabel);
html._(InfoBlock.class); html._(InfoBlock.class);
// clear the info contents so this queue's info doesn't accumulate into // clear the info contents so this queue's info doesn't accumulate into
// another queue's info // another queue's info
@ -120,7 +123,7 @@ class CapacitySchedulerPage extends RmView {
ResponseInfo ri = ResponseInfo ri =
info("\'" + lqinfo.getQueuePath().substring(5) + "\' Queue Status") info("\'" + lqinfo.getQueuePath().substring(5) + "\' Queue Status")
._("Queue State:", lqinfo.getQueueState()); ._("Queue State:", lqinfo.getQueueState());
renderQueueCapacityInfo(ri); renderQueueCapacityInfo(ri, "");
renderCommonLeafQueueInfo(ri); renderCommonLeafQueueInfo(ri);
html._(InfoBlock.class); html._(InfoBlock.class);
// clear the info contents so this queue's info doesn't accumulate into // clear the info contents so this queue's info doesn't accumulate into
@ -128,15 +131,19 @@ class CapacitySchedulerPage extends RmView {
ri.clear(); ri.clear();
} }
private void renderQueueCapacityInfo(ResponseInfo ri) { private void renderQueueCapacityInfo(ResponseInfo ri, String label) {
PartitionQueueCapacitiesInfo capacities =
lqinfo.getCapacities().getPartitionQueueCapacitiesInfo(label);
PartitionResourceUsageInfo resourceUsages =
lqinfo.getResources().getPartitionResourceUsageInfo(label);
ri. ri.
_("Used Capacity:", percent(lqinfo.getUsedCapacity() / 100)). _("Used Capacity:", percent(capacities.getUsedCapacity() / 100)).
_("Configured Capacity:", percent(lqinfo.getCapacity() / 100)). _("Configured Capacity:", percent(capacities.getCapacity() / 100)).
_("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)). _("Configured Max Capacity:", percent(capacities.getMaxCapacity() / 100)).
_("Absolute Used Capacity:", percent(lqinfo.getAbsoluteUsedCapacity() / 100)). _("Absolute Used Capacity:", percent(capacities.getAbsoluteUsedCapacity() / 100)).
_("Absolute Configured Capacity:", percent(lqinfo.getAbsoluteCapacity() / 100)). _("Absolute Configured Capacity:", percent(capacities.getAbsoluteCapacity() / 100)).
_("Absolute Configured Max Capacity:", percent(lqinfo.getAbsoluteMaxCapacity() / 100)). _("Absolute Configured Max Capacity:", percent(capacities.getAbsoluteMaxCapacity() / 100)).
_("Used Resources:", lqinfo.getResourcesUsed().toString()); _("Used Resources:", resourceUsages.getUsed().toString());
} }
private void renderCommonLeafQueueInfo(ResponseInfo ri) { private void renderCommonLeafQueueInfo(ResponseInfo ri) {
@ -166,11 +173,13 @@ class CapacitySchedulerPage extends RmView {
static class QueueUsersInfoBlock extends HtmlBlock { static class QueueUsersInfoBlock extends HtmlBlock {
final CapacitySchedulerLeafQueueInfo lqinfo; final CapacitySchedulerLeafQueueInfo lqinfo;
private String nodeLabel;
@Inject @Inject
QueueUsersInfoBlock(ViewContext ctx, CSQInfo info) { QueueUsersInfoBlock(ViewContext ctx, CSQInfo info) {
super(ctx); super(ctx);
lqinfo = (CapacitySchedulerLeafQueueInfo) info.qinfo; lqinfo = (CapacitySchedulerLeafQueueInfo) info.qinfo;
nodeLabel = info.label;
} }
@Override @Override
@ -188,9 +197,14 @@ class CapacitySchedulerPage extends RmView {
ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList(); ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList();
for (UserInfo userInfo : users) { for (UserInfo userInfo : users) {
ResourceInfo resourcesUsed = userInfo.getResourcesUsed();
if (nodeLabel != null) {
resourcesUsed = userInfo.getResourceUsageInfo()
.getPartitionResourceUsageInfo(nodeLabel).getUsed();
}
tbody.tr().td(userInfo.getUsername()) tbody.tr().td(userInfo.getUsername())
.td(userInfo.getUserResourceLimit().toString()) .td(userInfo.getUserResourceLimit().toString())
.td(userInfo.getResourcesUsed().toString()) .td(resourcesUsed.toString())
.td(lqinfo.getUserAMResourceLimit().toString()) .td(lqinfo.getUserAMResourceLimit().toString())
.td(userInfo.getAMResourcesUsed().toString()) .td(userInfo.getAMResourcesUsed().toString())
.td(Integer.toString(userInfo.getNumActiveApplications())) .td(Integer.toString(userInfo.getNumActiveApplications()))
@ -211,15 +225,32 @@ class CapacitySchedulerPage extends RmView {
@Override @Override
public void render(Block html) { public void render(Block html) {
ArrayList<CapacitySchedulerQueueInfo> subQueues = ArrayList<CapacitySchedulerQueueInfo> subQueues = (csqinfo.qinfo == null)
(csqinfo.qinfo == null) ? csqinfo.csinfo.getQueues().getQueueInfoList() ? csqinfo.csinfo.getQueues().getQueueInfoList()
: csqinfo.qinfo.getQueues().getQueueInfoList(); : csqinfo.qinfo.getQueues().getQueueInfoList();
UL<Hamlet> ul = html.ul("#pq"); UL<Hamlet> ul = html.ul("#pq");
float used;
float absCap;
float absMaxCap;
float absUsedCap;
for (CapacitySchedulerQueueInfo info : subQueues) { for (CapacitySchedulerQueueInfo info : subQueues) {
float used = info.getUsedCapacity() / 100; String nodeLabel = (csqinfo.label == null) ? "" : csqinfo.label;
float absCap = info.getAbsoluteCapacity() / 100; //DEFAULT_NODE_LABEL_PARTITION is accessible to all queues
float absMaxCap = info.getAbsoluteMaxCapacity() / 100; //other exclsiveNodeLabels are accessible only if configured
float absUsedCap = info.getAbsoluteUsedCapacity() / 100; if (!nodeLabel.isEmpty()// i.e. its DEFAULT_NODE_LABEL_PARTITION
&& csqinfo.isExclusiveNodeLabel
&& !info.getNodeLabels().contains("*")
&& !info.getNodeLabels().contains(nodeLabel)) {
continue;
}
PartitionQueueCapacitiesInfo partitionQueueCapsInfo = info
.getCapacities().getPartitionQueueCapacitiesInfo(nodeLabel);
used = partitionQueueCapsInfo.getUsedCapacity() / 100;
absCap = partitionQueueCapsInfo.getAbsoluteCapacity() / 100;
absMaxCap = partitionQueueCapsInfo.getAbsoluteMaxCapacity() / 100;
absUsedCap = partitionQueueCapsInfo.getAbsoluteUsedCapacity() / 100;
LI<UL<Hamlet>> li = ul. LI<UL<Hamlet>> li = ul.
li(). li().
a(_Q).$style(width(absMaxCap * Q_MAX_WIDTH)). a(_Q).$style(width(absMaxCap * Q_MAX_WIDTH)).
@ -343,16 +374,13 @@ class CapacitySchedulerPage extends RmView {
_(); _();
float used = 0; float used = 0;
if (null == nodeLabelsInfo
|| (nodeLabelsInfo.size() == 1 && nodeLabelsInfo.get(0)
.getLabelName().isEmpty())) {
CSQueue root = cs.getRootQueue();
CapacitySchedulerInfo sinfo =
new CapacitySchedulerInfo(root, cs, new RMNodeLabel(
RMNodeLabelsManager.NO_LABEL));
csqinfo.csinfo = sinfo;
csqinfo.qinfo = null;
CSQueue root = cs.getRootQueue();
CapacitySchedulerInfo sinfo = new CapacitySchedulerInfo(root, cs);
csqinfo.csinfo = sinfo;
if (null == nodeLabelsInfo || (nodeLabelsInfo.size() == 1
&& nodeLabelsInfo.get(0).getLabelName().isEmpty())) {
used = sinfo.getUsedCapacity() / 100; used = sinfo.getUsedCapacity() / 100;
//label is not enabled in the cluster or there's only "default" label, //label is not enabled in the cluster or there's only "default" label,
ul.li(). ul.li().
@ -365,18 +393,16 @@ class CapacitySchedulerPage extends RmView {
_(QueueBlock.class)._(); _(QueueBlock.class)._();
} else { } else {
for (RMNodeLabel label : nodeLabelsInfo) { for (RMNodeLabel label : nodeLabelsInfo) {
CSQueue root = cs.getRootQueue();
CapacitySchedulerInfo sinfo =
new CapacitySchedulerInfo(root, cs, label);
csqinfo.csinfo = sinfo;
csqinfo.qinfo = null; csqinfo.qinfo = null;
csqinfo.label = label.getLabelName(); csqinfo.label = label.getLabelName();
String nodeLabel = csqinfo.label.length() == 0 csqinfo.isExclusiveNodeLabel = label.getIsExclusive();
String nodeLabelDisplay = csqinfo.label.length() == 0
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : csqinfo.label; ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : csqinfo.label;
QueueCapacities queueCapacities = root.getQueueCapacities(); PartitionQueueCapacitiesInfo capacities = sinfo.getCapacities()
used = queueCapacities.getUsedCapacity(label.getLabelName()); .getPartitionQueueCapacitiesInfo(csqinfo.label);
used = capacities.getUsedCapacity() / 100;
String partitionUiTag = String partitionUiTag =
"Partition: " + nodeLabel + " " + label.getResource(); "Partition: " + nodeLabelDisplay + " " + label.getResource();
ul.li(). ul.li().
a(_Q).$style(width(Q_MAX_WIDTH)). a(_Q).$style(width(Q_MAX_WIDTH)).
span().$style(join(width(used), ";left:0%;", span().$style(join(width(used), ";left:0%;",

View File

@ -105,14 +105,12 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -144,8 +142,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntry; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntry;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
@ -249,8 +247,7 @@ public class RMWebServices {
CapacityScheduler cs = (CapacityScheduler) rs; CapacityScheduler cs = (CapacityScheduler) rs;
CSQueue root = cs.getRootQueue(); CSQueue root = cs.getRootQueue();
sinfo = sinfo =
new CapacitySchedulerInfo(root, cs, new RMNodeLabel( new CapacitySchedulerInfo(root, cs);
RMNodeLabelsManager.NO_LABEL));
} else if (rs instanceof FairScheduler) { } else if (rs instanceof FairScheduler) {
FairScheduler fs = (FairScheduler) rs; FairScheduler fs = (FairScheduler) rs;
sinfo = new FairSchedulerInfo(fs); sinfo = new FairSchedulerInfo(fs);

View File

@ -24,12 +24,9 @@ import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType; import javax.xml.bind.annotation.XmlType;
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
@XmlRootElement(name = "capacityScheduler") @XmlRootElement(name = "capacityScheduler")
@XmlType(name = "capacityScheduler") @XmlType(name = "capacityScheduler")
@ -41,6 +38,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
protected float maxCapacity; protected float maxCapacity;
protected String queueName; protected String queueName;
protected CapacitySchedulerQueueInfoList queues; protected CapacitySchedulerQueueInfoList queues;
protected QueueCapacitiesInfo capacities;
protected CapacitySchedulerHealthInfo health; protected CapacitySchedulerHealthInfo health;
@XmlTransient @XmlTransient
@ -49,19 +47,17 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
public CapacitySchedulerInfo() { public CapacitySchedulerInfo() {
} // JAXB needs this } // JAXB needs this
public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs, public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) {
RMNodeLabel nodeLabel) {
String label = nodeLabel.getLabelName();
QueueCapacities parentQueueCapacities = parent.getQueueCapacities();
this.queueName = parent.getQueueName(); this.queueName = parent.getQueueName();
this.usedCapacity = parentQueueCapacities.getUsedCapacity(label) * 100; this.usedCapacity = parent.getUsedCapacity() * 100;
this.capacity = parentQueueCapacities.getCapacity(label) * 100; this.capacity = parent.getCapacity() * 100;
float max = parentQueueCapacities.getMaximumCapacity(label); float max = parent.getMaximumCapacity();
if (max < EPSILON || max > 1f) if (max < EPSILON || max > 1f)
max = 1f; max = 1f;
this.maxCapacity = max * 100; this.maxCapacity = max * 100;
queues = getQueues(parent, nodeLabel); capacities = new QueueCapacitiesInfo(parent.getQueueCapacities());
queues = getQueues(parent);
health = new CapacitySchedulerHealthInfo(cs); health = new CapacitySchedulerHealthInfo(cs);
} }
@ -73,6 +69,10 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
return this.usedCapacity; return this.usedCapacity;
} }
public QueueCapacitiesInfo getCapacities() {
return capacities;
}
public float getMaxCapacity() { public float getMaxCapacity() {
return this.maxCapacity; return this.maxCapacity;
} }
@ -85,31 +85,21 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
return this.queues; return this.queues;
} }
protected CapacitySchedulerQueueInfoList getQueues(CSQueue parent, protected CapacitySchedulerQueueInfoList getQueues(CSQueue parent) {
RMNodeLabel nodeLabel) {
CSQueue parentQueue = parent; CSQueue parentQueue = parent;
CapacitySchedulerQueueInfoList queuesInfo = CapacitySchedulerQueueInfoList queuesInfo =
new CapacitySchedulerQueueInfoList(); new CapacitySchedulerQueueInfoList();
for (CSQueue queue : parentQueue.getChildQueues()) { for (CSQueue queue : parentQueue.getChildQueues()) {
if (nodeLabel.getIsExclusive()
&& !((AbstractCSQueue) queue).accessibleToPartition(nodeLabel
.getLabelName())) {
// Skip displaying the hierarchy for the queues for which the exclusive
// labels are not accessible
continue;
}
CapacitySchedulerQueueInfo info; CapacitySchedulerQueueInfo info;
if (queue instanceof LeafQueue) { if (queue instanceof LeafQueue) {
info = info =
new CapacitySchedulerLeafQueueInfo((LeafQueue) queue, new CapacitySchedulerLeafQueueInfo((LeafQueue) queue);
nodeLabel.getLabelName());
} else { } else {
info = new CapacitySchedulerQueueInfo(queue, nodeLabel.getLabelName()); info = new CapacitySchedulerQueueInfo(queue);
info.queues = getQueues(queue, nodeLabel); info.queues = getQueues(queue);
} }
queuesInfo.addToQueueInfoList(info); queuesInfo.addToQueueInfoList(info);
} }
return queuesInfo; return queuesInfo;
} }
} }

View File

@ -22,6 +22,7 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@XmlRootElement @XmlRootElement
@ -49,8 +50,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
CapacitySchedulerLeafQueueInfo() { CapacitySchedulerLeafQueueInfo() {
}; };
CapacitySchedulerLeafQueueInfo(LeafQueue q, String nodeLabel) { CapacitySchedulerLeafQueueInfo(LeafQueue q) {
super(q, nodeLabel); super(q);
numActiveApplications = q.getNumActiveApplications(); numActiveApplications = q.getNumActiveApplications();
numPendingApplications = q.getNumPendingApplications(); numPendingApplications = q.getNumPendingApplications();
numContainers = q.getNumContainers(); numContainers = q.getNumContainers();
@ -68,6 +69,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
defaultPriority = q.getDefaultApplicationPriority().getPriority(); defaultPriority = q.getDefaultApplicationPriority().getPriority();
} }
@Override
protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) {
resources = new ResourceUsageInfo(queueResourceUsage);
}
public int getNumActiveApplications() { public int getNumActiveApplications() {
return numActiveApplications; return numActiveApplications;
} }

View File

@ -60,36 +60,36 @@ public class CapacitySchedulerQueueInfo {
protected long allocatedContainers; protected long allocatedContainers;
protected long reservedContainers; protected long reservedContainers;
protected long pendingContainers; protected long pendingContainers;
protected QueueCapacitiesInfo capacities;
protected ResourceUsageInfo resources;
CapacitySchedulerQueueInfo() { CapacitySchedulerQueueInfo() {
}; };
CapacitySchedulerQueueInfo(CSQueue q, String nodeLabel) { CapacitySchedulerQueueInfo(CSQueue q) {
QueueCapacities qCapacities = q.getQueueCapacities();
ResourceUsage queueResourceUsage = q.getQueueResourceUsage();
queuePath = q.getQueuePath(); queuePath = q.getQueuePath();
capacity = qCapacities.getCapacity(nodeLabel) * 100; capacity = q.getCapacity() * 100;
usedCapacity = qCapacities.getUsedCapacity(nodeLabel) * 100; usedCapacity = q.getUsedCapacity() * 100;
maxCapacity = qCapacities.getMaximumCapacity(nodeLabel); maxCapacity = q.getMaximumCapacity();
if (maxCapacity < EPSILON || maxCapacity > 1f) if (maxCapacity < EPSILON || maxCapacity > 1f)
maxCapacity = 1f; maxCapacity = 1f;
maxCapacity *= 100; maxCapacity *= 100;
absoluteCapacity = absoluteCapacity =
cap(qCapacities.getAbsoluteCapacity(nodeLabel), 0f, 1f) * 100; cap(q.getAbsoluteCapacity(), 0f, 1f) * 100;
absoluteMaxCapacity = absoluteMaxCapacity =
cap(qCapacities.getAbsoluteMaximumCapacity(nodeLabel), 0f, 1f) * 100; cap(q.getAbsoluteMaximumCapacity(), 0f, 1f) * 100;
absoluteUsedCapacity = absoluteUsedCapacity =
cap(qCapacities.getAbsoluteUsedCapacity(nodeLabel), 0f, 1f) * 100; cap(q.getAbsoluteUsedCapacity(), 0f, 1f) * 100;
numApplications = q.getNumApplications(); numApplications = q.getNumApplications();
allocatedContainers = q.getMetrics().getAllocatedContainers(); allocatedContainers = q.getMetrics().getAllocatedContainers();
pendingContainers = q.getMetrics().getPendingContainers(); pendingContainers = q.getMetrics().getPendingContainers();
reservedContainers = q.getMetrics().getReservedContainers(); reservedContainers = q.getMetrics().getReservedContainers();
queueName = q.getQueueName(); queueName = q.getQueueName();
state = q.getState(); state = q.getState();
resourcesUsed = new ResourceInfo(queueResourceUsage.getUsed(nodeLabel)); resourcesUsed = new ResourceInfo(q.getUsedResources());
if (q instanceof PlanQueue && !((PlanQueue) q).showReservationsAsQueues()) { if (q instanceof PlanQueue && !((PlanQueue) q).showReservationsAsQueues()) {
hideReservationQueues = true; hideReservationQueues = true;
} }
@ -100,6 +100,15 @@ public class CapacitySchedulerQueueInfo {
nodeLabels.addAll(labelSet); nodeLabels.addAll(labelSet);
Collections.sort(nodeLabels); Collections.sort(nodeLabels);
} }
QueueCapacities qCapacities = q.getQueueCapacities();
capacities = new QueueCapacitiesInfo(qCapacities);
ResourceUsage queueResourceUsage = q.getQueueResourceUsage();
populateQueueResourceUsage(queueResourceUsage);
}
protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) {
resources = new ResourceUsageInfo(queueResourceUsage, false);
} }
public float getCapacity() { public float getCapacity() {
@ -179,4 +188,12 @@ public class CapacitySchedulerQueueInfo {
public ArrayList<String> getNodeLabels() { public ArrayList<String> getNodeLabels() {
return this.nodeLabels; return this.nodeLabels;
} }
public QueueCapacitiesInfo getCapacities() {
return capacities;
}
public ResourceUsageInfo getResources() {
return resources;
}
} }

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
/**
* This class represents queue capacities for a given partition
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class PartitionQueueCapacitiesInfo {
private String partitionName;
private float capacity;
private float usedCapacity;
private float maxCapacity = 100;
private float absoluteCapacity;
private float absoluteUsedCapacity;
private float absoluteMaxCapacity = 100;
public PartitionQueueCapacitiesInfo() {
}
public PartitionQueueCapacitiesInfo(String partitionName, float capacity,
float usedCapacity, float maxCapacity, float absCapacity,
float absUsedCapacity, float absMaxCapacity) {
super();
this.partitionName = partitionName;
this.capacity = capacity;
this.usedCapacity = usedCapacity;
this.maxCapacity = maxCapacity;
this.absoluteCapacity = absCapacity;
this.absoluteUsedCapacity = absUsedCapacity;
this.absoluteMaxCapacity = absMaxCapacity;
}
public float getCapacity() {
return capacity;
}
public void setCapacity(float capacity) {
this.capacity = capacity;
}
public float getUsedCapacity() {
return usedCapacity;
}
public void setUsedCapacity(float usedCapacity) {
this.usedCapacity = usedCapacity;
}
public float getMaxCapacity() {
return maxCapacity;
}
public void setMaxCapacity(float maxCapacity) {
this.maxCapacity = maxCapacity;
}
public String getPartitionName() {
return partitionName;
}
public void setPartitionName(String partitionName) {
this.partitionName = partitionName;
}
public float getAbsoluteCapacity() {
return absoluteCapacity;
}
public void setAbsoluteCapacity(float absoluteCapacity) {
this.absoluteCapacity = absoluteCapacity;
}
public float getAbsoluteUsedCapacity() {
return absoluteUsedCapacity;
}
public void setAbsoluteUsedCapacity(float absoluteUsedCapacity) {
this.absoluteUsedCapacity = absoluteUsedCapacity;
}
public float getAbsoluteMaxCapacity() {
return absoluteMaxCapacity;
}
public void setAbsoluteMaxCapacity(float absoluteMaxCapacity) {
this.absoluteMaxCapacity = absoluteMaxCapacity;
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
/**
* This class represents queue/user resource usage info for a given partition
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class PartitionResourceUsageInfo {
private String partitionName;
private ResourceInfo used = new ResourceInfo();
private ResourceInfo reserved;
private ResourceInfo pending;
private ResourceInfo amUsed;
public PartitionResourceUsageInfo() {
}
public PartitionResourceUsageInfo(String partitionName, ResourceInfo used,
ResourceInfo reserved, ResourceInfo pending,
ResourceInfo amResourceUsed) {
super();
this.partitionName = partitionName;
this.used = used;
this.reserved = reserved;
this.pending = pending;
this.amUsed = amResourceUsed;
}
public String getPartitionName() {
return partitionName;
}
public void setPartitionName(String partitionName) {
this.partitionName = partitionName;
}
public ResourceInfo getUsed() {
return used;
}
public void setUsed(ResourceInfo used) {
this.used = used;
}
public ResourceInfo getReserved() {
return reserved;
}
public void setReserved(ResourceInfo reserved) {
this.reserved = reserved;
}
public ResourceInfo getPending() {
return pending;
}
public void setPending(ResourceInfo pending) {
this.pending = pending;
}
public ResourceInfo getAmUsed() {
return amUsed;
}
public void setAmUsed(ResourceInfo amResourceUsed) {
this.amUsed = amResourceUsed;
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.ArrayList;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
/**
* DAO which wraps PartitionQueueCapacitiesInfo applicable for a queue
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class QueueCapacitiesInfo {
protected List<PartitionQueueCapacitiesInfo> queueCapacitiesByPartition =
new ArrayList<>();
public QueueCapacitiesInfo() {
}
public QueueCapacitiesInfo(QueueCapacities capacities) {
if (capacities == null) {
return;
}
float capacity;
float usedCapacity;
float maxCapacity;
float absCapacity;
float absUsedCapacity;
float absMaxCapacity;
for (String partitionName : capacities.getExistingNodeLabels()) {
usedCapacity = capacities.getUsedCapacity(partitionName) * 100;
capacity = capacities.getCapacity(partitionName) * 100;
maxCapacity = capacities.getMaximumCapacity(partitionName);
absCapacity = CapacitySchedulerQueueInfo
.cap(capacities.getAbsoluteCapacity(partitionName), 0f, 1f) * 100;
absUsedCapacity = CapacitySchedulerQueueInfo
.cap(capacities.getAbsoluteUsedCapacity(partitionName), 0f, 1f) * 100;
absMaxCapacity = CapacitySchedulerQueueInfo.cap(
capacities.getAbsoluteMaximumCapacity(partitionName), 0f, 1f) * 100;
if (maxCapacity < CapacitySchedulerQueueInfo.EPSILON || maxCapacity > 1f)
maxCapacity = 1f;
maxCapacity = maxCapacity * 100;
queueCapacitiesByPartition.add(
new PartitionQueueCapacitiesInfo(partitionName, capacity, usedCapacity,
maxCapacity, absCapacity, absUsedCapacity, absMaxCapacity));
}
}
public void add(PartitionQueueCapacitiesInfo partitionQueueCapacitiesInfo) {
queueCapacitiesByPartition.add(partitionQueueCapacitiesInfo);
}
public List<PartitionQueueCapacitiesInfo> getQueueCapacitiesByPartition() {
return queueCapacitiesByPartition;
}
public void setQueueCapacitiesByPartition(
List<PartitionQueueCapacitiesInfo> capacities) {
this.queueCapacitiesByPartition = capacities;
}
public PartitionQueueCapacitiesInfo getPartitionQueueCapacitiesInfo(
String partitionName) {
for (PartitionQueueCapacitiesInfo partitionQueueCapacitiesInfo : queueCapacitiesByPartition) {
if (partitionQueueCapacitiesInfo.getPartitionName()
.equals(partitionName)) {
return partitionQueueCapacitiesInfo;
}
}
return new PartitionQueueCapacitiesInfo();
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.ArrayList;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
/**
* DAO which wraps PartitionResourceUsageInfo applicable for a queue/user
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class ResourceUsageInfo {
List<PartitionResourceUsageInfo> resourceUsagesByPartition =
new ArrayList<>();
public ResourceUsageInfo() {
}
public ResourceUsageInfo(ResourceUsage resourceUsage,
boolean considerAMUsage) {
if (resourceUsage == null) {
return;
}
for (String partitionName : resourceUsage.getNodePartitionsSet()) {
resourceUsagesByPartition.add(new PartitionResourceUsageInfo(
partitionName, new ResourceInfo(resourceUsage.getUsed(partitionName)),
new ResourceInfo(resourceUsage.getReserved(partitionName)),
new ResourceInfo(resourceUsage.getPending(partitionName)),
considerAMUsage
? new ResourceInfo(resourceUsage.getAMUsed(partitionName))
: null));
}
}
public ResourceUsageInfo(ResourceUsage resourceUsage) {
this(resourceUsage, true);
}
public List<PartitionResourceUsageInfo> getPartitionResourceUsages() {
return resourceUsagesByPartition;
}
public void setPartitionResourceUsages(
List<PartitionResourceUsageInfo> resources) {
this.resourceUsagesByPartition = resources;
}
public PartitionResourceUsageInfo getPartitionResourceUsageInfo(
String partitionName) {
for (PartitionResourceUsageInfo partitionResourceUsageInfo : resourceUsagesByPartition) {
if (partitionResourceUsageInfo.getPartitionName().equals(partitionName)) {
return partitionResourceUsageInfo;
}
}
return new PartitionResourceUsageInfo();
}
}

View File

@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.Application;
@ -1778,8 +1777,7 @@ public class TestCapacityScheduler {
(CapacityScheduler) resourceManager.getResourceScheduler(); (CapacityScheduler) resourceManager.getResourceScheduler();
CSQueue origRootQ = cs.getRootQueue(); CSQueue origRootQ = cs.getRootQueue();
CapacitySchedulerInfo oldInfo = CapacitySchedulerInfo oldInfo =
new CapacitySchedulerInfo(origRootQ, cs, new RMNodeLabel( new CapacitySchedulerInfo(origRootQ, cs);
RMNodeLabelsManager.NO_LABEL));
int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues()); int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
int origNumAppsRoot = origRootQ.getNumApplications(); int origNumAppsRoot = origRootQ.getNumApplications();
@ -1789,8 +1787,7 @@ public class TestCapacityScheduler {
int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues()); int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
int newNumAppsRoot = newRootQ.getNumApplications(); int newNumAppsRoot = newRootQ.getNumApplications();
CapacitySchedulerInfo newInfo = CapacitySchedulerInfo newInfo =
new CapacitySchedulerInfo(newRootQ, cs, new RMNodeLabel( new CapacitySchedulerInfo(newRootQ, cs);
RMNodeLabelsManager.NO_LABEL));
CapacitySchedulerLeafQueueInfo origOldA1 = CapacitySchedulerLeafQueueInfo origOldA1 =
(CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues()); (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
CapacitySchedulerLeafQueueInfo origNewA1 = CapacitySchedulerLeafQueueInfo origNewA1 =

View File

@ -316,7 +316,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
JSONObject info = json.getJSONObject("scheduler"); JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements", 1, info.length()); assertEquals("incorrect number of elements", 1, info.length());
info = info.getJSONObject("schedulerInfo"); info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements", 7, info.length()); assertEquals("incorrect number of elements", 8, info.length());
verifyClusterSchedulerGeneric(info.getString("type"), verifyClusterSchedulerGeneric(info.getString("type"),
(float) info.getDouble("usedCapacity"), (float) info.getDouble("usedCapacity"),
(float) info.getDouble("capacity"), (float) info.getDouble("capacity"),
@ -349,10 +349,10 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
private void verifySubQueue(JSONObject info, String q, private void verifySubQueue(JSONObject info, String q,
float parentAbsCapacity, float parentAbsMaxCapacity) float parentAbsCapacity, float parentAbsMaxCapacity)
throws JSONException, Exception { throws JSONException, Exception {
int numExpectedElements = 16; int numExpectedElements = 18;
boolean isParentQueue = true; boolean isParentQueue = true;
if (!info.has("queues")) { if (!info.has("queues")) {
numExpectedElements = 29; numExpectedElements = 31;
isParentQueue = false; isParentQueue = false;
} }
assertEquals("incorrect number of elements", numExpectedElements, info.length()); assertEquals("incorrect number of elements", numExpectedElements, info.length());

View File

@ -0,0 +1,639 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.StringReader;
import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceServletContextListener;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
private static final String DEFAULT_PARTITION = "";
private static final String CAPACITIES = "capacities";
private static final String RESOURCE_USAGES_BY_PARTITION =
"resourceUsagesByPartition";
private static final String QUEUE_CAPACITIES_BY_PARTITION =
"queueCapacitiesByPartition";
private static final String QUEUE_C = "Qc";
private static final String LEAF_QUEUE_C1 = "Qc1";
private static final String LEAF_QUEUE_C2 = "Qc2";
private static final String QUEUE_B = "Qb";
private static final String QUEUE_A = "Qa";
private static final String LABEL_LY = "Ly";
private static final String LABEL_LX = "Lx";
private static final ImmutableSet<String> CLUSTER_LABELS =
ImmutableSet.of(LABEL_LX, LABEL_LY, DEFAULT_PARTITION);
private static MockRM rm;
private CapacitySchedulerConfiguration csConf;
private YarnConfiguration conf;
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
csConf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf, rm);
conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
rm = new MockRM(conf);
Set<NodeLabel> labels = new HashSet<NodeLabel>();
labels.add(NodeLabel.newInstance(LABEL_LX));
labels.add(NodeLabel.newInstance(LABEL_LY));
try {
RMNodeLabelsManager nodeLabelManager =
rm.getRMContext().getNodeLabelManager();
nodeLabelManager.addToCluserNodeLabels(labels);
} catch (Exception e) {
Assert.fail();
}
bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class);
}
});
public class GuiceServletConfig extends GuiceServletContextListener {
@Override
protected Injector getInjector() {
return injector;
}
}
private static void setupQueueConfiguration(
CapacitySchedulerConfiguration conf, ResourceManager rm) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { QUEUE_A, QUEUE_B, QUEUE_C });
String interMediateQueueC =
CapacitySchedulerConfiguration.ROOT + "." + QUEUE_C;
conf.setQueues(interMediateQueueC,
new String[] { LEAF_QUEUE_C1, LEAF_QUEUE_C2 });
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL_LX, 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL_LY, 100);
String leafQueueA = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_A;
conf.setCapacity(leafQueueA, 30);
conf.setMaximumCapacity(leafQueueA, 50);
String leafQueueB = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_B;
conf.setCapacity(leafQueueB, 30);
conf.setMaximumCapacity(leafQueueB, 50);
conf.setCapacity(interMediateQueueC, 40);
conf.setMaximumCapacity(interMediateQueueC, 50);
String leafQueueC1 = interMediateQueueC + "." + LEAF_QUEUE_C1;
conf.setCapacity(leafQueueC1, 50);
conf.setMaximumCapacity(leafQueueC1, 60);
String leafQueueC2 = interMediateQueueC + "." + LEAF_QUEUE_C2;
conf.setCapacity(leafQueueC2, 50);
conf.setMaximumCapacity(leafQueueC2, 70);
// Define label specific configuration
conf.setAccessibleNodeLabels(leafQueueA, ImmutableSet.of(DEFAULT_PARTITION));
conf.setAccessibleNodeLabels(leafQueueB, ImmutableSet.of(LABEL_LX));
conf.setAccessibleNodeLabels(interMediateQueueC,
ImmutableSet.of(LABEL_LX, LABEL_LY));
conf.setAccessibleNodeLabels(leafQueueC1,
ImmutableSet.of(LABEL_LX, LABEL_LY));
conf.setAccessibleNodeLabels(leafQueueC2,
ImmutableSet.of(LABEL_LX, LABEL_LY));
conf.setDefaultNodeLabelExpression(leafQueueB, LABEL_LX);
conf.setDefaultNodeLabelExpression(leafQueueC1, LABEL_LX);
conf.setDefaultNodeLabelExpression(leafQueueC2, LABEL_LY);
conf.setCapacityByLabel(leafQueueB, LABEL_LX, 30);
conf.setCapacityByLabel(interMediateQueueC, LABEL_LX, 70);
conf.setCapacityByLabel(leafQueueC1, LABEL_LX, 40);
conf.setCapacityByLabel(leafQueueC2, LABEL_LX, 60);
conf.setCapacityByLabel(interMediateQueueC, LABEL_LY, 100);
conf.setCapacityByLabel(leafQueueC1, LABEL_LY, 50);
conf.setCapacityByLabel(leafQueueC2, LABEL_LY, 50);
conf.setMaximumCapacityByLabel(leafQueueC1, LABEL_LY, 75);
conf.setMaximumCapacityByLabel(leafQueueC2, LABEL_LY, 75);
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
public TestRMWebServicesForCSWithPartitions() {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
}
@Test
public void testSchedulerPartitions() throws JSONException, Exception {
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("cluster").path("scheduler")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
verifySchedulerInfoJson(json);
}
@Test
public void testSchedulerPartitionsSlash() throws JSONException, Exception {
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("cluster").path("scheduler/")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
verifySchedulerInfoJson(json);
}
@Test
public void testSchedulerPartitionsDefault() throws JSONException, Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("scheduler").get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
verifySchedulerInfoJson(json);
}
@Test
public void testSchedulerPartitionsXML() throws JSONException, Exception {
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("cluster").path("scheduler")
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
verifySchedulerInfoXML(dom);
}
private void verifySchedulerInfoXML(Document dom) throws Exception {
NodeList scheduler = dom.getElementsByTagName("scheduler");
assertEquals("incorrect number of elements", 1, scheduler.getLength());
NodeList schedulerInfo = dom.getElementsByTagName("schedulerInfo");
assertEquals("incorrect number of elements", 1, schedulerInfo.getLength());
for (int i = 0; i < schedulerInfo.getLength(); i++) {
Element element = (Element) schedulerInfo.item(i);
NodeList children = element.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Element schedulerInfoElem = (Element) children.item(j);
if (schedulerInfoElem.getTagName().equals("queues")) {
NodeList qListInfos = schedulerInfoElem.getChildNodes();
for (int k = 0; k < qListInfos.getLength(); k++) {
Element qElem2 = (Element) qListInfos.item(k);
String queue =
WebServicesTestUtils.getXmlString(qElem2, "queueName");
switch (queue) {
case QUEUE_A:
verifyQueueAInfoXML(qElem2);
break;
case QUEUE_B:
verifyQueueBInfoXML(qElem2);
break;
case QUEUE_C:
verifyQueueCInfoXML(qElem2);
break;
default:
Assert.fail("Unexpected queue" + queue);
}
}
} else if (schedulerInfoElem.getTagName().equals(CAPACITIES)) {
NodeList capacitiesListInfos = schedulerInfoElem.getChildNodes();
assertEquals("incorrect number of partitions", 3,
capacitiesListInfos.getLength());
for (int k = 0; k < capacitiesListInfos.getLength(); k++) {
Element partitionCapacitiesInfo =
(Element) capacitiesListInfos.item(k);
String partitionName = WebServicesTestUtils
.getXmlString(partitionCapacitiesInfo, "partitionName");
assertTrue("invalid PartitionCapacityInfo",
CLUSTER_LABELS.contains(partitionName));
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, 100, 0, 100,
100, 0, 100);
}
}
}
}
}
private void verifyQueueAInfoXML(Element queueElem) {
NodeList children = queueElem.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Element queueChildElem = (Element) children.item(j);
if (queueChildElem.getTagName().equals(CAPACITIES)) {
NodeList capacitiesListInfos = queueChildElem.getChildNodes();
assertEquals("incorrect number of partitions", 1,
capacitiesListInfos.getLength());
Element partitionCapacitiesInfo = (Element) capacitiesListInfos.item(0);
String partitionName = WebServicesTestUtils
.getXmlString(partitionCapacitiesInfo, "partitionName");
assertTrue("invalid PartitionCapacityInfo",
partitionName.isEmpty());
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, 30, 0, 50, 30,
0, 50);
} else if (queueChildElem.getTagName().equals("resources")) {
verifyResourceUsageInfoXML(queueChildElem);
}
}
}
private void verifyQueueBInfoXML(Element queueElem) {
assertEquals("Invalid default Label expression", LABEL_LX,
WebServicesTestUtils.getXmlString(queueElem,
"defaultNodeLabelExpression"));
NodeList children = queueElem.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Element queueChildElem = (Element) children.item(j);
if (queueChildElem.getTagName().equals(CAPACITIES)) {
NodeList capacitiesListInfos = queueChildElem.getChildNodes();
assertEquals("incorrect number of partitions", 2,
capacitiesListInfos.getLength());
for (int k = 0; k < capacitiesListInfos.getLength(); k++) {
Element partitionCapacitiesInfo =
(Element) capacitiesListInfos.item(k);
String partitionName = WebServicesTestUtils
.getXmlString(partitionCapacitiesInfo, "partitionName");
switch (partitionName) {
case LABEL_LX:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, 30, 0, 100,
30, 0, 100);
break;
case DEFAULT_PARTITION:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, 30, 0, 50,
30, 0, 50);
break;
default:
Assert.fail("Unexpected partition" + partitionName);
}
}
} else if (queueChildElem.getTagName().equals("resources")) {
verifyResourceUsageInfoXML(queueChildElem);
}
}
assertEquals("Node Labels are not matching", LABEL_LX,
WebServicesTestUtils.getXmlString(queueElem, "nodeLabels"));
}
private void verifyQueueCInfoXML(Element queueElem) {
NodeList children = queueElem.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Element queueChildElem = (Element) children.item(j);
if (queueChildElem.getTagName().equals(CAPACITIES)) {
verifyQcCapacitiesInfoXML(queueChildElem, 70, 100, 70, 100, 100, 100,
100, 100, 40, 50, 40, 50);
} else if (queueChildElem.getTagName().equals("resources")) {
verifyResourceUsageInfoXML(queueChildElem);
} else if (queueChildElem.getTagName().equals("queues")) {
NodeList qListInfos = queueChildElem.getChildNodes();
for (int k = 0; k < qListInfos.getLength(); k++) {
Element qElem2 = (Element) qListInfos.item(k);
String queue = WebServicesTestUtils.getXmlString(qElem2, "queueName");
switch (queue) {
case LEAF_QUEUE_C1:
assertEquals("Invalid default Label expression", LABEL_LX,
WebServicesTestUtils.getXmlString(qElem2,
"defaultNodeLabelExpression"));
NodeList queuec1Children = qElem2.getChildNodes();
for (int l = 0; l < queuec1Children.getLength(); l++) {
Element queueC1ChildElem = (Element) queuec1Children.item(l);
if (queueC1ChildElem.getTagName().equals(CAPACITIES)) {
verifyQcCapacitiesInfoXML(queueC1ChildElem, 40, 100, 28, 100,
50, 75, 50, 75, 50, 60, 20, 30);
}
}
break;
case LEAF_QUEUE_C2:
assertEquals("Invalid default Label expression", LABEL_LY,
WebServicesTestUtils.getXmlString(qElem2,
"defaultNodeLabelExpression"));
NodeList queuec2Children = qElem2.getChildNodes();
for (int l = 0; l < queuec2Children.getLength(); l++) {
Element queueC2ChildElem = (Element) queuec2Children.item(l);
if (queueC2ChildElem.getTagName().equals(CAPACITIES)) {
verifyQcCapacitiesInfoXML(queueC2ChildElem, 60, 100, 42, 100,
50, 75, 50, 75, 50, 70, 20, 35);
}
}
break;
default:
Assert.fail("Unexpected queue" + queue);
}
}
}
}
}
private void verifyQcCapacitiesInfoXML(Element partitionCapacitiesElem,
float lxCaps, float lxMaxCaps, float lxAbsCaps, float lxAbsMaxCaps,
float lyCaps, float lyMaxCaps, float lyAbsCaps, float lyAbsMaxCaps,
float defCaps, float defMaxCaps, float defAbsCaps, float defAbsMaxCaps) {
NodeList capacitiesListInfos = partitionCapacitiesElem.getChildNodes();
assertEquals("incorrect number of partitions", 3,
capacitiesListInfos.getLength());
for (int k = 0; k < capacitiesListInfos.getLength(); k++) {
Element partitionCapacitiesInfo = (Element) capacitiesListInfos.item(k);
String partitionName = WebServicesTestUtils
.getXmlString(partitionCapacitiesInfo, "partitionName");
switch (partitionName) {
case LABEL_LX:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, lxCaps, 0,
lxMaxCaps, lxAbsCaps, 0, lxAbsMaxCaps);
break;
case LABEL_LY:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, lyCaps, 0,
lyMaxCaps, lyAbsCaps, 0, lyAbsMaxCaps);
break;
case DEFAULT_PARTITION:
verifyPartitionCapacityInfoXML(partitionCapacitiesInfo, defCaps, 0,
defMaxCaps, defAbsCaps, 0, defAbsMaxCaps);
break;
default:
Assert.fail("Unexpected partition" + partitionName);
}
}
}
private void verifyResourceUsageInfoXML(Element queueChildElem) {
NodeList resourceUsageInfo = queueChildElem.getChildNodes();
assertEquals("incorrect number of partitions", 1,
resourceUsageInfo.getLength());
Element partitionResourceUsageInfo = (Element) resourceUsageInfo.item(0);
String partitionName = WebServicesTestUtils
.getXmlString(partitionResourceUsageInfo, "partitionName");
assertTrue("invalid PartitionCapacityInfo",
DEFAULT_PARTITION.equals(partitionName));
}
private void verifyPartitionCapacityInfoXML(Element partitionInfo,
float capacity, float usedCapacity, float maxCapacity,
float absoluteCapacity, float absoluteUsedCapacity,
float absoluteMaxCapacity) {
assertEquals("capacity doesn't match", capacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "capacity"), 1e-3f);
assertEquals("capacity doesn't match", usedCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "usedCapacity"), 1e-3f);
assertEquals("capacity doesn't match", maxCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "maxCapacity"), 1e-3f);
assertEquals("capacity doesn't match", absoluteCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "absoluteCapacity"),
1e-3f);
assertEquals("capacity doesn't match", absoluteUsedCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "absoluteUsedCapacity"),
1e-3f);
assertEquals("capacity doesn't match", absoluteMaxCapacity,
WebServicesTestUtils.getXmlFloat(partitionInfo, "absoluteMaxCapacity"),
1e-3f);
}
private void verifySchedulerInfoJson(JSONObject json)
throws JSONException, Exception {
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements", 1, info.length());
info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements", 8, info.length());
JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES);
JSONArray partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);
assertEquals("incorrect number of elements", CLUSTER_LABELS.size(),
partitionsCapsArray.length());
for (int i = 0; i < partitionsCapsArray.length(); i++) {
JSONObject partitionInfo = partitionsCapsArray.getJSONObject(i);
String partitionName = partitionInfo.getString("partitionName");
assertTrue("Unknown partition received",
CLUSTER_LABELS.contains(partitionName));
verifyPartitionCapacityInfoJson(partitionInfo, 100, 0, 100, 100, 0, 100);
}
JSONObject jsonQueuesObject = info.getJSONObject("queues");
JSONArray queuesArray = jsonQueuesObject.getJSONArray("queue");
for (int i = 0; i < queuesArray.length(); i++) {
JSONObject queueJson = queuesArray.getJSONObject(i);
String queue = queueJson.getString("queueName");
assertEquals("Partition resourceInfo is wrong", 1,
queueJson.getJSONObject("resources")
.getJSONArray(RESOURCE_USAGES_BY_PARTITION).length());
JSONObject resourcesJsonObject = queueJson.getJSONObject("resources");
JSONArray partitionsResourcesArray =
resourcesJsonObject.getJSONArray("resourceUsagesByPartition");
assertEquals("incorrect number of elements", 1,
partitionsResourcesArray.length());
capacitiesJsonObject = queueJson.getJSONObject(CAPACITIES);
partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);
JSONObject partitionInfo = null;
String partitionName = null;
switch (queue) {
case QUEUE_A:
assertEquals("incorrect number of partitions", 1,
partitionsCapsArray.length());
partitionInfo = partitionsCapsArray.getJSONObject(0);
partitionName = partitionInfo.getString("partitionName");
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0, 50);
assertEquals("incorrect number of elements", 5,
partitionsResourcesArray.getJSONObject(0).length());
break;
case QUEUE_B:
assertEquals("Invalid default Label expression", LABEL_LX,
queueJson.getString("defaultNodeLabelExpression"));
assertEquals("incorrect number of elements", 5,
partitionsResourcesArray.getJSONObject(0).length());
verifyAccesibleNodeLabels(queueJson, ImmutableSet.of(LABEL_LX));
assertEquals("incorrect number of partitions", 2,
partitionsCapsArray.length());
for (int j = 0; j < partitionsCapsArray.length(); j++) {
partitionInfo = partitionsCapsArray.getJSONObject(j);
partitionName = partitionInfo.getString("partitionName");
switch (partitionName) {
case LABEL_LX:
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 100, 30, 0,
100);
break;
case DEFAULT_PARTITION:
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0,
50);
break;
default:
Assert.fail("Unexpected partition" + partitionName);
}
}
break;
case QUEUE_C:
verifyAccesibleNodeLabels(queueJson,
ImmutableSet.of(LABEL_LX, LABEL_LY));
assertEquals("incorrect number of elements", 4,
partitionsResourcesArray.getJSONObject(0).length());
verifyQcPartitionsCapacityInfoJson(partitionsCapsArray, 70, 100, 70,
100, 100, 100, 100, 100, 40, 50, 40, 50);
verifySubQueuesOfQc(queueJson);
break;
default:
Assert.fail("Unexpected queue" + queue);
}
}
}
private void verifyAccesibleNodeLabels(JSONObject queueJson,
Set<String> accesibleNodeLabels) throws JSONException {
JSONArray nodeLabels = queueJson.getJSONArray("nodeLabels");
assertEquals("number of accessible Node Labels not matching",
accesibleNodeLabels.size(), nodeLabels.length());
for (int i = 0; i < nodeLabels.length(); i++) {
assertTrue("Invalid accessible node label : " + nodeLabels.getString(i),
accesibleNodeLabels.contains(nodeLabels.getString(i)));
}
}
private void verifySubQueuesOfQc(JSONObject queueCJson) throws JSONException {
JSONObject jsonQueuesObject = queueCJson.getJSONObject("queues");
JSONArray queuesArray = jsonQueuesObject.getJSONArray("queue");
for (int i = 0; i < queuesArray.length(); i++) {
JSONObject queueJson = queuesArray.getJSONObject(i);
String queue = queueJson.getString("queueName");
JSONObject capacitiesJsonObject = queueJson.getJSONObject(CAPACITIES);
JSONArray partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);
switch (queue) {
case LEAF_QUEUE_C1:
verifyAccesibleNodeLabels(queueJson,
ImmutableSet.of(LABEL_LX, LABEL_LY));
assertEquals("Invalid default Label expression", LABEL_LX,
queueJson.getString("defaultNodeLabelExpression"));
verifyQcPartitionsCapacityInfoJson(partitionsCapsArray, 40, 100, 28,
100, 50, 75, 50, 75, 50, 60, 20, 30);
break;
case LEAF_QUEUE_C2:
verifyAccesibleNodeLabels(queueJson,
ImmutableSet.of(LABEL_LX, LABEL_LY));
assertEquals("Invalid default Label expression", LABEL_LY,
queueJson.getString("defaultNodeLabelExpression"));
verifyQcPartitionsCapacityInfoJson(partitionsCapsArray, 60, 100, 42,
100, 50, 75, 50, 75, 50, 70, 20, 35);
break;
default:
Assert.fail("Unexpected queue" + queue);
}
}
}
private void verifyQcPartitionsCapacityInfoJson(JSONArray partitionsCapsArray,
float lxCaps, float lxMaxCaps, float lxAbsCaps, float lxAbsMaxCaps,
float lyCaps, float lyMaxCaps, float lyAbsCaps, float lyAbsMaxCaps,
float defCaps, float defMaxCaps, float defAbsCaps, float defAbsMaxCaps)
throws JSONException {
assertEquals("incorrect number of partitions", CLUSTER_LABELS.size(),
partitionsCapsArray.length());
for (int j = 0; j < partitionsCapsArray.length(); j++) {
JSONObject partitionInfo = partitionsCapsArray.getJSONObject(j);
String partitionName = partitionInfo.getString("partitionName");
switch (partitionName) {
case LABEL_LX:
verifyPartitionCapacityInfoJson(partitionInfo, lxCaps, 0, lxMaxCaps,
lxAbsCaps, 0, lxAbsMaxCaps);
break;
case LABEL_LY:
verifyPartitionCapacityInfoJson(partitionInfo, lyCaps, 0, lyMaxCaps,
lyAbsCaps, 0, lyAbsMaxCaps);
break;
case DEFAULT_PARTITION:
verifyPartitionCapacityInfoJson(partitionInfo, defCaps, 0, defMaxCaps,
defAbsCaps, 0, defAbsMaxCaps);
break;
default:
Assert.fail("Unexpected partition" + partitionName);
}
}
}
private void verifyPartitionCapacityInfoJson(
JSONObject partitionCapacityInfoJson, float capacity, float usedCapacity,
float maxCapacity, float absoluteCapacity, float absoluteUsedCapacity,
float absoluteMaxCapacity) throws JSONException {
assertEquals("capacity doesn't match", capacity,
(float) partitionCapacityInfoJson.getDouble("capacity"), 1e-3f);
assertEquals("capacity doesn't match", usedCapacity,
(float) partitionCapacityInfoJson.getDouble("usedCapacity"), 1e-3f);
assertEquals("capacity doesn't match", maxCapacity,
(float) partitionCapacityInfoJson.getDouble("maxCapacity"), 1e-3f);
assertEquals("capacity doesn't match", absoluteCapacity,
(float) partitionCapacityInfoJson.getDouble("absoluteCapacity"), 1e-3f);
assertEquals("capacity doesn't match", absoluteUsedCapacity,
(float) partitionCapacityInfoJson.getDouble("absoluteUsedCapacity"),
1e-3f);
assertEquals("capacity doesn't match", absoluteMaxCapacity,
(float) partitionCapacityInfoJson.getDouble("absoluteMaxCapacity"),
1e-3f);
}
}