YARN-7953. [GQ] Data structures for federation global queues calculations. Contributed by Abhishek Modi.

This commit is contained in:
Botong Huang 2018-08-16 08:28:35 -07:00
parent b78f4e57d9
commit 1e8686b448
10 changed files with 1471 additions and 0 deletions

View File

@ -108,6 +108,9 @@
<excludes> <excludes>
<exclude>src/test/resources/schedulerInfo1.json</exclude> <exclude>src/test/resources/schedulerInfo1.json</exclude>
<exclude>src/test/resources/schedulerInfo2.json</exclude> <exclude>src/test/resources/schedulerInfo2.json</exclude>
<exclude>src/test/resources/globalqueues/basic-queue.json</exclude>
<exclude>src/test/resources/globalqueues/tree-queue.json</exclude>
<exclude>src/test/resources/globalqueues/tree-queue-adaptable.json</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
/**
* Exception thrown when FederationQueue is not valid.
*/
public class FederationGlobalQueueValidationException extends Exception {
public FederationGlobalQueueValidationException(String s) {
super(s);
}
}

View File

@ -0,0 +1,198 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class represents a set of root queues (one for each sub-cluster) of a
* federation.
*/
public class FederationGlobalView implements Cloneable {
private ResourceCalculator rc;
public static final Logger LOG =
LoggerFactory.getLogger(FederationGlobalView.class);
private String name;
private FederationQueue global;
private List<FederationQueue> subClusters;
private Configuration conf;
public FederationGlobalView(){
subClusters = new ArrayList<>();
}
public FederationGlobalView(Configuration config, ResourceCalculator rc) {
this();
this.conf=config;
this.rc=rc;
}
public FederationGlobalView(Configuration config, ResourceCalculator rc,
List<FederationQueue> subClusters) {
this(config, rc);
setSubClusters(subClusters);
globalFromLocal();
}
/**
* This method checks that certain queue invariants are respected.
*
* @throws FederationGlobalQueueValidationException upon violation.
*/
public void validate() throws FederationGlobalQueueValidationException {
try {
if (global != null) {
global.validate();
}
for (FederationQueue f : subClusters) {
f.validate();
}
} catch(FederationGlobalQueueValidationException f) {
LOG.error("Error in validating " + this.toQuickString());
throw f;
}
}
/**
* Returns a FederationQueue matching the queueName
* from the specified subClusters.
*
* @param queueName
* @param subClusterName
* @return FederationQueue corresponding to the queueName and subCluster
*/
public FederationQueue getQueue(String queueName, String subClusterName) {
for (FederationQueue f : subClusters) {
if (f.getSubClusterId().equals(
SubClusterId.newInstance(subClusterName))) {
return f.getChildByName(queueName);
}
}
return null;
}
/**
* Get name of the FederationGlobalView
* @return name of the global view
*/
public String getName() {
return name;
}
/**
* Set name of the FederationGlobalView
* @param name global view name
*/
public void setName(String name) {
this.name = name;
}
/**
* Get global view subclusters
* @return subclusters associated with global view
*/
public List<FederationQueue> getSubClusters() {
return subClusters;
}
/**
* Set global view subclusters
* @param subClusters subclusters associated with global view
*/
public void setSubClusters(List<FederationQueue> subClusters) {
this.subClusters = subClusters;
}
/**
* Creates a global queue by merging queues of all subclusters.
*/
public void globalFromLocal() {
// filling out the global object and propagating totCap
FederationQueue globalQueue = FederationQueue.mergeQueues(
this.getSubClusters(), SubClusterId.newInstance("global"));
Resource totCap =
Resources.componentwiseMax(globalQueue.getGuarCap(),
globalQueue.getMaxCap());
globalQueue.setTotCap(totCap);
globalQueue.propagateCapacities();
this.setGlobal(globalQueue);
}
public String toString() {
return toQuickString();
}
/**
* Produces a quick String representation of all the queues associated
* with view.
* Good for printing.
*/
public String toQuickString() {
StringBuilder sb = new StringBuilder();
subClusters.forEach(sc -> sb.append(sc.toQuickString()).append("\n"));
return sb.toString();
}
/**
* Returns global queue associated with the view.
* @return global queue.
*/
public FederationQueue getGlobal() {
return global;
}
/**
* Set global queue for FederationGlobalView
* @param global queue for FederationGlobalView
*/
public void setGlobal(FederationQueue global) {
this.global = global;
}
// simply initialize the root to zero preemption
protected void initializeRootPreemption() {
global.setToBePreempted(Resource.newInstance(0, 0));
for (FederationQueue lr : subClusters) {
lr.setToBePreempted(Resource.newInstance(0, 0));
}
}
public FederationGlobalView clone() throws CloneNotSupportedException {
FederationGlobalView copy = (FederationGlobalView) super.clone();
copy.setGlobal(global.clone(true));
List<FederationQueue> clonedSubClusters = new ArrayList<>();
for (FederationQueue localRoot : getSubClusters()) {
clonedSubClusters.add(localRoot.clone(true));
}
copy.setSubClusters(clonedSubClusters);
return copy;
}
}

View File

@ -0,0 +1,761 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_RESOURCE_CALCULATOR_CLASS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class represents a tree of queues in a sub-cluster YarnRM.
* Useful to communicate with GPG and global policies.
*/
public class FederationQueue implements Iterable<FederationQueue> {
private String queueName;
private String queueType;
// sub-cluster to which queue belongs.
private SubClusterId subClusterId;
// capacities associated with queue.
private Resource totCap;
private Resource guarCap;
private Resource maxCap;
private Resource usedCap;
private Resource demandCap;
private Resource idealAlloc;
//resource that can be preempted in this queue.
private Resource toBePreempted;
// Used only for testing (to embed expected behavior)
private Resource testExpectedIdealAlloc;
private Map<String, FederationQueue> children;
private ResourceCalculator rc;
private Resource totalUnassigned;
public static final Logger LOG =
LoggerFactory.getLogger(FederationQueue.class);
private Configuration conf;
public FederationQueue() {
this(new Configuration());
}
public FederationQueue(Configuration conf) {
this(conf,
ReflectionUtils.newInstance(
conf.getClass(RESOURCE_CALCULATOR_CLASS,
DEFAULT_RESOURCE_CALCULATOR_CLASS, ResourceCalculator.class),
conf));
}
public FederationQueue(Configuration conf, ResourceCalculator rc) {
this.conf = conf;
children = new HashMap<>();
this.rc = rc;
}
public FederationQueue(String queuename, SubClusterId subClusterId,
Resource guar, Resource max, Resource used, Resource pending) {
this.conf = new Configuration();
this.rc =
ReflectionUtils.newInstance(
conf.getClass(RESOURCE_CALCULATOR_CLASS,
DEFAULT_RESOURCE_CALCULATOR_CLASS, ResourceCalculator.class),
conf);
this.queueName = queuename;
this.subClusterId = subClusterId;
this.guarCap = guar;
this.maxCap = max;
this.usedCap = used;
this.demandCap = pending;
this.totCap = Resources.clone(guar);
this.children = new HashMap<>();
}
/**
* This method propagates from leaf to root all metrics, and pushes down the
* total capacity from the root.
*/
public void propagateCapacities() {
rollDownCapacityFromRoot(totCap);
rollUpMetricsFromChildren();
}
private void rollDownCapacityFromRoot(Resource rootCap) {
totCap = rootCap;
for (FederationQueue c: children.values()) {
c.rollDownCapacityFromRoot(rootCap);
}
}
private void rollUpMetricsFromChildren() {
Resource childGuar = Resources.createResource(0L, 0);
Resource childMax = Resources.createResource(0L, 0);
Resource childSumOfMax = Resources.createResource(0L, 0);
Resource childUsed = Resources.createResource(0L, 0);
Resource childDem = Resources.createResource(0L, 0);
// this pull the leaf data up
for (FederationQueue c : children.values()) {
c.rollUpMetricsFromChildren();
if (c.getGuarCap() != null) {
Resources.addTo(childGuar, c.getGuarCap());
}
if (c.getMaxCap() != null) {
Resources.addTo(childSumOfMax, c.getMaxCap());
childMax = Resources.max(rc, totCap, childMax, c.getMaxCap());
}
if (c.getUsedCap() != null) {
Resources.addTo(childUsed, c.getUsedCap());
}
if (c.getDemandCap() != null) {
Resources.addTo(childDem, c.getDemandCap());
}
}
if (children.size() > 0) {
setGuarCap(childGuar);
setMaxCap(Resources.componentwiseMin(
Resources.componentwiseMax(childMax, childGuar), totCap));
setUsedCap(childUsed);
setDemandCap(childDem);
}
}
/**
* This method checks that certain queue invariants are respected.
*
* @throws FederationGlobalQueueValidationException upon violation.
*/
public void validate() throws FederationGlobalQueueValidationException {
if (totCap == null) {
throw new FederationGlobalQueueValidationException(
"Total capacity must be configured");
}
if (Resources.lessThan(rc, totCap, usedCap, Resources.none())) {
throw new FederationGlobalQueueValidationException(
"usedCap (" + usedCap + ") exceeds totCap (" + totCap + ") for queue "
+ this.getQueueName() + "@" + this.getSubClusterId());
}
if (!Resources.fitsIn(guarCap, totCap)) {
throw new FederationGlobalQueueValidationException(
"guarCap (" + guarCap + ") exceeds total capacity (" + totCap
+ " for queue " + this.getQueueName() +
"@" + this.getSubClusterId());
}
if (Resources.lessThan(rc, totCap, guarCap, Resources.none())) {
throw new FederationGlobalQueueValidationException(
"guarCap (" + guarCap + ") is outside [0,+inf] range for queue "
+ this.getQueueName() +
"@" + this.getSubClusterId());
}
if (!Resources.fitsIn(guarCap, maxCap)) {
throw new FederationGlobalQueueValidationException("maxCap (" + maxCap
+ ") is outside [" + guarCap + ",+inf] range for queue "
+ this.getQueueName() +
"@" + this.getSubClusterId());
}
if (Resources.lessThan(rc, totCap, usedCap, Resources.none())
|| !Resources.fitsIn(usedCap, maxCap)) {
throw new FederationGlobalQueueValidationException("usedCap (" + usedCap
+ ") is outside [0," + maxCap + "] range for queue "
+ this.getQueueName() +
"@" + this.getSubClusterId());
}
if (Resources.lessThan(rc, totCap, demandCap, Resources.none())) {
throw new FederationGlobalQueueValidationException(
"demandCap (" + demandCap + ") is outside [0,+inf] range for queue "
+ this.getQueueName() +
"@" + this.getSubClusterId());
}
if (idealAlloc != null && !Resources.fitsIn(idealAlloc, totCap)) {
throw new FederationGlobalQueueValidationException(
"idealAlloc (" + idealAlloc + ") is greter than totCap (" + totCap
+ ") for queue " + this.getQueueName() +
"@" + this.getSubClusterId());
}
if (children != null && children.size() > 0) {
Resource childGuar = Resources.createResource(0L, 0);
Resource childMax = Resources.createResource(0L, 0);
Resource childUsed = Resources.createResource(0L, 0);
Resource childDem = Resources.createResource(0L, 0);
Resource childIdealAlloc = Resources.createResource(0, 0);
for (FederationQueue c : children.values()) {
Resources.addTo(childGuar, c.getGuarCap());
Resources.addTo(childUsed, c.getUsedCap());
Resources.addTo(childDem, c.getDemandCap());
if (c.idealAlloc != null) {
Resources.addTo(childIdealAlloc, c.getIdealAlloc());
}
if (!Resources.lessThanOrEqual(rc, totCap, childMax, maxCap)) {
throw new FederationGlobalQueueValidationException(
"Sum of children maxCap (" + childMax
+ ") mismatched with parent maxCap (" + maxCap
+ ") for queue " + this.getQueueName() + "@"
+ this.getSubClusterId());
}
c.validate();
}
if (!Resources.equals(childGuar, guarCap)) {
throw new FederationGlobalQueueValidationException(
"Sum of children guarCap (" + childGuar
+ ") mismatched with parent guarCap (" + guarCap
+ ") for queue " + this.getQueueName() +
"@" + this.getSubClusterId());
}
if (!Resources.equals(childUsed, usedCap)) {
throw new FederationGlobalQueueValidationException(
"Sum of children usedCap (" + childUsed
+ ") mismatched with parent usedCap (" + usedCap
+ ") for queue " + this.getQueueName() +
"@" + this.getSubClusterId());
}
if (!Resources.equals(childDem, demandCap)) {
throw new FederationGlobalQueueValidationException(
"Sum of children demandCap (" + childGuar
+ ") mismatched with parent demandCap (" + demandCap
+ ") for queue " + this.getQueueName() +
"@" + this.getSubClusterId());
}
if (idealAlloc != null
&& !Resources.fitsIn(childIdealAlloc, idealAlloc)) {
throw new FederationGlobalQueueValidationException(
"Sum of children idealAlloc (" + childIdealAlloc
+ ") exceed the parent idealAlloc (" + idealAlloc
+ ") for queue " + this.getQueueName() +
"@" + this.getSubClusterId());
}
}
}
/**
* This method clones the FederationQueue.
* @param recursive whether to clone recursively.
* @return cloned object of Federation Queue.
*/
public FederationQueue clone(boolean recursive) {
FederationQueue metoo = new FederationQueue(this.conf, rc);
metoo.queueName = queueName;
metoo.subClusterId = subClusterId;
metoo.totCap = Resources.clone(totCap);
metoo.guarCap = Resources.clone(guarCap);
metoo.maxCap = Resources.clone(maxCap);
metoo.usedCap = Resources.clone(usedCap);
metoo.demandCap = Resources.clone(demandCap);
metoo.idealAlloc =
(idealAlloc != null) ? Resources.clone(idealAlloc) : null;
metoo.toBePreempted =
(toBePreempted != null) ? Resources.clone(toBePreempted) : null;
metoo.testExpectedIdealAlloc = (testExpectedIdealAlloc != null)
? Resources.clone(testExpectedIdealAlloc) : null;
for (Map.Entry<String, FederationQueue> c : children.entrySet()) {
if (recursive) {
metoo.children.put(c.getKey(), c.getValue().clone(true));
} else {
metoo.children.put(c.getKey(), c.getValue());
}
}
return metoo;
}
/**
* This operation combine every level of the queue and produces a merged tree.
*
* @param subClusterQueues the input queues to merge
* @return the root of the merged FederationQueue tree
*/
public static FederationQueue mergeQueues(
List<FederationQueue> subClusterQueues, SubClusterId newScope) {
FederationQueue combined = null;
for (FederationQueue root : subClusterQueues) {
if (combined == null) {
combined = root.clone(false);
combined.setSubClusterId(newScope);
continue;
}
combined.setTotCap(Resources
.clone(Resources.add(combined.getTotCap(), root.getTotCap())));
combined.setGuarCap(Resources
.clone(Resources.add(combined.getGuarCap(), root.getGuarCap())));
combined.setMaxCap(
Resources.clone(Resources.componentwiseMax(combined.getTotCap(),
Resources.add(combined.getMaxCap(), root.getMaxCap()))));
combined.setUsedCap(Resources
.clone(Resources.add(combined.getUsedCap(), root.getUsedCap())));
combined.setDemandCap(Resources
.clone(Resources.add(combined.getDemandCap(), root.getDemandCap())));
Map<String, FederationQueue> newChildren = new HashMap<>();
for (Map.Entry<String, FederationQueue> mychild :
combined.children.entrySet()) {
FederationQueue theirchild = root.getChildren().get(mychild.getKey());
List<FederationQueue> mergelist = new ArrayList<>();
mergelist.add(mychild.getValue());
mergelist.add(theirchild);
newChildren.put(mychild.getKey(), mergeQueues(mergelist, newScope));
}
combined.children = newChildren;
}
combined.propagateCapacities();
return combined;
}
/**
* Get child FederationQueue by name.
* @param queueName name of the queue.
* @return children FederationQueue.
*/
public FederationQueue getChildByName(String queueName) {
return recursiveChildByName(this, queueName);
}
private static FederationQueue recursiveChildByName(FederationQueue f,
String a) {
if (f == null) {
return null;
}
if (f.getQueueName() != null && f.getQueueName().equals(a)) {
return f;
}
if (f.getChildren().get(a) != null) {
return f.getChildren().get(a);
}
for (FederationQueue c : f.getChildren().values()) {
FederationQueue ret = recursiveChildByName(c, a);
if (ret != null) {
return ret;
}
}
return null;
}
/**
* Sets total capacity for FederationQueue and children.
* @param totCapacity total capacity of the queue.
*/
public void recursiveSetOfTotCap(Resource totCapacity) {
this.setTotCap(totCapacity);
for (FederationQueue child : this.getChildren().values()) {
child.recursiveSetOfTotCap(totCapacity);
}
}
/**
* Get the queue total unassigned resources.
* @return queue unassigned resources.
*/
public Resource getTotalUnassigned() {
return totalUnassigned;
}
/**
* Set the queue total unassigned resources.
* @param totalUnassigned queue totalUnassigned resources.
*/
public void setTotalUnassigned(Resource totalUnassigned) {
this.totalUnassigned = totalUnassigned;
}
/**
* Get the queue configuration
* @return queue configuration
*/
public Configuration getConf() {
return conf;
}
/**
* Set the queue configuration
* @param conf queue configuration
*/
public void setConf(Configuration conf) {
this.conf = conf;
}
/**
* Get queue guaranteed capacity
* @return queue guaranteed capacity
*/
public Resource getGuarCap() {
return guarCap;
}
/**
* Set queue guaranteed capacity
* @param guarCap queue guaranteed capacity
*/
public void setGuarCap(Resource guarCap) {
this.guarCap = guarCap;
}
/**
* Get queue max capacity
* @return queue max capacity
*/
public Resource getMaxCap() {
return maxCap;
}
/**
* Set queue max capacity
* @param maxCap max capacity of the queue
*/
public void setMaxCap(Resource maxCap) {
this.maxCap = maxCap;
}
/**
* Get queue used capacity
* @return queue used capacity
*/
public Resource getUsedCap() {
return usedCap;
}
/**
* Set queue used capacity
* @param usedCap queue used capacity
*/
public void setUsedCap(Resource usedCap) {
this.usedCap = usedCap;
}
/**
* Get queue demand capacity
* @return queue demand capacity
*/
public Resource getDemandCap() {
return demandCap;
}
/**
* Set queue demand capacity
* @param demandCap queue demand capacity
*/
public void setDemandCap(Resource demandCap) {
this.demandCap = demandCap;
}
/**
* Get queue children
* @return queue children
*/
public Map<String, FederationQueue> getChildren() {
return children;
}
/**
* Set queue children
* @param children queue children
*/
public void setChildren(Map<String, FederationQueue> children) {
this.children = children;
}
/**
* Get queue name
* @return queue name
*/
public String getQueueName() {
return queueName;
}
/**
* Get queue total capacity
* @return queue total capacity
*/
public Resource getTotCap() {
return totCap;
}
/**
* Set queue total capacity
* @param totCap queue total capacity
*/
public void setTotCap(Resource totCap) {
this.totCap = totCap;
}
/**
* Set queue name
* @param queueName queue name
*/
public void setQueueName(String queueName) {
this.queueName = queueName;
}
/**
* Get queue ideal allocation
* @return queue ideal allocation
*/
public Resource getIdealAlloc() {
return idealAlloc;
}
/**
* Set queue ideal allocation
* @param idealAlloc queue ideal allocation
*/
public void setIdealAlloc(Resource idealAlloc) {
this.idealAlloc = idealAlloc;
}
/**
* Get queue resources to be preempted
* @return queue resources to be preempted
*/
public Resource getToBePreempted() {
return toBePreempted;
}
/**
* Set queue resources to be preempted
* @param toBePreempted queue resources to be preempted
*/
public void setToBePreempted(Resource toBePreempted) {
this.toBePreempted = toBePreempted;
}
/**
* Get queue subcluster id
* @return queue subcluster id
*/
public SubClusterId getSubClusterId() {
return subClusterId;
}
/**
* Set queue subcluster id
* @param subClusterId queue subcluster id
*/
public void setSubClusterId(SubClusterId subClusterId) {
this.subClusterId = subClusterId;
}
/**
* Set queue type
* @param queueType queue type
*/
public void setQueueType(String queueType) {
this.queueType = queueType;
}
/**
* Get queue type
* @return queue type
*/
public String getQueueType() {
return queueType;
}
/**
* Get queue expected ideal allocation
* @return queue ideal allocation
*/
public Resource getExpectedIdealAlloc() {
return testExpectedIdealAlloc;
}
public String toString() {
return toQuickString();
}
/**
* Produces a quick String representation of the queue rooted at this node.
* Good for printing.
*/
public String toQuickString() {
return this.appendToSB(new StringBuilder(), 0).toString();
}
/**
* Append queue hierarchy rooted at this node to the given StringBuilder.
*/
private StringBuilder appendToSB(StringBuilder sb, int depth) {
sb.append("\n").append(String.join("", Collections.nCopies(depth, "\t")))
.append(queueName);
if (depth == 0) {
sb.append("(" + subClusterId + ")");
}
sb.append(" [g: ").append(guarCap.getMemorySize()).append("/")
.append(guarCap.getVirtualCores()).append(", m: ")
.append(maxCap.getMemorySize()).append("/")
.append(maxCap.getVirtualCores()).append(", u: ")
.append(usedCap.getMemorySize()).append("/")
.append(usedCap.getVirtualCores()).append(", d: ")
.append(demandCap.getMemorySize()).append("/")
.append(demandCap.getVirtualCores()).append(", t: ")
.append(totCap.getMemorySize()).append("/")
.append(totCap.getVirtualCores());
if (idealAlloc != null) {
sb.append(", i: ").append(idealAlloc.getMemorySize()).append("/")
.append(idealAlloc.getVirtualCores());
}
sb.append("]");
if (children != null && !children.isEmpty()) {
children.values().forEach(c -> c.appendToSB(sb, depth + 1));
}
return sb;
}
/**
* Count the total child queues
* @return total child queues
*/
public long countQueues() {
long count = 1;
for (FederationQueue q : getChildren().values()) {
count += q.countQueues();
}
return count;
}
/**
* Checks whether queue is leaf queue
* @return is queue leaf queue
*/
public boolean isLeaf() {
return this.getChildren() == null || this.getChildren().isEmpty();
}
/**
* Get queue number of children
* @return number of queue children
*/
public int childrenNum() {
return this.getChildren() != null ? this.getChildren().size() : 0;
}
/**
* True if the sum of used and pending resources for this queue are smaller
* than the guaranteed resources.
*/
public boolean isUnderutilized() {
return Resources.fitsIn(
Resources.add(this.getUsedCap(), this.getDemandCap()),
this.getGuarCap());
}
/**
* Return a stream of the current FederationQueue (uses the FedQueueIterator).
*/
public Stream<FederationQueue> stream() {
return StreamSupport.stream(this.spliterator(), false);
}
/**
* Stream all leaf nodes of the FederationQueue hierarchy.
*/
public Stream<FederationQueue> streamLeafQs() {
return this.stream().filter(FederationQueue::isLeaf);
}
/**
* Stream all leaf nodes that have non-zero guaranteed capacity.
*/
public Stream<FederationQueue> streamNonEmptyLeafQs() {
return this.streamLeafQs()
.filter(leafQ -> leafQ.getGuarCap().getMemorySize() > 0);
}
/**
* Stream all inner nodes of the FederationQueue hierarchy.
*/
public Stream<FederationQueue> streamInnerQs() {
return this.stream().filter(q -> !q.isLeaf());
}
@Override
public Iterator<FederationQueue> iterator() {
return new FedQueueIterator(this);
}
/**
* Iterator for FederationQueue.
*/
private static final class FedQueueIterator implements
Iterator<FederationQueue> {
private Deque<FederationQueue> state;
private FederationQueue crt;
FedQueueIterator(FederationQueue root) {
this.state = new ArrayDeque<>();
state.push(root);
}
@Override
public boolean hasNext() {
return !state.isEmpty();
}
@Override
public FederationQueue next() {
crt = state.pop();
if (crt.getChildren() != null && !crt.getChildren().isEmpty()) {
state.addAll(crt.getChildren().values());
}
return crt;
}
}
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/**
* This class provides support methods for all global queue tests.
*/
public final class GlobalQueueTestUtil {
private GlobalQueueTestUtil() {
}
public static List<FederationQueue> generateFedCluster() throws IOException {
int numSubclusters = 20;
List<FederationQueue> toMerge = new ArrayList<>();
String queueJson = loadFile("globalqueues/tree-queue.json");
for (int i = 0; i < numSubclusters; i++) {
FederationQueue temp = parseQueue(queueJson);
toMerge.add(temp);
}
return toMerge;
}
public static String loadFile(String filename) throws IOException {
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
URL submitURI = cL.getResource(filename);
return FileUtils.readFileToString(new File(submitURI.getFile()),
Charset.defaultCharset());
}
public static String toJSONString(Resource resInf) {
StringBuilder builder = new StringBuilder();
builder.append("{");
builder.append("\"memory\"");
builder.append(" : ");
builder.append(resInf.getMemorySize());
builder.append(", ");
builder.append("\"vCores\"");
builder.append(" : ");
builder.append(resInf.getVirtualCores());
builder.append("}");
return builder.toString();
}
public static FederationQueue parseQueue(String queueJson)
throws IOException {
JsonFactory jsonF = new JsonFactory();
ObjectMapper mapper = new ObjectMapper();
Map jsonMap = mapper.readValue(jsonF.createParser(queueJson), Map.class);
FederationQueue fq = parseFedQueue(jsonMap);
fq.propagateCapacities();
return fq;
}
private static FederationQueue parseFedQueue(Map jsonMap) {
FederationQueue fq = new FederationQueue(new Configuration());
fq.setQueueName(jsonMap.get("queueName").toString());
fq.setSubClusterId(SubClusterId.newInstance(
((Map)(jsonMap.get("scope"))).get("id").toString()));
if (jsonMap.get("guarCap") != null) {
fq.setGuarCap(parseResource((Map)jsonMap.get("guarCap")));
}
if (jsonMap.get("maxCap") != null) {
fq.setMaxCap(parseResource((Map)jsonMap.get("maxCap")));
}
if (jsonMap.get("usedCap") != null) {
fq.setUsedCap(parseResource((Map)jsonMap.get("usedCap")));
}
if (jsonMap.get("totCap") != null) {
fq.setTotCap(parseResource((Map)jsonMap.get("totCap")));
}
if (jsonMap.get("demandCap") != null) {
fq.setDemandCap(parseResource((Map)jsonMap.get("demandCap")));
}
if (jsonMap.get("children") != null) {
List children = (List) jsonMap.get("children");
Map<String, FederationQueue> childrenFedQueue = new HashMap<>();
for (Object o : children) {
FederationQueue child = parseFedQueue((Map)(o));
childrenFedQueue.put(child.getQueueName(), child);
}
fq.setChildren(childrenFedQueue);
}
return fq;
}
private static Resource parseResource(Map resMap) {
Resource res = Resource.newInstance(0, 0);
if (resMap.get("memory") != null) {
res.setMemorySize(Integer.parseInt(resMap.get("memory").toString()));
}
if (resMap.get("vCores") != null) {
res.setVirtualCores(Integer.parseInt(resMap.get("vCores").toString()));
}
return res;
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
import static org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues.GlobalQueueTestUtil.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides simple tests for the {@code FederationQueue} data class.
*/
public class TestFederationQueue {
public static final Logger LOG =
LoggerFactory.getLogger(TestFederationQueue.class);
@Test
public void testParseQueue() throws Exception {
String queueJson = loadFile("globalqueues/tree-queue.json");
FederationQueue fq = parseQueue(queueJson);
fq.validate();
Assert.assertEquals("root", fq.getQueueName());
Assert.assertEquals(2, fq.getChildren().size());
Assert.assertEquals(100000, fq.getGuarCap().getMemorySize());
FederationQueue queueA = fq.getChildByName("A");
Assert.assertEquals(2, queueA.getChildren().size());
Assert.assertEquals(750, queueA.getUsedCap().getVirtualCores());
}
@Test
public void testMergeFedQueue() throws Exception {
List<FederationQueue> toMerge = generateFedCluster();
FederationQueue merged =
FederationQueue.mergeQueues(toMerge,
SubClusterId.newInstance("merged"));
merged.propagateCapacities();
merged.validate();
LOG.info(merged.toQuickString());
}
@Test
public void testPropagateFedQueue() throws Exception {
String queueJson = loadFile("globalqueues/tree-queue-adaptable.json");
int numSubclusters = 10;
Resource guar = Resource.newInstance(5 * 1024, 10);
Resource max = Resource.newInstance(8 * 1024, 10);
Resource used = Resource.newInstance(4 * 1024, 10);
Resource dem = Resource.newInstance(1 * 1024, 10);
List<FederationQueue> toMerge = new ArrayList<>();
for (int i = 0; i < numSubclusters; i++) {
queueJson = String.format(queueJson, "A1", toJSONString(guar),
toJSONString(max), toJSONString(used), toJSONString(dem));
FederationQueue temp = parseQueue(queueJson);
temp.propagateCapacities();
temp.validate();
toMerge.add(temp);
}
FederationQueue merged =
FederationQueue.mergeQueues(toMerge,
SubClusterId.newInstance("merged"));
merged.propagateCapacities();
merged.validate();
LOG.info(merged.toQuickString());
}
}

View File

@ -0,0 +1,9 @@
{
"queueName" : "%s",
"totCap" : %s,
"guarCap" : %s,
"maxCap" : %s,
"usedCap" : %s,
"demandCap" : %s,
"scope" : {"id" : "sc1"}
}

View File

@ -0,0 +1,96 @@
{
"queueName": "root",
"scope" : {"id" : "sc1"},
"totCap": {
"memory": "100000",
"vCores": "1000"
},
"guarCap": {
"memory": "100000",
"vCores": "1000"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "50000",
"vCores": "500"
},
"demandCap": {
"memory": "1000",
"vCores": "10"
},
"children": [
{
"queueName": "A",
"scope" : {"id" : "sc1"},
"guarCap": {
"memory": "50000",
"vCores": "500"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "25000",
"vCores": "250"
},
"demandCap": {
"memory": "0",
"vCores": "0"
},
"children": [
{
"queueName": "%s",
"guarCap": %s,
"maxCap": %s,
"usedCap": %s,
"demandCap": %s,
"scope" : {"id" : "sc1"}
},
{
"queueName": "A2",
"scope" : {"id" : "sc1"},
"guarCap": {
"memory": "0",
"vCores": "0"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "0",
"vCores": "0"
},
"demandCap": {
"memory": "0",
"vCores": "0"
}
}
]
},
{
"queueName": "B",
"scope" : {"id" : "sc1"},
"guarCap": {
"memory": "50000",
"vCores": "500"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "50000",
"vCores": "500"
},
"demandCap": {
"memory": "1000",
"vCores": "10"
}
}
]
}

View File

@ -0,0 +1,128 @@
{
"queueName": "root",
"scope" : {"id" : "sc1"},
"totCap": {
"memory": "100000",
"vCores": "1000"
},
"guarCap": {
"memory": "100000",
"vCores": "1000"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "100000",
"vCores": "1000"
},
"demandCap": {
"memory": "1000",
"vCores": "10"
},
"testExpectedToBePreempted": {
"memory": "0",
"vCores": "0"
},
"children": [
{
"queueName": "A",
"scope" : {"id" : "sc1"},
"guarCap": {
"memory": "50000",
"vCores": "500"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "75000",
"vCores": "750"
},
"demandCap": {
"memory": "0",
"vCores": "0"
},
"testExpectedToBePreempted": {
"memory": "10000",
"vCores": "100"
},
"children": [
{
"queueName": "A1",
"scope" : {"id" : "sc1"},
"guarCap": {
"memory": "25000",
"vCores": "250"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "50000",
"vCores": "500"
},
"demandCap": {
"memory": "0",
"vCores": "0"
},
"testExpectedToBePreempted": {
"memory": "10000",
"vCores": "100"
}
},
{
"queueName": "A2",
"scope" : {"id" : "sc1"},
"guarCap": {
"memory": "25000",
"vCores": "250"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "25000",
"vCores": "250"
},
"demandCap": {
"memory": "0",
"vCores": "0"
},
"testExpectedToBePreempted": {
"memory": "0",
"vCores": "0"
}
}
]
},
{
"queueName": "B",
"scope" : {"id" : "sc1"},
"guarCap": {
"memory": "50000",
"vCores": "500"
},
"maxCap": {
"memory": "100000",
"vCores": "1000"
},
"usedCap": {
"memory": "25000",
"vCores": "250"
},
"demandCap": {
"memory": "10000",
"vCores": "100"
},
"testExpectedToBePreempted": {
"memory": "0",
"vCores": "0"
}
}
]
}