YARN-7574. Add support for Node Labels on Auto Created Leaf Queue Template. Contributed by Suma Shivaprasad.
This commit is contained in:
parent
5700556cd6
commit
821b0de4c5
|
@ -236,13 +236,14 @@ public class RMServerUtils {
|
|||
*/
|
||||
public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler,
|
||||
RMContext rmContext)
|
||||
throws InvalidResourceRequestException {
|
||||
RMContext rmContext) throws InvalidResourceRequestException {
|
||||
// Get queue from scheduler
|
||||
QueueInfo queueInfo = null;
|
||||
try {
|
||||
queueInfo = scheduler.getQueueInfo(queueName, false, false);
|
||||
} catch (IOException e) {
|
||||
//Queue may not exist since it could be auto-created in case of
|
||||
// dynamic queues
|
||||
}
|
||||
|
||||
for (ResourceRequest resReq : ask) {
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
|
@ -75,6 +76,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
|
||||
.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||
|
@ -1109,6 +1113,49 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
amBlacklist.getBlacklistAdditions() + ") and removals(" +
|
||||
amBlacklist.getBlacklistRemovals() + ")");
|
||||
}
|
||||
|
||||
QueueInfo queueInfo = null;
|
||||
for (ResourceRequest amReq : appAttempt.amReqs) {
|
||||
if (amReq.getNodeLabelExpression() == null && ResourceRequest.ANY
|
||||
.equals(amReq.getResourceName())) {
|
||||
String queue = appAttempt.rmApp.getQueue();
|
||||
|
||||
//Load queue only once since queue will be same across attempts
|
||||
if (queueInfo == null) {
|
||||
try {
|
||||
queueInfo = appAttempt.scheduler.getQueueInfo(queue, false,
|
||||
false);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Could not find queue for application : ", e);
|
||||
// Set application status to REJECTED since we cant find the
|
||||
// queue
|
||||
appAttempt.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptEvent(appAttempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.FAIL,
|
||||
"Could not find queue for application : " +
|
||||
appAttempt.rmApp.getQueue()));
|
||||
appAttempt.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(appAttempt.rmApp.getApplicationId(), RMAppEventType
|
||||
.APP_REJECTED,
|
||||
"Could not find queue for application : " +
|
||||
appAttempt.rmApp.getQueue()));
|
||||
return RMAppAttemptState.FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
String labelExp = RMNodeLabelsManager.NO_LABEL;
|
||||
if (queueInfo != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Setting default node label expression : " + queueInfo
|
||||
.getDefaultNodeLabelExpression());
|
||||
}
|
||||
labelExp = queueInfo.getDefaultNodeLabelExpression();
|
||||
}
|
||||
|
||||
amReq.setNodeLabelExpression(labelExp);
|
||||
}
|
||||
}
|
||||
|
||||
// AM resource has been checked when submission
|
||||
Allocation amContainerAllocation =
|
||||
appAttempt.scheduler.allocate(
|
||||
|
|
|
@ -132,4 +132,16 @@ public class Allocation {
|
|||
public void setResourceLimit(Resource resource) {
|
||||
this.resourceLimit = resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Allocation{" + "containers=" + containers + ", strictContainers="
|
||||
+ strictContainers + ", fungibleContainers=" + fungibleContainers
|
||||
+ ", fungibleResources=" + fungibleResources + ", nmTokens=" + nmTokens
|
||||
+ ", increasedContainers=" + increasedContainers
|
||||
+ ", decreasedContainers=" + decreasedContainers
|
||||
+ ", promotedContainers=" + promotedContainers + ", demotedContainers="
|
||||
+ demotedContainers + ", previousAttemptContainers="
|
||||
+ previousAttemptContainers + ", resourceLimit=" + resourceLimit + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -186,20 +186,34 @@ public class SchedulerUtils {
|
|||
ResourceRequest resReq, QueueInfo queueInfo) {
|
||||
|
||||
String labelExp = resReq.getNodeLabelExpression();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Requested Node Label Expression : " + labelExp);
|
||||
LOG.debug("Queue Info : " + queueInfo);
|
||||
}
|
||||
|
||||
// if queue has default label expression, and RR doesn't have, use the
|
||||
// default label expression of queue
|
||||
if (labelExp == null && queueInfo != null && ResourceRequest.ANY
|
||||
.equals(resReq.getResourceName())) {
|
||||
if ( LOG.isDebugEnabled()) {
|
||||
LOG.debug("Setting default node label expression : " + queueInfo
|
||||
.getDefaultNodeLabelExpression());
|
||||
}
|
||||
labelExp = queueInfo.getDefaultNodeLabelExpression();
|
||||
}
|
||||
|
||||
// If labelExp still equals to null, set it to be NO_LABEL
|
||||
if (labelExp == null) {
|
||||
// If labelExp still equals to null, it could either be a dynamic queue
|
||||
// or the label is not configured
|
||||
// set it to be NO_LABEL in case of a pre-configured queue. Dynamic
|
||||
// queues are handled in RMAppAttemptImp.ScheduledTransition
|
||||
if (labelExp == null && queueInfo != null) {
|
||||
labelExp = RMNodeLabelsManager.NO_LABEL;
|
||||
}
|
||||
|
||||
if ( labelExp != null) {
|
||||
resReq.setNodeLabelExpression(labelExp);
|
||||
}
|
||||
}
|
||||
|
||||
public static void normalizeAndValidateRequest(ResourceRequest resReq,
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler,
|
||||
|
@ -209,6 +223,7 @@ public class SchedulerUtils {
|
|||
isRecovery, rmContext, null);
|
||||
}
|
||||
|
||||
|
||||
public static void normalizeAndValidateRequest(ResourceRequest resReq,
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler,
|
||||
boolean isRecovery, RMContext rmContext, QueueInfo queueInfo)
|
||||
|
@ -233,11 +248,12 @@ public class SchedulerUtils {
|
|||
try {
|
||||
queueInfo = scheduler.getQueueInfo(queueName, false, false);
|
||||
} catch (IOException e) {
|
||||
// it is possible queue cannot get when queue mapping is set, just ignore
|
||||
// the queueInfo here, and move forward
|
||||
//Queue may not exist since it could be auto-created in case of
|
||||
// dynamic queues
|
||||
}
|
||||
}
|
||||
SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
|
||||
|
||||
if (!isRecovery) {
|
||||
validateResourceRequest(resReq, maximumResource, queueInfo, rmContext);
|
||||
}
|
||||
|
@ -245,8 +261,7 @@ public class SchedulerUtils {
|
|||
|
||||
public static void normalizeAndvalidateRequest(ResourceRequest resReq,
|
||||
Resource maximumResource, String queueName, YarnScheduler scheduler,
|
||||
RMContext rmContext)
|
||||
throws InvalidResourceRequestException {
|
||||
RMContext rmContext) throws InvalidResourceRequestException {
|
||||
normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler,
|
||||
rmContext, null);
|
||||
}
|
||||
|
|
|
@ -148,11 +148,10 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
|
|||
try {
|
||||
for( String nodeLabel : parent.getQueueCapacities().getExistingNodeLabels
|
||||
()) {
|
||||
//TODO - update to use getMaximumCapacity(nodeLabel) in YARN-7574
|
||||
setEntitlement(nodeLabel, new QueueEntitlement(0.0f,
|
||||
parent.getLeafQueueTemplate()
|
||||
.getQueueCapacities()
|
||||
.getMaximumCapacity()));
|
||||
.getMaximumCapacity(nodeLabel)));
|
||||
}
|
||||
} catch (SchedulerDynamicEditException e) {
|
||||
throw new IOException(e);
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public interface AutoCreatedQueueManagementPolicy {
|
||||
|
@ -26,14 +29,15 @@ public interface AutoCreatedQueueManagementPolicy {
|
|||
* Initialize policy
|
||||
* @param schedulerContext Capacity Scheduler context
|
||||
*/
|
||||
void init(CapacitySchedulerContext schedulerContext, ParentQueue parentQueue);
|
||||
void init(CapacitySchedulerContext schedulerContext, ParentQueue
|
||||
parentQueue) throws IOException;
|
||||
|
||||
/**
|
||||
* Reinitialize policy state ( if required )
|
||||
* @param schedulerContext Capacity Scheduler context
|
||||
*/
|
||||
void reinitialize(CapacitySchedulerContext schedulerContext,
|
||||
ParentQueue parentQueue);
|
||||
ParentQueue parentQueue) throws IOException;
|
||||
|
||||
/**
|
||||
* Get initial template for the specified leaf queue
|
||||
|
@ -48,6 +52,10 @@ public interface AutoCreatedQueueManagementPolicy {
|
|||
/**
|
||||
* Compute/Adjust child queue capacities
|
||||
* for auto created leaf queues
|
||||
* This computes queue entitlements but does not update LeafQueueState or
|
||||
* queue capacities. Scheduler calls commitQueueManagemetChanges after
|
||||
* validation after applying queue changes and commits to LeafQueueState
|
||||
* are done in commitQueueManagementChanges.
|
||||
*
|
||||
* @return returns a list of suggested QueueEntitlementChange(s) which may
|
||||
* or may not be be enforced by the scheduler
|
||||
|
|
|
@ -1204,6 +1204,8 @@ public class CapacityScheduler extends
|
|||
updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
|
||||
}
|
||||
|
||||
LOG.info("Allocation for application " + applicationAttemptId + " : " +
|
||||
allocation + " with cluster resource : " + getClusterResource());
|
||||
return allocation;
|
||||
}
|
||||
|
||||
|
|
|
@ -1878,6 +1878,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
setCapacity(leafQueueConfPrefix, val);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@Private
|
||||
public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
|
||||
String label, float val) {
|
||||
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
|
||||
queuePath);
|
||||
setCapacityByLabel(leafQueueConfPrefix, label, val);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath,
|
||||
|
@ -1887,6 +1896,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
setMaximumCapacity(leafQueueConfPrefix, val);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
|
||||
String label, float val) {
|
||||
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
|
||||
queuePath);
|
||||
setMaximumCapacityByLabel(leafQueueConfPrefix, label, val);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@Private
|
||||
public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath,
|
||||
|
@ -1905,6 +1923,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
setUserLimitFactor(leafQueueConfPrefix, val);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public void setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression(String
|
||||
queuePath,
|
||||
String expression) {
|
||||
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
|
||||
queuePath);
|
||||
setDefaultNodeLabelExpression(leafQueueConfPrefix, expression);
|
||||
}
|
||||
|
||||
public static String getUnits(String resourceValue) {
|
||||
String units;
|
||||
for (int i = 0; i < resourceValue.length(); i++) {
|
||||
|
|
|
@ -778,6 +778,17 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
metrics.setAMResouceLimit(nodePartition, amResouceLimit);
|
||||
queueUsage.setAMLimit(nodePartition, amResouceLimit);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Queue: " + getQueueName() + ", node label : " +
|
||||
nodePartition
|
||||
+ ", queue "
|
||||
+ "partition "
|
||||
+ "resource : " + queuePartitionResource + ','
|
||||
+ " queue current limit : " + queueCurrentLimit + ","
|
||||
+ " queue partition usable resource : "
|
||||
+ queuePartitionUsableResource + ","
|
||||
+ " amResourceLimit : " + amResouceLimit);
|
||||
}
|
||||
return amResouceLimit;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
|
|
@ -132,7 +132,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
|
|||
}
|
||||
}
|
||||
|
||||
private void initializeQueueManagementPolicy() {
|
||||
private void initializeQueueManagementPolicy() throws IOException {
|
||||
queueManagementPolicy =
|
||||
csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
|
||||
getQueuePath());
|
||||
|
@ -140,7 +140,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
|
|||
queueManagementPolicy.init(csContext, this);
|
||||
}
|
||||
|
||||
private void reinitializeQueueManagementPolicy() {
|
||||
private void reinitializeQueueManagementPolicy() throws IOException {
|
||||
AutoCreatedQueueManagementPolicy managementPolicy =
|
||||
csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
|
||||
getQueuePath());
|
||||
|
@ -339,6 +339,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
|
|||
((AutoCreatedLeafQueue) childQueue).validateConfigurations(template);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
|
|||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -63,8 +64,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
|
||||
.NO_LABEL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.capacity.CSQueueUtils.EPSILON;
|
||||
|
||||
|
@ -85,8 +84,6 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
private static final Log LOG = LogFactory.getLog(
|
||||
GuaranteedOrZeroCapacityOverTimePolicy.class);
|
||||
|
||||
private AutoCreatedLeafQueueConfig ZERO_CAPACITY_ENTITLEMENT;
|
||||
|
||||
private ReentrantReadWriteLock.WriteLock writeLock;
|
||||
|
||||
private ReentrantReadWriteLock.ReadLock readLock;
|
||||
|
@ -97,12 +94,70 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
|
||||
private QueueCapacities leafQueueTemplateCapacities;
|
||||
|
||||
private Map<String, LeafQueueState> leafQueueStateMap = new HashMap<>();
|
||||
private Set<String> leafQueueTemplateNodeLabels;
|
||||
|
||||
private LeafQueueState leafQueueState = new LeafQueueState();
|
||||
|
||||
private Clock clock = new MonotonicClock();
|
||||
|
||||
private class LeafQueueState {
|
||||
|
||||
//map of partition-> queueName->{leaf queue's state}
|
||||
private Map<String, Map<String, LeafQueueStatePerPartition>>
|
||||
leafQueueStateMap = new HashMap<>();
|
||||
|
||||
public boolean containsLeafQueue(String leafQueueName, String partition) {
|
||||
if (leafQueueStateMap.containsKey(partition)) {
|
||||
return leafQueueStateMap.get(partition).containsKey(leafQueueName);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean containsPartition(String partition) {
|
||||
if (leafQueueStateMap.containsKey(partition)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean addLeafQueueStateIfNotExists(String leafQueueName,
|
||||
String partition, LeafQueueStatePerPartition leafQueueState) {
|
||||
if (!containsPartition(partition)) {
|
||||
leafQueueStateMap.put(partition, new HashMap<>());
|
||||
}
|
||||
if (!containsLeafQueue(leafQueueName, partition)) {
|
||||
leafQueueStateMap.get(partition).put(leafQueueName, leafQueueState);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean createLeafQueueStateIfNotExists(LeafQueue leafQueue,
|
||||
String partition) {
|
||||
return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), partition,
|
||||
new LeafQueueStatePerPartition());
|
||||
}
|
||||
|
||||
public LeafQueueStatePerPartition getLeafQueueStatePerPartition(
|
||||
String leafQueueName, String partition) {
|
||||
if (leafQueueStateMap.get(partition) != null) {
|
||||
return leafQueueStateMap.get(partition).get(leafQueueName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public Map<String, Map<String, LeafQueueStatePerPartition>>
|
||||
getLeafQueueStateMap() {
|
||||
return leafQueueStateMap;
|
||||
}
|
||||
|
||||
private void clear() {
|
||||
leafQueueStateMap.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private class LeafQueueStatePerPartition {
|
||||
|
||||
private AtomicBoolean isActive = new AtomicBoolean(false);
|
||||
|
||||
private long mostRecentActivationTime;
|
||||
|
@ -139,41 +194,16 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
}
|
||||
}
|
||||
|
||||
private boolean containsLeafQueue(String leafQueueName) {
|
||||
return leafQueueStateMap.containsKey(leafQueueName);
|
||||
}
|
||||
|
||||
private boolean addLeafQueueStateIfNotExists(String leafQueueName,
|
||||
LeafQueueState leafQueueState) {
|
||||
if (!containsLeafQueue(leafQueueName)) {
|
||||
leafQueueStateMap.put(leafQueueName, leafQueueState);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean addLeafQueueStateIfNotExists(LeafQueue leafQueue) {
|
||||
return addLeafQueueStateIfNotExists(leafQueue.getQueueName(),
|
||||
new LeafQueueState());
|
||||
}
|
||||
|
||||
private void clearLeafQueueState() {
|
||||
leafQueueStateMap.clear();
|
||||
}
|
||||
|
||||
private class ParentQueueState {
|
||||
|
||||
private Map<String, Float> totalAbsoluteActivatedChildQueueCapacityByLabel =
|
||||
new HashMap<String, Float>();
|
||||
|
||||
private float getAbsoluteActivatedChildQueueCapacity() {
|
||||
return getAbsoluteActivatedChildQueueCapacity(NO_LABEL);
|
||||
}
|
||||
|
||||
private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
|
||||
try {
|
||||
readLock.lock();
|
||||
Float totalActivatedCapacity = getByLabel(nodeLabel);
|
||||
Float totalActivatedCapacity = getAbsActivatedChildQueueCapacityByLabel(
|
||||
nodeLabel);
|
||||
if (totalActivatedCapacity != null) {
|
||||
return totalActivatedCapacity;
|
||||
} else{
|
||||
|
@ -188,11 +218,14 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
float childQueueCapacity) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
Float activatedChildCapacity = getByLabel(nodeLabel);
|
||||
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
|
||||
nodeLabel);
|
||||
if (activatedChildCapacity != null) {
|
||||
setByLabel(nodeLabel, activatedChildCapacity + childQueueCapacity);
|
||||
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
|
||||
activatedChildCapacity + childQueueCapacity);
|
||||
} else{
|
||||
setByLabel(nodeLabel, childQueueCapacity);
|
||||
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
|
||||
childQueueCapacity);
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
@ -203,22 +236,25 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
float childQueueCapacity) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
Float activatedChildCapacity = getByLabel(nodeLabel);
|
||||
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
|
||||
nodeLabel);
|
||||
if (activatedChildCapacity != null) {
|
||||
setByLabel(nodeLabel, activatedChildCapacity - childQueueCapacity);
|
||||
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
|
||||
activatedChildCapacity - childQueueCapacity);
|
||||
} else{
|
||||
setByLabel(nodeLabel, childQueueCapacity);
|
||||
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
|
||||
childQueueCapacity);
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
Float getByLabel(String label) {
|
||||
Float getAbsActivatedChildQueueCapacityByLabel(String label) {
|
||||
return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label);
|
||||
}
|
||||
|
||||
Float setByLabel(String label, float val) {
|
||||
Float setAbsActivatedChildQueueCapacityByLabel(String label, float val) {
|
||||
return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val);
|
||||
}
|
||||
|
||||
|
@ -256,13 +292,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
|
||||
@Override
|
||||
public void init(final CapacitySchedulerContext schedulerContext,
|
||||
final ParentQueue parentQueue) {
|
||||
final ParentQueue parentQueue) throws IOException {
|
||||
this.scheduler = schedulerContext;
|
||||
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
readLock = lock.readLock();
|
||||
writeLock = lock.writeLock();
|
||||
|
||||
if (!(parentQueue instanceof ManagedParentQueue)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Expected instance of type " + ManagedParentQueue.class);
|
||||
|
@ -278,15 +313,43 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
+ leafQueueTemplate.getQueueCapacities() + "]");
|
||||
}
|
||||
|
||||
private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) {
|
||||
private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue)
|
||||
throws IOException {
|
||||
leafQueueTemplate = parentQueue.getLeafQueueTemplate();
|
||||
|
||||
leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities();
|
||||
|
||||
ZERO_CAPACITY_ENTITLEMENT = buildTemplate(0.0f,
|
||||
leafQueueTemplateCapacities.getMaximumCapacity());
|
||||
Set<String> parentQueueLabels = parentQueue.getNodeLabelsForQueue();
|
||||
for (String nodeLabel : leafQueueTemplateCapacities
|
||||
.getExistingNodeLabels()) {
|
||||
|
||||
if (!parentQueueLabels.contains(nodeLabel)) {
|
||||
LOG.error("Invalid node label " + nodeLabel
|
||||
+ " on configured leaf template on parent" + " queue " + parentQueue
|
||||
.getQueueName());
|
||||
throw new IOException("Invalid node label " + nodeLabel
|
||||
+ " on configured leaf template on parent" + " queue " + parentQueue
|
||||
.getQueueName());
|
||||
}
|
||||
}
|
||||
|
||||
leafQueueTemplateNodeLabels =
|
||||
leafQueueTemplateCapacities.getExistingNodeLabels();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute/Adjust child queue capacities
|
||||
* for auto created leaf queues
|
||||
* This computes queue entitlements but does not update LeafQueueState or
|
||||
* queue capacities. Scheduler calls commitQueueManagemetChanges after
|
||||
* validation after applying queue changes and commits to LeafQueueState
|
||||
* are done in commitQueueManagementChanges.
|
||||
*
|
||||
* @return List of Queue Management change suggestions which could potentially
|
||||
* be committed/rejected by the scheduler due to validation failures
|
||||
* @throws SchedulerDynamicEditException
|
||||
*/
|
||||
@Override
|
||||
public List<QueueManagementChange> computeQueueManagementChanges()
|
||||
throws SchedulerDynamicEditException {
|
||||
|
@ -298,30 +361,39 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
try {
|
||||
readLock.lock();
|
||||
List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
|
||||
List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
|
||||
|
||||
//Map of LeafQueue->QueueCapacities - keep adding the computed
|
||||
// entitlements to this map and finally
|
||||
// build the leaf queue configuration Template for all identified leaf
|
||||
// queues
|
||||
Map<String, QueueCapacities> leafQueueEntitlements = new HashMap<>();
|
||||
for (String nodeLabel : leafQueueTemplateNodeLabels) {
|
||||
// check if any leaf queues need to be deactivated based on pending
|
||||
// applications and
|
||||
// applications
|
||||
float parentAbsoluteCapacity =
|
||||
managedParentQueue.getQueueCapacities().getAbsoluteCapacity();
|
||||
|
||||
managedParentQueue.getQueueCapacities().getAbsoluteCapacity(
|
||||
nodeLabel);
|
||||
float leafQueueTemplateAbsoluteCapacity =
|
||||
leafQueueTemplateCapacities.getAbsoluteCapacity();
|
||||
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel);
|
||||
Map<String, QueueCapacities> deactivatedLeafQueues =
|
||||
deactivateLeafQueuesIfInActive(managedParentQueue, queueManagementChanges);
|
||||
deactivateLeafQueuesIfInActive(managedParentQueue, nodeLabel,
|
||||
leafQueueEntitlements);
|
||||
|
||||
float deactivatedCapacity = getTotalDeactivatedCapacity(
|
||||
deactivatedLeafQueues);
|
||||
deactivatedLeafQueues, nodeLabel);
|
||||
|
||||
float sumOfChildQueueActivatedCapacity = parentQueueState.
|
||||
getAbsoluteActivatedChildQueueCapacity();
|
||||
getAbsoluteActivatedChildQueueCapacity(nodeLabel);
|
||||
|
||||
//Check if we need to activate anything at all?
|
||||
float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity,
|
||||
deactivatedCapacity, sumOfChildQueueActivatedCapacity);
|
||||
float availableCapacity =
|
||||
parentAbsoluteCapacity - sumOfChildQueueActivatedCapacity
|
||||
+ deactivatedCapacity + EPSILON;
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Parent queue : " + managedParentQueue.getQueueName() + " absCapacity = "
|
||||
LOG.debug("Parent queue : " + managedParentQueue.getQueueName()
|
||||
+ ", nodeLabel = " + nodeLabel + ", absCapacity = "
|
||||
+ parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = "
|
||||
+ leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = "
|
||||
+ deactivatedCapacity + " , absChildActivatedCapacity = "
|
||||
|
@ -331,37 +403,50 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
|
||||
if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) {
|
||||
//sort applications across leaf queues by submit time
|
||||
List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
|
||||
|
||||
if (pendingApps.size() > 0) {
|
||||
int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated(
|
||||
availableCapacity, leafQueueTemplateAbsoluteCapacity,
|
||||
pendingApps.size());
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found " + maxLeafQueuesTobeActivated
|
||||
+ " leaf queues to be activated with " + pendingApps.size()
|
||||
+ " apps ");
|
||||
LOG.debug("Found " + maxLeafQueuesTobeActivated + " leaf queues"
|
||||
+ " to be activated with " + pendingApps.size() + " apps ");
|
||||
}
|
||||
|
||||
LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues(
|
||||
pendingApps, maxLeafQueuesTobeActivated,
|
||||
nodeLabel, pendingApps, maxLeafQueuesTobeActivated,
|
||||
deactivatedLeafQueues.keySet());
|
||||
|
||||
//Compute entitlement changes for the identified leaf queues
|
||||
// which is appended to the List of queueManagementChanges
|
||||
computeQueueManagementChanges(leafQueuesToBeActivated,
|
||||
queueManagementChanges, availableCapacity,
|
||||
leafQueueTemplateAbsoluteCapacity);
|
||||
// which is appended to the List of computedEntitlements
|
||||
updateLeafQueueCapacitiesByLabel(nodeLabel, leafQueuesToBeActivated,
|
||||
leafQueueEntitlements);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (leafQueuesToBeActivated.size() > 0) {
|
||||
LOG.debug(
|
||||
"Activated leaf queues : [" + leafQueuesToBeActivated + "]");
|
||||
LOG.debug("Activated leaf queues : [" + leafQueuesToBeActivated
|
||||
+ "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Populate new entitlements
|
||||
|
||||
for (final Iterator<Map.Entry<String, QueueCapacities>> iterator =
|
||||
leafQueueEntitlements.entrySet().iterator(); iterator.hasNext(); ) {
|
||||
Map.Entry<String, QueueCapacities> queueCapacities = iterator.next();
|
||||
String leafQueueName = queueCapacities.getKey();
|
||||
AutoCreatedLeafQueue leafQueue =
|
||||
(AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
|
||||
.getQueue(leafQueueName);
|
||||
AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
|
||||
queueCapacities.getValue());
|
||||
queueManagementChanges.add(
|
||||
new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
|
||||
|
||||
}
|
||||
return queueManagementChanges;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
|
@ -369,14 +454,14 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
}
|
||||
|
||||
private float getTotalDeactivatedCapacity(
|
||||
Map<String, QueueCapacities> deactivatedLeafQueues) {
|
||||
Map<String, QueueCapacities> deactivatedLeafQueues, String nodeLabel) {
|
||||
float deactivatedCapacity = 0;
|
||||
for (Iterator<Map.Entry<String, QueueCapacities>> iterator =
|
||||
deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) {
|
||||
Map.Entry<String, QueueCapacities> deactivatedQueueCapacity =
|
||||
iterator.next();
|
||||
deactivatedCapacity +=
|
||||
deactivatedQueueCapacity.getValue().getAbsoluteCapacity();
|
||||
deactivatedQueueCapacity.getValue().getAbsoluteCapacity(nodeLabel);
|
||||
}
|
||||
return deactivatedCapacity;
|
||||
}
|
||||
|
@ -385,20 +470,42 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
void updateLeafQueueState() {
|
||||
try {
|
||||
writeLock.lock();
|
||||
Set<String> newPartitions = new HashSet<>();
|
||||
Set<String> newQueues = new HashSet<>();
|
||||
|
||||
for (CSQueue newQueue : managedParentQueue.getChildQueues()) {
|
||||
if (newQueue instanceof LeafQueue) {
|
||||
addLeafQueueStateIfNotExists((LeafQueue) newQueue);
|
||||
for (String nodeLabel : leafQueueTemplateNodeLabels) {
|
||||
leafQueueState.createLeafQueueStateIfNotExists((LeafQueue) newQueue,
|
||||
nodeLabel);
|
||||
newPartitions.add(nodeLabel);
|
||||
}
|
||||
newQueues.add(newQueue.getQueueName());
|
||||
}
|
||||
}
|
||||
|
||||
for (Iterator<Map.Entry<String, LeafQueueState>> itr =
|
||||
leafQueueStateMap.entrySet().iterator(); itr.hasNext(); ) {
|
||||
Map.Entry<String, LeafQueueState> e = itr.next();
|
||||
String queueName = e.getKey();
|
||||
if (!newQueues.contains(queueName)) {
|
||||
for (Iterator<Map.Entry<String, Map<String, LeafQueueStatePerPartition>>>
|
||||
itr = leafQueueState.getLeafQueueStateMap().entrySet().iterator();
|
||||
itr.hasNext(); ) {
|
||||
Map.Entry<String, Map<String, LeafQueueStatePerPartition>> e =
|
||||
itr.next();
|
||||
String partition = e.getKey();
|
||||
if (!newPartitions.contains(partition)) {
|
||||
itr.remove();
|
||||
LOG.info(
|
||||
"Removed partition " + partition + " from leaf queue " + "state");
|
||||
} else{
|
||||
Map<String, LeafQueueStatePerPartition> queues = e.getValue();
|
||||
for (
|
||||
Iterator<Map.Entry<String, LeafQueueStatePerPartition>> queueItr =
|
||||
queues.entrySet().iterator(); queueItr.hasNext(); ) {
|
||||
String queue = queueItr.next().getKey();
|
||||
if (!newQueues.contains(queue)) {
|
||||
queueItr.remove();
|
||||
LOG.info("Removed queue " + queue + " from leaf queue "
|
||||
+ "state from partition " + partition);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -406,22 +513,20 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
}
|
||||
}
|
||||
|
||||
private LinkedHashSet<String> getSortedLeafQueues(
|
||||
private LinkedHashSet<String> getSortedLeafQueues(String nodeLabel,
|
||||
final List<FiCaSchedulerApp> pendingApps, int leafQueuesNeeded,
|
||||
Set<String> deactivatedQueues) throws SchedulerDynamicEditException {
|
||||
|
||||
LinkedHashSet<String> leafQueues = new LinkedHashSet<>(leafQueuesNeeded);
|
||||
int ctr = 0;
|
||||
for (FiCaSchedulerApp app : pendingApps) {
|
||||
|
||||
AutoCreatedLeafQueue leafQueue =
|
||||
(AutoCreatedLeafQueue) app.getCSLeafQueue();
|
||||
String leafQueueName = leafQueue.getQueueName();
|
||||
|
||||
//Check if leafQueue is not active already and has any pending apps
|
||||
if (ctr < leafQueuesNeeded) {
|
||||
|
||||
if (!isActive(leafQueue)) {
|
||||
if (!isActive(leafQueue, nodeLabel)) {
|
||||
if (!deactivatedQueues.contains(leafQueueName)) {
|
||||
if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) {
|
||||
ctr++;
|
||||
|
@ -445,11 +550,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public boolean isActive(final AutoCreatedLeafQueue leafQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
public boolean isActive(final AutoCreatedLeafQueue leafQueue,
|
||||
String nodeLabel) throws SchedulerDynamicEditException {
|
||||
try {
|
||||
readLock.lock();
|
||||
LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue);
|
||||
LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue,
|
||||
nodeLabel);
|
||||
return leafQueueStatus.isActive();
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
|
@ -457,64 +563,52 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
}
|
||||
|
||||
private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive(
|
||||
ParentQueue parentQueue,
|
||||
List<QueueManagementChange> queueManagementChanges)
|
||||
ParentQueue parentQueue, String nodeLabel,
|
||||
Map<String, QueueCapacities> leafQueueEntitlements)
|
||||
throws SchedulerDynamicEditException {
|
||||
Map<String, QueueCapacities> deactivatedQueues = new HashMap<>();
|
||||
|
||||
for (CSQueue childQueue : parentQueue.getChildQueues()) {
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
|
||||
if (leafQueue != null) {
|
||||
if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) {
|
||||
if (!leafQueueEntitlements.containsKey(leafQueue.getQueueName())) {
|
||||
leafQueueEntitlements.put(leafQueue.getQueueName(),
|
||||
new QueueCapacities(false));
|
||||
}
|
||||
|
||||
if (isActive(leafQueue) && !hasPendingApps(leafQueue)) {
|
||||
queueManagementChanges.add(
|
||||
new QueueManagementChange.UpdateQueue(leafQueue,
|
||||
ZERO_CAPACITY_ENTITLEMENT));
|
||||
QueueCapacities capacities = leafQueueEntitlements.get(
|
||||
leafQueue.getQueueName());
|
||||
updateToZeroCapacity(capacities, nodeLabel);
|
||||
deactivatedQueues.put(leafQueue.getQueueName(),
|
||||
leafQueueTemplateCapacities);
|
||||
} else{
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(" Leaf queue has pending applications : " + leafQueue
|
||||
.getNumApplications() + ".Skipping deactivation for "
|
||||
+ leafQueue);
|
||||
LOG.debug(" Leaf queue has pending applications or is " + "inactive"
|
||||
+ " : " + leafQueue.getNumApplications()
|
||||
+ ".Skipping deactivation for " + leafQueue);
|
||||
}
|
||||
}
|
||||
} else{
|
||||
LOG.warn("Could not find queue in scheduler while trying" + " to "
|
||||
+ "deactivate for " + parentQueue);
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (deactivatedQueues.size() > 0) {
|
||||
LOG.debug("Deactivated leaf queues : " + deactivatedQueues);
|
||||
}
|
||||
}
|
||||
return deactivatedQueues;
|
||||
}
|
||||
|
||||
private void computeQueueManagementChanges(
|
||||
private void updateLeafQueueCapacitiesByLabel(String nodeLabel,
|
||||
Set<String> leafQueuesToBeActivated,
|
||||
List<QueueManagementChange> queueManagementChanges,
|
||||
final float availableCapacity,
|
||||
final float leafQueueTemplateAbsoluteCapacity) {
|
||||
|
||||
float curAvailableCapacity = availableCapacity;
|
||||
|
||||
Map<String, QueueCapacities> leafQueueEntitlements) {
|
||||
for (String curLeafQueue : leafQueuesToBeActivated) {
|
||||
if (!leafQueueEntitlements.containsKey(curLeafQueue)) {
|
||||
leafQueueEntitlements.put(curLeafQueue, new QueueCapacities(false));
|
||||
// Activate queues if capacity is available
|
||||
if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) {
|
||||
AutoCreatedLeafQueue leafQueue =
|
||||
(AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
|
||||
.getQueue(curLeafQueue);
|
||||
if (leafQueue != null) {
|
||||
AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
|
||||
leafQueueTemplateCapacities.getCapacity(),
|
||||
leafQueueTemplateCapacities.getMaximumCapacity());
|
||||
queueManagementChanges.add(
|
||||
new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
|
||||
curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity;
|
||||
} else{
|
||||
LOG.warn(
|
||||
"Could not find queue in scheduler while trying to deactivate "
|
||||
+ curLeafQueue);
|
||||
}
|
||||
}
|
||||
|
||||
QueueCapacities capacities = leafQueueEntitlements.get(curLeafQueue);
|
||||
updateCapacityFromTemplate(capacities, nodeLabel);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -528,17 +622,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
availableCapacity / childQueueAbsoluteCapacity);
|
||||
|
||||
return Math.min(numLeafQueuesNeeded, numPendingApps);
|
||||
} else{
|
||||
throw new SchedulerDynamicEditException("Child queue absolute capacity "
|
||||
+ "is initialized to 0. Check parent queue's " + managedParentQueue
|
||||
.getQueueName() + " leaf queue template configuration");
|
||||
}
|
||||
}
|
||||
|
||||
private float getAvailableCapacity(float parentAbsCapacity,
|
||||
float deactivatedAbsCapacity, float totalChildQueueActivatedCapacity) {
|
||||
return parentAbsCapacity - totalChildQueueActivatedCapacity
|
||||
+ deactivatedAbsCapacity + EPSILON;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -567,57 +652,55 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue;
|
||||
|
||||
if (updatedQueueTemplate.getQueueCapacities().getCapacity() > 0) {
|
||||
if (isActive(leafQueue)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Queue is already active. Skipping activation : " + queue
|
||||
.getQueuePath());
|
||||
}
|
||||
} else{
|
||||
activate(leafQueue);
|
||||
}
|
||||
} else{
|
||||
if (!isActive(leafQueue)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Queue is already de-activated. " + "Skipping de-activation "
|
||||
+ ": " + leafQueue.getQueuePath());
|
||||
}
|
||||
} else{
|
||||
deactivate(leafQueue);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void activate(final AutoCreatedLeafQueue leafQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
getLeafQueueState(leafQueue).activate();
|
||||
|
||||
parentQueueState.incAbsoluteActivatedChildCapacity(NO_LABEL,
|
||||
leafQueueTemplateCapacities.getAbsoluteCapacity());
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void deactivate(final AutoCreatedLeafQueue leafQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
getLeafQueueState(leafQueue).deactivate();
|
||||
|
||||
for (String nodeLabel : managedParentQueue.getQueueCapacities()
|
||||
for (String nodeLabel : updatedQueueTemplate.getQueueCapacities()
|
||||
.getExistingNodeLabels()) {
|
||||
parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
|
||||
leafQueueTemplateCapacities.getAbsoluteCapacity());
|
||||
if (updatedQueueTemplate.getQueueCapacities().
|
||||
getCapacity(nodeLabel) > 0) {
|
||||
if (isActive(leafQueue, nodeLabel)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Queue is already active." + " Skipping activation : "
|
||||
+ queue.getQueuePath());
|
||||
}
|
||||
} else{
|
||||
activate(leafQueue, nodeLabel);
|
||||
}
|
||||
} else{
|
||||
if (!isActive(leafQueue, nodeLabel)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Queue is already de-activated. Skipping "
|
||||
+ "de-activation : " + leafQueue.getQueuePath());
|
||||
}
|
||||
} else{
|
||||
deactivate(leafQueue, nodeLabel);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void activate(final AbstractAutoCreatedLeafQueue leafQueue,
|
||||
String nodeLabel) throws SchedulerDynamicEditException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
getLeafQueueState(leafQueue, nodeLabel).activate();
|
||||
parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel,
|
||||
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue,
|
||||
String nodeLabel) throws SchedulerDynamicEditException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
getLeafQueueState(leafQueue, nodeLabel).deactivate();
|
||||
|
||||
parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
|
||||
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
@ -629,7 +712,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
|
||||
@Override
|
||||
public void reinitialize(CapacitySchedulerContext schedulerContext,
|
||||
final ParentQueue parentQueue) {
|
||||
final ParentQueue parentQueue) throws IOException {
|
||||
if (!(parentQueue instanceof ManagedParentQueue)) {
|
||||
throw new IllegalStateException(
|
||||
"Expected instance of type " + ManagedParentQueue.class + " found "
|
||||
|
@ -649,12 +732,11 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
|
||||
//clear state
|
||||
parentQueueState.clear();
|
||||
clearLeafQueueState();
|
||||
leafQueueState.clear();
|
||||
|
||||
LOG.info(
|
||||
"Reinitialized queue management policy for parent queue "
|
||||
+ parentQueue.getQueueName() +" with leaf queue template "
|
||||
+ "capacities : ["
|
||||
"Reinitialized queue management policy for parent queue " + parentQueue
|
||||
.getQueueName() + " with leaf queue template " + "capacities : ["
|
||||
+ leafQueueTemplate.getQueueCapacities() + "]");
|
||||
}
|
||||
|
||||
|
@ -663,51 +745,74 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
AbstractAutoCreatedLeafQueue leafQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
|
||||
if ( !(leafQueue instanceof AutoCreatedLeafQueue)) {
|
||||
throw new SchedulerDynamicEditException("Not an instance of "
|
||||
+ "AutoCreatedLeafQueue : " + leafQueue.getClass());
|
||||
AutoCreatedLeafQueueConfig template;
|
||||
|
||||
if (!(leafQueue instanceof AutoCreatedLeafQueue)) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Not an instance of " + "AutoCreatedLeafQueue : " + leafQueue
|
||||
.getClass());
|
||||
}
|
||||
|
||||
AutoCreatedLeafQueue autoCreatedLeafQueue =
|
||||
(AutoCreatedLeafQueue) leafQueue;
|
||||
AutoCreatedLeafQueueConfig template = ZERO_CAPACITY_ENTITLEMENT;
|
||||
try {
|
||||
writeLock.lock();
|
||||
if (!addLeafQueueStateIfNotExists(leafQueue)) {
|
||||
LOG.error("Leaf queue already exists in state : " + getLeafQueueState(
|
||||
leafQueue));
|
||||
throw new SchedulerDynamicEditException(
|
||||
|
||||
QueueCapacities capacities = new QueueCapacities(false);
|
||||
for (String nodeLabel : leafQueueTemplateNodeLabels) {
|
||||
if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue,
|
||||
nodeLabel)) {
|
||||
String message =
|
||||
"Leaf queue already exists in state : " + getLeafQueueState(
|
||||
leafQueue));
|
||||
leafQueue, nodeLabel);
|
||||
LOG.error(message);
|
||||
}
|
||||
|
||||
float availableCapacity = getAvailableCapacity(
|
||||
managedParentQueue.getQueueCapacities().getAbsoluteCapacity(), 0,
|
||||
parentQueueState.getAbsoluteActivatedChildQueueCapacity());
|
||||
float availableCapacity = managedParentQueue.getQueueCapacities().
|
||||
getAbsoluteCapacity(nodeLabel) - parentQueueState.
|
||||
getAbsoluteActivatedChildQueueCapacity(nodeLabel) + EPSILON;
|
||||
|
||||
if (availableCapacity >= leafQueueTemplateCapacities
|
||||
.getAbsoluteCapacity()) {
|
||||
activate(autoCreatedLeafQueue);
|
||||
template = buildTemplate(leafQueueTemplateCapacities.getCapacity(),
|
||||
leafQueueTemplateCapacities.getMaximumCapacity());
|
||||
.getAbsoluteCapacity(nodeLabel)) {
|
||||
updateCapacityFromTemplate(capacities, nodeLabel);
|
||||
activate(leafQueue, nodeLabel);
|
||||
} else{
|
||||
updateToZeroCapacity(capacities, nodeLabel);
|
||||
}
|
||||
}
|
||||
|
||||
template = buildTemplate(capacities);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
return template;
|
||||
}
|
||||
|
||||
private void updateToZeroCapacity(QueueCapacities capacities,
|
||||
String nodeLabel) {
|
||||
capacities.setCapacity(nodeLabel, 0.0f);
|
||||
capacities.setMaximumCapacity(nodeLabel,
|
||||
leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
|
||||
}
|
||||
|
||||
private void updateCapacityFromTemplate(QueueCapacities capacities,
|
||||
String nodeLabel) {
|
||||
capacities.setCapacity(nodeLabel,
|
||||
leafQueueTemplateCapacities.getCapacity(nodeLabel));
|
||||
capacities.setMaximumCapacity(nodeLabel,
|
||||
leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
LeafQueueState getLeafQueueState(LeafQueue queue)
|
||||
throws SchedulerDynamicEditException {
|
||||
LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue,
|
||||
String partition) throws SchedulerDynamicEditException {
|
||||
try {
|
||||
readLock.lock();
|
||||
String queueName = queue.getQueueName();
|
||||
if (!containsLeafQueue(queueName)) {
|
||||
if (!leafQueueState.containsLeafQueue(queueName, partition)) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not find leaf queue in " + "state " + queueName);
|
||||
} else{
|
||||
return leafQueueStateMap.get(queueName);
|
||||
return leafQueueState.
|
||||
getLeafQueueStatePerPartition(queueName, partition);
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
|
@ -715,8 +820,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public float getAbsoluteActivatedChildQueueCapacity() {
|
||||
return parentQueueState.getAbsoluteActivatedChildQueueCapacity();
|
||||
public float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
|
||||
return parentQueueState.getAbsoluteActivatedChildQueueCapacity(nodeLabel);
|
||||
}
|
||||
|
||||
private List<FiCaSchedulerApp> getSortedPendingApplications() {
|
||||
|
@ -726,20 +831,10 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
|
|||
return apps;
|
||||
}
|
||||
|
||||
private AutoCreatedLeafQueueConfig buildTemplate(float capacity,
|
||||
float maxCapacity) {
|
||||
private AutoCreatedLeafQueueConfig buildTemplate(QueueCapacities capacities) {
|
||||
AutoCreatedLeafQueueConfig.Builder templateBuilder =
|
||||
new AutoCreatedLeafQueueConfig.Builder();
|
||||
|
||||
QueueCapacities capacities = new QueueCapacities(false);
|
||||
templateBuilder.capacities(capacities);
|
||||
|
||||
for (String nodeLabel : managedParentQueue.getQueueCapacities()
|
||||
.getExistingNodeLabels()) {
|
||||
capacities.setCapacity(nodeLabel, capacity);
|
||||
capacities.setMaximumCapacity(nodeLabel, maxCapacity);
|
||||
}
|
||||
|
||||
return new AutoCreatedLeafQueueConfig(templateBuilder);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,4 +62,12 @@ public class PendingAskUpdateResult {
|
|||
public String getNewNodePartition() {
|
||||
return newNodePartition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PendingAskUpdateResult{" + "lastPendingAsk=" + lastPendingAsk
|
||||
+ ", lastNodePartition='" + lastNodePartition + '\''
|
||||
+ ", newPendingAsk=" + newPendingAsk + ", newNodePartition='"
|
||||
+ newNodePartition + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
|
@ -65,6 +67,7 @@ public class MockNM {
|
|||
new HashMap<ContainerId, ContainerStatus>();
|
||||
private Map<ApplicationId, AppCollectorData> registeringCollectors
|
||||
= new ConcurrentHashMap<>();
|
||||
private Set<NodeLabel> nodeLabels;
|
||||
|
||||
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
||||
// scale vcores based on the requested memory
|
||||
|
@ -101,6 +104,13 @@ public class MockNM {
|
|||
nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1]));
|
||||
}
|
||||
|
||||
public MockNM(String nodeIdStr, Resource capability,
|
||||
ResourceTrackerService resourceTracker, String version, Set<NodeLabel>
|
||||
nodeLabels) {
|
||||
this(nodeIdStr, capability, resourceTracker, version);
|
||||
this.nodeLabels = nodeLabels;
|
||||
}
|
||||
|
||||
public NodeId getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
@ -164,12 +174,17 @@ public class MockNM {
|
|||
List<ApplicationId> runningApplications) throws Exception {
|
||||
RegisterNodeManagerRequest req = Records.newRecord(
|
||||
RegisterNodeManagerRequest.class);
|
||||
|
||||
req.setNodeId(nodeId);
|
||||
req.setHttpPort(httpPort);
|
||||
req.setResource(capability);
|
||||
req.setContainerStatuses(containerReports);
|
||||
req.setNMVersion(version);
|
||||
req.setRunningApplications(runningApplications);
|
||||
if ( nodeLabels != null && nodeLabels.size() > 0) {
|
||||
req.setNodeLabels(nodeLabels);
|
||||
}
|
||||
|
||||
RegisterNodeManagerResponse registrationResponse =
|
||||
resourceTracker.registerNodeManager(req);
|
||||
this.currentContainerTokenMasterKey =
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -247,10 +249,11 @@ public class TestAppManager{
|
|||
private TestRMAppManager appMonitor;
|
||||
private ApplicationSubmissionContext asContext;
|
||||
private ApplicationId appId;
|
||||
private QueueInfo mockDefaultQueueInfo;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Before
|
||||
public void setUp() {
|
||||
public void setUp() throws IOException {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
rmContext = mockRMContext(1, now - 10);
|
||||
|
@ -258,6 +261,7 @@ public class TestAppManager{
|
|||
.setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
|
||||
ResourceScheduler scheduler = mockResourceScheduler();
|
||||
((RMContextImpl)rmContext).setScheduler(scheduler);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
((RMContextImpl) rmContext).setYarnConfiguration(conf);
|
||||
|
@ -275,6 +279,11 @@ public class TestAppManager{
|
|||
asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
|
||||
asContext.setResource(mockResource());
|
||||
asContext.setPriority(Priority.newInstance(0));
|
||||
asContext.setQueue("default");
|
||||
mockDefaultQueueInfo = mock(QueueInfo.class);
|
||||
when(scheduler.getQueueInfo("default", false, false))
|
||||
.thenReturn(mockDefaultQueueInfo);
|
||||
|
||||
setupDispatcher(rmContext, conf);
|
||||
}
|
||||
|
||||
|
@ -709,6 +718,7 @@ public class TestAppManager{
|
|||
for (ResourceRequest req : reqs) {
|
||||
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
|
||||
}
|
||||
|
||||
// setAMContainerResourceRequests has priority over
|
||||
// setAMContainerResourceRequest and setResource
|
||||
Assert.assertEquals(reqs, app.getAMResourceRequests());
|
||||
|
@ -722,6 +732,7 @@ public class TestAppManager{
|
|||
ResourceRequest req =
|
||||
ResourceRequest.newInstance(Priority.newInstance(0),
|
||||
ResourceRequest.ANY, Resources.createResource(1025), 1, true);
|
||||
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
|
||||
asContext.setAMContainerResourceRequest(cloneResourceRequest(req));
|
||||
// getAMContainerResourceRequests uses a singleton list of
|
||||
// getAMContainerResourceRequest
|
||||
|
@ -729,7 +740,6 @@ public class TestAppManager{
|
|||
Assert.assertEquals(req, asContext.getAMContainerResourceRequests().get(0));
|
||||
Assert.assertEquals(1, asContext.getAMContainerResourceRequests().size());
|
||||
RMApp app = testRMAppSubmit();
|
||||
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
|
||||
// setAMContainerResourceRequest has priority over setResource
|
||||
Assert.assertEquals(Collections.singletonList(req),
|
||||
app.getAMResourceRequests());
|
||||
|
@ -740,10 +750,12 @@ public class TestAppManager{
|
|||
asContext.setResource(Resources.createResource(1024));
|
||||
asContext.setAMContainerResourceRequests(null);
|
||||
RMApp app = testRMAppSubmit();
|
||||
|
||||
// setResource
|
||||
Assert.assertEquals(Collections.singletonList(
|
||||
ResourceRequest.newInstance(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
|
||||
ResourceRequest.ANY, Resources.createResource(1024), 1, true, "")),
|
||||
ResourceRequest.ANY, Resources.createResource(1024), 1, true,
|
||||
"")),
|
||||
app.getAMResourceRequests());
|
||||
}
|
||||
|
||||
|
@ -766,6 +778,8 @@ public class TestAppManager{
|
|||
throws Exception {
|
||||
asContext.setResource(null);
|
||||
List<ResourceRequest> reqs = new ArrayList<>();
|
||||
when(mockDefaultQueueInfo.getAccessibleNodeLabels()).thenReturn
|
||||
(new HashSet<String>() {{ add("label1"); add(""); }});
|
||||
ResourceRequest anyReq = ResourceRequest.newInstance(
|
||||
Priority.newInstance(1),
|
||||
ResourceRequest.ANY, Resources.createResource(1024), 1, false, "label1",
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.security.TestGroupsCaching;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
|
@ -65,6 +66,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
|
||||
.SimpleGroupsMapping;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -89,6 +92,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|||
.capacity.CapacitySchedulerConfiguration.DOT;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.capacity.CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.capacity.CapacitySchedulerConfiguration.ROOT;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -99,7 +104,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
private static final Log LOG = LogFactory.getLog(
|
||||
TestCapacitySchedulerAutoCreatedQueueBase.class);
|
||||
public static final int GB = 1024;
|
||||
public final static ContainerUpdates NULL_UPDATE_REQUESTS =
|
||||
public static final ContainerUpdates NULL_UPDATE_REQUESTS =
|
||||
new ContainerUpdates();
|
||||
|
||||
public static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
|
@ -112,9 +117,6 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
public static final String B1 = B + ".b1";
|
||||
public static final String B2 = B + ".b2";
|
||||
public static final String B3 = B + ".b3";
|
||||
public static final String C1 = C + ".c1";
|
||||
public static final String C2 = C + ".c2";
|
||||
public static final String C3 = C + ".c3";
|
||||
public static final float A_CAPACITY = 20f;
|
||||
public static final float B_CAPACITY = 40f;
|
||||
public static final float C_CAPACITY = 20f;
|
||||
|
@ -124,8 +126,6 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
public static final float B1_CAPACITY = 60f;
|
||||
public static final float B2_CAPACITY = 20f;
|
||||
public static final float B3_CAPACITY = 20f;
|
||||
public static final float C1_CAPACITY = 20f;
|
||||
public static final float C2_CAPACITY = 20f;
|
||||
|
||||
public static final int NODE_MEMORY = 16;
|
||||
|
||||
|
@ -147,12 +147,14 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
public static final String NODEL_LABEL_GPU = "GPU";
|
||||
public static final String NODEL_LABEL_SSD = "SSD";
|
||||
|
||||
public static final float NODE_LABEL_GPU_TEMPLATE_CAPACITY = 30.0f;
|
||||
public static final float NODEL_LABEL_SSD_TEMPLATE_CAPACITY = 40.0f;
|
||||
|
||||
protected MockRM mockRM = null;
|
||||
protected MockNM nm1 = null;
|
||||
protected MockNM nm2 = null;
|
||||
protected MockNM nm3 = null;
|
||||
protected CapacityScheduler cs;
|
||||
private final TestCapacityScheduler tcs = new TestCapacityScheduler();
|
||||
protected SpyDispatcher dispatcher;
|
||||
private static EventHandler<Event> rmAppEventEventHandler;
|
||||
|
||||
|
@ -215,15 +217,29 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
}
|
||||
|
||||
protected void setupNodes(MockRM newMockRM) throws Exception {
|
||||
NodeLabel ssdLabel = Records.newRecord(NodeLabel.class);
|
||||
ssdLabel.setName(NODEL_LABEL_SSD);
|
||||
ssdLabel.setExclusivity(true);
|
||||
|
||||
nm1 = // label = SSD
|
||||
new MockNM("h1:1234", NODE_MEMORY * GB, NODE1_VCORES, newMockRM
|
||||
.getResourceTrackerService());
|
||||
new MockNM("h1:1234",
|
||||
Resource.newInstance(NODE_MEMORY * GB, NODE1_VCORES),
|
||||
newMockRM.getResourceTrackerService(),
|
||||
YarnVersionInfo.getVersion(),
|
||||
new HashSet<NodeLabel>() {{ add(ssdLabel); }});
|
||||
|
||||
nm1.registerNode();
|
||||
|
||||
nm2 = // label = GPU
|
||||
new MockNM("h2:1234", NODE_MEMORY * GB, NODE2_VCORES, newMockRM
|
||||
.getResourceTrackerService
|
||||
());
|
||||
NodeLabel gpuLabel = Records.newRecord(NodeLabel.class);
|
||||
ssdLabel.setName(NODEL_LABEL_GPU);
|
||||
ssdLabel.setExclusivity(true);
|
||||
|
||||
//Label = GPU
|
||||
nm2 = new MockNM("h2:1234",
|
||||
Resource.newInstance(NODE_MEMORY * GB, NODE2_VCORES),
|
||||
newMockRM.getResourceTrackerService(),
|
||||
YarnVersionInfo.getVersion(),
|
||||
new HashSet<NodeLabel>() {{ add(gpuLabel); }});
|
||||
nm2.registerNode();
|
||||
|
||||
nm3 = // label = ""
|
||||
|
@ -295,19 +311,23 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
|
||||
/**
|
||||
* @param conf, to be modified
|
||||
* @return, CS configuration which has C as an auto creation enabled parent
|
||||
* queue
|
||||
* @return, CS configuration which has C
|
||||
* as an auto creation enabled parent queue
|
||||
* <p>
|
||||
* root / \ \ \ a b c d / \ / | \ a1 a2 b1
|
||||
* b2 b3
|
||||
* root
|
||||
* / \ \ \
|
||||
* a b c d
|
||||
* / \ / | \
|
||||
* a1 a2 b1 b2 b3
|
||||
*/
|
||||
|
||||
public static CapacitySchedulerConfiguration setupQueueConfiguration(
|
||||
CapacitySchedulerConfiguration conf) {
|
||||
|
||||
//setup new queues with one of them auto enabled
|
||||
// Define top-level queues
|
||||
// Set childQueue for root
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
conf.setQueues(ROOT,
|
||||
new String[] { "a", "b", "c", "d" });
|
||||
|
||||
conf.setCapacity(A, A_CAPACITY);
|
||||
|
@ -339,6 +359,19 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100);
|
||||
conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
|
||||
|
||||
conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_GPU,
|
||||
NODE_LABEL_GPU_TEMPLATE_CAPACITY);
|
||||
conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, NODEL_LABEL_GPU, 100.0f);
|
||||
conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_SSD,
|
||||
NODEL_LABEL_SSD_TEMPLATE_CAPACITY);
|
||||
conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, NODEL_LABEL_SSD,
|
||||
100.0f);
|
||||
|
||||
conf.setDefaultNodeLabelExpression(C, NODEL_LABEL_GPU);
|
||||
conf.setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression
|
||||
(C, NODEL_LABEL_SSD);
|
||||
|
||||
|
||||
LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue");
|
||||
|
||||
conf.setUserLimitFactor(D, 1.0f);
|
||||
|
@ -363,8 +396,13 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
accessibleNodeLabelsOnC.add(NO_LABEL);
|
||||
|
||||
conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC);
|
||||
conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 50);
|
||||
conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 50);
|
||||
conf.setAccessibleNodeLabels(ROOT, accessibleNodeLabelsOnC);
|
||||
conf.setCapacityByLabel(ROOT, NODEL_LABEL_GPU, 100f);
|
||||
conf.setCapacityByLabel(ROOT, NODEL_LABEL_SSD, 100f);
|
||||
|
||||
conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC);
|
||||
conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 100f);
|
||||
conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 100f);
|
||||
|
||||
LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");
|
||||
|
||||
|
@ -541,19 +579,21 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
autoCreatedLeafQueue.getMaxApplicationsPerUser());
|
||||
}
|
||||
|
||||
protected void validateInitialQueueEntitlement(CSQueue parentQueue,
|
||||
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
|
||||
protected void validateInitialQueueEntitlement(CSQueue parentQueue, String
|
||||
leafQueueName, Map<String, Float>
|
||||
expectedTotalChildQueueAbsCapacityByLabel,
|
||||
Set<String> nodeLabels)
|
||||
throws SchedulerDynamicEditException {
|
||||
throws SchedulerDynamicEditException, InterruptedException {
|
||||
validateInitialQueueEntitlement(cs, parentQueue, leafQueueName,
|
||||
expectedTotalChildQueueAbsCapacity, nodeLabels);
|
||||
expectedTotalChildQueueAbsCapacityByLabel, nodeLabels);
|
||||
}
|
||||
|
||||
protected void validateInitialQueueEntitlement(
|
||||
CapacityScheduler capacityScheduler, CSQueue parentQueue,
|
||||
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
|
||||
String leafQueueName,
|
||||
Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
|
||||
Set<String> nodeLabels)
|
||||
throws SchedulerDynamicEditException {
|
||||
throws SchedulerDynamicEditException, InterruptedException {
|
||||
ManagedParentQueue autoCreateEnabledParentQueue =
|
||||
(ManagedParentQueue) parentQueue;
|
||||
|
||||
|
@ -561,11 +601,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
(GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
|
||||
.getAutoCreatedQueueManagementPolicy();
|
||||
|
||||
assertEquals(expectedTotalChildQueueAbsCapacity,
|
||||
policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
|
||||
|
||||
AutoCreatedLeafQueue leafQueue =
|
||||
(AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);
|
||||
|
||||
Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
|
||||
QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
|
||||
|
@ -573,6 +609,10 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
|
||||
for (String label : nodeLabels) {
|
||||
validateCapacitiesByLabel(autoCreateEnabledParentQueue, leafQueue, label);
|
||||
assertEquals(true, policy.isActive(leafQueue, label));
|
||||
|
||||
assertEquals(expectedTotalChildQueueAbsCapacityByLabel.get(label),
|
||||
policy.getAbsoluteActivatedChildQueueCapacity(label), EPSILON);
|
||||
|
||||
QueueEntitlement expectedEntitlement = new QueueEntitlement(
|
||||
cap.getCapacity(label), cap.getMaximumCapacity(label));
|
||||
|
@ -581,21 +621,19 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
|
||||
validateEffectiveMinResource(leafQueue, label, expectedEntitlements);
|
||||
}
|
||||
|
||||
assertEquals(true, policy.isActive(leafQueue));
|
||||
}
|
||||
|
||||
protected void validateCapacitiesByLabel(
|
||||
ManagedParentQueue autoCreateEnabledParentQueue,
|
||||
AutoCreatedLeafQueue leafQueue, String label) {
|
||||
assertEquals(
|
||||
autoCreateEnabledParentQueue.getLeafQueueTemplate().getQueueCapacities()
|
||||
.getCapacity(), leafQueue.getQueueCapacities().getCapacity(label),
|
||||
EPSILON);
|
||||
assertEquals(
|
||||
autoCreateEnabledParentQueue.getLeafQueueTemplate().getQueueCapacities()
|
||||
.getMaximumCapacity(),
|
||||
leafQueue.getQueueCapacities().getMaximumCapacity(label), EPSILON);
|
||||
protected void validateCapacitiesByLabel(ManagedParentQueue
|
||||
autoCreateEnabledParentQueue, AutoCreatedLeafQueue leafQueue, String
|
||||
label) throws InterruptedException {
|
||||
assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate()
|
||||
.getQueueCapacities().getCapacity(label),
|
||||
leafQueue.getQueueCapacities()
|
||||
.getCapacity(label), EPSILON);
|
||||
assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate()
|
||||
.getQueueCapacities().getMaximumCapacity(label),
|
||||
leafQueue.getQueueCapacities()
|
||||
.getMaximumCapacity(label), EPSILON);
|
||||
}
|
||||
|
||||
protected void validateEffectiveMinResource(CSQueue leafQueue,
|
||||
|
@ -621,8 +659,10 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
}
|
||||
|
||||
protected void validateActivatedQueueEntitlement(CSQueue parentQueue,
|
||||
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
|
||||
List<QueueManagementChange> queueManagementChanges)
|
||||
String leafQueueName, Map<String, Float>
|
||||
expectedTotalChildQueueAbsCapacity,
|
||||
List<QueueManagementChange> queueManagementChanges, Set<String>
|
||||
expectedNodeLabels)
|
||||
throws SchedulerDynamicEditException {
|
||||
ManagedParentQueue autoCreateEnabledParentQueue =
|
||||
(ManagedParentQueue) parentQueue;
|
||||
|
@ -633,67 +673,84 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
|
||||
QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
|
||||
.getQueueCapacities();
|
||||
QueueEntitlement expectedEntitlement = new QueueEntitlement(
|
||||
cap.getCapacity(), cap.getMaximumCapacity());
|
||||
|
||||
//validate capacity
|
||||
validateQueueEntitlements(leafQueueName, expectedEntitlement,
|
||||
queueManagementChanges);
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue)
|
||||
cs.getQueue(leafQueueName);
|
||||
|
||||
Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
|
||||
|
||||
for (String label : expectedNodeLabels) {
|
||||
//validate leaf queue state
|
||||
assertEquals(true, policy.isActive(leafQueue, label));
|
||||
|
||||
QueueEntitlement expectedEntitlement = new QueueEntitlement(
|
||||
cap.getCapacity(label), cap.getMaximumCapacity(label));
|
||||
|
||||
//validate parent queue state
|
||||
assertEquals(expectedTotalChildQueueAbsCapacity,
|
||||
policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
|
||||
assertEquals(expectedTotalChildQueueAbsCapacity.get(label),
|
||||
policy.getAbsoluteActivatedChildQueueCapacity(label), EPSILON);
|
||||
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
|
||||
leafQueueName);
|
||||
expectedEntitlements.put(label, expectedEntitlement);
|
||||
}
|
||||
|
||||
//validate leaf queue state
|
||||
assertEquals(true, policy.isActive(leafQueue));
|
||||
//validate capacity
|
||||
validateQueueEntitlements(leafQueueName, expectedEntitlements,
|
||||
queueManagementChanges, expectedNodeLabels);
|
||||
}
|
||||
|
||||
protected void validateDeactivatedQueueEntitlement(CSQueue parentQueue,
|
||||
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
|
||||
List<QueueManagementChange> queueManagementChanges)
|
||||
String leafQueueName, Map<String, Float>
|
||||
expectedTotalChildQueueAbsCapacity,
|
||||
List<QueueManagementChange>
|
||||
queueManagementChanges)
|
||||
throws SchedulerDynamicEditException {
|
||||
QueueEntitlement expectedEntitlement = new QueueEntitlement(0.0f, 1.0f);
|
||||
QueueEntitlement expectedEntitlement =
|
||||
new QueueEntitlement(0.0f, 1.0f);
|
||||
|
||||
ManagedParentQueue autoCreateEnabledParentQueue =
|
||||
(ManagedParentQueue) parentQueue;
|
||||
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
|
||||
leafQueueName);
|
||||
AutoCreatedLeafQueue leafQueue =
|
||||
(AutoCreatedLeafQueue) cs.getQueue(leafQueueName);
|
||||
|
||||
GuaranteedOrZeroCapacityOverTimePolicy policy =
|
||||
(GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
|
||||
.getAutoCreatedQueueManagementPolicy();
|
||||
|
||||
Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
|
||||
|
||||
for (String label : accessibleNodeLabelsOnC) {
|
||||
//validate parent queue state
|
||||
assertEquals(expectedTotalChildQueueAbsCapacity,
|
||||
policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
|
||||
LOG.info("Validating label " + label);
|
||||
assertEquals(expectedTotalChildQueueAbsCapacity.get(label), policy
|
||||
.getAbsoluteActivatedChildQueueCapacity(label), EPSILON);
|
||||
|
||||
//validate leaf queue state
|
||||
assertEquals(false, policy.isActive(leafQueue));
|
||||
assertEquals(false, policy.isActive(leafQueue, label));
|
||||
expectedEntitlements.put(label, expectedEntitlement);
|
||||
}
|
||||
|
||||
//validate capacity
|
||||
validateQueueEntitlements(leafQueueName, expectedEntitlement,
|
||||
queueManagementChanges);
|
||||
validateQueueEntitlements(leafQueueName, expectedEntitlements,
|
||||
queueManagementChanges, accessibleNodeLabelsOnC);
|
||||
}
|
||||
|
||||
private void validateQueueEntitlements(String leafQueueName,
|
||||
QueueEntitlement expectedEntitlement,
|
||||
List<QueueManagementChange> queueEntitlementChanges) {
|
||||
void validateQueueEntitlements(String leafQueueName,
|
||||
Map<String, QueueEntitlement> expectedEntitlements,
|
||||
List<QueueManagementChange>
|
||||
queueEntitlementChanges, Set<String> expectedNodeLabels) {
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
|
||||
leafQueueName);
|
||||
validateQueueEntitlementChangesForLeafQueue(leafQueue, expectedEntitlement,
|
||||
queueEntitlementChanges);
|
||||
validateQueueEntitlementChanges(leafQueue, expectedEntitlements,
|
||||
queueEntitlementChanges, expectedNodeLabels);
|
||||
}
|
||||
|
||||
private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue,
|
||||
QueueEntitlement expectedQueueEntitlement,
|
||||
final List<QueueManagementChange> queueEntitlementChanges) {
|
||||
private void validateQueueEntitlementChanges(AutoCreatedLeafQueue leafQueue,
|
||||
Map<String, QueueEntitlement> expectedQueueEntitlements,
|
||||
final List<QueueManagementChange> queueEntitlementChanges, Set<String>
|
||||
expectedNodeLabels) {
|
||||
boolean found = false;
|
||||
|
||||
Map<String, QueueEntitlement> expectedQueueEntitlements = new HashMap<>();
|
||||
for (QueueManagementChange entitlementChange : queueEntitlementChanges) {
|
||||
if (leafQueue.getQueueName().equals(
|
||||
entitlementChange.getQueue().getQueueName())) {
|
||||
|
@ -701,13 +758,12 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
AutoCreatedLeafQueueConfig updatedQueueTemplate =
|
||||
entitlementChange.getUpdatedQueueTemplate();
|
||||
|
||||
for (String label : accessibleNodeLabelsOnC) {
|
||||
for (String label : expectedNodeLabels) {
|
||||
QueueEntitlement newEntitlement = new QueueEntitlement(
|
||||
updatedQueueTemplate.getQueueCapacities().getCapacity(label),
|
||||
updatedQueueTemplate.getQueueCapacities()
|
||||
.getMaximumCapacity(label));
|
||||
assertEquals(expectedQueueEntitlement, newEntitlement);
|
||||
expectedQueueEntitlements.put(label, expectedQueueEntitlement);
|
||||
updatedQueueTemplate.getQueueCapacities().getMaximumCapacity
|
||||
(label));
|
||||
assertEquals(expectedQueueEntitlements.get(label), newEntitlement);
|
||||
validateEffectiveMinResource(leafQueue, label,
|
||||
expectedQueueEntitlements);
|
||||
}
|
||||
|
@ -716,9 +772,20 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
|||
}
|
||||
}
|
||||
if (!found) {
|
||||
fail("Could not find the specified leaf queue in entitlement changes : "
|
||||
fail(
|
||||
"Could not find the specified leaf queue in entitlement changes : "
|
||||
+ leafQueue.getQueueName());
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, Float> populateExpectedAbsCapacityByLabelForParentQueue
|
||||
(int numLeafQueues) {
|
||||
Map<String, Float> expectedChildQueueAbsCapacity = new HashMap<>();
|
||||
expectedChildQueueAbsCapacity.put(NODEL_LABEL_GPU,
|
||||
NODE_LABEL_GPU_TEMPLATE_CAPACITY/100 * numLeafQueues);
|
||||
expectedChildQueueAbsCapacity.put(NODEL_LABEL_SSD,
|
||||
NODEL_LABEL_SSD_TEMPLATE_CAPACITY/100 * numLeafQueues);
|
||||
expectedChildQueueAbsCapacity.put(NO_LABEL, 0.1f * numLeafQueues);
|
||||
return expectedChildQueueAbsCapacity;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,15 +72,18 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
|
||||
.NO_LABEL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement
|
||||
.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.capacity.CSQueueUtils.EPSILON;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -90,7 +93,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
/**
|
||||
* Tests for creation and reinitialization of auto created leaf queues
|
||||
* under a ManagedParentQueue.
|
||||
* and capacity management under a ManagedParentQueue.
|
||||
*/
|
||||
public class TestCapacitySchedulerAutoQueueCreation
|
||||
extends TestCapacitySchedulerAutoCreatedQueueBase {
|
||||
|
@ -105,7 +108,7 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
4);
|
||||
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@Test(timeout = 20000)
|
||||
public void testAutoCreateLeafQueueCreation() throws Exception {
|
||||
|
||||
try {
|
||||
|
@ -122,7 +125,12 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
|
||||
PARENT_QUEUE);
|
||||
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
|
||||
validateInitialQueueEntitlement(parentQueue, USER0, 0.1f, accessibleNodeLabelsOnC);
|
||||
|
||||
Map<String, Float> expectedChildQueueAbsCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER0,
|
||||
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000);
|
||||
|
||||
assertTrue(autoCreatedLeafQueue
|
||||
|
@ -136,7 +144,14 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER);
|
||||
parentQueue = (ManagedParentQueue) cs.getQueue("d");
|
||||
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
|
||||
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER, 0.02f,
|
||||
|
||||
expectedChildQueueAbsCapacity =
|
||||
new HashMap<String, Float>() {{
|
||||
put(NO_LABEL, 0.02f);
|
||||
}};
|
||||
|
||||
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER,
|
||||
expectedChildQueueAbsCapacity,
|
||||
new HashSet<String>() {{ add(NO_LABEL); }});
|
||||
|
||||
} finally {
|
||||
|
@ -173,10 +188,17 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
USER0);
|
||||
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
|
||||
PARENT_QUEUE);
|
||||
|
||||
assertEquals(parentQueue, user0Queue.getParent());
|
||||
assertEquals(parentQueue, user1Queue.getParent());
|
||||
validateInitialQueueEntitlement(parentQueue, USER0, 0.2f, accessibleNodeLabelsOnC);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.2f, accessibleNodeLabelsOnC);
|
||||
|
||||
Map<String, Float>
|
||||
expectedAbsChildQueueCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(2);
|
||||
validateInitialQueueEntitlement(parentQueue, USER0,
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1,
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
ApplicationAttemptId appAttemptId = appsInC.get(0);
|
||||
|
||||
|
@ -184,7 +206,8 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(
|
||||
null);
|
||||
ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY,
|
||||
1 * GB, 1, true, priority, recordFactory);
|
||||
1 * GB, 1, true, priority,
|
||||
recordFactory);
|
||||
|
||||
cs.allocate(appAttemptId, Collections.<ResourceRequest>singletonList(r1),
|
||||
null, Collections.<ContainerId>emptyList(), Collections.singletonList(host),
|
||||
|
@ -216,8 +239,12 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
|
||||
USER1);
|
||||
|
||||
expectedAbsChildQueueCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(1);
|
||||
|
||||
validateInitialQueueEntitlement(parentQueue, leafQueue.getQueueName(),
|
||||
0.1f, accessibleNodeLabelsOnC);
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
} finally {
|
||||
cleanupQueue(USER0);
|
||||
|
@ -498,52 +525,80 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
CSQueue parentQueue = cs.getQueue(PARENT_QUEUE);
|
||||
|
||||
//submit app1 as USER1
|
||||
submitApp(mockRM, parentQueue, USER1, USER1, 1, 1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1,
|
||||
1, 1);
|
||||
Map<String, Float> expectedAbsChildQueueCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1,
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app2 as USER2
|
||||
ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2,
|
||||
1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
|
||||
|
||||
expectedAbsChildQueueCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(2);
|
||||
validateInitialQueueEntitlement(parentQueue, USER2,
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app3 as USER1
|
||||
submitApp(mockRM, parentQueue, USER1, USER1, 3, 2);
|
||||
|
||||
//validate total activated abs capacity remains the same
|
||||
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
|
||||
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
|
||||
parentQueue)
|
||||
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue)
|
||||
.getAutoCreatedQueueManagementPolicy();
|
||||
assertEquals(autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
|
||||
|
||||
//submit user_3 app. This cant be scheduled since there is no capacity
|
||||
for (String nodeLabel : accessibleNodeLabelsOnC) {
|
||||
assertEquals(expectedAbsChildQueueCapacity.get(nodeLabel),
|
||||
autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(nodeLabel), EPSILON);
|
||||
}
|
||||
|
||||
//submit user_3 app. This cant be allocated since there is no capacity
|
||||
// in NO_LABEL, SSD but can be in GPU label
|
||||
submitApp(mockRM, parentQueue, USER3, USER3, 4, 1);
|
||||
final CSQueue user3LeafQueue = cs.getQueue(USER3);
|
||||
validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f,
|
||||
1.0f, 1.0f);
|
||||
validateCapacitiesByLabel((ManagedParentQueue) parentQueue,
|
||||
(AutoCreatedLeafQueue)
|
||||
user3LeafQueue, NODEL_LABEL_GPU);
|
||||
|
||||
assertEquals(autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
|
||||
assertEquals(0.2f, autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
|
||||
assertEquals(0.9f, autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(NODEL_LABEL_GPU),
|
||||
EPSILON);
|
||||
|
||||
//deactivate USER2 queue
|
||||
//Verify that AMs can be allocated
|
||||
//Node 1 has SSD and default node label expression on C is SSD.
|
||||
//This validates that the default node label expression with SSD is set
|
||||
// on the AM attempt
|
||||
// and app attempt reaches ALLOCATED state for a dynamic queue 'USER1'
|
||||
mockRM.launchAM(mockRM.getRMContext().getRMApps().get(user1AppId),
|
||||
mockRM, nm1);
|
||||
|
||||
// //deactivate USER2 queue
|
||||
cs.killAllAppsInQueue(USER2);
|
||||
mockRM.waitForState(user2AppId, RMAppState.KILLED);
|
||||
|
||||
//Verify if USER_2 can be deactivated since it has no pending appsA
|
||||
//Verify if USER_2 can be deactivated since it has no pending apps
|
||||
List<QueueManagementChange> queueManagementChanges =
|
||||
autoCreatedQueueManagementPolicy.computeQueueManagementChanges();
|
||||
|
||||
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue;
|
||||
managedParentQueue.validateAndApplyQueueManagementChanges(
|
||||
queueManagementChanges);
|
||||
managedParentQueue.
|
||||
validateAndApplyQueueManagementChanges(queueManagementChanges);
|
||||
|
||||
validateDeactivatedQueueEntitlement(parentQueue, USER2, 0.2f,
|
||||
queueManagementChanges);
|
||||
validateDeactivatedQueueEntitlement(parentQueue, USER2,
|
||||
expectedAbsChildQueueCapacity, queueManagementChanges);
|
||||
|
||||
//USER_3 should now get activated
|
||||
validateActivatedQueueEntitlement(parentQueue, USER3, 0.2f,
|
||||
queueManagementChanges);
|
||||
//USER_3 should now get activated for SSD, NO_LABEL
|
||||
Set<String> expectedNodeLabelsUpdated = new HashSet<>();
|
||||
expectedNodeLabelsUpdated.add(NO_LABEL);
|
||||
expectedNodeLabelsUpdated.add(NODEL_LABEL_SSD);
|
||||
|
||||
validateActivatedQueueEntitlement(parentQueue, USER3,
|
||||
expectedAbsChildQueueCapacity , queueManagementChanges, expectedNodeLabelsUpdated);
|
||||
|
||||
} finally {
|
||||
cleanupQueue(USER1);
|
||||
|
@ -565,13 +620,18 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
|
||||
//submit app1 as USER1
|
||||
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
CSQueue user1LeafQueue = newCS.getQueue(USER1);
|
||||
Map<String, Float> expectedAbsChildQueueCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER1,
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app2 as USER2
|
||||
submitApp(newMockRM, parentQueue, USER2, USER2, 2, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
|
||||
CSQueue user2LeafQueue = newCS.getQueue(USER2);
|
||||
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2, 2,
|
||||
1);
|
||||
expectedAbsChildQueueCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(2);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER2,
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
//validate total activated abs capacity remains the same
|
||||
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
|
||||
|
@ -579,7 +639,7 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
parentQueue)
|
||||
.getAutoCreatedQueueManagementPolicy();
|
||||
assertEquals(autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
|
||||
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON);
|
||||
|
||||
//submit user_3 app. This cant be scheduled since there is no capacity
|
||||
submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
|
||||
|
@ -588,7 +648,7 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
1.0f, 1.0f);
|
||||
|
||||
assertEquals(autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
|
||||
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON);
|
||||
|
||||
// add new NM.
|
||||
newMockRM.registerNode("127.0.0.3:1234", 125 * GB, 20);
|
||||
|
@ -596,31 +656,33 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
// There will be change in effective resource when nodes are added
|
||||
// since we deal with percentages
|
||||
|
||||
Resource MAX_RES = Resources.addTo(TEMPLATE_MAX_RES,
|
||||
Resources.createResource(125 * GB, 20));
|
||||
Resource MAX_RES = Resources.addTo(TEMPLATE_MAX_RES, Resources.createResource(125 *
|
||||
GB, 20));
|
||||
|
||||
Resource MIN_RES = Resources.createResource(14438, 6);
|
||||
|
||||
Assert.assertEquals("Effective Min resource for USER3 is not correct",
|
||||
Resources.none(),
|
||||
user3LeafQueue.getQueueResourceQuotas().getEffectiveMinResource());
|
||||
Resources.none(), user3LeafQueue.getQueueResourceQuotas()
|
||||
.getEffectiveMinResource());
|
||||
Assert.assertEquals("Effective Max resource for USER3 is not correct",
|
||||
MAX_RES,
|
||||
user3LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
|
||||
MAX_RES, user3LeafQueue
|
||||
.getQueueResourceQuotas()
|
||||
.getEffectiveMaxResource());
|
||||
|
||||
CSQueue user1LeafQueue = newCS.getQueue(USER1);
|
||||
CSQueue user2LeafQueue = newCS.getQueue(USER2);
|
||||
Assert.assertEquals("Effective Min resource for USER2 is not correct",
|
||||
MIN_RES,
|
||||
user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource());
|
||||
MIN_RES, user1LeafQueue.getQueueResourceQuotas()
|
||||
.getEffectiveMinResource());
|
||||
Assert.assertEquals("Effective Max resource for USER2 is not correct",
|
||||
MAX_RES,
|
||||
user1LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
|
||||
MAX_RES, user1LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
|
||||
|
||||
Assert.assertEquals("Effective Min resource for USER1 is not correct",
|
||||
MIN_RES,
|
||||
user2LeafQueue.getQueueResourceQuotas().getEffectiveMinResource());
|
||||
MIN_RES, user2LeafQueue.getQueueResourceQuotas()
|
||||
.getEffectiveMinResource());
|
||||
Assert.assertEquals("Effective Max resource for USER1 is not correct",
|
||||
MAX_RES,
|
||||
user2LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
|
||||
MAX_RES, user2LeafQueue.getQueueResourceQuotas()
|
||||
.getEffectiveMaxResource());
|
||||
|
||||
// unregister one NM.
|
||||
newMockRM.unRegisterNode(nm3);
|
||||
|
@ -629,11 +691,11 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
|
||||
// After loosing one NM, resources will reduce
|
||||
Assert.assertEquals("Effective Min resource for USER2 is not correct",
|
||||
MIN_RES_UPDATED,
|
||||
user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource());
|
||||
MIN_RES_UPDATED, user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource
|
||||
());
|
||||
Assert.assertEquals("Effective Max resource for USER2 is not correct",
|
||||
MAX_RES_UPDATED,
|
||||
user2LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
|
||||
MAX_RES_UPDATED, user2LeafQueue.getQueueResourceQuotas()
|
||||
.getEffectiveMaxResource());
|
||||
|
||||
} finally {
|
||||
cleanupQueue(USER1);
|
||||
|
@ -646,25 +708,6 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCreatedQueueInheritsNodeLabels() throws Exception {
|
||||
|
||||
try {
|
||||
String host = "127.0.0.1";
|
||||
RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
|
||||
host);
|
||||
cs.handle(new NodeAddedSchedulerEvent(node));
|
||||
|
||||
CSQueue parentQueue = cs.getQueue(PARENT_QUEUE);
|
||||
|
||||
submitApp(USER1, USER1, NODEL_LABEL_GPU);
|
||||
//submit app1 as USER1
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
} finally {
|
||||
cleanupQueue(USER1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReinitializeQueuesWithAutoCreatedLeafQueues()
|
||||
throws Exception {
|
||||
|
@ -679,12 +722,20 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
|
||||
//submit app1 as USER1
|
||||
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
|
||||
Map<String, Float> expectedChildQueueAbsCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER1,
|
||||
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app2 as USER2
|
||||
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2,
|
||||
2, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
|
||||
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2,
|
||||
USER2, 2,
|
||||
1);
|
||||
expectedChildQueueAbsCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(2);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER2,
|
||||
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
//update parent queue capacity
|
||||
conf.setCapacity(C, 30f);
|
||||
|
@ -709,19 +760,27 @@ public class TestCapacitySchedulerAutoQueueCreation
|
|||
|
||||
//submit app1 as USER3
|
||||
submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f, accessibleNodeLabelsOnC);
|
||||
AutoCreatedLeafQueue user3Queue = (AutoCreatedLeafQueue) newCS.getQueue(
|
||||
USER1);
|
||||
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f);
|
||||
AutoCreatedLeafQueue user3Queue =
|
||||
(AutoCreatedLeafQueue) newCS.getQueue(USER1);
|
||||
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
|
||||
|
||||
validateUserAndAppLimits(user3Queue, 900, 900);
|
||||
|
||||
//submit app1 as USER1 - is already activated. there should be no diff
|
||||
// in capacities
|
||||
submitApp(newMockRM, parentQueue, USER3, USER3, 4, 2);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f, accessibleNodeLabelsOnC);
|
||||
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f);
|
||||
|
||||
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
|
||||
|
||||
validateUserAndAppLimits(user3Queue, 900, 900);
|
||||
|
||||
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
|
||||
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
|
||||
parentQueue)
|
||||
.getAutoCreatedQueueManagementPolicy();
|
||||
assertEquals(0.27f, autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity
|
||||
(NO_LABEL), EPSILON);
|
||||
} finally {
|
||||
cleanupQueue(USER1);
|
||||
cleanupQueue(USER2);
|
||||
|
|
|
@ -24,7 +24,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
|
||||
.NO_LABEL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.capacity.CSQueueUtils.EPSILON;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -54,21 +57,27 @@ public class TestQueueManagementDynamicEditPolicy extends
|
|||
parentQueue)
|
||||
.getAutoCreatedQueueManagementPolicy();
|
||||
assertEquals(0f, autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
|
||||
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
|
||||
|
||||
//submit app1 as USER1
|
||||
ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, 1,
|
||||
1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
Map<String, Float> expectedAbsChildQueueCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1,
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app2 as USER2
|
||||
ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2,
|
||||
1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
|
||||
expectedAbsChildQueueCapacity =
|
||||
populateExpectedAbsCapacityByLabelForParentQueue(2);
|
||||
validateInitialQueueEntitlement(parentQueue, USER2,
|
||||
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
|
||||
|
||||
//validate total activated abs capacity
|
||||
assertEquals(0.2f, autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
|
||||
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
|
||||
|
||||
//submit user_3 app. This cant be scheduled since there is no capacity
|
||||
submitApp(mockRM, parentQueue, USER3, USER3, 3, 1);
|
||||
|
@ -77,7 +86,7 @@ public class TestQueueManagementDynamicEditPolicy extends
|
|||
1.0f, 1.0f);
|
||||
|
||||
assertEquals(autoCreatedQueueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
|
||||
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON);
|
||||
|
||||
//deactivate USER2 queue
|
||||
cs.killAllAppsInQueue(USER2);
|
||||
|
@ -88,8 +97,8 @@ public class TestQueueManagementDynamicEditPolicy extends
|
|||
mockRM.waitForState(user1AppId, RMAppState.KILLED);
|
||||
|
||||
policy.editSchedule();
|
||||
|
||||
waitForPolicyState(0.1f, autoCreatedQueueManagementPolicy, 1000);
|
||||
waitForPolicyState(0.1f, autoCreatedQueueManagementPolicy, NO_LABEL,
|
||||
1000);
|
||||
|
||||
validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.5f, 0.1f,
|
||||
1.0f, 1.0f);
|
||||
|
@ -105,13 +114,12 @@ public class TestQueueManagementDynamicEditPolicy extends
|
|||
}
|
||||
|
||||
private void waitForPolicyState(float expectedVal,
|
||||
GuaranteedOrZeroCapacityOverTimePolicy queueManagementPolicy, int
|
||||
timesec) throws
|
||||
InterruptedException {
|
||||
GuaranteedOrZeroCapacityOverTimePolicy queueManagementPolicy, String
|
||||
nodeLabel, int timesec) throws InterruptedException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < timesec * 1000) {
|
||||
if (Float.compare(expectedVal, queueManagementPolicy
|
||||
.getAbsoluteActivatedChildQueueCapacity()) != 0) {
|
||||
.getAbsoluteActivatedChildQueueCapacity(nodeLabel)) > EPSILON) {
|
||||
Thread.sleep(100);
|
||||
} else {
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue