YARN-4212. FairScheduler: Can't create a DRF queue under a FAIR policy queue. (Yufei Gu via kasha)
(cherry picked from commit 11be3f70e0
)
This commit is contained in:
parent
46b6c95e0a
commit
fefac1276a
|
@ -408,9 +408,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
* Initialize a {@link FSQueue} with queue-specific properties and its
|
* Initialize a {@link FSQueue} with queue-specific properties and its
|
||||||
* metrics.
|
* metrics.
|
||||||
* @param queue the FSQueue needed to be initialized
|
* @param queue the FSQueue needed to be initialized
|
||||||
* @param scheduler the scheduler which the queue belonged to
|
|
||||||
*/
|
*/
|
||||||
public void initFSQueue(FSQueue queue, FairScheduler scheduler){
|
public void initFSQueue(FSQueue queue){
|
||||||
// Set queue-specific properties.
|
// Set queue-specific properties.
|
||||||
String name = queue.getName();
|
String name = queue.getName();
|
||||||
queue.setWeights(getQueueWeight(name));
|
queue.setWeights(getQueueWeight(name));
|
||||||
|
@ -419,14 +418,6 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
queue.setMaxRunningApps(getQueueMaxApps(name));
|
queue.setMaxRunningApps(getQueueMaxApps(name));
|
||||||
queue.setMaxAMShare(getQueueMaxAMShare(name));
|
queue.setMaxAMShare(getQueueMaxAMShare(name));
|
||||||
queue.setMaxChildQueueResource(getMaxChildResources(name));
|
queue.setMaxChildQueueResource(getMaxChildResources(name));
|
||||||
try {
|
|
||||||
SchedulingPolicy policy = getSchedulingPolicy(name);
|
|
||||||
policy.initialize(scheduler.getClusterResource());
|
|
||||||
queue.setPolicy(policy);
|
|
||||||
} catch (AllocationConfigurationException ex) {
|
|
||||||
LOG.warn("Failed to set the scheduling policy "
|
|
||||||
+ getDefaultSchedulingPolicy(), ex);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set queue metrics.
|
// Set queue metrics.
|
||||||
queue.getMetrics().setMinShare(getMinResources(name));
|
queue.getMetrics().setMinShare(getMinResources(name));
|
||||||
|
|
|
@ -197,15 +197,6 @@ public class FSLeafQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setPolicy(SchedulingPolicy policy)
|
|
||||||
throws AllocationConfigurationException {
|
|
||||||
if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)) {
|
|
||||||
throwPolicyDoesnotApplyException(policy);
|
|
||||||
}
|
|
||||||
super.policy = policy;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateInternal(boolean checkStarvation) {
|
public void updateInternal(boolean checkStarvation) {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
|
|
|
@ -239,19 +239,6 @@ public class FSParentQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setPolicy(SchedulingPolicy policy)
|
|
||||||
throws AllocationConfigurationException {
|
|
||||||
boolean allowed =
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, (parent == null)
|
|
||||||
? SchedulingPolicy.DEPTH_ROOT
|
|
||||||
: SchedulingPolicy.DEPTH_INTERMEDIATE);
|
|
||||||
if (!allowed) {
|
|
||||||
throwPolicyDoesnotApplyException(policy);
|
|
||||||
}
|
|
||||||
super.policy = policy;
|
|
||||||
}
|
|
||||||
|
|
||||||
void incrementRunnableApps() {
|
void incrementRunnableApps() {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -91,20 +91,23 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, name);
|
this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, name);
|
||||||
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
|
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
|
setPolicy(scheduler.getAllocationConfiguration().getSchedulingPolicy(name));
|
||||||
reinit(false);
|
reinit(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize a queue by setting its queue-specific properties and its
|
* Initialize a queue by setting its queue-specific properties and its
|
||||||
* metrics.
|
* metrics. This method is invoked when creating a new queue or reloading
|
||||||
* This function is invoked when a new queue is created or reloading the
|
* the allocation file.
|
||||||
* allocation configuration.
|
* This method does not set policies for queues when reloading the allocation
|
||||||
|
* file since we need to either set all new policies or nothing, which is
|
||||||
|
* handled by method {@link #verifyAndSetPolicyFromConf}.
|
||||||
*
|
*
|
||||||
* @param recursive whether child queues should be reinitialized recursively
|
* @param recursive whether child queues should be reinitialized recursively
|
||||||
*/
|
*/
|
||||||
public void reinit(boolean recursive) {
|
public void reinit(boolean recursive) {
|
||||||
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||||
allocConf.initFSQueue(this, scheduler);
|
allocConf.initFSQueue(this);
|
||||||
updatePreemptionVariables();
|
updatePreemptionVariables();
|
||||||
|
|
||||||
if (recursive) {
|
if (recursive) {
|
||||||
|
@ -131,15 +134,11 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
return parent;
|
return parent;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
|
public void setPolicy(SchedulingPolicy policy) {
|
||||||
throws AllocationConfigurationException {
|
policy.initialize(scheduler.getClusterResource());
|
||||||
throw new AllocationConfigurationException("SchedulingPolicy " + policy
|
this.policy = policy;
|
||||||
+ " does not apply to queue " + getName());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract void setPolicy(SchedulingPolicy policy)
|
|
||||||
throws AllocationConfigurationException;
|
|
||||||
|
|
||||||
public void setWeights(ResourceWeights weights){
|
public void setWeights(ResourceWeights weights){
|
||||||
this.weights = weights;
|
this.weights = weights;
|
||||||
}
|
}
|
||||||
|
@ -463,4 +462,33 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively check policies for queues in pre-order. Get queue policies
|
||||||
|
* from the allocation file instead of properties of {@link FSQueue} objects.
|
||||||
|
* Set the policy for current queue if there is no policy violation for its
|
||||||
|
* children. This method is invoked while reloading the allocation file.
|
||||||
|
*
|
||||||
|
* @param queueConf allocation configuration
|
||||||
|
* @return true if no policy violation and successfully set polices
|
||||||
|
* for queues; false otherwise
|
||||||
|
*/
|
||||||
|
public boolean verifyAndSetPolicyFromConf(AllocationConfiguration queueConf) {
|
||||||
|
SchedulingPolicy queuePolicy = queueConf.getSchedulingPolicy(getName());
|
||||||
|
|
||||||
|
for (FSQueue child : getChildQueues()) {
|
||||||
|
if (!queuePolicy.isChildPolicyAllowed(
|
||||||
|
queueConf.getSchedulingPolicy(child.getName()))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
boolean success = child.verifyAndSetPolicyFromConf(queueConf);
|
||||||
|
if (!success) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the policy if no policy violation for all children
|
||||||
|
setPolicy(queuePolicy);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
import org.xml.sax.SAXException;
|
import org.xml.sax.SAXException;
|
||||||
|
|
||||||
import com.google.common.base.CharMatcher;
|
import com.google.common.base.CharMatcher;
|
||||||
|
@ -42,10 +43,10 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maintains a list of queues as well as scheduling parameters for each queue,
|
* Maintains a list of queues as well as scheduling parameters for each queue,
|
||||||
* such as guaranteed share allocations, from the fair scheduler config file.
|
* such as guaranteed share allocations, from the fair scheduler config file.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
@ -72,6 +73,9 @@ public class QueueManager {
|
||||||
|
|
||||||
public void initialize(Configuration conf) throws IOException,
|
public void initialize(Configuration conf) throws IOException,
|
||||||
SAXException, AllocationConfigurationException, ParserConfigurationException {
|
SAXException, AllocationConfigurationException, ParserConfigurationException {
|
||||||
|
// Policies of root and default queue are set to
|
||||||
|
// SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
|
||||||
|
// loaded yet.
|
||||||
rootQueue = new FSParentQueue("root", scheduler, null);
|
rootQueue = new FSParentQueue("root", scheduler, null);
|
||||||
queues.put(rootQueue.getName(), rootQueue);
|
queues.put(rootQueue.getName(), rootQueue);
|
||||||
|
|
||||||
|
@ -80,7 +84,7 @@ public class QueueManager {
|
||||||
// Recursively reinitialize to propagate queue properties
|
// Recursively reinitialize to propagate queue properties
|
||||||
rootQueue.reinit(true);
|
rootQueue.reinit(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a leaf queue by name, creating it if the create param is true and is necessary.
|
* Get a leaf queue by name, creating it if the create param is true and is necessary.
|
||||||
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
|
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
|
||||||
|
@ -272,12 +276,25 @@ public class QueueManager {
|
||||||
FSParentQueue newParent = null;
|
FSParentQueue newParent = null;
|
||||||
String queueName = i.next();
|
String queueName = i.next();
|
||||||
|
|
||||||
|
// Check if child policy is allowed
|
||||||
|
SchedulingPolicy childPolicy = scheduler.getAllocationConfiguration().
|
||||||
|
getSchedulingPolicy(queueName);
|
||||||
|
if (!parent.getPolicy().isChildPolicyAllowed(childPolicy)) {
|
||||||
|
LOG.error("Can't create queue '" + queueName + "'.");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
// Only create a leaf queue at the very end
|
// Only create a leaf queue at the very end
|
||||||
if (!i.hasNext() && (queueType != FSQueueType.PARENT)) {
|
if (!i.hasNext() && (queueType != FSQueueType.PARENT)) {
|
||||||
FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent);
|
FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent);
|
||||||
leafQueues.add(leafQueue);
|
leafQueues.add(leafQueue);
|
||||||
queue = leafQueue;
|
queue = leafQueue;
|
||||||
} else {
|
} else {
|
||||||
|
if (childPolicy instanceof FifoPolicy) {
|
||||||
|
LOG.error("Can't create queue '" + queueName + "', since "
|
||||||
|
+ FifoPolicy.NAME + " is only for leaf queues.");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
newParent = new FSParentQueue(queueName, scheduler, parent);
|
newParent = new FSParentQueue(queueName, scheduler, parent);
|
||||||
queue = newParent;
|
queue = newParent;
|
||||||
}
|
}
|
||||||
|
@ -479,6 +496,13 @@ public class QueueManager {
|
||||||
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
||||||
// Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
|
// Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
|
||||||
synchronized (queues) {
|
synchronized (queues) {
|
||||||
|
// Verify and set scheduling policies for existing queues before creating
|
||||||
|
// any queue, since we need parent policies to determine if we can create
|
||||||
|
// its children.
|
||||||
|
if (!rootQueue.verifyAndSetPolicyFromConf(queueConf)) {
|
||||||
|
LOG.error("Setting scheduling policies for existing queues failed!");
|
||||||
|
}
|
||||||
|
|
||||||
for (String name : queueConf.getConfiguredQueues().get(
|
for (String name : queueConf.getConfiguredQueues().get(
|
||||||
FSQueueType.LEAF)) {
|
FSQueueType.LEAF)) {
|
||||||
if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
|
if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
|
||||||
|
|
|
@ -42,12 +42,6 @@ public abstract class SchedulingPolicy {
|
||||||
public static final SchedulingPolicy DEFAULT_POLICY =
|
public static final SchedulingPolicy DEFAULT_POLICY =
|
||||||
getInstance(FairSharePolicy.class);
|
getInstance(FairSharePolicy.class);
|
||||||
|
|
||||||
public static final byte DEPTH_LEAF = (byte) 1;
|
|
||||||
public static final byte DEPTH_INTERMEDIATE = (byte) 2;
|
|
||||||
public static final byte DEPTH_ROOT = (byte) 4;
|
|
||||||
public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate
|
|
||||||
public static final byte DEPTH_ANY = (byte) 7;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz
|
* Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz
|
||||||
*/
|
*/
|
||||||
|
@ -113,27 +107,6 @@ public abstract class SchedulingPolicy {
|
||||||
*/
|
*/
|
||||||
public abstract String getName();
|
public abstract String getName();
|
||||||
|
|
||||||
/**
|
|
||||||
* Specifies the depths in the hierarchy, this {@link SchedulingPolicy}
|
|
||||||
* applies to
|
|
||||||
*
|
|
||||||
* @return depth equal to one of fields {@link SchedulingPolicy}#DEPTH_*
|
|
||||||
*/
|
|
||||||
public abstract byte getApplicableDepth();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if the specified {@link SchedulingPolicy} can be used for a queue at
|
|
||||||
* the specified depth in the hierarchy
|
|
||||||
*
|
|
||||||
* @param policy {@link SchedulingPolicy} we are checking the
|
|
||||||
* depth-applicability for
|
|
||||||
* @param depth queue's depth in the hierarchy
|
|
||||||
* @return true if policy is applicable to passed depth, false otherwise
|
|
||||||
*/
|
|
||||||
public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) {
|
|
||||||
return ((policy.getApplicableDepth() & depth) == depth) ? true : false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The comparator returned by this method is to be used for sorting the
|
* The comparator returned by this method is to be used for sorting the
|
||||||
* {@link Schedulable}s in that queue.
|
* {@link Schedulable}s in that queue.
|
||||||
|
@ -191,4 +164,13 @@ public abstract class SchedulingPolicy {
|
||||||
public abstract Resource getHeadroom(Resource queueFairShare,
|
public abstract Resource getHeadroom(Resource queueFairShare,
|
||||||
Resource queueUsage, Resource maxAvailable);
|
Resource queueUsage, Resource maxAvailable);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether the policy of a child queue is allowed.
|
||||||
|
*
|
||||||
|
* @param childPolicy the policy of child queue
|
||||||
|
* @return true if the child policy is allowed; false otherwise
|
||||||
|
*/
|
||||||
|
public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,11 +57,6 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
||||||
return NAME;
|
return NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte getApplicableDepth() {
|
|
||||||
return SchedulingPolicy.DEPTH_ANY;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Comparator<Schedulable> getComparator() {
|
public Comparator<Schedulable> getComparator() {
|
||||||
return COMPARATOR;
|
return COMPARATOR;
|
||||||
|
|
|
@ -21,6 +21,8 @@ import java.io.Serializable;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -40,6 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class FairSharePolicy extends SchedulingPolicy {
|
public class FairSharePolicy extends SchedulingPolicy {
|
||||||
|
private static final Log LOG = LogFactory.getLog(FifoPolicy.class);
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final String NAME = "fair";
|
public static final String NAME = "fair";
|
||||||
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
|
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
|
||||||
|
@ -175,7 +178,15 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte getApplicableDepth() {
|
public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) {
|
||||||
return SchedulingPolicy.DEPTH_ANY;
|
if (childPolicy instanceof DominantResourceFairnessPolicy) {
|
||||||
|
LOG.error("Queue policy can't be " + DominantResourceFairnessPolicy.NAME
|
||||||
|
+ " if the parent policy is " + getName() + ". Choose " +
|
||||||
|
getName() + " or " + FifoPolicy.NAME + " for child queues instead."
|
||||||
|
+ " Please note that " + FifoPolicy.NAME
|
||||||
|
+ " is only for leaf queues.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,14 +21,14 @@ import java.io.Serializable;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -38,6 +38,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class FifoPolicy extends SchedulingPolicy {
|
public class FifoPolicy extends SchedulingPolicy {
|
||||||
|
private static final Log LOG = LogFactory.getLog(FifoPolicy.class);
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final String NAME = "FIFO";
|
public static final String NAME = "FIFO";
|
||||||
private static final FifoComparator COMPARATOR = new FifoComparator();
|
private static final FifoComparator COMPARATOR = new FifoComparator();
|
||||||
|
@ -127,9 +129,11 @@ public class FifoPolicy extends SchedulingPolicy {
|
||||||
return headroom;
|
return headroom;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte getApplicableDepth() {
|
public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) {
|
||||||
return SchedulingPolicy.DEPTH_LEAF;
|
LOG.error(getName() + " policy is only for leaf queues. Please choose "
|
||||||
|
+ DominantResourceFairnessPolicy.NAME + " or " + FairSharePolicy.NAME
|
||||||
|
+ " for parent queues.");
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -197,7 +197,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
|
||||||
out.println("<fairSharePreemptionTimeout>0" +
|
out.println("<fairSharePreemptionTimeout>0" +
|
||||||
"</fairSharePreemptionTimeout>");
|
"</fairSharePreemptionTimeout>");
|
||||||
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
||||||
addChildQueue(out);
|
addChildQueue(out, "fair");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
|
|
||||||
// DRF queue with fairshare preemption enabled
|
// DRF queue with fairshare preemption enabled
|
||||||
|
@ -207,9 +207,10 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
|
||||||
out.println("<fairSharePreemptionTimeout>0" +
|
out.println("<fairSharePreemptionTimeout>0" +
|
||||||
"</fairSharePreemptionTimeout>");
|
"</fairSharePreemptionTimeout>");
|
||||||
out.println("<schedulingPolicy>drf</schedulingPolicy>");
|
out.println("<schedulingPolicy>drf</schedulingPolicy>");
|
||||||
addChildQueue(out);
|
addChildQueue(out, "drf");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
|
out.println("<defaultQueueSchedulingPolicy>drf" +
|
||||||
|
"</defaultQueueSchedulingPolicy>");
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
|
@ -237,13 +238,14 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
|
||||||
assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
|
assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addChildQueue(PrintWriter out) {
|
private void addChildQueue(PrintWriter out, String policy) {
|
||||||
// Child queue under fairshare with same settings
|
// Child queue under fairshare with same settings
|
||||||
out.println("<queue name=\"child\">");
|
out.println("<queue name=\"child\">");
|
||||||
out.println("<fairSharePreemptionThreshold>1" +
|
out.println("<fairSharePreemptionThreshold>1" +
|
||||||
"</fairSharePreemptionThreshold>");
|
"</fairSharePreemptionThreshold>");
|
||||||
out.println("<fairSharePreemptionTimeout>0" +
|
out.println("<fairSharePreemptionTimeout>0" +
|
||||||
"</fairSharePreemptionTimeout>");
|
"</fairSharePreemptionTimeout>");
|
||||||
|
out.println("<schedulingPolicy>" + policy + "</schedulingPolicy>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5155,5 +5155,4 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
Resources.equals(aQueue.getDemand(), maxResource) &&
|
Resources.equals(aQueue.getDemand(), maxResource) &&
|
||||||
Resources.equals(bQueue.getDemand(), maxResource));
|
Resources.equals(bQueue.getDemand(), maxResource));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import java.io.File;
|
||||||
import static org.junit.Assert.assertTrue;
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.Stack;
|
import java.util.Stack;
|
||||||
|
@ -30,16 +31,29 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
public class TestSchedulingPolicy {
|
public class TestSchedulingPolicy {
|
||||||
private static final Log LOG = LogFactory.getLog(TestSchedulingPolicy.class);
|
private static final Log LOG = LogFactory.getLog(TestSchedulingPolicy.class);
|
||||||
|
private final static String ALLOC_FILE =
|
||||||
|
new File(FairSchedulerTestBase.TEST_DIR, "test-queues").getAbsolutePath();
|
||||||
|
private FairSchedulerConfiguration conf;
|
||||||
|
private FairScheduler scheduler;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
scheduler = new FairScheduler();
|
||||||
|
conf = new FairSchedulerConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 1000)
|
@Test(timeout = 1000)
|
||||||
public void testParseSchedulingPolicy()
|
public void testParseSchedulingPolicy()
|
||||||
|
@ -78,66 +92,6 @@ public class TestSchedulingPolicy {
|
||||||
sm.getName().equals(FifoPolicy.NAME));
|
sm.getName().equals(FifoPolicy.NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Trivial tests that make sure
|
|
||||||
* {@link SchedulingPolicy#isApplicableTo(SchedulingPolicy, byte)} works as
|
|
||||||
* expected for the possible values of depth
|
|
||||||
*
|
|
||||||
* @throws AllocationConfigurationException
|
|
||||||
*/
|
|
||||||
@Test(timeout = 1000)
|
|
||||||
public void testIsApplicableTo() throws AllocationConfigurationException {
|
|
||||||
final String ERR = "Broken SchedulingPolicy#isApplicableTo";
|
|
||||||
|
|
||||||
// fifo
|
|
||||||
SchedulingPolicy policy = SchedulingPolicy.parse("fifo");
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
|
|
||||||
assertFalse(ERR, SchedulingPolicy.isApplicableTo(
|
|
||||||
SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_INTERMEDIATE));
|
|
||||||
assertFalse(ERR, SchedulingPolicy.isApplicableTo(
|
|
||||||
SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_ROOT));
|
|
||||||
|
|
||||||
|
|
||||||
// fair
|
|
||||||
policy = SchedulingPolicy.parse("fair");
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
|
|
||||||
assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
|
|
||||||
SchedulingPolicy.DEPTH_INTERMEDIATE));
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
|
||||||
|
|
||||||
// drf
|
|
||||||
policy = SchedulingPolicy.parse("drf");
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
|
|
||||||
assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
|
|
||||||
SchedulingPolicy.DEPTH_INTERMEDIATE));
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
|
||||||
|
|
||||||
policy = Mockito.mock(SchedulingPolicy.class);
|
|
||||||
Mockito.when(policy.getApplicableDepth()).thenReturn(
|
|
||||||
SchedulingPolicy.DEPTH_PARENT);
|
|
||||||
assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
|
|
||||||
SchedulingPolicy.DEPTH_INTERMEDIATE));
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
|
|
||||||
assertTrue(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
|
|
||||||
assertFalse(ERR,
|
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test whether {@link FairSharePolicy.FairShareComparator} is transitive.
|
* Test whether {@link FairSharePolicy.FairShareComparator} is transitive.
|
||||||
*/
|
*/
|
||||||
|
@ -353,4 +307,222 @@ public class TestSchedulingPolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulingPolicyViolation() throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"child1\">");
|
||||||
|
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" <queue name=\"child2\">");
|
||||||
|
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<defaultQueueSchedulingPolicy>drf" +
|
||||||
|
"</defaultQueueSchedulingPolicy>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
|
||||||
|
FSQueue child1 = scheduler.getQueueManager().getQueue("child1");
|
||||||
|
assertNull("Queue 'child1' should be null since its policy isn't allowed to"
|
||||||
|
+ " be 'drf' if its parent policy is 'fair'.", child1);
|
||||||
|
|
||||||
|
// dynamic queue
|
||||||
|
FSQueue dynamicQueue = scheduler.getQueueManager().
|
||||||
|
getLeafQueue("dynamicQueue", true);
|
||||||
|
assertNull("Dynamic queue should be null since it isn't allowed to be 'drf'"
|
||||||
|
+ " policy if its parent policy is 'fair'.", dynamicQueue);
|
||||||
|
|
||||||
|
// Set child1 to 'fair' and child2 to 'drf', the reload the allocation file.
|
||||||
|
out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"child1\">");
|
||||||
|
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" <queue name=\"child2\">");
|
||||||
|
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<defaultQueueSchedulingPolicy>drf" +
|
||||||
|
"</defaultQueueSchedulingPolicy>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.reinitialize(conf, null);
|
||||||
|
child1 = scheduler.getQueueManager().getQueue("child1");
|
||||||
|
assertNotNull("Queue 'child1' should be not null since its policy is "
|
||||||
|
+ "allowed to be 'fair' if its parent policy is 'fair'.", child1);
|
||||||
|
|
||||||
|
// Detect the policy violation of Child2, keep the original policy instead
|
||||||
|
// of setting the new policy.
|
||||||
|
FSQueue child2 = scheduler.getQueueManager().getQueue("child2");
|
||||||
|
assertTrue("Queue 'child2' should be 'fair' since its new policy 'drf' "
|
||||||
|
+ "is not allowed.", child2.getPolicy() instanceof FairSharePolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulingPolicyViolationInTheMiddleLevel()
|
||||||
|
throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"level2\">");
|
||||||
|
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"level3\">");
|
||||||
|
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"leaf\">");
|
||||||
|
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
|
||||||
|
FSQueue level2 = scheduler.getQueueManager().getQueue("level2");
|
||||||
|
assertNotNull("Queue 'level2' shouldn't be null since its policy is allowed"
|
||||||
|
+ " to be 'fair' if its parent policy is 'fair'.", level2);
|
||||||
|
FSQueue level3 = scheduler.getQueueManager().getQueue("level2.level3");
|
||||||
|
assertNull("Queue 'level3' should be null since its policy isn't allowed"
|
||||||
|
+ " to be 'drf' if its parent policy is 'fair'.", level3);
|
||||||
|
FSQueue leaf = scheduler.getQueueManager().getQueue("level2.level3.leaf");
|
||||||
|
assertNull("Queue 'leaf' should be null since its parent failed to create.",
|
||||||
|
leaf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFIFOPolicyOnlyForLeafQueues()
|
||||||
|
throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println(" <queue name=\"intermediate\">");
|
||||||
|
out.println(" <schedulingPolicy>fifo</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"leaf\">");
|
||||||
|
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
|
||||||
|
FSQueue intermediate = scheduler.getQueueManager().getQueue("intermediate");
|
||||||
|
assertNull("Queue 'intermediate' should be null since 'fifo' is only for "
|
||||||
|
+ "leaf queue.", intermediate);
|
||||||
|
|
||||||
|
out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println(" <queue name=\"intermediate\">");
|
||||||
|
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"leaf\">");
|
||||||
|
out.println(" <schedulingPolicy>fifo</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.reinitialize(conf, null);
|
||||||
|
|
||||||
|
assertNotNull(scheduler.getQueueManager().getQueue("intermediate"));
|
||||||
|
|
||||||
|
FSQueue leaf = scheduler.getQueueManager().getQueue("intermediate.leaf");
|
||||||
|
assertNotNull("Queue 'leaf' should be null since 'fifo' is only for "
|
||||||
|
+ "leaf queue.", leaf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPolicyReinitilization() throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"child1\">");
|
||||||
|
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" <queue name=\"child2\">");
|
||||||
|
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
|
||||||
|
// Set child1 to 'drf' which is not allowed, then reload the allocation file
|
||||||
|
out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"child1\">");
|
||||||
|
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" <queue name=\"child2\">");
|
||||||
|
out.println(" <schedulingPolicy>fifo</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.reinitialize(conf, null);
|
||||||
|
|
||||||
|
FSQueue child1 = scheduler.getQueueManager().getQueue("child1");
|
||||||
|
assertTrue("Queue 'child1' should still be 'fair' since 'drf' isn't allowed"
|
||||||
|
+ " if its parent policy is 'fair'.",
|
||||||
|
child1.getPolicy() instanceof FairSharePolicy);
|
||||||
|
FSQueue child2 = scheduler.getQueueManager().getQueue("child2");
|
||||||
|
assertTrue("Queue 'child2' should still be 'fair' there is a policy"
|
||||||
|
+ " violation while reinitialization.",
|
||||||
|
child2.getPolicy() instanceof FairSharePolicy);
|
||||||
|
|
||||||
|
// Set both child1 and root to 'drf', then reload the allocation file
|
||||||
|
out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("<schedulingPolicy>drf</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"child1\">");
|
||||||
|
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" <queue name=\"child2\">");
|
||||||
|
out.println(" <schedulingPolicy>fifo</schedulingPolicy>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.reinitialize(conf, null);
|
||||||
|
|
||||||
|
child1 = scheduler.getQueueManager().getQueue("child1");
|
||||||
|
assertTrue("Queue 'child1' should be 'drf' since both 'root' and 'child1'"
|
||||||
|
+ " are 'drf'.",
|
||||||
|
child1.getPolicy() instanceof DominantResourceFairnessPolicy);
|
||||||
|
child2 = scheduler.getQueueManager().getQueue("child2");
|
||||||
|
assertTrue("Queue 'child2' should still be 'fifo' there is no policy"
|
||||||
|
+ " violation while reinitialization.",
|
||||||
|
child2.getPolicy() instanceof FifoPolicy);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue