YARN-1707. Introduce APIs to add/remove/resize queues in the CapacityScheduler. Contributed by Carlo Curino and Subru Krishnan
(cherry picked from commitaac47fda7f
) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (cherry picked from commiteb3e40b833
) (cherry picked from commitcf5ef00b96
)
This commit is contained in:
parent
08ccd30063
commit
5ac08c5398
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||||
|
@ -220,6 +221,24 @@ public abstract class AbstractYarnScheduler
|
||||||
+ " does not support moving apps between queues");
|
+ " does not support moving apps between queues");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeQueue(String queueName) throws YarnException {
|
||||||
|
throw new YarnException(getClass().getSimpleName()
|
||||||
|
+ " does not support removing queues");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addQueue(Queue newQueue) throws YarnException {
|
||||||
|
throw new YarnException(getClass().getSimpleName()
|
||||||
|
+ " does not support this operation");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setEntitlement(String queue, QueueEntitlement entitlement)
|
||||||
|
throws YarnException {
|
||||||
|
throw new YarnException(getClass().getSimpleName()
|
||||||
|
+ " does not support this operation");
|
||||||
|
}
|
||||||
|
|
||||||
private void killOrphanContainerOnNode(RMNode node,
|
private void killOrphanContainerOnNode(RMNode node,
|
||||||
NMContainerStatus container) {
|
NMContainerStatus container) {
|
||||||
if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
|
if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
|
||||||
|
@ -503,4 +522,10 @@ public abstract class AbstractYarnScheduler
|
||||||
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
|
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
|
||||||
return EnumSet.of(SchedulerResourceTypes.MEMORY);
|
return EnumSet.of(SchedulerResourceTypes.MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getPlanQueues() throws YarnException {
|
||||||
|
throw new YarnException(getClass().getSimpleName()
|
||||||
|
+ " does not support reservations");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
||||||
|
public class SchedulerDynamicEditException extends YarnException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 7100374511387193257L;
|
||||||
|
|
||||||
|
public SchedulerDynamicEditException(String string) {
|
||||||
|
super(string);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
|
|
||||||
|
@ -223,6 +225,46 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
||||||
*/
|
*/
|
||||||
void killAllAppsInQueue(String queueName) throws YarnException;
|
void killAllAppsInQueue(String queueName) throws YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove an existing queue. Implementations might limit when a queue could be
|
||||||
|
* removed (e.g., must have zero entitlement, and no applications running, or
|
||||||
|
* must be a leaf, etc..).
|
||||||
|
*
|
||||||
|
* @param queueName name of the queue to remove
|
||||||
|
* @throws YarnException
|
||||||
|
*/
|
||||||
|
void removeQueue(String queueName) throws YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add to the scheduler a new Queue. Implementations might limit what type of
|
||||||
|
* queues can be dynamically added (e.g., Queue must be a leaf, must be
|
||||||
|
* attached to existing parent, must have zero entitlement).
|
||||||
|
*
|
||||||
|
* @param newQueue the queue being added.
|
||||||
|
* @throws YarnException
|
||||||
|
*/
|
||||||
|
void addQueue(Queue newQueue) throws YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method increase the entitlement for current queue (must respect
|
||||||
|
* invariants, e.g., no overcommit of parents, non negative, etc.).
|
||||||
|
* Entitlement is a general term for weights in FairScheduler, capacity for
|
||||||
|
* the CapacityScheduler, etc.
|
||||||
|
*
|
||||||
|
* @param queue the queue for which we change entitlement
|
||||||
|
* @param entitlement the new entitlement for the queue (capacity,
|
||||||
|
* maxCapacity, etc..)
|
||||||
|
* @throws YarnException
|
||||||
|
*/
|
||||||
|
void setEntitlement(String queue, QueueEntitlement entitlement)
|
||||||
|
throws YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the list of names for queues managed by the Reservation System
|
||||||
|
* @return the list of queues which support reservations
|
||||||
|
*/
|
||||||
|
public Set<String> getPlanQueues() throws YarnException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a collection of the resource types that are considered when
|
* Return a collection of the resource types that are considered when
|
||||||
* scheduling
|
* scheduling
|
||||||
|
|
|
@ -24,6 +24,8 @@ import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -90,6 +92,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
|
|
||||||
@LimitedPrivate("yarn")
|
@LimitedPrivate("yarn")
|
||||||
@Evolving
|
@Evolving
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -473,9 +480,12 @@ public class CapacityScheduler extends
|
||||||
private void validateExistingQueues(
|
private void validateExistingQueues(
|
||||||
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
for (String queue : queues.keySet()) {
|
// check that all static queues are included in the newQueues list
|
||||||
if (!newQueues.containsKey(queue)) {
|
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
|
||||||
throw new IOException(queue + " cannot be found during refresh!");
|
if (!(e.getValue() instanceof ReservationQueue)) {
|
||||||
|
if (!newQueues.containsKey(e.getKey())) {
|
||||||
|
throw new IOException(e.getKey() + " cannot be found during refresh!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -507,26 +517,42 @@ public class CapacityScheduler extends
|
||||||
Map<String, CSQueue> oldQueues,
|
Map<String, CSQueue> oldQueues,
|
||||||
QueueHook hook) throws IOException {
|
QueueHook hook) throws IOException {
|
||||||
CSQueue queue;
|
CSQueue queue;
|
||||||
|
String fullQueueName =
|
||||||
|
(parent == null) ? queueName
|
||||||
|
: (parent.getQueuePath() + "." + queueName);
|
||||||
String[] childQueueNames =
|
String[] childQueueNames =
|
||||||
conf.getQueues((parent == null) ?
|
conf.getQueues(fullQueueName);
|
||||||
queueName : (parent.getQueuePath()+"."+queueName));
|
boolean isReservableQueue = conf.isReservableQueue(fullQueueName);
|
||||||
if (childQueueNames == null || childQueueNames.length == 0) {
|
if (childQueueNames == null || childQueueNames.length == 0) {
|
||||||
if (null == parent) {
|
if (null == parent) {
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
"Queue configuration missing child queue names for " + queueName);
|
"Queue configuration missing child queue names for " + queueName);
|
||||||
}
|
}
|
||||||
queue =
|
// Check if the queue will be dynamically managed by the Reservation
|
||||||
new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName));
|
// system
|
||||||
|
if (isReservableQueue) {
|
||||||
// Used only for unit tests
|
queue =
|
||||||
queue = hook.hook(queue);
|
new PlanQueue(csContext, queueName, parent,
|
||||||
|
oldQueues.get(queueName));
|
||||||
|
} else {
|
||||||
|
queue =
|
||||||
|
new LeafQueue(csContext, queueName, parent,
|
||||||
|
oldQueues.get(queueName));
|
||||||
|
|
||||||
|
// Used only for unit tests
|
||||||
|
queue = hook.hook(queue);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (isReservableQueue) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Only Leaf Queues can be reservable for " + queueName);
|
||||||
|
}
|
||||||
ParentQueue parentQueue =
|
ParentQueue parentQueue =
|
||||||
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
|
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
|
||||||
|
|
||||||
// Used only for unit tests
|
// Used only for unit tests
|
||||||
queue = hook.hook(parentQueue);
|
queue = hook.hook(parentQueue);
|
||||||
|
|
||||||
List<CSQueue> childQueues = new ArrayList<CSQueue>();
|
List<CSQueue> childQueues = new ArrayList<CSQueue>();
|
||||||
for (String childQueueName : childQueueNames) {
|
for (String childQueueName : childQueueNames) {
|
||||||
CSQueue childQueue =
|
CSQueue childQueue =
|
||||||
|
@ -548,7 +574,7 @@ public class CapacityScheduler extends
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized CSQueue getQueue(String queueName) {
|
public synchronized CSQueue getQueue(String queueName) {
|
||||||
if (queueName == null) {
|
if (queueName == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -716,7 +742,7 @@ public class CapacityScheduler extends
|
||||||
ApplicationAttemptId applicationAttemptId,
|
ApplicationAttemptId applicationAttemptId,
|
||||||
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
|
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
|
||||||
LOG.info("Application Attempt " + applicationAttemptId + " is done." +
|
LOG.info("Application Attempt " + applicationAttemptId + " is done." +
|
||||||
" finalState=" + rmAppAttemptFinalState);
|
" finalState=" + rmAppAttemptFinalState);
|
||||||
|
|
||||||
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
|
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
|
||||||
SchedulerApplication<FiCaSchedulerApp> application =
|
SchedulerApplication<FiCaSchedulerApp> application =
|
||||||
|
@ -996,9 +1022,16 @@ public class CapacityScheduler extends
|
||||||
case APP_ADDED:
|
case APP_ADDED:
|
||||||
{
|
{
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
||||||
addApplication(appAddedEvent.getApplicationId(),
|
String queueName =
|
||||||
appAddedEvent.getQueue(),
|
resolveReservationQueueName(appAddedEvent.getQueue(),
|
||||||
appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
|
appAddedEvent.getApplicationId(),
|
||||||
|
appAddedEvent.getReservationID());
|
||||||
|
if (queueName != null) {
|
||||||
|
addApplication(appAddedEvent.getApplicationId(),
|
||||||
|
queueName,
|
||||||
|
appAddedEvent.getUser(),
|
||||||
|
appAddedEvent.getIsAppRecovering());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case APP_REMOVED:
|
case APP_REMOVED:
|
||||||
|
@ -1231,6 +1264,123 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized String resolveReservationQueueName(String queueName,
|
||||||
|
ApplicationId applicationId, ReservationId reservationID) {
|
||||||
|
CSQueue queue = getQueue(queueName);
|
||||||
|
// Check if the queue is a plan queue
|
||||||
|
if ((queue == null) || !(queue instanceof PlanQueue)) {
|
||||||
|
return queueName;
|
||||||
|
}
|
||||||
|
if (reservationID != null) {
|
||||||
|
String resQName = reservationID.toString();
|
||||||
|
queue = getQueue(resQName);
|
||||||
|
if (queue == null) {
|
||||||
|
String message =
|
||||||
|
"Application "
|
||||||
|
+ applicationId
|
||||||
|
+ " submitted to a reservation which is not yet currently active: "
|
||||||
|
+ resQName;
|
||||||
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
|
.handle(new RMAppRejectedEvent(applicationId, message));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// use the reservation queue to run the app
|
||||||
|
queueName = resQName;
|
||||||
|
} else {
|
||||||
|
// use the default child queue of the plan for unreserved apps
|
||||||
|
queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
|
||||||
|
}
|
||||||
|
return queueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void removeQueue(String queueName)
|
||||||
|
throws SchedulerDynamicEditException {
|
||||||
|
LOG.info("Removing queue: " + queueName);
|
||||||
|
CSQueue q = this.getQueue(queueName);
|
||||||
|
if (!(q instanceof ReservationQueue)) {
|
||||||
|
throw new SchedulerDynamicEditException("The queue that we are asked "
|
||||||
|
+ "to remove (" + queueName + ") is not a ReservationQueue");
|
||||||
|
}
|
||||||
|
ReservationQueue disposableLeafQueue = (ReservationQueue) q;
|
||||||
|
// at this point we should have no more apps
|
||||||
|
if (disposableLeafQueue.getNumApplications() > 0) {
|
||||||
|
throw new SchedulerDynamicEditException("The queue " + queueName
|
||||||
|
+ " is not empty " + disposableLeafQueue.getApplications().size()
|
||||||
|
+ " active apps " + disposableLeafQueue.pendingApplications.size()
|
||||||
|
+ " pending apps");
|
||||||
|
}
|
||||||
|
|
||||||
|
((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
|
||||||
|
this.queues.remove(queueName);
|
||||||
|
LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void addQueue(Queue queue)
|
||||||
|
throws SchedulerDynamicEditException {
|
||||||
|
|
||||||
|
if (!(queue instanceof ReservationQueue)) {
|
||||||
|
throw new SchedulerDynamicEditException("Queue " + queue.getQueueName()
|
||||||
|
+ " is not a ReservationQueue");
|
||||||
|
}
|
||||||
|
|
||||||
|
ReservationQueue newQueue = (ReservationQueue) queue;
|
||||||
|
|
||||||
|
if (newQueue.getParent() == null
|
||||||
|
|| !(newQueue.getParent() instanceof PlanQueue)) {
|
||||||
|
throw new SchedulerDynamicEditException("ParentQueue for "
|
||||||
|
+ newQueue.getQueueName()
|
||||||
|
+ " is not properly set (should be set and be a PlanQueue)");
|
||||||
|
}
|
||||||
|
|
||||||
|
PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
|
||||||
|
String queuename = newQueue.getQueueName();
|
||||||
|
parentPlan.addChildQueue(newQueue);
|
||||||
|
this.queues.put(queuename, newQueue);
|
||||||
|
LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void setEntitlement(String inQueue,
|
||||||
|
QueueEntitlement entitlement) throws SchedulerDynamicEditException,
|
||||||
|
YarnException {
|
||||||
|
LeafQueue queue = getAndCheckLeafQueue(inQueue);
|
||||||
|
ParentQueue parent = (ParentQueue) queue.getParent();
|
||||||
|
|
||||||
|
if (!(queue instanceof ReservationQueue)) {
|
||||||
|
throw new SchedulerDynamicEditException("Entitlement can not be"
|
||||||
|
+ " modified dynamically since queue " + inQueue
|
||||||
|
+ " is not a ReservationQueue");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(parent instanceof PlanQueue)) {
|
||||||
|
throw new SchedulerDynamicEditException("The parent of ReservationQueue "
|
||||||
|
+ inQueue + " must be an PlanQueue");
|
||||||
|
}
|
||||||
|
|
||||||
|
ReservationQueue newQueue = (ReservationQueue) queue;
|
||||||
|
|
||||||
|
float sumChilds = ((PlanQueue) parent).sumOfChildCapacities();
|
||||||
|
float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity();
|
||||||
|
|
||||||
|
if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) {
|
||||||
|
// note: epsilon checks here are not ok, as the epsilons might accumulate
|
||||||
|
// and become a problem in aggregate
|
||||||
|
if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0
|
||||||
|
&& Math.abs(entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
newQueue.setEntitlement(entitlement);
|
||||||
|
} else {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"Sum of child queues would exceed 100% for PlanQueue: "
|
||||||
|
+ parent.getQueueName());
|
||||||
|
}
|
||||||
|
LOG.info("Set entitlement for ReservationQueue " + inQueue + " to "
|
||||||
|
+ queue.getCapacity() + " request was (" + entitlement.getCapacity() + ")");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized String moveApplication(ApplicationId appId,
|
public synchronized String moveApplication(ApplicationId appId,
|
||||||
String targetQueueName) throws YarnException {
|
String targetQueueName) throws YarnException {
|
||||||
|
@ -1238,11 +1388,12 @@ public class CapacityScheduler extends
|
||||||
getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
|
getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
|
||||||
String sourceQueueName = app.getQueue().getQueueName();
|
String sourceQueueName = app.getQueue().getQueueName();
|
||||||
LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
|
LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
|
||||||
LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
|
String destQueueName = handleMoveToPlanQueue(targetQueueName);
|
||||||
|
LeafQueue dest = getAndCheckLeafQueue(destQueueName);
|
||||||
// Validation check - ACLs, submission limits for user & queue
|
// Validation check - ACLs, submission limits for user & queue
|
||||||
String user = app.getUser();
|
String user = app.getUser();
|
||||||
try {
|
try {
|
||||||
dest.submitApplication(appId, user, targetQueueName);
|
dest.submitApplication(appId, user, destQueueName);
|
||||||
} catch (AccessControlException e) {
|
} catch (AccessControlException e) {
|
||||||
throw new YarnException(e);
|
throw new YarnException(e);
|
||||||
}
|
}
|
||||||
|
@ -1261,7 +1412,7 @@ public class CapacityScheduler extends
|
||||||
dest.submitApplicationAttempt(app, user);
|
dest.submitApplicationAttempt(app, user);
|
||||||
applications.get(appId).setQueue(dest);
|
applications.get(appId).setQueue(dest);
|
||||||
LOG.info("App: " + app.getApplicationId() + " successfully moved from "
|
LOG.info("App: " + app.getApplicationId() + " successfully moved from "
|
||||||
+ sourceQueueName + " to: " + targetQueueName);
|
+ sourceQueueName + " to: " + destQueueName);
|
||||||
return targetQueueName;
|
return targetQueueName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1296,4 +1447,24 @@ public class CapacityScheduler extends
|
||||||
return EnumSet
|
return EnumSet
|
||||||
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
|
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String handleMoveToPlanQueue(String targetQueueName) {
|
||||||
|
CSQueue dest = getQueue(targetQueueName);
|
||||||
|
if (dest != null && dest instanceof PlanQueue) {
|
||||||
|
// use the default child reservation queue of the plan
|
||||||
|
targetQueueName = targetQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
|
||||||
|
}
|
||||||
|
return targetQueueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getPlanQueues() {
|
||||||
|
Set<String> ret = new HashSet<String>();
|
||||||
|
for (Map.Entry<String, CSQueue> l : queues.entrySet()) {
|
||||||
|
if (l.getValue() instanceof PlanQueue) {
|
||||||
|
ret.add(l.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,8 +86,8 @@ public class LeafQueue implements CSQueue {
|
||||||
private int userLimit;
|
private int userLimit;
|
||||||
private float userLimitFactor;
|
private float userLimitFactor;
|
||||||
|
|
||||||
private int maxApplications;
|
protected int maxApplications;
|
||||||
private int maxApplicationsPerUser;
|
protected int maxApplicationsPerUser;
|
||||||
|
|
||||||
private float maxAMResourcePerQueuePercent;
|
private float maxAMResourcePerQueuePercent;
|
||||||
private int maxActiveApplications; // Based on absolute max capacity
|
private int maxActiveApplications; // Based on absolute max capacity
|
||||||
|
@ -153,8 +153,7 @@ public class LeafQueue implements CSQueue {
|
||||||
Resources.subtract(maximumAllocation, minimumAllocation),
|
Resources.subtract(maximumAllocation, minimumAllocation),
|
||||||
maximumAllocation);
|
maximumAllocation);
|
||||||
|
|
||||||
float capacity =
|
float capacity = getCapacityFromConf();
|
||||||
(float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
|
|
||||||
float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
|
float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
|
||||||
|
|
||||||
float maximumCapacity =
|
float maximumCapacity =
|
||||||
|
@ -221,6 +220,11 @@ public class LeafQueue implements CSQueue {
|
||||||
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
|
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// externalizing in method, to allow overriding
|
||||||
|
protected float getCapacityFromConf() {
|
||||||
|
return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void setupQueueConfigs(
|
private synchronized void setupQueueConfigs(
|
||||||
Resource clusterResource,
|
Resource clusterResource,
|
||||||
float capacity, float absoluteCapacity,
|
float capacity, float absoluteCapacity,
|
||||||
|
@ -483,7 +487,7 @@ public class LeafQueue implements CSQueue {
|
||||||
* Set user limit factor - used only for testing.
|
* Set user limit factor - used only for testing.
|
||||||
* @param userLimitFactor new user limit factor
|
* @param userLimitFactor new user limit factor
|
||||||
*/
|
*/
|
||||||
synchronized void setUserLimitFactor(int userLimitFactor) {
|
synchronized void setUserLimitFactor(float userLimitFactor) {
|
||||||
this.userLimitFactor = userLimitFactor;
|
this.userLimitFactor = userLimitFactor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,7 +835,7 @@ public class LeafQueue implements CSQueue {
|
||||||
getApplication(reservedContainer.getApplicationAttemptId());
|
getApplication(reservedContainer.getApplicationAttemptId());
|
||||||
synchronized (application) {
|
synchronized (application) {
|
||||||
return assignReservedContainer(application, node, reservedContainer,
|
return assignReservedContainer(application, node, reservedContainer,
|
||||||
clusterResource);
|
clusterResource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1880,4 +1884,16 @@ public class LeafQueue implements CSQueue {
|
||||||
getParent().detachContainer(clusterResource, application, rmContainer);
|
getParent().detachContainer(clusterResource, application, rmContainer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setCapacity(float capacity) {
|
||||||
|
this.capacity = capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAbsoluteCapacity(float absoluteCapacity) {
|
||||||
|
this.absoluteCapacity = absoluteCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxApplications(int maxApplications) {
|
||||||
|
this.maxApplications = maxApplications;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class ParentQueue implements CSQueue {
|
||||||
|
|
||||||
private float usedCapacity = 0.0f;
|
private float usedCapacity = 0.0f;
|
||||||
|
|
||||||
private final Set<CSQueue> childQueues;
|
protected final Set<CSQueue> childQueues;
|
||||||
private final Comparator<CSQueue> queueComparator;
|
private final Comparator<CSQueue> queueComparator;
|
||||||
|
|
||||||
private Resource usedResources = Resources.createResource(0, 0);
|
private Resource usedResources = Resources.createResource(0, 0);
|
||||||
|
@ -159,7 +159,7 @@ public class ParentQueue implements CSQueue {
|
||||||
", fullname=" + getQueuePath());
|
", fullname=" + getQueuePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void setupQueueConfigs(
|
protected synchronized void setupQueueConfigs(
|
||||||
Resource clusterResource,
|
Resource clusterResource,
|
||||||
float capacity, float absoluteCapacity,
|
float capacity, float absoluteCapacity,
|
||||||
float maximumCapacity, float absoluteMaxCapacity,
|
float maximumCapacity, float absoluteMaxCapacity,
|
||||||
|
@ -881,4 +881,8 @@ public class ParentQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<QueueACL, AccessControlList> getACLs() {
|
||||||
|
return acls;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,193 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This represents a dynamic queue managed by the {@link ReservationSystem}.
|
||||||
|
* From the user perspective this is equivalent to a LeafQueue that respect
|
||||||
|
* reservations, but functionality wise is a sub-class of ParentQueue
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class PlanQueue extends ParentQueue {
|
||||||
|
|
||||||
|
public static final String DEFAULT_QUEUE_SUFFIX = "-default";
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
|
||||||
|
|
||||||
|
private int maxAppsForReservation;
|
||||||
|
private int maxAppsPerUserForReservation;
|
||||||
|
private int userLimit;
|
||||||
|
private float userLimitFactor;
|
||||||
|
protected CapacitySchedulerContext schedulerContext;
|
||||||
|
private boolean showReservationsAsQueues;
|
||||||
|
|
||||||
|
public PlanQueue(CapacitySchedulerContext cs, String queueName,
|
||||||
|
CSQueue parent, CSQueue old) {
|
||||||
|
super(cs, queueName, parent, old);
|
||||||
|
|
||||||
|
this.schedulerContext = cs;
|
||||||
|
// Set the reservation queue attributes for the Plan
|
||||||
|
CapacitySchedulerConfiguration conf = cs.getConfiguration();
|
||||||
|
String queuePath = super.getQueuePath();
|
||||||
|
int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath);
|
||||||
|
showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath);
|
||||||
|
if (maxAppsForReservation < 0) {
|
||||||
|
maxAppsForReservation =
|
||||||
|
(int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
|
||||||
|
.getAbsoluteCapacity());
|
||||||
|
}
|
||||||
|
int userLimit = conf.getUserLimit(queuePath);
|
||||||
|
float userLimitFactor = conf.getUserLimitFactor(queuePath);
|
||||||
|
int maxAppsPerUserForReservation =
|
||||||
|
(int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
|
||||||
|
updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
|
||||||
|
maxAppsPerUserForReservation);
|
||||||
|
|
||||||
|
StringBuffer queueInfo = new StringBuffer();
|
||||||
|
queueInfo.append("Created Plan Queue: ").append(queueName)
|
||||||
|
.append("\nwith capacity: [").append(super.getCapacity())
|
||||||
|
.append("]\nwith max capacity: [").append(super.getMaximumCapacity())
|
||||||
|
.append("\nwith max reservation apps: [").append(maxAppsForReservation)
|
||||||
|
.append("]\nwith max reservation apps per user: [")
|
||||||
|
.append(maxAppsPerUserForReservation).append("]\nwith user limit: [")
|
||||||
|
.append(userLimit).append("]\nwith user limit factor: [")
|
||||||
|
.append(userLimitFactor).append("].");
|
||||||
|
LOG.info(queueInfo.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void reinitialize(CSQueue newlyParsedQueue,
|
||||||
|
Resource clusterResource) throws IOException {
|
||||||
|
// Sanity check
|
||||||
|
if (!(newlyParsedQueue instanceof PlanQueue)
|
||||||
|
|| !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
|
||||||
|
throw new IOException("Trying to reinitialize " + getQueuePath()
|
||||||
|
+ " from " + newlyParsedQueue.getQueuePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
|
||||||
|
|
||||||
|
if (newlyParsedParentQueue.getChildQueues().size() > 0) {
|
||||||
|
throw new IOException(
|
||||||
|
"Reservable Queue should not have sub-queues in the"
|
||||||
|
+ "configuration");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set new configs
|
||||||
|
setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(),
|
||||||
|
newlyParsedParentQueue.getAbsoluteCapacity(),
|
||||||
|
newlyParsedParentQueue.getMaximumCapacity(),
|
||||||
|
newlyParsedParentQueue.getAbsoluteMaximumCapacity(),
|
||||||
|
newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs());
|
||||||
|
|
||||||
|
updateQuotas(newlyParsedParentQueue.userLimit,
|
||||||
|
newlyParsedParentQueue.userLimitFactor,
|
||||||
|
newlyParsedParentQueue.maxAppsForReservation,
|
||||||
|
newlyParsedParentQueue.maxAppsPerUserForReservation);
|
||||||
|
|
||||||
|
// run reinitialize on each existing queue, to trigger absolute cap
|
||||||
|
// recomputations
|
||||||
|
for (CSQueue res : this.getChildQueues()) {
|
||||||
|
res.reinitialize(res, clusterResource);
|
||||||
|
}
|
||||||
|
showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void addChildQueue(CSQueue newQueue)
|
||||||
|
throws SchedulerDynamicEditException {
|
||||||
|
if (newQueue.getCapacity() > 0) {
|
||||||
|
throw new SchedulerDynamicEditException("Queue " + newQueue
|
||||||
|
+ " being added has non zero capacity.");
|
||||||
|
}
|
||||||
|
boolean added = this.childQueues.add(newQueue);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("updateChildQueues (action: add queue): " + added + " "
|
||||||
|
+ getChildQueuesToPrint());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void removeChildQueue(CSQueue remQueue)
|
||||||
|
throws SchedulerDynamicEditException {
|
||||||
|
if (remQueue.getCapacity() > 0) {
|
||||||
|
throw new SchedulerDynamicEditException("Queue " + remQueue
|
||||||
|
+ " being removed has non zero capacity.");
|
||||||
|
}
|
||||||
|
Iterator<CSQueue> qiter = childQueues.iterator();
|
||||||
|
while (qiter.hasNext()) {
|
||||||
|
CSQueue cs = qiter.next();
|
||||||
|
if (cs.equals(remQueue)) {
|
||||||
|
qiter.remove();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Removed child queue: {}", cs.getQueueName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized float sumOfChildCapacities() {
|
||||||
|
float ret = 0;
|
||||||
|
for (CSQueue l : childQueues) {
|
||||||
|
ret += l.getCapacity();
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateQuotas(int userLimit, float userLimitFactor,
|
||||||
|
int maxAppsForReservation, int maxAppsPerUserForReservation) {
|
||||||
|
this.userLimit = userLimit;
|
||||||
|
this.userLimitFactor = userLimitFactor;
|
||||||
|
this.maxAppsForReservation = maxAppsForReservation;
|
||||||
|
this.maxAppsPerUserForReservation = maxAppsPerUserForReservation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of maximum applications for each of the reservations in this Plan.
|
||||||
|
*
|
||||||
|
* @return maxAppsForreservation
|
||||||
|
*/
|
||||||
|
public int getMaxApplicationsForReservations() {
|
||||||
|
return maxAppsForReservation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of maximum applications per user for each of the reservations in
|
||||||
|
* this Plan.
|
||||||
|
*
|
||||||
|
* @return maxAppsPerUserForreservation
|
||||||
|
*/
|
||||||
|
public int getMaxApplicationsPerUserForReservation() {
|
||||||
|
return maxAppsPerUserForReservation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* User limit value for each of the reservations in this Plan.
|
||||||
|
*
|
||||||
|
* @return userLimit
|
||||||
|
*/
|
||||||
|
public int getUserLimitForReservation() {
|
||||||
|
return userLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* User limit factor value for each of the reservations in this Plan.
|
||||||
|
*
|
||||||
|
* @return userLimitFactor
|
||||||
|
*/
|
||||||
|
public float getUserLimitFactor() {
|
||||||
|
return userLimitFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine whether to hide/show the ReservationQueues
|
||||||
|
*/
|
||||||
|
public boolean showReservationsAsQueues() {
|
||||||
|
return showReservationsAsQueues;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This represents a dynamic {@link LeafQueue} managed by the
|
||||||
|
* {@link ReservationSystem}
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class ReservationQueue extends LeafQueue {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(ReservationQueue.class);
|
||||||
|
|
||||||
|
private PlanQueue parent;
|
||||||
|
|
||||||
|
private int maxSystemApps;
|
||||||
|
|
||||||
|
public ReservationQueue(CapacitySchedulerContext cs, String queueName,
|
||||||
|
PlanQueue parent) {
|
||||||
|
super(cs, queueName, parent, null);
|
||||||
|
maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
|
||||||
|
// the following parameters are common to all reservation in the plan
|
||||||
|
updateQuotas(parent.getUserLimitForReservation(),
|
||||||
|
parent.getUserLimitFactor(),
|
||||||
|
parent.getMaxApplicationsForReservations(),
|
||||||
|
parent.getMaxApplicationsPerUserForReservation());
|
||||||
|
this.parent = parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void reinitialize(CSQueue newlyParsedQueue,
|
||||||
|
Resource clusterResource) throws IOException {
|
||||||
|
// Sanity check
|
||||||
|
if (!(newlyParsedQueue instanceof ReservationQueue)
|
||||||
|
|| !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
|
||||||
|
throw new IOException("Trying to reinitialize " + getQueuePath()
|
||||||
|
+ " from " + newlyParsedQueue.getQueuePath());
|
||||||
|
}
|
||||||
|
CSQueueUtils.updateQueueStatistics(
|
||||||
|
parent.schedulerContext.getResourceCalculator(), newlyParsedQueue,
|
||||||
|
parent, parent.schedulerContext.getClusterResource(),
|
||||||
|
parent.schedulerContext.getMinimumResourceCapability());
|
||||||
|
super.reinitialize(newlyParsedQueue, clusterResource);
|
||||||
|
updateQuotas(parent.getUserLimitForReservation(),
|
||||||
|
parent.getUserLimitFactor(),
|
||||||
|
parent.getMaxApplicationsForReservations(),
|
||||||
|
parent.getMaxApplicationsPerUserForReservation());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This methods to change capacity for a queue and adjusts its
|
||||||
|
* absoluteCapacity
|
||||||
|
*
|
||||||
|
* @param entitlement the new entitlement for the queue (capacity,
|
||||||
|
* maxCapacity, etc..)
|
||||||
|
* @throws SchedulerDynamicEditException
|
||||||
|
*/
|
||||||
|
public synchronized void setEntitlement(QueueEntitlement entitlement)
|
||||||
|
throws SchedulerDynamicEditException {
|
||||||
|
float capacity = entitlement.getCapacity();
|
||||||
|
if (capacity < 0 || capacity > 1.0f) {
|
||||||
|
throw new SchedulerDynamicEditException(
|
||||||
|
"Capacity demand is not in the [0,1] range: " + capacity);
|
||||||
|
}
|
||||||
|
setCapacity(capacity);
|
||||||
|
setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
|
||||||
|
setMaxApplications((int) (maxSystemApps * getAbsoluteCapacity()));
|
||||||
|
// note: we currently set maxCapacity to capacity
|
||||||
|
// this might be revised later
|
||||||
|
setMaxCapacity(entitlement.getMaxCapacity());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("successfully changed to " + capacity + " for queue "
|
||||||
|
+ this.getQueueName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateQuotas(int userLimit, float userLimitFactor,
|
||||||
|
int maxAppsForReservation, int maxAppsPerUserForReservation) {
|
||||||
|
setUserLimit(userLimit);
|
||||||
|
setUserLimitFactor(userLimitFactor);
|
||||||
|
setMaxApplications(maxAppsForReservation);
|
||||||
|
maxApplicationsPerUser = maxAppsPerUserForReservation;
|
||||||
|
}
|
||||||
|
|
||||||
|
// used by the super constructor, we initialize to zero
|
||||||
|
protected float getCapacityFromConf() {
|
||||||
|
return 0f;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
|
||||||
|
|
||||||
|
public class QueueEntitlement {
|
||||||
|
|
||||||
|
private float capacity;
|
||||||
|
private float maxCapacity;
|
||||||
|
|
||||||
|
public QueueEntitlement(float capacity, float maxCapacity){
|
||||||
|
this.setCapacity(capacity);
|
||||||
|
this.maxCapacity = maxCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getMaxCapacity() {
|
||||||
|
return maxCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxCapacity(float maxCapacity) {
|
||||||
|
this.maxCapacity = maxCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getCapacity() {
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCapacity(float capacity) {
|
||||||
|
this.capacity = capacity;
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
|
||||||
|
|
||||||
@XmlRootElement
|
@XmlRootElement
|
||||||
@XmlAccessorType(XmlAccessType.FIELD)
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
|
@ -48,6 +49,7 @@ public class CapacitySchedulerQueueInfo {
|
||||||
protected QueueState state;
|
protected QueueState state;
|
||||||
protected CapacitySchedulerQueueInfoList queues;
|
protected CapacitySchedulerQueueInfoList queues;
|
||||||
protected ResourceInfo resourcesUsed;
|
protected ResourceInfo resourcesUsed;
|
||||||
|
private boolean hideReservationQueues = true;
|
||||||
|
|
||||||
CapacitySchedulerQueueInfo() {
|
CapacitySchedulerQueueInfo() {
|
||||||
};
|
};
|
||||||
|
@ -69,6 +71,10 @@ public class CapacitySchedulerQueueInfo {
|
||||||
queueName = q.getQueueName();
|
queueName = q.getQueueName();
|
||||||
state = q.getState();
|
state = q.getState();
|
||||||
resourcesUsed = new ResourceInfo(q.getUsedResources());
|
resourcesUsed = new ResourceInfo(q.getUsedResources());
|
||||||
|
if(q instanceof PlanQueue &&
|
||||||
|
((PlanQueue)q).showReservationsAsQueues()) {
|
||||||
|
hideReservationQueues = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public float getCapacity() {
|
public float getCapacity() {
|
||||||
|
@ -112,6 +118,9 @@ public class CapacitySchedulerQueueInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CapacitySchedulerQueueInfoList getQueues() {
|
public CapacitySchedulerQueueInfoList getQueues() {
|
||||||
|
if(hideReservationQueues) {
|
||||||
|
return new CapacitySchedulerQueueInfoList();
|
||||||
|
}
|
||||||
return this.queues;
|
return this.queues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -410,7 +410,7 @@ public class TestCapacityScheduler {
|
||||||
cs.stop();
|
cs.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkQueueCapacities(CapacityScheduler cs,
|
void checkQueueCapacities(CapacityScheduler cs,
|
||||||
float capacityA, float capacityB) {
|
float capacityA, float capacityB) {
|
||||||
CSQueue rootQueue = cs.getRootQueue();
|
CSQueue rootQueue = cs.getRootQueue();
|
||||||
CSQueue queueA = findQueue(rootQueue, A);
|
CSQueue queueA = findQueue(rootQueue, A);
|
||||||
|
|
|
@ -0,0 +1,282 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestCapacitySchedulerDynamicBehavior {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestCapacitySchedulerDynamicBehavior.class);
|
||||||
|
private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
|
private static final String B1 = B + ".b1";
|
||||||
|
private static final String B2 = B + ".b2";
|
||||||
|
private static final String B3 = B + ".b3";
|
||||||
|
private static float A_CAPACITY = 10.5f;
|
||||||
|
private static float B_CAPACITY = 89.5f;
|
||||||
|
private static float A1_CAPACITY = 30;
|
||||||
|
private static float A2_CAPACITY = 70;
|
||||||
|
private static float B1_CAPACITY = 79.2f;
|
||||||
|
private static float B2_CAPACITY = 0.8f;
|
||||||
|
private static float B3_CAPACITY = 20;
|
||||||
|
|
||||||
|
private final TestCapacityScheduler tcs = new TestCapacityScheduler();
|
||||||
|
|
||||||
|
private int GB = 1024;
|
||||||
|
|
||||||
|
private MockRM rm;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupPlanQueueConfiguration(conf);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_RESERVATIONS_ENABLE, false);
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefreshQueuesWithReservations() throws Exception {
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
// Test add one reservation dynamically and manually modify capacity
|
||||||
|
ReservationQueue a1 =
|
||||||
|
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
|
||||||
|
cs.addQueue(a1);
|
||||||
|
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
|
||||||
|
|
||||||
|
// Test add another reservation queue and use setEntitlement to modify
|
||||||
|
// capacity
|
||||||
|
ReservationQueue a2 =
|
||||||
|
new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
|
||||||
|
cs.addQueue(a2);
|
||||||
|
cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
|
||||||
|
|
||||||
|
// Verify all allocations match
|
||||||
|
tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
|
// Reinitialize and verify all dynamic queued survived
|
||||||
|
CapacitySchedulerConfiguration conf = cs.getConfiguration();
|
||||||
|
conf.setCapacity(A, 80f);
|
||||||
|
conf.setCapacity(B, 20f);
|
||||||
|
cs.reinitialize(conf, rm.getRMContext());
|
||||||
|
|
||||||
|
tcs.checkQueueCapacities(cs, 80f, 20f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddQueueFailCases() throws Exception {
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Test invalid addition (adding non-zero size queue)
|
||||||
|
ReservationQueue a1 =
|
||||||
|
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
|
||||||
|
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
|
||||||
|
cs.addQueue(a1);
|
||||||
|
fail();
|
||||||
|
} catch (Exception e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test add one reservation dynamically and manually modify capacity
|
||||||
|
ReservationQueue a1 =
|
||||||
|
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
|
||||||
|
cs.addQueue(a1);
|
||||||
|
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
|
||||||
|
|
||||||
|
// Test add another reservation queue and use setEntitlement to modify
|
||||||
|
// capacity
|
||||||
|
ReservationQueue a2 =
|
||||||
|
new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
|
||||||
|
|
||||||
|
cs.addQueue(a2);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Test invalid entitlement (sum of queues exceed 100%)
|
||||||
|
cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100 + 0.1f,
|
||||||
|
1.0f));
|
||||||
|
fail();
|
||||||
|
} catch (Exception e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
|
||||||
|
|
||||||
|
// Verify all allocations match
|
||||||
|
tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
|
cs.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveQueue() throws Exception {
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
// Test add one reservation dynamically and manually modify capacity
|
||||||
|
ReservationQueue a1 =
|
||||||
|
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
|
||||||
|
cs.addQueue(a1);
|
||||||
|
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
|
||||||
|
|
||||||
|
// submit an app
|
||||||
|
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
||||||
|
// check preconditions
|
||||||
|
List<ApplicationAttemptId> appsInA1 = cs.getAppsInQueue("a1");
|
||||||
|
assertEquals(1, appsInA1.size());
|
||||||
|
try {
|
||||||
|
cs.removeQueue("a1");
|
||||||
|
fail();
|
||||||
|
} catch (SchedulerDynamicEditException s) {
|
||||||
|
// expected a1 contains applications
|
||||||
|
}
|
||||||
|
// clear queue by killling all apps
|
||||||
|
cs.killAllAppsInQueue("a1");
|
||||||
|
// wait for events of move to propagate
|
||||||
|
rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
|
||||||
|
|
||||||
|
try {
|
||||||
|
cs.removeQueue("a1");
|
||||||
|
fail();
|
||||||
|
} catch (SchedulerDynamicEditException s) {
|
||||||
|
// expected a1 is not zero capacity
|
||||||
|
}
|
||||||
|
// set capacity to zero
|
||||||
|
cs.setEntitlement("a1", new QueueEntitlement(0f, 0f));
|
||||||
|
cs.removeQueue("a1");
|
||||||
|
|
||||||
|
assertTrue(cs.getQueue("a1") == null);
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoveAppToPlanQueue() throws Exception {
|
||||||
|
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
// submit an app
|
||||||
|
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "b1");
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
rm.getApplicationReport(app.getApplicationId())
|
||||||
|
.getCurrentApplicationAttemptId();
|
||||||
|
|
||||||
|
// check preconditions
|
||||||
|
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
|
assertEquals(1, appsInB1.size());
|
||||||
|
|
||||||
|
List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
|
||||||
|
assertEquals(1, appsInB.size());
|
||||||
|
assertTrue(appsInB.contains(appAttemptId));
|
||||||
|
|
||||||
|
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
|
||||||
|
assertTrue(appsInA.isEmpty());
|
||||||
|
|
||||||
|
String queue =
|
||||||
|
scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
|
||||||
|
.getQueueName();
|
||||||
|
Assert.assertTrue(queue.equals("b1"));
|
||||||
|
|
||||||
|
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
|
||||||
|
assertTrue(appsInRoot.contains(appAttemptId));
|
||||||
|
assertEquals(1, appsInRoot.size());
|
||||||
|
|
||||||
|
// create the default reservation queue
|
||||||
|
String defQName = "a" + PlanQueue.DEFAULT_QUEUE_SUFFIX;
|
||||||
|
ReservationQueue defQ =
|
||||||
|
new ReservationQueue(scheduler, defQName,
|
||||||
|
(PlanQueue) scheduler.getQueue("a"));
|
||||||
|
scheduler.addQueue(defQ);
|
||||||
|
defQ.setEntitlement(new QueueEntitlement(1f, 1f));
|
||||||
|
|
||||||
|
List<ApplicationAttemptId> appsInDefQ = scheduler.getAppsInQueue(defQName);
|
||||||
|
assertTrue(appsInDefQ.isEmpty());
|
||||||
|
|
||||||
|
// now move the app to plan queue
|
||||||
|
scheduler.moveApplication(app.getApplicationId(), "a");
|
||||||
|
|
||||||
|
// check postconditions
|
||||||
|
appsInDefQ = scheduler.getAppsInQueue(defQName);
|
||||||
|
assertEquals(1, appsInDefQ.size());
|
||||||
|
queue =
|
||||||
|
scheduler.getApplicationAttempt(appsInDefQ.get(0)).getQueue()
|
||||||
|
.getQueueName();
|
||||||
|
Assert.assertTrue(queue.equals(defQName));
|
||||||
|
|
||||||
|
appsInA = scheduler.getAppsInQueue("a");
|
||||||
|
assertTrue(appsInA.contains(appAttemptId));
|
||||||
|
assertEquals(1, appsInA.size());
|
||||||
|
|
||||||
|
appsInRoot = scheduler.getAppsInQueue("root");
|
||||||
|
assertTrue(appsInRoot.contains(appAttemptId));
|
||||||
|
assertEquals(1, appsInRoot.size());
|
||||||
|
|
||||||
|
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
|
assertTrue(appsInB1.isEmpty());
|
||||||
|
|
||||||
|
appsInB = scheduler.getAppsInQueue("b");
|
||||||
|
assertTrue(appsInB.isEmpty());
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupPlanQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||||
|
|
||||||
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] { "a", "b" });
|
||||||
|
|
||||||
|
conf.setCapacity(A, A_CAPACITY);
|
||||||
|
conf.setCapacity(B, B_CAPACITY);
|
||||||
|
|
||||||
|
// Define 2nd-level queues
|
||||||
|
conf.setQueues(B, new String[] { "b1", "b2", "b3" });
|
||||||
|
conf.setCapacity(B1, B1_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(B1, 100.0f);
|
||||||
|
conf.setCapacity(B2, B2_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(B2, 100.0f);
|
||||||
|
conf.setCapacity(B3, B3_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(B3, 100.0f);
|
||||||
|
|
||||||
|
conf.setReservableQueue(A, true);
|
||||||
|
conf.setReservationWindow(A, 86400 * 1000);
|
||||||
|
conf.setAverageCapacity(A, 1.0f);
|
||||||
|
|
||||||
|
LOG.info("Setup a as a plan queue");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,103 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestReservationQueue {
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration csConf;
|
||||||
|
CapacitySchedulerContext csContext;
|
||||||
|
final static int GB = 1024;
|
||||||
|
private final ResourceCalculator resourceCalculator =
|
||||||
|
new DefaultResourceCalculator();
|
||||||
|
ReservationQueue reservationQueue;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
|
||||||
|
// setup a context / conf
|
||||||
|
csConf = new CapacitySchedulerConfiguration();
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
csContext = mock(CapacitySchedulerContext.class);
|
||||||
|
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||||
|
when(csContext.getConf()).thenReturn(conf);
|
||||||
|
when(csContext.getMinimumResourceCapability()).thenReturn(
|
||||||
|
Resources.createResource(GB, 1));
|
||||||
|
when(csContext.getMaximumResourceCapability()).thenReturn(
|
||||||
|
Resources.createResource(16 * GB, 32));
|
||||||
|
when(csContext.getClusterResource()).thenReturn(
|
||||||
|
Resources.createResource(100 * 16 * GB, 100 * 32));
|
||||||
|
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
||||||
|
|
||||||
|
// create a queue
|
||||||
|
PlanQueue pq = new PlanQueue(csContext, "root", null, null);
|
||||||
|
reservationQueue = new ReservationQueue(csContext, "a", pq);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddSubtractCapacity() throws Exception {
|
||||||
|
|
||||||
|
// verify that setting, adding, subtracting capacity works
|
||||||
|
reservationQueue.setCapacity(1.0F);
|
||||||
|
assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
|
||||||
|
reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
|
||||||
|
reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f));
|
||||||
|
assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
|
||||||
|
reservationQueue.getCapacity() - 0.9 < CSQueueUtils.EPSILON);
|
||||||
|
reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f));
|
||||||
|
assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
|
||||||
|
reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
|
||||||
|
reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f));
|
||||||
|
assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
|
||||||
|
reservationQueue.getCapacity() < CSQueueUtils.EPSILON);
|
||||||
|
|
||||||
|
try {
|
||||||
|
reservationQueue.setEntitlement(new QueueEntitlement(1.1f, 1f));
|
||||||
|
fail();
|
||||||
|
} catch (SchedulerDynamicEditException iae) {
|
||||||
|
// expected
|
||||||
|
assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
|
||||||
|
reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
reservationQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f));
|
||||||
|
fail();
|
||||||
|
} catch (SchedulerDynamicEditException iae) {
|
||||||
|
// expected
|
||||||
|
assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
|
||||||
|
reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue