YARN-7574. Add support for Node Labels on Auto Created Leaf Queue Template. Contributed by Suma Shivaprasad.

(cherry picked from commit 821b0de4c5)
This commit is contained in:
Sunil G 2018-04-09 21:17:22 +05:30
parent 091db4d0eb
commit 8311fcc75b
17 changed files with 831 additions and 441 deletions

View File

@ -236,13 +236,14 @@ public class RMServerUtils {
*/
public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
Resource maximumResource, String queueName, YarnScheduler scheduler,
RMContext rmContext)
throws InvalidResourceRequestException {
RMContext rmContext) throws InvalidResourceRequestException {
// Get queue from scheduler
QueueInfo queueInfo = null;
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
//Queue may not exist since it could be auto-created in case of
// dynamic queues
}
for (ResourceRequest resReq : ask) {

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@ -75,6 +76,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
@ -1109,6 +1113,49 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
amBlacklist.getBlacklistAdditions() + ") and removals(" +
amBlacklist.getBlacklistRemovals() + ")");
}
QueueInfo queueInfo = null;
for (ResourceRequest amReq : appAttempt.amReqs) {
if (amReq.getNodeLabelExpression() == null && ResourceRequest.ANY
.equals(amReq.getResourceName())) {
String queue = appAttempt.rmApp.getQueue();
//Load queue only once since queue will be same across attempts
if (queueInfo == null) {
try {
queueInfo = appAttempt.scheduler.getQueueInfo(queue, false,
false);
} catch (IOException e) {
LOG.error("Could not find queue for application : ", e);
// Set application status to REJECTED since we cant find the
// queue
appAttempt.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttempt.getAppAttemptId(),
RMAppAttemptEventType.FAIL,
"Could not find queue for application : " +
appAttempt.rmApp.getQueue()));
appAttempt.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(appAttempt.rmApp.getApplicationId(), RMAppEventType
.APP_REJECTED,
"Could not find queue for application : " +
appAttempt.rmApp.getQueue()));
return RMAppAttemptState.FAILED;
}
}
String labelExp = RMNodeLabelsManager.NO_LABEL;
if (queueInfo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting default node label expression : " + queueInfo
.getDefaultNodeLabelExpression());
}
labelExp = queueInfo.getDefaultNodeLabelExpression();
}
amReq.setNodeLabelExpression(labelExp);
}
}
// AM resource has been checked when submission
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(

View File

@ -132,4 +132,16 @@ public class Allocation {
public void setResourceLimit(Resource resource) {
this.resourceLimit = resource;
}
@Override
public String toString() {
return "Allocation{" + "containers=" + containers + ", strictContainers="
+ strictContainers + ", fungibleContainers=" + fungibleContainers
+ ", fungibleResources=" + fungibleResources + ", nmTokens=" + nmTokens
+ ", increasedContainers=" + increasedContainers
+ ", decreasedContainers=" + decreasedContainers
+ ", promotedContainers=" + promotedContainers + ", demotedContainers="
+ demotedContainers + ", previousAttemptContainers="
+ previousAttemptContainers + ", resourceLimit=" + resourceLimit + '}';
}
}

View File

@ -186,20 +186,34 @@ public class SchedulerUtils {
ResourceRequest resReq, QueueInfo queueInfo) {
String labelExp = resReq.getNodeLabelExpression();
if (LOG.isDebugEnabled()) {
LOG.debug("Requested Node Label Expression : " + labelExp);
LOG.debug("Queue Info : " + queueInfo);
}
// if queue has default label expression, and RR doesn't have, use the
// default label expression of queue
if (labelExp == null && queueInfo != null && ResourceRequest.ANY
.equals(resReq.getResourceName())) {
if ( LOG.isDebugEnabled()) {
LOG.debug("Setting default node label expression : " + queueInfo
.getDefaultNodeLabelExpression());
}
labelExp = queueInfo.getDefaultNodeLabelExpression();
}
// If labelExp still equals to null, set it to be NO_LABEL
if (labelExp == null) {
// If labelExp still equals to null, it could either be a dynamic queue
// or the label is not configured
// set it to be NO_LABEL in case of a pre-configured queue. Dynamic
// queues are handled in RMAppAttemptImp.ScheduledTransition
if (labelExp == null && queueInfo != null) {
labelExp = RMNodeLabelsManager.NO_LABEL;
}
if ( labelExp != null) {
resReq.setNodeLabelExpression(labelExp);
}
}
public static void normalizeAndValidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
@ -209,6 +223,7 @@ public class SchedulerUtils {
isRecovery, rmContext, null);
}
public static void normalizeAndValidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
boolean isRecovery, RMContext rmContext, QueueInfo queueInfo)
@ -233,11 +248,12 @@ public class SchedulerUtils {
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// it is possible queue cannot get when queue mapping is set, just ignore
// the queueInfo here, and move forward
//Queue may not exist since it could be auto-created in case of
// dynamic queues
}
}
SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
if (!isRecovery) {
validateResourceRequest(resReq, maximumResource, queueInfo, rmContext);
}
@ -245,8 +261,7 @@ public class SchedulerUtils {
public static void normalizeAndvalidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
RMContext rmContext)
throws InvalidResourceRequestException {
RMContext rmContext) throws InvalidResourceRequestException {
normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler,
rmContext, null);
}

View File

@ -148,11 +148,10 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
try {
for( String nodeLabel : parent.getQueueCapacities().getExistingNodeLabels
()) {
//TODO - update to use getMaximumCapacity(nodeLabel) in YARN-7574
setEntitlement(nodeLabel, new QueueEntitlement(0.0f,
parent.getLeafQueueTemplate()
.getQueueCapacities()
.getMaximumCapacity()));
.getMaximumCapacity(nodeLabel)));
}
} catch (SchedulerDynamicEditException e) {
throw new IOException(e);

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import java.io.IOException;
import java.util.List;
public interface AutoCreatedQueueManagementPolicy {
@ -26,14 +29,15 @@ public interface AutoCreatedQueueManagementPolicy {
* Initialize policy
* @param schedulerContext Capacity Scheduler context
*/
void init(CapacitySchedulerContext schedulerContext, ParentQueue parentQueue);
void init(CapacitySchedulerContext schedulerContext, ParentQueue
parentQueue) throws IOException;
/**
* Reinitialize policy state ( if required )
* @param schedulerContext Capacity Scheduler context
*/
void reinitialize(CapacitySchedulerContext schedulerContext,
ParentQueue parentQueue);
ParentQueue parentQueue) throws IOException;
/**
* Get initial template for the specified leaf queue
@ -48,6 +52,10 @@ public interface AutoCreatedQueueManagementPolicy {
/**
* Compute/Adjust child queue capacities
* for auto created leaf queues
* This computes queue entitlements but does not update LeafQueueState or
* queue capacities. Scheduler calls commitQueueManagemetChanges after
* validation after applying queue changes and commits to LeafQueueState
* are done in commitQueueManagementChanges.
*
* @return returns a list of suggested QueueEntitlementChange(s) which may
* or may not be be enforced by the scheduler

View File

@ -1167,6 +1167,8 @@ public class CapacityScheduler extends
updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
}
LOG.info("Allocation for application " + applicationAttemptId + " : " +
allocation + " with cluster resource : " + getClusterResource());
return allocation;
}

View File

@ -1829,6 +1829,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
setCapacity(leafQueueConfPrefix, val);
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
String label, float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setCapacityByLabel(leafQueueConfPrefix, label, val);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath,
@ -1838,6 +1847,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
setMaximumCapacity(leafQueueConfPrefix, val);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
String label, float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setMaximumCapacityByLabel(leafQueueConfPrefix, label, val);
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath,
@ -1856,6 +1874,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
setUserLimitFactor(leafQueueConfPrefix, val);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression(String
queuePath,
String expression) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setDefaultNodeLabelExpression(leafQueueConfPrefix, expression);
}
public static String getUnits(String resourceValue) {
String units;
for (int i = 0; i < resourceValue.length(); i++) {

View File

@ -778,6 +778,17 @@ public class LeafQueue extends AbstractCSQueue {
metrics.setAMResouceLimit(nodePartition, amResouceLimit);
queueUsage.setAMLimit(nodePartition, amResouceLimit);
if(LOG.isDebugEnabled()) {
LOG.debug("Queue: " + getQueueName() + ", node label : " +
nodePartition
+ ", queue "
+ "partition "
+ "resource : " + queuePartitionResource + ','
+ " queue current limit : " + queueCurrentLimit + ","
+ " queue partition usable resource : "
+ queuePartitionUsableResource + ","
+ " amResourceLimit : " + amResouceLimit);
}
return amResouceLimit;
} finally {
writeLock.unlock();

View File

@ -132,7 +132,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
}
}
private void initializeQueueManagementPolicy() {
private void initializeQueueManagementPolicy() throws IOException {
queueManagementPolicy =
csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
getQueuePath());
@ -140,7 +140,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
queueManagementPolicy.init(csContext, this);
}
private void reinitializeQueueManagementPolicy() {
private void reinitializeQueueManagementPolicy() throws IOException {
AutoCreatedQueueManagementPolicy managementPolicy =
csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
getQueuePath());
@ -339,6 +339,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
((AutoCreatedLeafQueue) childQueue).validateConfigurations(template);
break;
}
}
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@ -63,8 +64,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CSQueueUtils.EPSILON;
@ -85,8 +84,6 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private static final Log LOG = LogFactory.getLog(
GuaranteedOrZeroCapacityOverTimePolicy.class);
private AutoCreatedLeafQueueConfig ZERO_CAPACITY_ENTITLEMENT;
private ReentrantReadWriteLock.WriteLock writeLock;
private ReentrantReadWriteLock.ReadLock readLock;
@ -97,12 +94,70 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private QueueCapacities leafQueueTemplateCapacities;
private Map<String, LeafQueueState> leafQueueStateMap = new HashMap<>();
private Set<String> leafQueueTemplateNodeLabels;
private LeafQueueState leafQueueState = new LeafQueueState();
private Clock clock = new MonotonicClock();
private class LeafQueueState {
//map of partition-> queueName->{leaf queue's state}
private Map<String, Map<String, LeafQueueStatePerPartition>>
leafQueueStateMap = new HashMap<>();
public boolean containsLeafQueue(String leafQueueName, String partition) {
if (leafQueueStateMap.containsKey(partition)) {
return leafQueueStateMap.get(partition).containsKey(leafQueueName);
}
return false;
}
private boolean containsPartition(String partition) {
if (leafQueueStateMap.containsKey(partition)) {
return true;
}
return false;
}
private boolean addLeafQueueStateIfNotExists(String leafQueueName,
String partition, LeafQueueStatePerPartition leafQueueState) {
if (!containsPartition(partition)) {
leafQueueStateMap.put(partition, new HashMap<>());
}
if (!containsLeafQueue(leafQueueName, partition)) {
leafQueueStateMap.get(partition).put(leafQueueName, leafQueueState);
return true;
}
return false;
}
public boolean createLeafQueueStateIfNotExists(LeafQueue leafQueue,
String partition) {
return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), partition,
new LeafQueueStatePerPartition());
}
public LeafQueueStatePerPartition getLeafQueueStatePerPartition(
String leafQueueName, String partition) {
if (leafQueueStateMap.get(partition) != null) {
return leafQueueStateMap.get(partition).get(leafQueueName);
}
return null;
}
public Map<String, Map<String, LeafQueueStatePerPartition>>
getLeafQueueStateMap() {
return leafQueueStateMap;
}
private void clear() {
leafQueueStateMap.clear();
}
}
private class LeafQueueStatePerPartition {
private AtomicBoolean isActive = new AtomicBoolean(false);
private long mostRecentActivationTime;
@ -139,41 +194,16 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
}
private boolean containsLeafQueue(String leafQueueName) {
return leafQueueStateMap.containsKey(leafQueueName);
}
private boolean addLeafQueueStateIfNotExists(String leafQueueName,
LeafQueueState leafQueueState) {
if (!containsLeafQueue(leafQueueName)) {
leafQueueStateMap.put(leafQueueName, leafQueueState);
return true;
}
return false;
}
private boolean addLeafQueueStateIfNotExists(LeafQueue leafQueue) {
return addLeafQueueStateIfNotExists(leafQueue.getQueueName(),
new LeafQueueState());
}
private void clearLeafQueueState() {
leafQueueStateMap.clear();
}
private class ParentQueueState {
private Map<String, Float> totalAbsoluteActivatedChildQueueCapacityByLabel =
new HashMap<String, Float>();
private float getAbsoluteActivatedChildQueueCapacity() {
return getAbsoluteActivatedChildQueueCapacity(NO_LABEL);
}
private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
try {
readLock.lock();
Float totalActivatedCapacity = getByLabel(nodeLabel);
Float totalActivatedCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (totalActivatedCapacity != null) {
return totalActivatedCapacity;
} else{
@ -188,11 +218,14 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
float childQueueCapacity) {
try {
writeLock.lock();
Float activatedChildCapacity = getByLabel(nodeLabel);
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (activatedChildCapacity != null) {
setByLabel(nodeLabel, activatedChildCapacity + childQueueCapacity);
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
activatedChildCapacity + childQueueCapacity);
} else{
setByLabel(nodeLabel, childQueueCapacity);
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
childQueueCapacity);
}
} finally {
writeLock.unlock();
@ -203,22 +236,25 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
float childQueueCapacity) {
try {
writeLock.lock();
Float activatedChildCapacity = getByLabel(nodeLabel);
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (activatedChildCapacity != null) {
setByLabel(nodeLabel, activatedChildCapacity - childQueueCapacity);
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
activatedChildCapacity - childQueueCapacity);
} else{
setByLabel(nodeLabel, childQueueCapacity);
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
childQueueCapacity);
}
} finally {
writeLock.unlock();
}
}
Float getByLabel(String label) {
Float getAbsActivatedChildQueueCapacityByLabel(String label) {
return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label);
}
Float setByLabel(String label, float val) {
Float setAbsActivatedChildQueueCapacityByLabel(String label, float val) {
return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val);
}
@ -256,13 +292,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
@Override
public void init(final CapacitySchedulerContext schedulerContext,
final ParentQueue parentQueue) {
final ParentQueue parentQueue) throws IOException {
this.scheduler = schedulerContext;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
if (!(parentQueue instanceof ManagedParentQueue)) {
throw new IllegalArgumentException(
"Expected instance of type " + ManagedParentQueue.class);
@ -278,15 +313,43 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
+ leafQueueTemplate.getQueueCapacities() + "]");
}
private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) {
private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue)
throws IOException {
leafQueueTemplate = parentQueue.getLeafQueueTemplate();
leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities();
ZERO_CAPACITY_ENTITLEMENT = buildTemplate(0.0f,
leafQueueTemplateCapacities.getMaximumCapacity());
Set<String> parentQueueLabels = parentQueue.getNodeLabelsForQueue();
for (String nodeLabel : leafQueueTemplateCapacities
.getExistingNodeLabels()) {
if (!parentQueueLabels.contains(nodeLabel)) {
LOG.error("Invalid node label " + nodeLabel
+ " on configured leaf template on parent" + " queue " + parentQueue
.getQueueName());
throw new IOException("Invalid node label " + nodeLabel
+ " on configured leaf template on parent" + " queue " + parentQueue
.getQueueName());
}
}
leafQueueTemplateNodeLabels =
leafQueueTemplateCapacities.getExistingNodeLabels();
}
/**
* Compute/Adjust child queue capacities
* for auto created leaf queues
* This computes queue entitlements but does not update LeafQueueState or
* queue capacities. Scheduler calls commitQueueManagemetChanges after
* validation after applying queue changes and commits to LeafQueueState
* are done in commitQueueManagementChanges.
*
* @return List of Queue Management change suggestions which could potentially
* be committed/rejected by the scheduler due to validation failures
* @throws SchedulerDynamicEditException
*/
@Override
public List<QueueManagementChange> computeQueueManagementChanges()
throws SchedulerDynamicEditException {
@ -298,30 +361,39 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
try {
readLock.lock();
List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
//Map of LeafQueue->QueueCapacities - keep adding the computed
// entitlements to this map and finally
// build the leaf queue configuration Template for all identified leaf
// queues
Map<String, QueueCapacities> leafQueueEntitlements = new HashMap<>();
for (String nodeLabel : leafQueueTemplateNodeLabels) {
// check if any leaf queues need to be deactivated based on pending
// applications and
// applications
float parentAbsoluteCapacity =
managedParentQueue.getQueueCapacities().getAbsoluteCapacity();
managedParentQueue.getQueueCapacities().getAbsoluteCapacity(
nodeLabel);
float leafQueueTemplateAbsoluteCapacity =
leafQueueTemplateCapacities.getAbsoluteCapacity();
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel);
Map<String, QueueCapacities> deactivatedLeafQueues =
deactivateLeafQueuesIfInActive(managedParentQueue, queueManagementChanges);
deactivateLeafQueuesIfInActive(managedParentQueue, nodeLabel,
leafQueueEntitlements);
float deactivatedCapacity = getTotalDeactivatedCapacity(
deactivatedLeafQueues);
deactivatedLeafQueues, nodeLabel);
float sumOfChildQueueActivatedCapacity = parentQueueState.
getAbsoluteActivatedChildQueueCapacity();
getAbsoluteActivatedChildQueueCapacity(nodeLabel);
//Check if we need to activate anything at all?
float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity,
deactivatedCapacity, sumOfChildQueueActivatedCapacity);
float availableCapacity =
parentAbsoluteCapacity - sumOfChildQueueActivatedCapacity
+ deactivatedCapacity + EPSILON;
if (LOG.isDebugEnabled()) {
LOG.debug(
"Parent queue : " + managedParentQueue.getQueueName() + " absCapacity = "
LOG.debug("Parent queue : " + managedParentQueue.getQueueName()
+ ", nodeLabel = " + nodeLabel + ", absCapacity = "
+ parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = "
+ leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = "
+ deactivatedCapacity + " , absChildActivatedCapacity = "
@ -331,37 +403,50 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) {
//sort applications across leaf queues by submit time
List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
if (pendingApps.size() > 0) {
int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated(
availableCapacity, leafQueueTemplateAbsoluteCapacity,
pendingApps.size());
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + maxLeafQueuesTobeActivated
+ " leaf queues to be activated with " + pendingApps.size()
+ " apps ");
LOG.debug("Found " + maxLeafQueuesTobeActivated + " leaf queues"
+ " to be activated with " + pendingApps.size() + " apps ");
}
LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues(
pendingApps, maxLeafQueuesTobeActivated,
nodeLabel, pendingApps, maxLeafQueuesTobeActivated,
deactivatedLeafQueues.keySet());
//Compute entitlement changes for the identified leaf queues
// which is appended to the List of queueManagementChanges
computeQueueManagementChanges(leafQueuesToBeActivated,
queueManagementChanges, availableCapacity,
leafQueueTemplateAbsoluteCapacity);
// which is appended to the List of computedEntitlements
updateLeafQueueCapacitiesByLabel(nodeLabel, leafQueuesToBeActivated,
leafQueueEntitlements);
if (LOG.isDebugEnabled()) {
if (leafQueuesToBeActivated.size() > 0) {
LOG.debug(
"Activated leaf queues : [" + leafQueuesToBeActivated + "]");
LOG.debug("Activated leaf queues : [" + leafQueuesToBeActivated
+ "]");
}
}
}
}
}
//Populate new entitlements
for (final Iterator<Map.Entry<String, QueueCapacities>> iterator =
leafQueueEntitlements.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, QueueCapacities> queueCapacities = iterator.next();
String leafQueueName = queueCapacities.getKey();
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
.getQueue(leafQueueName);
AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
queueCapacities.getValue());
queueManagementChanges.add(
new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
}
return queueManagementChanges;
} finally {
readLock.unlock();
@ -369,14 +454,14 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
private float getTotalDeactivatedCapacity(
Map<String, QueueCapacities> deactivatedLeafQueues) {
Map<String, QueueCapacities> deactivatedLeafQueues, String nodeLabel) {
float deactivatedCapacity = 0;
for (Iterator<Map.Entry<String, QueueCapacities>> iterator =
deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, QueueCapacities> deactivatedQueueCapacity =
iterator.next();
deactivatedCapacity +=
deactivatedQueueCapacity.getValue().getAbsoluteCapacity();
deactivatedQueueCapacity.getValue().getAbsoluteCapacity(nodeLabel);
}
return deactivatedCapacity;
}
@ -385,20 +470,42 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
void updateLeafQueueState() {
try {
writeLock.lock();
Set<String> newPartitions = new HashSet<>();
Set<String> newQueues = new HashSet<>();
for (CSQueue newQueue : managedParentQueue.getChildQueues()) {
if (newQueue instanceof LeafQueue) {
addLeafQueueStateIfNotExists((LeafQueue) newQueue);
for (String nodeLabel : leafQueueTemplateNodeLabels) {
leafQueueState.createLeafQueueStateIfNotExists((LeafQueue) newQueue,
nodeLabel);
newPartitions.add(nodeLabel);
}
newQueues.add(newQueue.getQueueName());
}
}
for (Iterator<Map.Entry<String, LeafQueueState>> itr =
leafQueueStateMap.entrySet().iterator(); itr.hasNext(); ) {
Map.Entry<String, LeafQueueState> e = itr.next();
String queueName = e.getKey();
if (!newQueues.contains(queueName)) {
for (Iterator<Map.Entry<String, Map<String, LeafQueueStatePerPartition>>>
itr = leafQueueState.getLeafQueueStateMap().entrySet().iterator();
itr.hasNext(); ) {
Map.Entry<String, Map<String, LeafQueueStatePerPartition>> e =
itr.next();
String partition = e.getKey();
if (!newPartitions.contains(partition)) {
itr.remove();
LOG.info(
"Removed partition " + partition + " from leaf queue " + "state");
} else{
Map<String, LeafQueueStatePerPartition> queues = e.getValue();
for (
Iterator<Map.Entry<String, LeafQueueStatePerPartition>> queueItr =
queues.entrySet().iterator(); queueItr.hasNext(); ) {
String queue = queueItr.next().getKey();
if (!newQueues.contains(queue)) {
queueItr.remove();
LOG.info("Removed queue " + queue + " from leaf queue "
+ "state from partition " + partition);
}
}
}
}
} finally {
@ -406,22 +513,20 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
}
private LinkedHashSet<String> getSortedLeafQueues(
private LinkedHashSet<String> getSortedLeafQueues(String nodeLabel,
final List<FiCaSchedulerApp> pendingApps, int leafQueuesNeeded,
Set<String> deactivatedQueues) throws SchedulerDynamicEditException {
LinkedHashSet<String> leafQueues = new LinkedHashSet<>(leafQueuesNeeded);
int ctr = 0;
for (FiCaSchedulerApp app : pendingApps) {
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) app.getCSLeafQueue();
String leafQueueName = leafQueue.getQueueName();
//Check if leafQueue is not active already and has any pending apps
if (ctr < leafQueuesNeeded) {
if (!isActive(leafQueue)) {
if (!isActive(leafQueue, nodeLabel)) {
if (!deactivatedQueues.contains(leafQueueName)) {
if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) {
ctr++;
@ -445,11 +550,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
@VisibleForTesting
public boolean isActive(final AutoCreatedLeafQueue leafQueue)
throws SchedulerDynamicEditException {
public boolean isActive(final AutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
try {
readLock.lock();
LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue);
LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue,
nodeLabel);
return leafQueueStatus.isActive();
} finally {
readLock.unlock();
@ -457,64 +563,52 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive(
ParentQueue parentQueue,
List<QueueManagementChange> queueManagementChanges)
ParentQueue parentQueue, String nodeLabel,
Map<String, QueueCapacities> leafQueueEntitlements)
throws SchedulerDynamicEditException {
Map<String, QueueCapacities> deactivatedQueues = new HashMap<>();
for (CSQueue childQueue : parentQueue.getChildQueues()) {
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
if (leafQueue != null) {
if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) {
if (!leafQueueEntitlements.containsKey(leafQueue.getQueueName())) {
leafQueueEntitlements.put(leafQueue.getQueueName(),
new QueueCapacities(false));
}
if (isActive(leafQueue) && !hasPendingApps(leafQueue)) {
queueManagementChanges.add(
new QueueManagementChange.UpdateQueue(leafQueue,
ZERO_CAPACITY_ENTITLEMENT));
QueueCapacities capacities = leafQueueEntitlements.get(
leafQueue.getQueueName());
updateToZeroCapacity(capacities, nodeLabel);
deactivatedQueues.put(leafQueue.getQueueName(),
leafQueueTemplateCapacities);
} else{
if (LOG.isDebugEnabled()) {
LOG.debug(" Leaf queue has pending applications : " + leafQueue
.getNumApplications() + ".Skipping deactivation for "
+ leafQueue);
LOG.debug(" Leaf queue has pending applications or is " + "inactive"
+ " : " + leafQueue.getNumApplications()
+ ".Skipping deactivation for " + leafQueue);
}
}
} else{
LOG.warn("Could not find queue in scheduler while trying" + " to "
+ "deactivate for " + parentQueue);
}
}
if (LOG.isDebugEnabled()) {
if (deactivatedQueues.size() > 0) {
LOG.debug("Deactivated leaf queues : " + deactivatedQueues);
}
}
return deactivatedQueues;
}
private void computeQueueManagementChanges(
private void updateLeafQueueCapacitiesByLabel(String nodeLabel,
Set<String> leafQueuesToBeActivated,
List<QueueManagementChange> queueManagementChanges,
final float availableCapacity,
final float leafQueueTemplateAbsoluteCapacity) {
float curAvailableCapacity = availableCapacity;
Map<String, QueueCapacities> leafQueueEntitlements) {
for (String curLeafQueue : leafQueuesToBeActivated) {
if (!leafQueueEntitlements.containsKey(curLeafQueue)) {
leafQueueEntitlements.put(curLeafQueue, new QueueCapacities(false));
// Activate queues if capacity is available
if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) {
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
.getQueue(curLeafQueue);
if (leafQueue != null) {
AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
leafQueueTemplateCapacities.getCapacity(),
leafQueueTemplateCapacities.getMaximumCapacity());
queueManagementChanges.add(
new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity;
} else{
LOG.warn(
"Could not find queue in scheduler while trying to deactivate "
+ curLeafQueue);
}
}
QueueCapacities capacities = leafQueueEntitlements.get(curLeafQueue);
updateCapacityFromTemplate(capacities, nodeLabel);
}
}
@ -528,17 +622,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
availableCapacity / childQueueAbsoluteCapacity);
return Math.min(numLeafQueuesNeeded, numPendingApps);
} else{
throw new SchedulerDynamicEditException("Child queue absolute capacity "
+ "is initialized to 0. Check parent queue's " + managedParentQueue
.getQueueName() + " leaf queue template configuration");
}
}
private float getAvailableCapacity(float parentAbsCapacity,
float deactivatedAbsCapacity, float totalChildQueueActivatedCapacity) {
return parentAbsCapacity - totalChildQueueActivatedCapacity
+ deactivatedAbsCapacity + EPSILON;
return 0;
}
/**
@ -567,57 +652,55 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue;
if (updatedQueueTemplate.getQueueCapacities().getCapacity() > 0) {
if (isActive(leafQueue)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Queue is already active. Skipping activation : " + queue
.getQueuePath());
}
} else{
activate(leafQueue);
}
} else{
if (!isActive(leafQueue)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Queue is already de-activated. " + "Skipping de-activation "
+ ": " + leafQueue.getQueuePath());
}
} else{
deactivate(leafQueue);
}
}
}
} finally {
writeLock.unlock();
}
}
private void activate(final AutoCreatedLeafQueue leafQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
getLeafQueueState(leafQueue).activate();
parentQueueState.incAbsoluteActivatedChildCapacity(NO_LABEL,
leafQueueTemplateCapacities.getAbsoluteCapacity());
} finally {
writeLock.unlock();
}
}
private void deactivate(final AutoCreatedLeafQueue leafQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
getLeafQueueState(leafQueue).deactivate();
for (String nodeLabel : managedParentQueue.getQueueCapacities()
for (String nodeLabel : updatedQueueTemplate.getQueueCapacities()
.getExistingNodeLabels()) {
parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity());
if (updatedQueueTemplate.getQueueCapacities().
getCapacity(nodeLabel) > 0) {
if (isActive(leafQueue, nodeLabel)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Queue is already active." + " Skipping activation : "
+ queue.getQueuePath());
}
} else{
activate(leafQueue, nodeLabel);
}
} else{
if (!isActive(leafQueue, nodeLabel)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Queue is already de-activated. Skipping "
+ "de-activation : " + leafQueue.getQueuePath());
}
} else{
deactivate(leafQueue, nodeLabel);
}
}
}
}
} finally {
writeLock.unlock();
}
}
private void activate(final AbstractAutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
try {
writeLock.lock();
getLeafQueueState(leafQueue, nodeLabel).activate();
parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
} finally {
writeLock.unlock();
}
}
private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
try {
writeLock.lock();
getLeafQueueState(leafQueue, nodeLabel).deactivate();
parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
} finally {
writeLock.unlock();
}
@ -629,7 +712,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
@Override
public void reinitialize(CapacitySchedulerContext schedulerContext,
final ParentQueue parentQueue) {
final ParentQueue parentQueue) throws IOException {
if (!(parentQueue instanceof ManagedParentQueue)) {
throw new IllegalStateException(
"Expected instance of type " + ManagedParentQueue.class + " found "
@ -649,12 +732,11 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
//clear state
parentQueueState.clear();
clearLeafQueueState();
leafQueueState.clear();
LOG.info(
"Reinitialized queue management policy for parent queue "
+ parentQueue.getQueueName() +" with leaf queue template "
+ "capacities : ["
"Reinitialized queue management policy for parent queue " + parentQueue
.getQueueName() + " with leaf queue template " + "capacities : ["
+ leafQueueTemplate.getQueueCapacities() + "]");
}
@ -663,51 +745,74 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
AbstractAutoCreatedLeafQueue leafQueue)
throws SchedulerDynamicEditException {
if ( !(leafQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException("Not an instance of "
+ "AutoCreatedLeafQueue : " + leafQueue.getClass());
AutoCreatedLeafQueueConfig template;
if (!(leafQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Not an instance of " + "AutoCreatedLeafQueue : " + leafQueue
.getClass());
}
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) leafQueue;
AutoCreatedLeafQueueConfig template = ZERO_CAPACITY_ENTITLEMENT;
try {
writeLock.lock();
if (!addLeafQueueStateIfNotExists(leafQueue)) {
LOG.error("Leaf queue already exists in state : " + getLeafQueueState(
leafQueue));
throw new SchedulerDynamicEditException(
QueueCapacities capacities = new QueueCapacities(false);
for (String nodeLabel : leafQueueTemplateNodeLabels) {
if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue,
nodeLabel)) {
String message =
"Leaf queue already exists in state : " + getLeafQueueState(
leafQueue));
leafQueue, nodeLabel);
LOG.error(message);
}
float availableCapacity = getAvailableCapacity(
managedParentQueue.getQueueCapacities().getAbsoluteCapacity(), 0,
parentQueueState.getAbsoluteActivatedChildQueueCapacity());
float availableCapacity = managedParentQueue.getQueueCapacities().
getAbsoluteCapacity(nodeLabel) - parentQueueState.
getAbsoluteActivatedChildQueueCapacity(nodeLabel) + EPSILON;
if (availableCapacity >= leafQueueTemplateCapacities
.getAbsoluteCapacity()) {
activate(autoCreatedLeafQueue);
template = buildTemplate(leafQueueTemplateCapacities.getCapacity(),
leafQueueTemplateCapacities.getMaximumCapacity());
.getAbsoluteCapacity(nodeLabel)) {
updateCapacityFromTemplate(capacities, nodeLabel);
activate(leafQueue, nodeLabel);
} else{
updateToZeroCapacity(capacities, nodeLabel);
}
}
template = buildTemplate(capacities);
} finally {
writeLock.unlock();
}
return template;
}
private void updateToZeroCapacity(QueueCapacities capacities,
String nodeLabel) {
capacities.setCapacity(nodeLabel, 0.0f);
capacities.setMaximumCapacity(nodeLabel,
leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
}
private void updateCapacityFromTemplate(QueueCapacities capacities,
String nodeLabel) {
capacities.setCapacity(nodeLabel,
leafQueueTemplateCapacities.getCapacity(nodeLabel));
capacities.setMaximumCapacity(nodeLabel,
leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
}
@VisibleForTesting
LeafQueueState getLeafQueueState(LeafQueue queue)
throws SchedulerDynamicEditException {
LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue,
String partition) throws SchedulerDynamicEditException {
try {
readLock.lock();
String queueName = queue.getQueueName();
if (!containsLeafQueue(queueName)) {
if (!leafQueueState.containsLeafQueue(queueName, partition)) {
throw new SchedulerDynamicEditException(
"Could not find leaf queue in " + "state " + queueName);
} else{
return leafQueueStateMap.get(queueName);
return leafQueueState.
getLeafQueueStatePerPartition(queueName, partition);
}
} finally {
readLock.unlock();
@ -715,8 +820,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
@VisibleForTesting
public float getAbsoluteActivatedChildQueueCapacity() {
return parentQueueState.getAbsoluteActivatedChildQueueCapacity();
public float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
return parentQueueState.getAbsoluteActivatedChildQueueCapacity(nodeLabel);
}
private List<FiCaSchedulerApp> getSortedPendingApplications() {
@ -726,20 +831,10 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
return apps;
}
private AutoCreatedLeafQueueConfig buildTemplate(float capacity,
float maxCapacity) {
private AutoCreatedLeafQueueConfig buildTemplate(QueueCapacities capacities) {
AutoCreatedLeafQueueConfig.Builder templateBuilder =
new AutoCreatedLeafQueueConfig.Builder();
QueueCapacities capacities = new QueueCapacities(false);
templateBuilder.capacities(capacities);
for (String nodeLabel : managedParentQueue.getQueueCapacities()
.getExistingNodeLabels()) {
capacities.setCapacity(nodeLabel, capacity);
capacities.setMaximumCapacity(nodeLabel, maxCapacity);
}
return new AutoCreatedLeafQueueConfig(templateBuilder);
}
}

View File

@ -62,4 +62,12 @@ public class PendingAskUpdateResult {
public String getNewNodePartition() {
return newNodePartition;
}
@Override
public String toString() {
return "PendingAskUpdateResult{" + "lastPendingAsk=" + lastPendingAsk
+ ", lastNodePartition='" + lastNodePartition + '\''
+ ", newPendingAsk=" + newPendingAsk + ", newNodePartition='"
+ newNodePartition + '\'' + '}';
}
}

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -65,6 +67,7 @@ public class MockNM {
new HashMap<ContainerId, ContainerStatus>();
private Map<ApplicationId, AppCollectorData> registeringCollectors
= new ConcurrentHashMap<>();
private Set<NodeLabel> nodeLabels;
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
// scale vcores based on the requested memory
@ -101,6 +104,13 @@ public class MockNM {
nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1]));
}
public MockNM(String nodeIdStr, Resource capability,
ResourceTrackerService resourceTracker, String version, Set<NodeLabel>
nodeLabels) {
this(nodeIdStr, capability, resourceTracker, version);
this.nodeLabels = nodeLabels;
}
public NodeId getNodeId() {
return nodeId;
}
@ -164,12 +174,17 @@ public class MockNM {
List<ApplicationId> runningApplications) throws Exception {
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
req.setHttpPort(httpPort);
req.setResource(capability);
req.setContainerStatuses(containerReports);
req.setNMVersion(version);
req.setRunningApplications(runningApplications);
if ( nodeLabels != null && nodeLabels.size() > 0) {
req.setNodeLabels(nodeLabels);
}
RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req);
this.currentContainerTokenMasterKey =

View File

@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -247,10 +249,11 @@ public class TestAppManager{
private TestRMAppManager appMonitor;
private ApplicationSubmissionContext asContext;
private ApplicationId appId;
private QueueInfo mockDefaultQueueInfo;
@SuppressWarnings("deprecation")
@Before
public void setUp() {
public void setUp() throws IOException {
long now = System.currentTimeMillis();
rmContext = mockRMContext(1, now - 10);
@ -258,6 +261,7 @@ public class TestAppManager{
.setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
ResourceScheduler scheduler = mockResourceScheduler();
((RMContextImpl)rmContext).setScheduler(scheduler);
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
((RMContextImpl) rmContext).setYarnConfiguration(conf);
@ -275,6 +279,11 @@ public class TestAppManager{
asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
asContext.setResource(mockResource());
asContext.setPriority(Priority.newInstance(0));
asContext.setQueue("default");
mockDefaultQueueInfo = mock(QueueInfo.class);
when(scheduler.getQueueInfo("default", false, false))
.thenReturn(mockDefaultQueueInfo);
setupDispatcher(rmContext, conf);
}
@ -709,6 +718,7 @@ public class TestAppManager{
for (ResourceRequest req : reqs) {
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
}
// setAMContainerResourceRequests has priority over
// setAMContainerResourceRequest and setResource
Assert.assertEquals(reqs, app.getAMResourceRequests());
@ -722,6 +732,7 @@ public class TestAppManager{
ResourceRequest req =
ResourceRequest.newInstance(Priority.newInstance(0),
ResourceRequest.ANY, Resources.createResource(1025), 1, true);
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
asContext.setAMContainerResourceRequest(cloneResourceRequest(req));
// getAMContainerResourceRequests uses a singleton list of
// getAMContainerResourceRequest
@ -729,7 +740,6 @@ public class TestAppManager{
Assert.assertEquals(req, asContext.getAMContainerResourceRequests().get(0));
Assert.assertEquals(1, asContext.getAMContainerResourceRequests().size());
RMApp app = testRMAppSubmit();
req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
// setAMContainerResourceRequest has priority over setResource
Assert.assertEquals(Collections.singletonList(req),
app.getAMResourceRequests());
@ -740,10 +750,12 @@ public class TestAppManager{
asContext.setResource(Resources.createResource(1024));
asContext.setAMContainerResourceRequests(null);
RMApp app = testRMAppSubmit();
// setResource
Assert.assertEquals(Collections.singletonList(
ResourceRequest.newInstance(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
ResourceRequest.ANY, Resources.createResource(1024), 1, true, "")),
ResourceRequest.ANY, Resources.createResource(1024), 1, true,
"")),
app.getAMResourceRequests());
}
@ -766,6 +778,8 @@ public class TestAppManager{
throws Exception {
asContext.setResource(null);
List<ResourceRequest> reqs = new ArrayList<>();
when(mockDefaultQueueInfo.getAccessibleNodeLabels()).thenReturn
(new HashSet<String>() {{ add("label1"); add(""); }});
ResourceRequest anyReq = ResourceRequest.newInstance(
Priority.newInstance(1),
ResourceRequest.ANY, Resources.createResource(1024), 1, false, "label1",

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.security.TestGroupsCaching;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -65,6 +66,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.SimpleGroupsMapping;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
@ -89,6 +92,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CapacitySchedulerConfiguration.DOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CapacitySchedulerConfiguration.ROOT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -99,7 +104,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
private static final Log LOG = LogFactory.getLog(
TestCapacitySchedulerAutoCreatedQueueBase.class);
public static final int GB = 1024;
public final static ContainerUpdates NULL_UPDATE_REQUESTS =
public static final ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
public static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
@ -112,9 +117,6 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String B1 = B + ".b1";
public static final String B2 = B + ".b2";
public static final String B3 = B + ".b3";
public static final String C1 = C + ".c1";
public static final String C2 = C + ".c2";
public static final String C3 = C + ".c3";
public static final float A_CAPACITY = 20f;
public static final float B_CAPACITY = 40f;
public static final float C_CAPACITY = 20f;
@ -124,8 +126,6 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final float B1_CAPACITY = 60f;
public static final float B2_CAPACITY = 20f;
public static final float B3_CAPACITY = 20f;
public static final float C1_CAPACITY = 20f;
public static final float C2_CAPACITY = 20f;
public static final int NODE_MEMORY = 16;
@ -147,12 +147,14 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String NODEL_LABEL_GPU = "GPU";
public static final String NODEL_LABEL_SSD = "SSD";
public static final float NODE_LABEL_GPU_TEMPLATE_CAPACITY = 30.0f;
public static final float NODEL_LABEL_SSD_TEMPLATE_CAPACITY = 40.0f;
protected MockRM mockRM = null;
protected MockNM nm1 = null;
protected MockNM nm2 = null;
protected MockNM nm3 = null;
protected CapacityScheduler cs;
private final TestCapacityScheduler tcs = new TestCapacityScheduler();
protected SpyDispatcher dispatcher;
private static EventHandler<Event> rmAppEventEventHandler;
@ -215,15 +217,29 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
}
protected void setupNodes(MockRM newMockRM) throws Exception {
NodeLabel ssdLabel = Records.newRecord(NodeLabel.class);
ssdLabel.setName(NODEL_LABEL_SSD);
ssdLabel.setExclusivity(true);
nm1 = // label = SSD
new MockNM("h1:1234", NODE_MEMORY * GB, NODE1_VCORES, newMockRM
.getResourceTrackerService());
new MockNM("h1:1234",
Resource.newInstance(NODE_MEMORY * GB, NODE1_VCORES),
newMockRM.getResourceTrackerService(),
YarnVersionInfo.getVersion(),
new HashSet<NodeLabel>() {{ add(ssdLabel); }});
nm1.registerNode();
nm2 = // label = GPU
new MockNM("h2:1234", NODE_MEMORY * GB, NODE2_VCORES, newMockRM
.getResourceTrackerService
());
NodeLabel gpuLabel = Records.newRecord(NodeLabel.class);
ssdLabel.setName(NODEL_LABEL_GPU);
ssdLabel.setExclusivity(true);
//Label = GPU
nm2 = new MockNM("h2:1234",
Resource.newInstance(NODE_MEMORY * GB, NODE2_VCORES),
newMockRM.getResourceTrackerService(),
YarnVersionInfo.getVersion(),
new HashSet<NodeLabel>() {{ add(gpuLabel); }});
nm2.registerNode();
nm3 = // label = ""
@ -295,19 +311,23 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
/**
* @param conf, to be modified
* @return, CS configuration which has C as an auto creation enabled parent
* queue
* @return, CS configuration which has C
* as an auto creation enabled parent queue
* <p>
* root / \ \ \ a b c d / \ / | \ a1 a2 b1
* b2 b3
* root
* / \ \ \
* a b c d
* / \ / | \
* a1 a2 b1 b2 b3
*/
public static CapacitySchedulerConfiguration setupQueueConfiguration(
CapacitySchedulerConfiguration conf) {
//setup new queues with one of them auto enabled
// Define top-level queues
// Set childQueue for root
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
conf.setQueues(ROOT,
new String[] { "a", "b", "c", "d" });
conf.setCapacity(A, A_CAPACITY);
@ -339,6 +359,19 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100);
conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_GPU,
NODE_LABEL_GPU_TEMPLATE_CAPACITY);
conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, NODEL_LABEL_GPU, 100.0f);
conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_SSD,
NODEL_LABEL_SSD_TEMPLATE_CAPACITY);
conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, NODEL_LABEL_SSD,
100.0f);
conf.setDefaultNodeLabelExpression(C, NODEL_LABEL_GPU);
conf.setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression
(C, NODEL_LABEL_SSD);
LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue");
conf.setUserLimitFactor(D, 1.0f);
@ -363,8 +396,13 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
accessibleNodeLabelsOnC.add(NO_LABEL);
conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC);
conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 50);
conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 50);
conf.setAccessibleNodeLabels(ROOT, accessibleNodeLabelsOnC);
conf.setCapacityByLabel(ROOT, NODEL_LABEL_GPU, 100f);
conf.setCapacityByLabel(ROOT, NODEL_LABEL_SSD, 100f);
conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC);
conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 100f);
conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 100f);
LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");
@ -541,19 +579,21 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
autoCreatedLeafQueue.getMaxApplicationsPerUser());
}
protected void validateInitialQueueEntitlement(CSQueue parentQueue,
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
protected void validateInitialQueueEntitlement(CSQueue parentQueue, String
leafQueueName, Map<String, Float>
expectedTotalChildQueueAbsCapacityByLabel,
Set<String> nodeLabels)
throws SchedulerDynamicEditException {
throws SchedulerDynamicEditException, InterruptedException {
validateInitialQueueEntitlement(cs, parentQueue, leafQueueName,
expectedTotalChildQueueAbsCapacity, nodeLabels);
expectedTotalChildQueueAbsCapacityByLabel, nodeLabels);
}
protected void validateInitialQueueEntitlement(
CapacityScheduler capacityScheduler, CSQueue parentQueue,
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
String leafQueueName,
Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
Set<String> nodeLabels)
throws SchedulerDynamicEditException {
throws SchedulerDynamicEditException, InterruptedException {
ManagedParentQueue autoCreateEnabledParentQueue =
(ManagedParentQueue) parentQueue;
@ -561,11 +601,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
(GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
.getAutoCreatedQueueManagementPolicy();
assertEquals(expectedTotalChildQueueAbsCapacity,
policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);
Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
@ -573,6 +609,10 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
for (String label : nodeLabels) {
validateCapacitiesByLabel(autoCreateEnabledParentQueue, leafQueue, label);
assertEquals(true, policy.isActive(leafQueue, label));
assertEquals(expectedTotalChildQueueAbsCapacityByLabel.get(label),
policy.getAbsoluteActivatedChildQueueCapacity(label), EPSILON);
QueueEntitlement expectedEntitlement = new QueueEntitlement(
cap.getCapacity(label), cap.getMaximumCapacity(label));
@ -581,21 +621,19 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
validateEffectiveMinResource(leafQueue, label, expectedEntitlements);
}
assertEquals(true, policy.isActive(leafQueue));
}
protected void validateCapacitiesByLabel(
ManagedParentQueue autoCreateEnabledParentQueue,
AutoCreatedLeafQueue leafQueue, String label) {
assertEquals(
autoCreateEnabledParentQueue.getLeafQueueTemplate().getQueueCapacities()
.getCapacity(), leafQueue.getQueueCapacities().getCapacity(label),
EPSILON);
assertEquals(
autoCreateEnabledParentQueue.getLeafQueueTemplate().getQueueCapacities()
.getMaximumCapacity(),
leafQueue.getQueueCapacities().getMaximumCapacity(label), EPSILON);
protected void validateCapacitiesByLabel(ManagedParentQueue
autoCreateEnabledParentQueue, AutoCreatedLeafQueue leafQueue, String
label) throws InterruptedException {
assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate()
.getQueueCapacities().getCapacity(label),
leafQueue.getQueueCapacities()
.getCapacity(label), EPSILON);
assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate()
.getQueueCapacities().getMaximumCapacity(label),
leafQueue.getQueueCapacities()
.getMaximumCapacity(label), EPSILON);
}
protected void validateEffectiveMinResource(CSQueue leafQueue,
@ -621,8 +659,10 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
}
protected void validateActivatedQueueEntitlement(CSQueue parentQueue,
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
List<QueueManagementChange> queueManagementChanges)
String leafQueueName, Map<String, Float>
expectedTotalChildQueueAbsCapacity,
List<QueueManagementChange> queueManagementChanges, Set<String>
expectedNodeLabels)
throws SchedulerDynamicEditException {
ManagedParentQueue autoCreateEnabledParentQueue =
(ManagedParentQueue) parentQueue;
@ -633,67 +673,84 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
.getQueueCapacities();
QueueEntitlement expectedEntitlement = new QueueEntitlement(
cap.getCapacity(), cap.getMaximumCapacity());
//validate capacity
validateQueueEntitlements(leafQueueName, expectedEntitlement,
queueManagementChanges);
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue)
cs.getQueue(leafQueueName);
Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
for (String label : expectedNodeLabels) {
//validate leaf queue state
assertEquals(true, policy.isActive(leafQueue, label));
QueueEntitlement expectedEntitlement = new QueueEntitlement(
cap.getCapacity(label), cap.getMaximumCapacity(label));
//validate parent queue state
assertEquals(expectedTotalChildQueueAbsCapacity,
policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
assertEquals(expectedTotalChildQueueAbsCapacity.get(label),
policy.getAbsoluteActivatedChildQueueCapacity(label), EPSILON);
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
leafQueueName);
expectedEntitlements.put(label, expectedEntitlement);
}
//validate leaf queue state
assertEquals(true, policy.isActive(leafQueue));
//validate capacity
validateQueueEntitlements(leafQueueName, expectedEntitlements,
queueManagementChanges, expectedNodeLabels);
}
protected void validateDeactivatedQueueEntitlement(CSQueue parentQueue,
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
List<QueueManagementChange> queueManagementChanges)
String leafQueueName, Map<String, Float>
expectedTotalChildQueueAbsCapacity,
List<QueueManagementChange>
queueManagementChanges)
throws SchedulerDynamicEditException {
QueueEntitlement expectedEntitlement = new QueueEntitlement(0.0f, 1.0f);
QueueEntitlement expectedEntitlement =
new QueueEntitlement(0.0f, 1.0f);
ManagedParentQueue autoCreateEnabledParentQueue =
(ManagedParentQueue) parentQueue;
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
leafQueueName);
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) cs.getQueue(leafQueueName);
GuaranteedOrZeroCapacityOverTimePolicy policy =
(GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
.getAutoCreatedQueueManagementPolicy();
Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
for (String label : accessibleNodeLabelsOnC) {
//validate parent queue state
assertEquals(expectedTotalChildQueueAbsCapacity,
policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
LOG.info("Validating label " + label);
assertEquals(expectedTotalChildQueueAbsCapacity.get(label), policy
.getAbsoluteActivatedChildQueueCapacity(label), EPSILON);
//validate leaf queue state
assertEquals(false, policy.isActive(leafQueue));
assertEquals(false, policy.isActive(leafQueue, label));
expectedEntitlements.put(label, expectedEntitlement);
}
//validate capacity
validateQueueEntitlements(leafQueueName, expectedEntitlement,
queueManagementChanges);
validateQueueEntitlements(leafQueueName, expectedEntitlements,
queueManagementChanges, accessibleNodeLabelsOnC);
}
private void validateQueueEntitlements(String leafQueueName,
QueueEntitlement expectedEntitlement,
List<QueueManagementChange> queueEntitlementChanges) {
void validateQueueEntitlements(String leafQueueName,
Map<String, QueueEntitlement> expectedEntitlements,
List<QueueManagementChange>
queueEntitlementChanges, Set<String> expectedNodeLabels) {
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
leafQueueName);
validateQueueEntitlementChangesForLeafQueue(leafQueue, expectedEntitlement,
queueEntitlementChanges);
validateQueueEntitlementChanges(leafQueue, expectedEntitlements,
queueEntitlementChanges, expectedNodeLabels);
}
private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue,
QueueEntitlement expectedQueueEntitlement,
final List<QueueManagementChange> queueEntitlementChanges) {
private void validateQueueEntitlementChanges(AutoCreatedLeafQueue leafQueue,
Map<String, QueueEntitlement> expectedQueueEntitlements,
final List<QueueManagementChange> queueEntitlementChanges, Set<String>
expectedNodeLabels) {
boolean found = false;
Map<String, QueueEntitlement> expectedQueueEntitlements = new HashMap<>();
for (QueueManagementChange entitlementChange : queueEntitlementChanges) {
if (leafQueue.getQueueName().equals(
entitlementChange.getQueue().getQueueName())) {
@ -701,13 +758,12 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
AutoCreatedLeafQueueConfig updatedQueueTemplate =
entitlementChange.getUpdatedQueueTemplate();
for (String label : accessibleNodeLabelsOnC) {
for (String label : expectedNodeLabels) {
QueueEntitlement newEntitlement = new QueueEntitlement(
updatedQueueTemplate.getQueueCapacities().getCapacity(label),
updatedQueueTemplate.getQueueCapacities()
.getMaximumCapacity(label));
assertEquals(expectedQueueEntitlement, newEntitlement);
expectedQueueEntitlements.put(label, expectedQueueEntitlement);
updatedQueueTemplate.getQueueCapacities().getMaximumCapacity
(label));
assertEquals(expectedQueueEntitlements.get(label), newEntitlement);
validateEffectiveMinResource(leafQueue, label,
expectedQueueEntitlements);
}
@ -716,9 +772,20 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
}
}
if (!found) {
fail("Could not find the specified leaf queue in entitlement changes : "
fail(
"Could not find the specified leaf queue in entitlement changes : "
+ leafQueue.getQueueName());
}
}
protected Map<String, Float> populateExpectedAbsCapacityByLabelForParentQueue
(int numLeafQueues) {
Map<String, Float> expectedChildQueueAbsCapacity = new HashMap<>();
expectedChildQueueAbsCapacity.put(NODEL_LABEL_GPU,
NODE_LABEL_GPU_TEMPLATE_CAPACITY/100 * numLeafQueues);
expectedChildQueueAbsCapacity.put(NODEL_LABEL_SSD,
NODEL_LABEL_SSD_TEMPLATE_CAPACITY/100 * numLeafQueues);
expectedChildQueueAbsCapacity.put(NO_LABEL, 0.1f * numLeafQueues);
return expectedChildQueueAbsCapacity;
}
}

View File

@ -72,15 +72,18 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.placement
.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CSQueueUtils.EPSILON;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -90,7 +93,7 @@ import static org.mockito.Mockito.when;
/**
* Tests for creation and reinitialization of auto created leaf queues
* under a ManagedParentQueue.
* and capacity management under a ManagedParentQueue.
*/
public class TestCapacitySchedulerAutoQueueCreation
extends TestCapacitySchedulerAutoCreatedQueueBase {
@ -105,7 +108,7 @@ public class TestCapacitySchedulerAutoQueueCreation
4);
@Test(timeout = 10000)
@Test(timeout = 20000)
public void testAutoCreateLeafQueueCreation() throws Exception {
try {
@ -122,7 +125,12 @@ public class TestCapacitySchedulerAutoQueueCreation
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
PARENT_QUEUE);
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
validateInitialQueueEntitlement(parentQueue, USER0, 0.1f, accessibleNodeLabelsOnC);
Map<String, Float> expectedChildQueueAbsCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(parentQueue, USER0,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000);
assertTrue(autoCreatedLeafQueue
@ -136,7 +144,14 @@ public class TestCapacitySchedulerAutoQueueCreation
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER);
parentQueue = (ManagedParentQueue) cs.getQueue("d");
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER, 0.02f,
expectedChildQueueAbsCapacity =
new HashMap<String, Float>() {{
put(NO_LABEL, 0.02f);
}};
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER,
expectedChildQueueAbsCapacity,
new HashSet<String>() {{ add(NO_LABEL); }});
} finally {
@ -173,10 +188,17 @@ public class TestCapacitySchedulerAutoQueueCreation
USER0);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
PARENT_QUEUE);
assertEquals(parentQueue, user0Queue.getParent());
assertEquals(parentQueue, user1Queue.getParent());
validateInitialQueueEntitlement(parentQueue, USER0, 0.2f, accessibleNodeLabelsOnC);
validateInitialQueueEntitlement(parentQueue, USER1, 0.2f, accessibleNodeLabelsOnC);
Map<String, Float>
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(parentQueue, USER0,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
validateInitialQueueEntitlement(parentQueue, USER1,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
ApplicationAttemptId appAttemptId = appsInC.get(0);
@ -184,7 +206,8 @@ public class TestCapacitySchedulerAutoQueueCreation
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(
null);
ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 1, true, priority, recordFactory);
1 * GB, 1, true, priority,
recordFactory);
cs.allocate(appAttemptId, Collections.<ResourceRequest>singletonList(r1),
null, Collections.<ContainerId>emptyList(), Collections.singletonList(host),
@ -216,8 +239,12 @@ public class TestCapacitySchedulerAutoQueueCreation
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
USER1);
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(parentQueue, leafQueue.getQueueName(),
0.1f, accessibleNodeLabelsOnC);
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
} finally {
cleanupQueue(USER0);
@ -498,52 +525,80 @@ public class TestCapacitySchedulerAutoQueueCreation
CSQueue parentQueue = cs.getQueue(PARENT_QUEUE);
//submit app1 as USER1
submitApp(mockRM, parentQueue, USER1, USER1, 1, 1);
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1,
1, 1);
Map<String, Float> expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(parentQueue, USER1,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2,
1);
validateInitialQueueEntitlement(parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(parentQueue, USER2,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//submit another app3 as USER1
submitApp(mockRM, parentQueue, USER1, USER1, 3, 2);
//validate total activated abs capacity remains the same
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
parentQueue)
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue)
.getAutoCreatedQueueManagementPolicy();
assertEquals(autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
//submit user_3 app. This cant be scheduled since there is no capacity
for (String nodeLabel : accessibleNodeLabelsOnC) {
assertEquals(expectedAbsChildQueueCapacity.get(nodeLabel),
autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(nodeLabel), EPSILON);
}
//submit user_3 app. This cant be allocated since there is no capacity
// in NO_LABEL, SSD but can be in GPU label
submitApp(mockRM, parentQueue, USER3, USER3, 4, 1);
final CSQueue user3LeafQueue = cs.getQueue(USER3);
validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f,
1.0f, 1.0f);
validateCapacitiesByLabel((ManagedParentQueue) parentQueue,
(AutoCreatedLeafQueue)
user3LeafQueue, NODEL_LABEL_GPU);
assertEquals(autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
assertEquals(0.2f, autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
assertEquals(0.9f, autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(NODEL_LABEL_GPU),
EPSILON);
//deactivate USER2 queue
//Verify that AMs can be allocated
//Node 1 has SSD and default node label expression on C is SSD.
//This validates that the default node label expression with SSD is set
// on the AM attempt
// and app attempt reaches ALLOCATED state for a dynamic queue 'USER1'
mockRM.launchAM(mockRM.getRMContext().getRMApps().get(user1AppId),
mockRM, nm1);
// //deactivate USER2 queue
cs.killAllAppsInQueue(USER2);
mockRM.waitForState(user2AppId, RMAppState.KILLED);
//Verify if USER_2 can be deactivated since it has no pending appsA
//Verify if USER_2 can be deactivated since it has no pending apps
List<QueueManagementChange> queueManagementChanges =
autoCreatedQueueManagementPolicy.computeQueueManagementChanges();
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue;
managedParentQueue.validateAndApplyQueueManagementChanges(
queueManagementChanges);
managedParentQueue.
validateAndApplyQueueManagementChanges(queueManagementChanges);
validateDeactivatedQueueEntitlement(parentQueue, USER2, 0.2f,
queueManagementChanges);
validateDeactivatedQueueEntitlement(parentQueue, USER2,
expectedAbsChildQueueCapacity, queueManagementChanges);
//USER_3 should now get activated
validateActivatedQueueEntitlement(parentQueue, USER3, 0.2f,
queueManagementChanges);
//USER_3 should now get activated for SSD, NO_LABEL
Set<String> expectedNodeLabelsUpdated = new HashSet<>();
expectedNodeLabelsUpdated.add(NO_LABEL);
expectedNodeLabelsUpdated.add(NODEL_LABEL_SSD);
validateActivatedQueueEntitlement(parentQueue, USER3,
expectedAbsChildQueueCapacity , queueManagementChanges, expectedNodeLabelsUpdated);
} finally {
cleanupQueue(USER1);
@ -565,13 +620,18 @@ public class TestCapacitySchedulerAutoQueueCreation
//submit app1 as USER1
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
CSQueue user1LeafQueue = newCS.getQueue(USER1);
Map<String, Float> expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(newCS, parentQueue, USER1,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
submitApp(newMockRM, parentQueue, USER2, USER2, 2, 1);
validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
CSQueue user2LeafQueue = newCS.getQueue(USER2);
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2, 2,
1);
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(newCS, parentQueue, USER2,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//validate total activated abs capacity remains the same
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
@ -579,7 +639,7 @@ public class TestCapacitySchedulerAutoQueueCreation
parentQueue)
.getAutoCreatedQueueManagementPolicy();
assertEquals(autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON);
//submit user_3 app. This cant be scheduled since there is no capacity
submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
@ -588,7 +648,7 @@ public class TestCapacitySchedulerAutoQueueCreation
1.0f, 1.0f);
assertEquals(autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON);
// add new NM.
newMockRM.registerNode("127.0.0.3:1234", 125 * GB, 20);
@ -596,31 +656,33 @@ public class TestCapacitySchedulerAutoQueueCreation
// There will be change in effective resource when nodes are added
// since we deal with percentages
Resource MAX_RES = Resources.addTo(TEMPLATE_MAX_RES,
Resources.createResource(125 * GB, 20));
Resource MAX_RES = Resources.addTo(TEMPLATE_MAX_RES, Resources.createResource(125 *
GB, 20));
Resource MIN_RES = Resources.createResource(14438, 6);
Assert.assertEquals("Effective Min resource for USER3 is not correct",
Resources.none(),
user3LeafQueue.getQueueResourceQuotas().getEffectiveMinResource());
Resources.none(), user3LeafQueue.getQueueResourceQuotas()
.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for USER3 is not correct",
MAX_RES,
user3LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
MAX_RES, user3LeafQueue
.getQueueResourceQuotas()
.getEffectiveMaxResource());
CSQueue user1LeafQueue = newCS.getQueue(USER1);
CSQueue user2LeafQueue = newCS.getQueue(USER2);
Assert.assertEquals("Effective Min resource for USER2 is not correct",
MIN_RES,
user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource());
MIN_RES, user1LeafQueue.getQueueResourceQuotas()
.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for USER2 is not correct",
MAX_RES,
user1LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
MAX_RES, user1LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
Assert.assertEquals("Effective Min resource for USER1 is not correct",
MIN_RES,
user2LeafQueue.getQueueResourceQuotas().getEffectiveMinResource());
MIN_RES, user2LeafQueue.getQueueResourceQuotas()
.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for USER1 is not correct",
MAX_RES,
user2LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
MAX_RES, user2LeafQueue.getQueueResourceQuotas()
.getEffectiveMaxResource());
// unregister one NM.
newMockRM.unRegisterNode(nm3);
@ -629,11 +691,11 @@ public class TestCapacitySchedulerAutoQueueCreation
// After loosing one NM, resources will reduce
Assert.assertEquals("Effective Min resource for USER2 is not correct",
MIN_RES_UPDATED,
user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource());
MIN_RES_UPDATED, user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource
());
Assert.assertEquals("Effective Max resource for USER2 is not correct",
MAX_RES_UPDATED,
user2LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource());
MAX_RES_UPDATED, user2LeafQueue.getQueueResourceQuotas()
.getEffectiveMaxResource());
} finally {
cleanupQueue(USER1);
@ -646,25 +708,6 @@ public class TestCapacitySchedulerAutoQueueCreation
}
}
@Test
public void testAutoCreatedQueueInheritsNodeLabels() throws Exception {
try {
String host = "127.0.0.1";
RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
host);
cs.handle(new NodeAddedSchedulerEvent(node));
CSQueue parentQueue = cs.getQueue(PARENT_QUEUE);
submitApp(USER1, USER1, NODEL_LABEL_GPU);
//submit app1 as USER1
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
} finally {
cleanupQueue(USER1);
}
}
@Test
public void testReinitializeQueuesWithAutoCreatedLeafQueues()
throws Exception {
@ -679,12 +722,20 @@ public class TestCapacitySchedulerAutoQueueCreation
//submit app1 as USER1
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
Map<String, Float> expectedChildQueueAbsCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(newCS, parentQueue, USER1,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2,
2, 1);
validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2,
USER2, 2,
1);
expectedChildQueueAbsCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(newCS, parentQueue, USER2,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
//update parent queue capacity
conf.setCapacity(C, 30f);
@ -709,19 +760,27 @@ public class TestCapacitySchedulerAutoQueueCreation
//submit app1 as USER3
submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f, accessibleNodeLabelsOnC);
AutoCreatedLeafQueue user3Queue = (AutoCreatedLeafQueue) newCS.getQueue(
USER1);
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f);
AutoCreatedLeafQueue user3Queue =
(AutoCreatedLeafQueue) newCS.getQueue(USER1);
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
validateUserAndAppLimits(user3Queue, 900, 900);
//submit app1 as USER1 - is already activated. there should be no diff
// in capacities
submitApp(newMockRM, parentQueue, USER3, USER3, 4, 2);
validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f, accessibleNodeLabelsOnC);
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f);
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
validateUserAndAppLimits(user3Queue, 900, 900);
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
parentQueue)
.getAutoCreatedQueueManagementPolicy();
assertEquals(0.27f, autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity
(NO_LABEL), EPSILON);
} finally {
cleanupQueue(USER1);
cleanupQueue(USER2);

View File

@ -24,7 +24,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CSQueueUtils.EPSILON;
import static org.junit.Assert.assertEquals;
@ -54,21 +57,27 @@ public class TestQueueManagementDynamicEditPolicy extends
parentQueue)
.getAutoCreatedQueueManagementPolicy();
assertEquals(0f, autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
//submit app1 as USER1
ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, 1,
1);
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
Map<String, Float> expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(parentQueue, USER1,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2,
1);
validateInitialQueueEntitlement(parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(parentQueue, USER2,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//validate total activated abs capacity
assertEquals(0.2f, autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(), EPSILON);
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
//submit user_3 app. This cant be scheduled since there is no capacity
submitApp(mockRM, parentQueue, USER3, USER3, 3, 1);
@ -77,7 +86,7 @@ public class TestQueueManagementDynamicEditPolicy extends
1.0f, 1.0f);
assertEquals(autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON);
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON);
//deactivate USER2 queue
cs.killAllAppsInQueue(USER2);
@ -88,8 +97,8 @@ public class TestQueueManagementDynamicEditPolicy extends
mockRM.waitForState(user1AppId, RMAppState.KILLED);
policy.editSchedule();
waitForPolicyState(0.1f, autoCreatedQueueManagementPolicy, 1000);
waitForPolicyState(0.1f, autoCreatedQueueManagementPolicy, NO_LABEL,
1000);
validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.5f, 0.1f,
1.0f, 1.0f);
@ -105,13 +114,12 @@ public class TestQueueManagementDynamicEditPolicy extends
}
private void waitForPolicyState(float expectedVal,
GuaranteedOrZeroCapacityOverTimePolicy queueManagementPolicy, int
timesec) throws
InterruptedException {
GuaranteedOrZeroCapacityOverTimePolicy queueManagementPolicy, String
nodeLabel, int timesec) throws InterruptedException {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timesec * 1000) {
if (Float.compare(expectedVal, queueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity()) != 0) {
.getAbsoluteActivatedChildQueueCapacity(nodeLabel)) > EPSILON) {
Thread.sleep(100);
} else {
break;