YARN-482. FS: Extend SchedulingMode to intermediate queues. (kkambatl via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1469511 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
27bef76501
commit
15d5394221
|
@ -23,6 +23,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
||||||
|
(kkambatl via tucu)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-365. Change NM heartbeat handling to not generate a scheduler event
|
YARN-365. Change NM heartbeat handling to not generate a scheduler event
|
||||||
|
|
|
@ -278,9 +278,7 @@ public class AppSchedulable extends Schedulable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
||||||
@Override
|
|
||||||
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
|
||||||
LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
|
LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
|
||||||
|
|
||||||
if (reserved) {
|
if (reserved) {
|
||||||
|
@ -345,4 +343,13 @@ public class AppSchedulable extends Schedulable {
|
||||||
}
|
}
|
||||||
return Resources.none();
|
return Resources.none();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource assignReservedContainer(FSSchedulerNode node) {
|
||||||
|
return assignContainer(node, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource assignContainer(FSSchedulerNode node) {
|
||||||
|
return assignContainer(node, false);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -40,9 +40,6 @@ public class FSLeafQueue extends FSQueue {
|
||||||
|
|
||||||
private final List<AppSchedulable> appScheds =
|
private final List<AppSchedulable> appScheds =
|
||||||
new ArrayList<AppSchedulable>();
|
new ArrayList<AppSchedulable>();
|
||||||
|
|
||||||
/** Scheduling mode for jobs inside the queue (fair or FIFO) */
|
|
||||||
private SchedulingMode schedulingMode;
|
|
||||||
|
|
||||||
private final FairScheduler scheduler;
|
private final FairScheduler scheduler;
|
||||||
private final QueueManager queueMgr;
|
private final QueueManager queueMgr;
|
||||||
|
@ -86,13 +83,18 @@ public class FSLeafQueue extends FSQueue {
|
||||||
return appScheds;
|
return appScheds;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSchedulingMode(SchedulingMode mode) {
|
@Override
|
||||||
this.schedulingMode = mode;
|
public void setPolicy(SchedulingPolicy policy)
|
||||||
|
throws AllocationConfigurationException {
|
||||||
|
if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)) {
|
||||||
|
throwPolicyDoesnotApplyException(policy);
|
||||||
|
}
|
||||||
|
super.policy = policy;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recomputeFairShares() {
|
public void recomputeShares() {
|
||||||
schedulingMode.computeShares(getAppSchedulables(), getFairShare());
|
policy.computeShares(getAppSchedulables(), getFairShare());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -136,42 +138,27 @@ public class FSLeafQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
public Resource assignContainer(FSSchedulerNode node) {
|
||||||
LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved);
|
Resource assigned = Resources.none();
|
||||||
// If this queue is over its limit, reject
|
if (LOG.isDebugEnabled()) {
|
||||||
if (Resources.greaterThan(getResourceUsage(),
|
LOG.debug("Node offered to queue: " + getName());
|
||||||
queueMgr.getMaxResources(getName()))) {
|
|
||||||
return Resources.none();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this node already has reserved resources for an app, first try to
|
if (!assignContainerPreCheck(node)) {
|
||||||
// finish allocating resources for that app.
|
return assigned;
|
||||||
if (reserved) {
|
}
|
||||||
for (AppSchedulable sched : appScheds) {
|
|
||||||
if (sched.getApp().getApplicationAttemptId() ==
|
Comparator<Schedulable> comparator = policy.getComparator();
|
||||||
node.getReservedContainer().getApplicationAttemptId()) {
|
Collections.sort(appScheds, comparator);
|
||||||
return sched.assignContainer(node, reserved);
|
for (AppSchedulable sched : appScheds) {
|
||||||
|
if (sched.getRunnable()) {
|
||||||
|
assigned = sched.assignContainer(node);
|
||||||
|
if (Resources.greaterThan(assigned, Resources.none())) {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Resources.none(); // We should never get here
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise, chose app to schedule based on given policy.
|
|
||||||
else {
|
|
||||||
Comparator<Schedulable> comparator = schedulingMode.getComparator();
|
|
||||||
|
|
||||||
Collections.sort(appScheds, comparator);
|
|
||||||
for (AppSchedulable sched: appScheds) {
|
|
||||||
if (sched.getRunnable()) {
|
|
||||||
Resource assignedResource = sched.assignContainer(node, reserved);
|
|
||||||
if (!assignedResource.equals(Resources.none())) {
|
|
||||||
return assignedResource;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return Resources.none();
|
|
||||||
}
|
}
|
||||||
|
return assigned;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -33,7 +34,6 @@ public class FSParentQueue extends FSQueue {
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
FSParentQueue.class.getName());
|
FSParentQueue.class.getName());
|
||||||
|
|
||||||
|
|
||||||
private final List<FSQueue> childQueues =
|
private final List<FSQueue> childQueues =
|
||||||
new ArrayList<FSQueue>();
|
new ArrayList<FSQueue>();
|
||||||
private final QueueManager queueMgr;
|
private final QueueManager queueMgr;
|
||||||
|
@ -50,11 +50,11 @@ public class FSParentQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recomputeFairShares() {
|
public void recomputeShares() {
|
||||||
SchedulingMode.getDefault().computeShares(childQueues, getFairShare());
|
policy.computeShares(childQueues, getFairShare());
|
||||||
for (FSQueue childQueue : childQueues) {
|
for (FSQueue childQueue : childQueues) {
|
||||||
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
|
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
|
||||||
childQueue.recomputeFairShares();
|
childQueue.recomputeShares();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,13 +131,41 @@ public class FSParentQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
public Resource assignContainer(FSSchedulerNode node) {
|
||||||
throw new IllegalStateException(
|
Resource assigned = Resources.none();
|
||||||
"Parent queue should not be assigned container");
|
|
||||||
|
// If this queue is over its limit, reject
|
||||||
|
if (Resources.greaterThan(getResourceUsage(),
|
||||||
|
queueMgr.getMaxResources(getName()))) {
|
||||||
|
return assigned;
|
||||||
|
}
|
||||||
|
|
||||||
|
Collections.sort(childQueues, policy.getComparator());
|
||||||
|
for (FSQueue child : childQueues) {
|
||||||
|
assigned = child.assignContainer(node);
|
||||||
|
if (node.getReservedContainer() != null
|
||||||
|
|| Resources.greaterThan(assigned, Resources.none())) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return assigned;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<FSQueue> getChildQueues() {
|
public Collection<FSQueue> getChildQueues() {
|
||||||
return childQueues;
|
return childQueues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPolicy(SchedulingPolicy policy)
|
||||||
|
throws AllocationConfigurationException {
|
||||||
|
boolean allowed =
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, (this == queueMgr
|
||||||
|
.getRootQueue()) ? SchedulingPolicy.DEPTH_ROOT
|
||||||
|
: SchedulingPolicy.DEPTH_INTERMEDIATE);
|
||||||
|
if (!allowed) {
|
||||||
|
throwPolicyDoesnotApplyException(policy);
|
||||||
|
}
|
||||||
|
super.policy = policy;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,8 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
||||||
protected final RecordFactory recordFactory =
|
protected final RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
|
protected SchedulingPolicy policy = SchedulingPolicy.getDefault();
|
||||||
|
|
||||||
public FSQueue(String name, QueueManager queueMgr,
|
public FSQueue(String name, QueueManager queueMgr,
|
||||||
FairScheduler scheduler, FSParentQueue parent) {
|
FairScheduler scheduler, FSParentQueue parent) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
@ -63,6 +65,19 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SchedulingPolicy getPolicy() {
|
||||||
|
return policy;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
|
||||||
|
throws AllocationConfigurationException {
|
||||||
|
throw new AllocationConfigurationException("SchedulingPolicy " + policy
|
||||||
|
+ " does not apply to queue " + getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void setPolicy(SchedulingPolicy policy)
|
||||||
|
throws AllocationConfigurationException;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double getWeight() {
|
public double getWeight() {
|
||||||
return queueMgr.getQueueWeight(getName());
|
return queueMgr.getQueueWeight(getName());
|
||||||
|
@ -130,13 +145,27 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recomputes the fair shares for all queues and applications
|
* Recomputes the shares for all child queues and applications based on this
|
||||||
* under this queue.
|
* queue's current share
|
||||||
*/
|
*/
|
||||||
public abstract void recomputeFairShares();
|
public abstract void recomputeShares();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the children of this queue, if any.
|
* Gets the children of this queue, if any.
|
||||||
*/
|
*/
|
||||||
public abstract Collection<FSQueue> getChildQueues();
|
public abstract Collection<FSQueue> getChildQueues();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to check if the queue should attempt assigning resources
|
||||||
|
*
|
||||||
|
* @return true if check passes (can assign) or false otherwise
|
||||||
|
*/
|
||||||
|
protected boolean assignContainerPreCheck(FSSchedulerNode node) {
|
||||||
|
if (Resources.greaterThan(getResourceUsage(),
|
||||||
|
queueMgr.getMaxResources(getName()))
|
||||||
|
|| node.getReservedContainer() != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
private volatile int numContainers;
|
private volatile int numContainers;
|
||||||
|
|
||||||
private RMContainer reservedContainer;
|
private RMContainer reservedContainer;
|
||||||
|
private AppSchedulable reservedAppSchedulable;
|
||||||
|
|
||||||
/* set of containers that are allocated containers */
|
/* set of containers that are allocated containers */
|
||||||
private final Map<ContainerId, RMContainer> launchedContainers =
|
private final Map<ContainerId, RMContainer> launchedContainers =
|
||||||
|
@ -221,6 +222,7 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
" on node " + this + " for application " + application);
|
" on node " + this + " for application " + application);
|
||||||
}
|
}
|
||||||
this.reservedContainer = reservedContainer;
|
this.reservedContainer = reservedContainer;
|
||||||
|
this.reservedAppSchedulable = application.getAppSchedulable();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void unreserveResource(
|
public synchronized void unreserveResource(
|
||||||
|
@ -237,11 +239,15 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
" on node " + this);
|
" on node " + this);
|
||||||
}
|
}
|
||||||
|
|
||||||
reservedContainer = null;
|
this.reservedContainer = null;
|
||||||
|
this.reservedAppSchedulable = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized RMContainer getReservedContainer() {
|
public synchronized RMContainer getReservedContainer() {
|
||||||
return reservedContainer;
|
return reservedContainer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized AppSchedulable getReservedAppSchedulable() {
|
||||||
|
return reservedAppSchedulable;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,7 +161,7 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
protected boolean assignMultiple; // Allocate multiple containers per
|
protected boolean assignMultiple; // Allocate multiple containers per
|
||||||
// heartbeat
|
// heartbeat
|
||||||
protected int maxAssign; // Max containers to assign per heartbeat
|
protected int maxAssign; // Max containers to assign per heartbeat
|
||||||
|
|
||||||
public FairScheduler() {
|
public FairScheduler() {
|
||||||
clock = new SystemClock();
|
clock = new SystemClock();
|
||||||
queueMgr = new QueueManager(this);
|
queueMgr = new QueueManager(this);
|
||||||
|
@ -217,7 +217,7 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
rootQueue.setFairShare(clusterCapacity);
|
rootQueue.setFairShare(clusterCapacity);
|
||||||
// Recursively compute fair shares for all queues
|
// Recursively compute fair shares for all queues
|
||||||
// and update metrics
|
// and update metrics
|
||||||
rootQueue.recomputeFairShares();
|
rootQueue.recomputeShares();
|
||||||
|
|
||||||
// Update recorded capacity of root queue (child queues are updated
|
// Update recorded capacity of root queue (child queues are updated
|
||||||
// when fair share is calculated).
|
// when fair share is calculated).
|
||||||
|
@ -786,39 +786,24 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
// 1. Check for reserved applications
|
// 1. Check for reserved applications
|
||||||
// 2. Schedule if there are no reservations
|
// 2. Schedule if there are no reservations
|
||||||
|
|
||||||
// If we have have an application that has reserved a resource on this node
|
AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
|
||||||
// already, we try to complete the reservation.
|
if (reservedAppSchedulable != null) {
|
||||||
RMContainer reservedContainer = node.getReservedContainer();
|
// Reservation exists; try to fulfill the reservation
|
||||||
if (reservedContainer != null) {
|
LOG.info("Trying to fulfill reservation for application "
|
||||||
FSSchedulerApp reservedApplication =
|
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
|
||||||
applications.get(reservedContainer.getApplicationAttemptId());
|
+ " on node: " + nm);
|
||||||
|
|
||||||
// Try to fulfill the reservation
|
node.getReservedAppSchedulable().assignReservedContainer(node);
|
||||||
LOG.info("Trying to fulfill reservation for application " +
|
|
||||||
reservedApplication.getApplicationId() + " on node: " + nm);
|
|
||||||
|
|
||||||
FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
|
|
||||||
queue.assignContainer(node, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, schedule at queue which is furthest below fair share
|
|
||||||
else {
|
else {
|
||||||
|
// No reservation, schedule at queue which is farthest below fair share
|
||||||
int assignedContainers = 0;
|
int assignedContainers = 0;
|
||||||
while (node.getReservedContainer() == null) {
|
while (node.getReservedContainer() == null) {
|
||||||
// At most one task is scheduled each iteration of this loop
|
|
||||||
List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
|
|
||||||
queueMgr.getLeafQueues());
|
|
||||||
Collections.sort(scheds, SchedulingMode.getDefault().getComparator());
|
|
||||||
boolean assignedContainer = false;
|
boolean assignedContainer = false;
|
||||||
for (FSLeafQueue sched : scheds) {
|
if (Resources.greaterThan(
|
||||||
Resource assigned = sched.assignContainer(node, false);
|
queueMgr.getRootQueue().assignContainer(node),
|
||||||
if (Resources.greaterThan(assigned, Resources.none()) ||
|
Resources.none())) {
|
||||||
node.getReservedContainer() != null) {
|
assignedContainer = true;
|
||||||
eventLog.log("ASSIGN", nm.getHostName(), assigned);
|
|
||||||
assignedContainers++;
|
|
||||||
assignedContainer = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (!assignedContainer) { break; }
|
if (!assignedContainer) { break; }
|
||||||
if (!assignMultiple) { break; }
|
if (!assignMultiple) { break; }
|
||||||
|
|
|
@ -143,7 +143,6 @@ public class QueueManager {
|
||||||
if (leafQueue == null) {
|
if (leafQueue == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
leafQueue.setSchedulingMode(info.defaultSchedulingMode);
|
|
||||||
queue = leafQueue;
|
queue = leafQueue;
|
||||||
} else if (queue instanceof FSParentQueue) {
|
} else if (queue instanceof FSParentQueue) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -302,7 +301,7 @@ public class QueueManager {
|
||||||
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
|
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
|
||||||
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
|
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
|
||||||
Map<String, Double> queueWeights = new HashMap<String, Double>();
|
Map<String, Double> queueWeights = new HashMap<String, Double>();
|
||||||
Map<String, SchedulingMode> queueModes = new HashMap<String, SchedulingMode>();
|
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
|
||||||
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
|
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
||||||
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
||||||
|
@ -310,7 +309,7 @@ public class QueueManager {
|
||||||
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
||||||
long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
||||||
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
||||||
SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault();
|
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
|
||||||
|
|
||||||
// Remember all queue names so we can display them on web UI, etc.
|
// Remember all queue names so we can display them on web UI, etc.
|
||||||
List<String> queueNamesInAllocFile = new ArrayList<String>();
|
List<String> queueNamesInAllocFile = new ArrayList<String>();
|
||||||
|
@ -339,7 +338,7 @@ public class QueueManager {
|
||||||
if ("queue".equals(element.getTagName()) ||
|
if ("queue".equals(element.getTagName()) ||
|
||||||
"pool".equals(element.getTagName())) {
|
"pool".equals(element.getTagName())) {
|
||||||
loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
|
loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
|
||||||
userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
|
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
|
||||||
queueAcls, queueNamesInAllocFile);
|
queueAcls, queueNamesInAllocFile);
|
||||||
} else if ("user".equals(element.getTagName())) {
|
} else if ("user".equals(element.getTagName())) {
|
||||||
String userName = element.getAttribute("name");
|
String userName = element.getAttribute("name");
|
||||||
|
@ -370,11 +369,12 @@ public class QueueManager {
|
||||||
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
|
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
|
||||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
int val = Integer.parseInt(text);
|
int val = Integer.parseInt(text);
|
||||||
queueMaxAppsDefault = val;}
|
queueMaxAppsDefault = val;
|
||||||
else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
|
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|
||||||
|
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
|
||||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
SchedulingMode.setDefault(text);
|
SchedulingPolicy.setDefault(text);
|
||||||
defaultSchedulingMode = SchedulingMode.getDefault();
|
defaultSchedPolicy = SchedulingPolicy.getDefault();
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
||||||
}
|
}
|
||||||
|
@ -385,7 +385,7 @@ public class QueueManager {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
info = new QueueManagerInfo(minQueueResources, maxQueueResources,
|
info = new QueueManagerInfo(minQueueResources, maxQueueResources,
|
||||||
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
|
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
|
||||||
queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
|
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||||
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
|
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
|
||||||
|
|
||||||
// Root queue should have empty ACLs. As a queue's ACL is the union of
|
// Root queue should have empty ACLs. As a queue's ACL is the union of
|
||||||
|
@ -396,14 +396,15 @@ public class QueueManager {
|
||||||
rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" "));
|
rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" "));
|
||||||
rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" "));
|
rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" "));
|
||||||
queueAcls.put(ROOT_QUEUE, rootAcls);
|
queueAcls.put(ROOT_QUEUE, rootAcls);
|
||||||
|
|
||||||
|
// Create all queus
|
||||||
for (String name: queueNamesInAllocFile) {
|
for (String name: queueNamesInAllocFile) {
|
||||||
FSLeafQueue queue = getLeafQueue(name);
|
getLeafQueue(name);
|
||||||
if (queueModes.containsKey(name)) {
|
}
|
||||||
queue.setSchedulingMode(queueModes.get(name));
|
|
||||||
} else {
|
// Set custom policies as specified
|
||||||
queue.setSchedulingMode(defaultSchedulingMode);
|
for (Map.Entry<String, SchedulingPolicy> entry : queuePolicies.entrySet()) {
|
||||||
}
|
queues.get(entry.getKey()).setPolicy(entry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -414,7 +415,8 @@ public class QueueManager {
|
||||||
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
|
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
|
||||||
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
|
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
|
||||||
Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
|
Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
|
||||||
Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
|
Map<String, SchedulingPolicy> queuePolicies,
|
||||||
|
Map<String, Long> minSharePreemptionTimeouts,
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
|
||||||
throws AllocationConfigurationException {
|
throws AllocationConfigurationException {
|
||||||
String queueName = parentName + "." + element.getAttribute("name");
|
String queueName = parentName + "." + element.getAttribute("name");
|
||||||
|
@ -448,9 +450,10 @@ public class QueueManager {
|
||||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||||
long val = Long.parseLong(text) * 1000L;
|
long val = Long.parseLong(text) * 1000L;
|
||||||
minSharePreemptionTimeouts.put(queueName, val);
|
minSharePreemptionTimeouts.put(queueName, val);
|
||||||
} else if ("schedulingMode".equals(field.getTagName())) {
|
} else if ("schedulingPolicy".equals(field.getTagName())
|
||||||
|
|| "schedulingMode".equals(field.getTagName())) {
|
||||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||||
queueModes.put(queueName, SchedulingMode.parse(text));
|
queuePolicies.put(queueName, SchedulingPolicy.parse(text));
|
||||||
} else if ("aclSubmitApps".equals(field.getTagName())) {
|
} else if ("aclSubmitApps".equals(field.getTagName())) {
|
||||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||||
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
|
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
|
||||||
|
@ -459,8 +462,9 @@ public class QueueManager {
|
||||||
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
|
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
|
||||||
} else if ("queue".endsWith(field.getTagName()) ||
|
} else if ("queue".endsWith(field.getTagName()) ||
|
||||||
"pool".equals(field.getTagName())) {
|
"pool".equals(field.getTagName())) {
|
||||||
loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
|
loadQueue(queueName, field, minQueueResources, maxQueueResources,
|
||||||
userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
|
queueMaxApps, userMaxApps, queueWeights, queuePolicies,
|
||||||
|
minSharePreemptionTimeouts,
|
||||||
queueAcls, queueNamesInAllocFile);
|
queueAcls, queueNamesInAllocFile);
|
||||||
isLeaf = false;
|
isLeaf = false;
|
||||||
}
|
}
|
||||||
|
@ -615,13 +619,13 @@ public class QueueManager {
|
||||||
// below half its fair share for this long, it is allowed to preempt tasks.
|
// below half its fair share for this long, it is allowed to preempt tasks.
|
||||||
public final long fairSharePreemptionTimeout;
|
public final long fairSharePreemptionTimeout;
|
||||||
|
|
||||||
public final SchedulingMode defaultSchedulingMode;
|
public final SchedulingPolicy defaultSchedulingPolicy;
|
||||||
|
|
||||||
public QueueManagerInfo(Map<String, Resource> minQueueResources,
|
public QueueManagerInfo(Map<String, Resource> minQueueResources,
|
||||||
Map<String, Resource> maxQueueResources,
|
Map<String, Resource> maxQueueResources,
|
||||||
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
||||||
Map<String, Double> queueWeights, int userMaxAppsDefault,
|
Map<String, Double> queueWeights, int userMaxAppsDefault,
|
||||||
int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode,
|
int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy,
|
||||||
Map<String, Long> minSharePreemptionTimeouts,
|
Map<String, Long> minSharePreemptionTimeouts,
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||||
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
|
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
|
||||||
|
@ -632,7 +636,7 @@ public class QueueManager {
|
||||||
this.queueWeights = queueWeights;
|
this.queueWeights = queueWeights;
|
||||||
this.userMaxAppsDefault = userMaxAppsDefault;
|
this.userMaxAppsDefault = userMaxAppsDefault;
|
||||||
this.queueMaxAppsDefault = queueMaxAppsDefault;
|
this.queueMaxAppsDefault = queueMaxAppsDefault;
|
||||||
this.defaultSchedulingMode = defaultSchedulingMode;
|
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
|
||||||
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
|
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
|
||||||
this.queueAcls = queueAcls;
|
this.queueAcls = queueAcls;
|
||||||
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
|
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
|
||||||
|
@ -651,7 +655,7 @@ public class QueueManager {
|
||||||
minSharePreemptionTimeouts = new HashMap<String, Long>();
|
minSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||||
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
||||||
fairSharePreemptionTimeout = Long.MAX_VALUE;
|
fairSharePreemptionTimeout = Long.MAX_VALUE;
|
||||||
defaultSchedulingMode = SchedulingMode.getDefault();
|
defaultSchedulingPolicy = SchedulingPolicy.getDefault();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,11 +93,9 @@ public abstract class Schedulable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assign a container on this node if possible, and return the amount of
|
* Assign a container on this node if possible, and return the amount of
|
||||||
* resources assigned. If {@code reserved} is true, it means a reservation
|
* resources assigned.
|
||||||
* already exists on this node, and the schedulable should fulfill that
|
|
||||||
* reservation if possible.
|
|
||||||
*/
|
*/
|
||||||
public abstract Resource assignContainer(FSSchedulerNode node, boolean reserved);
|
public abstract Resource assignContainer(FSSchedulerNode node);
|
||||||
|
|
||||||
/** Assign a fair share to this Schedulable. */
|
/** Assign a fair share to this Schedulable. */
|
||||||
public void setFairShare(Resource fairShare) {
|
public void setFairShare(Resource fairShare) {
|
||||||
|
|
|
@ -1,118 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
|
|
||||||
|
|
||||||
@Public
|
|
||||||
@Unstable
|
|
||||||
public abstract class SchedulingMode {
|
|
||||||
private static final ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode> instances =
|
|
||||||
new ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode>();
|
|
||||||
|
|
||||||
private static SchedulingMode DEFAULT_MODE =
|
|
||||||
getInstance(FairSchedulingMode.class);
|
|
||||||
|
|
||||||
public static SchedulingMode getDefault() {
|
|
||||||
return DEFAULT_MODE;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void setDefault(String className)
|
|
||||||
throws AllocationConfigurationException {
|
|
||||||
DEFAULT_MODE = parse(className);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a {@link SchedulingMode} instance corresponding to the passed clazz
|
|
||||||
*/
|
|
||||||
public static SchedulingMode getInstance(Class<? extends SchedulingMode> clazz) {
|
|
||||||
SchedulingMode mode = instances.get(clazz);
|
|
||||||
if (mode == null) {
|
|
||||||
mode = ReflectionUtils.newInstance(clazz, null);
|
|
||||||
instances.put(clazz, mode);
|
|
||||||
}
|
|
||||||
return mode;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns {@link SchedulingMode} instance corresponding to the
|
|
||||||
* {@link SchedulingMode} passed as a string. The mode can be "fair" for
|
|
||||||
* FairSchedulingMode of "fifo" for FifoSchedulingMode. For custom
|
|
||||||
* {@link SchedulingMode}s in the RM classpath, the mode should be canonical
|
|
||||||
* class name of the {@link SchedulingMode}.
|
|
||||||
*
|
|
||||||
* @param mode canonical class name or "fair" or "fifo"
|
|
||||||
* @throws AllocationConfigurationException
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public static SchedulingMode parse(String mode)
|
|
||||||
throws AllocationConfigurationException {
|
|
||||||
@SuppressWarnings("rawtypes")
|
|
||||||
Class clazz;
|
|
||||||
String text = mode.toLowerCase();
|
|
||||||
if (text.equals("fair")) {
|
|
||||||
clazz = FairSchedulingMode.class;
|
|
||||||
} else if (text.equals("fifo")) {
|
|
||||||
clazz = FifoSchedulingMode.class;
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
clazz = Class.forName(mode);
|
|
||||||
} catch (ClassNotFoundException cnfe) {
|
|
||||||
throw new AllocationConfigurationException(mode
|
|
||||||
+ " SchedulingMode class not found!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!SchedulingMode.class.isAssignableFrom(clazz)) {
|
|
||||||
throw new AllocationConfigurationException(mode
|
|
||||||
+ " does not extend SchedulingMode");
|
|
||||||
}
|
|
||||||
return getInstance(clazz);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return returns the name of SchedulingMode
|
|
||||||
*/
|
|
||||||
public abstract String getName();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The comparator returned by this method is to be used for sorting the
|
|
||||||
* {@link Schedulable}s in that queue.
|
|
||||||
*
|
|
||||||
* @return the comparator to sort by
|
|
||||||
*/
|
|
||||||
public abstract Comparator<Schedulable> getComparator();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Computes and updates the shares of {@link Schedulable}s as per the
|
|
||||||
* SchedulingMode, to be used later at schedule time.
|
|
||||||
*
|
|
||||||
* @param schedulables {@link Schedulable}s whose shares are to be updated
|
|
||||||
* @param totalResources Total {@link Resource}s in the cluster
|
|
||||||
*/
|
|
||||||
public abstract void computeShares(
|
|
||||||
Collection<? extends Schedulable> schedulables, Resource totalResources);
|
|
||||||
}
|
|
|
@ -0,0 +1,145 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract class SchedulingPolicy {
|
||||||
|
private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
|
||||||
|
new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();
|
||||||
|
|
||||||
|
private static SchedulingPolicy DEFAULT_POLICY =
|
||||||
|
getInstance(FairSharePolicy.class);
|
||||||
|
|
||||||
|
public static final byte DEPTH_LEAF = (byte) 1;
|
||||||
|
public static final byte DEPTH_INTERMEDIATE = (byte) 2;
|
||||||
|
public static final byte DEPTH_ROOT = (byte) 4;
|
||||||
|
public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate
|
||||||
|
public static final byte DEPTH_ANY = (byte) 7;
|
||||||
|
|
||||||
|
public static SchedulingPolicy getDefault() {
|
||||||
|
return DEFAULT_POLICY;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setDefault(String className)
|
||||||
|
throws AllocationConfigurationException {
|
||||||
|
DEFAULT_POLICY = parse(className);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz
|
||||||
|
*/
|
||||||
|
public static SchedulingPolicy getInstance(Class<? extends SchedulingPolicy> clazz) {
|
||||||
|
SchedulingPolicy policy = instances.get(clazz);
|
||||||
|
if (policy == null) {
|
||||||
|
policy = ReflectionUtils.newInstance(clazz, null);
|
||||||
|
instances.put(clazz, policy);
|
||||||
|
}
|
||||||
|
return policy;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@link SchedulingPolicy} instance corresponding to the
|
||||||
|
* {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
|
||||||
|
* FairsharePolicy or "fifo" for FifoPolicy. For custom
|
||||||
|
* {@link SchedulingPolicy}s in the RM classpath, the policy should be
|
||||||
|
* canonical class name of the {@link SchedulingPolicy}.
|
||||||
|
*
|
||||||
|
* @param policy canonical class name or "fair" or "fifo"
|
||||||
|
* @throws AllocationConfigurationException
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static SchedulingPolicy parse(String policy)
|
||||||
|
throws AllocationConfigurationException {
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
Class clazz;
|
||||||
|
String text = policy.toLowerCase();
|
||||||
|
if (text.equals("fair")) {
|
||||||
|
clazz = FairSharePolicy.class;
|
||||||
|
} else if (text.equals("fifo")) {
|
||||||
|
clazz = FifoPolicy.class;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
clazz = Class.forName(policy);
|
||||||
|
} catch (ClassNotFoundException cnfe) {
|
||||||
|
throw new AllocationConfigurationException(policy
|
||||||
|
+ " SchedulingPolicy class not found!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!SchedulingPolicy.class.isAssignableFrom(clazz)) {
|
||||||
|
throw new AllocationConfigurationException(policy
|
||||||
|
+ " does not extend SchedulingPolicy");
|
||||||
|
}
|
||||||
|
return getInstance(clazz);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return returns the name of {@link SchedulingPolicy}
|
||||||
|
*/
|
||||||
|
public abstract String getName();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies the depths in the hierarchy, this {@link SchedulingPolicy}
|
||||||
|
* applies to
|
||||||
|
*
|
||||||
|
* @return depth equal to one of fields {@link SchedulingPolicy}#DEPTH_*
|
||||||
|
*/
|
||||||
|
public abstract byte getApplicableDepth();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the specified {@link SchedulingPolicy} can be used for a queue at
|
||||||
|
* the specified depth in the hierarchy
|
||||||
|
*
|
||||||
|
* @param policy {@link SchedulingPolicy} we are checking the
|
||||||
|
* depth-applicability for
|
||||||
|
* @param depth queue's depth in the hierarchy
|
||||||
|
* @return true if policy is applicable to passed depth, false otherwise
|
||||||
|
*/
|
||||||
|
public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) {
|
||||||
|
return ((policy.getApplicableDepth() & depth) == depth) ? true : false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The comparator returned by this method is to be used for sorting the
|
||||||
|
* {@link Schedulable}s in that queue.
|
||||||
|
*
|
||||||
|
* @return the comparator to sort by
|
||||||
|
*/
|
||||||
|
public abstract Comparator<Schedulable> getComparator();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes and updates the shares of {@link Schedulable}s as per the
|
||||||
|
* {@link SchedulingPolicy}, to be used later at schedule time.
|
||||||
|
*
|
||||||
|
* @param schedulables {@link Schedulable}s whose shares are to be updated
|
||||||
|
* @param totalResources Total {@link Resource}s in the cluster
|
||||||
|
*/
|
||||||
|
public abstract void computeShares(
|
||||||
|
Collection<? extends Schedulable> schedulables, Resource totalResources);
|
||||||
|
}
|
|
@ -15,7 +15,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -24,13 +24,13 @@ import java.util.Comparator;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
public class FairSchedulingMode extends SchedulingMode {
|
public class FairSharePolicy extends SchedulingPolicy {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final String NAME = "FairShare";
|
public static final String NAME = "Fairshare";
|
||||||
private FairShareComparator comparator = new FairShareComparator();
|
private FairShareComparator comparator = new FairShareComparator();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -211,4 +211,9 @@ public class FairSchedulingMode extends SchedulingMode {
|
||||||
share = Math.min(share, sched.getDemand().getMemory());
|
share = Math.min(share, sched.getDemand().getMemory());
|
||||||
return Resources.createResource((int) share);
|
return Resources.createResource((int) share);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte getApplicableDepth() {
|
||||||
|
return SchedulingPolicy.DEPTH_ANY;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -15,7 +15,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -24,11 +24,11 @@ import java.util.Comparator;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
public class FifoSchedulingMode extends SchedulingMode {
|
public class FifoPolicy extends SchedulingPolicy {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final String NAME = "FIFO";
|
public static final String NAME = "FIFO";
|
||||||
private FifoComparator comparator = new FifoComparator();
|
private FifoComparator comparator = new FifoComparator();
|
||||||
|
@ -73,4 +73,9 @@ public class FifoSchedulingMode extends SchedulingMode {
|
||||||
sched.setFairShare(Resources.createResource(0));
|
sched.setFairShare(Resources.createResource(0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte getApplicableDepth() {
|
||||||
|
return SchedulingPolicy.DEPTH_LEAF;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -68,7 +68,7 @@ public class FakeSchedulable extends Schedulable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
public Resource assignContainer(FSSchedulerNode node) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ import java.util.List;
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -33,12 +33,12 @@ import org.junit.Test;
|
||||||
*/
|
*/
|
||||||
public class TestComputeFairShares {
|
public class TestComputeFairShares {
|
||||||
private List<Schedulable> scheds;
|
private List<Schedulable> scheds;
|
||||||
private SchedulingMode schedulingMode;
|
private SchedulingPolicy schedulingMode;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
scheds = new ArrayList<Schedulable>();
|
scheds = new ArrayList<Schedulable>();
|
||||||
schedulingMode = new FairSchedulingMode();
|
schedulingMode = new FairSharePolicy();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -73,7 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -284,7 +284,7 @@ public class TestFairScheduler {
|
||||||
assertEquals(capacity / 4, queue2.getFairShare().getMemory());
|
assertEquals(capacity / 4, queue2.getFairShare().getMemory());
|
||||||
assertEquals(capacity / 4, queue3.getFairShare().getMemory());
|
assertEquals(capacity / 4, queue3.getFairShare().getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHierarchicalQueuesSimilarParents() {
|
public void testHierarchicalQueuesSimilarParents() {
|
||||||
QueueManager queueManager = scheduler.getQueueManager();
|
QueueManager queueManager = scheduler.getQueueManager();
|
||||||
|
@ -1359,7 +1359,7 @@ public class TestFairScheduler {
|
||||||
FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
||||||
|
|
||||||
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1");
|
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1");
|
||||||
queue1.setSchedulingMode(new FifoSchedulingMode());
|
queue1.setPolicy(new FifoPolicy());
|
||||||
|
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
|
|
||||||
|
@ -1381,7 +1381,80 @@ public class TestFairScheduler {
|
||||||
assertEquals(2, app1.getLiveContainers().size());
|
assertEquals(2, app1.getLiveContainers().size());
|
||||||
assertEquals(1, app2.getLiveContainers().size());
|
assertEquals(1, app2.getLiveContainers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify the behavior of
|
||||||
|
* {@link FSQueue#assignContainer(FSSchedulerNode)})
|
||||||
|
*
|
||||||
|
* Create two queues under root (fifoQueue and fairParent), and two queues
|
||||||
|
* under fairParent (fairChild1 and fairChild2). Submit two apps to the
|
||||||
|
* fifoQueue and one each to the fairChild* queues, all apps requiring 4
|
||||||
|
* containers each of the total 16 container capacity
|
||||||
|
*
|
||||||
|
* Assert the number of containers for each app after 4, 8, 12 and 16 updates.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void testAssignContainer() throws Exception {
|
||||||
|
final String user = "user1";
|
||||||
|
final String fifoQueue = "fifo";
|
||||||
|
final String fairParent = "fairParent";
|
||||||
|
final String fairChild1 = fairParent + ".fairChild1";
|
||||||
|
final String fairChild2 = fairParent + ".fairChild2";
|
||||||
|
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
|
||||||
|
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
|
||||||
|
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
|
ApplicationAttemptId attId1 =
|
||||||
|
createSchedulingRequest(1024, fifoQueue, user, 4);
|
||||||
|
ApplicationAttemptId attId2 =
|
||||||
|
createSchedulingRequest(1024, fairChild1, user, 4);
|
||||||
|
ApplicationAttemptId attId3 =
|
||||||
|
createSchedulingRequest(1024, fairChild2, user, 4);
|
||||||
|
ApplicationAttemptId attId4 =
|
||||||
|
createSchedulingRequest(1024, fifoQueue, user, 4);
|
||||||
|
|
||||||
|
FSSchedulerApp app1 = scheduler.applications.get(attId1);
|
||||||
|
FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
||||||
|
FSSchedulerApp app3 = scheduler.applications.get(attId3);
|
||||||
|
FSSchedulerApp app4 = scheduler.applications.get(attId4);
|
||||||
|
|
||||||
|
scheduler.getQueueManager().getLeafQueue(fifoQueue)
|
||||||
|
.setPolicy(SchedulingPolicy.parse("fifo"));
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
|
||||||
|
|
||||||
|
for (int i = 0; i < 8; i++) {
|
||||||
|
scheduler.handle(updateEvent1);
|
||||||
|
scheduler.handle(updateEvent2);
|
||||||
|
if ((i + 1) % 2 == 0) {
|
||||||
|
// 4 node updates: fifoQueue should have received 2, and fairChild*
|
||||||
|
// should have received one each
|
||||||
|
String ERR =
|
||||||
|
"Wrong number of assigned containers after " + (i + 1) + " updates";
|
||||||
|
if (i < 4) {
|
||||||
|
// app1 req still not met
|
||||||
|
assertEquals(ERR, (i + 1), app1.getLiveContainers().size());
|
||||||
|
assertEquals(ERR, 0, app4.getLiveContainers().size());
|
||||||
|
} else {
|
||||||
|
// app1 req has been met, app4 should be served now
|
||||||
|
assertEquals(ERR, 4, app1.getLiveContainers().size());
|
||||||
|
assertEquals(ERR, (i - 3), app4.getLiveContainers().size());
|
||||||
|
}
|
||||||
|
assertEquals(ERR, (i + 1) / 2, app2.getLiveContainers().size());
|
||||||
|
assertEquals(ERR, (i + 1) / 2, app3.getLiveContainers().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -1,59 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class TestSchedulingMode {
|
|
||||||
|
|
||||||
@Test(timeout = 1000)
|
|
||||||
public void testParseSchedulingMode() throws AllocationConfigurationException {
|
|
||||||
|
|
||||||
// Class name
|
|
||||||
SchedulingMode sm = SchedulingMode
|
|
||||||
.parse(FairSchedulingMode.class.getName());
|
|
||||||
assertTrue("Invalid scheduler name",
|
|
||||||
sm.getName().equals(FairSchedulingMode.NAME));
|
|
||||||
|
|
||||||
// Canonical name
|
|
||||||
sm = SchedulingMode.parse(FairSchedulingMode.class
|
|
||||||
.getCanonicalName());
|
|
||||||
assertTrue("Invalid scheduler name",
|
|
||||||
sm.getName().equals(FairSchedulingMode.NAME));
|
|
||||||
|
|
||||||
// Class
|
|
||||||
sm = SchedulingMode.getInstance(FairSchedulingMode.class);
|
|
||||||
assertTrue("Invalid scheduler name",
|
|
||||||
sm.getName().equals(FairSchedulingMode.NAME));
|
|
||||||
|
|
||||||
// Shortname - fair
|
|
||||||
sm = SchedulingMode.parse("fair");
|
|
||||||
assertTrue("Invalid scheduler name",
|
|
||||||
sm.getName().equals(FairSchedulingMode.NAME));
|
|
||||||
|
|
||||||
// Shortname - fifo
|
|
||||||
sm = SchedulingMode.parse("fifo");
|
|
||||||
assertTrue("Invalid scheduler name",
|
|
||||||
sm.getName().equals(FifoSchedulingMode.NAME));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class TestSchedulingPolicy {
|
||||||
|
|
||||||
|
@Test(timeout = 1000)
|
||||||
|
public void testParseSchedulingPolicy()
|
||||||
|
throws AllocationConfigurationException {
|
||||||
|
|
||||||
|
// Class name
|
||||||
|
SchedulingPolicy sm = SchedulingPolicy
|
||||||
|
.parse(FairSharePolicy.class.getName());
|
||||||
|
assertTrue("Invalid scheduler name",
|
||||||
|
sm.getName().equals(FairSharePolicy.NAME));
|
||||||
|
|
||||||
|
// Canonical name
|
||||||
|
sm = SchedulingPolicy.parse(FairSharePolicy.class
|
||||||
|
.getCanonicalName());
|
||||||
|
assertTrue("Invalid scheduler name",
|
||||||
|
sm.getName().equals(FairSharePolicy.NAME));
|
||||||
|
|
||||||
|
// Class
|
||||||
|
sm = SchedulingPolicy.getInstance(FairSharePolicy.class);
|
||||||
|
assertTrue("Invalid scheduler name",
|
||||||
|
sm.getName().equals(FairSharePolicy.NAME));
|
||||||
|
|
||||||
|
// Shortname - fair
|
||||||
|
sm = SchedulingPolicy.parse("fair");
|
||||||
|
assertTrue("Invalid scheduler name",
|
||||||
|
sm.getName().equals(FairSharePolicy.NAME));
|
||||||
|
|
||||||
|
// Shortname - fifo
|
||||||
|
sm = SchedulingPolicy.parse("fifo");
|
||||||
|
assertTrue("Invalid scheduler name",
|
||||||
|
sm.getName().equals(FifoPolicy.NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trivial tests that make sure
|
||||||
|
* {@link SchedulingPolicy#isApplicableTo(SchedulingPolicy, byte)} works as
|
||||||
|
* expected for the possible values of depth
|
||||||
|
*
|
||||||
|
* @throws AllocationConfigurationException
|
||||||
|
*/
|
||||||
|
@Test(timeout = 1000)
|
||||||
|
public void testIsApplicableTo() throws AllocationConfigurationException {
|
||||||
|
final String ERR = "Broken SchedulingPolicy#isApplicableTo";
|
||||||
|
|
||||||
|
// fifo
|
||||||
|
SchedulingPolicy policy = SchedulingPolicy.parse("fifo");
|
||||||
|
assertTrue(ERR,
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
|
||||||
|
assertFalse(ERR, SchedulingPolicy.isApplicableTo(
|
||||||
|
SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_INTERMEDIATE));
|
||||||
|
assertFalse(ERR, SchedulingPolicy.isApplicableTo(
|
||||||
|
SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_ROOT));
|
||||||
|
|
||||||
|
|
||||||
|
// fair
|
||||||
|
policy = SchedulingPolicy.parse("fair");
|
||||||
|
assertTrue(ERR,
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
|
||||||
|
assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
|
||||||
|
SchedulingPolicy.DEPTH_INTERMEDIATE));
|
||||||
|
assertTrue(ERR,
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
|
||||||
|
assertTrue(ERR,
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
|
||||||
|
assertTrue(ERR,
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
||||||
|
|
||||||
|
policy = Mockito.mock(SchedulingPolicy.class);
|
||||||
|
Mockito.when(policy.getApplicableDepth()).thenReturn(
|
||||||
|
SchedulingPolicy.DEPTH_PARENT);
|
||||||
|
assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
|
||||||
|
SchedulingPolicy.DEPTH_INTERMEDIATE));
|
||||||
|
assertTrue(ERR,
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
|
||||||
|
assertTrue(ERR,
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
|
||||||
|
assertFalse(ERR,
|
||||||
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue