YARN-3388. Allocation in LeafQueue could get stuck because DRF calculator isn't well supported when computing user-limit. (Nathan Roberts via wangda)
(cherry picked from commit444b2ea7af
) (cherry picked from commitc7d782d2f6
)
This commit is contained in:
parent
e2bd490236
commit
3bf2e16f76
|
@ -18,17 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.ArrayList;
|
import com.google.common.collect.Sets;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -58,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||||
|
@ -71,9 +61,22 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderi
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||||
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
@ -120,6 +123,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
|
private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
|
||||||
|
|
||||||
|
// Summation of consumed ratios for all users in queue
|
||||||
|
private float totalUserConsumedRatio = 0;
|
||||||
|
private UsageRatios qUsageRatios;
|
||||||
|
|
||||||
// record all ignore partition exclusivityRMContainer, this will be used to do
|
// record all ignore partition exclusivityRMContainer, this will be used to do
|
||||||
// preemption, key is the partition of the RMContainer allocated on
|
// preemption, key is the partition of the RMContainer allocated on
|
||||||
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
|
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
|
||||||
|
@ -136,6 +143,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// One time initialization is enough since it is static ordering policy
|
// One time initialization is enough since it is static ordering policy
|
||||||
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
|
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
|
||||||
|
|
||||||
|
qUsageRatios = new UsageRatios();
|
||||||
|
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("LeafQueue:" + " name=" + queueName
|
LOG.debug("LeafQueue:" + " name=" + queueName
|
||||||
+ ", fullname=" + getQueuePath());
|
+ ", fullname=" + getQueuePath());
|
||||||
|
@ -1071,6 +1080,9 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
private Resource computeUserLimit(FiCaSchedulerApp application,
|
private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||||
Resource clusterResource, User user,
|
Resource clusterResource, User user,
|
||||||
String nodePartition, SchedulingMode schedulingMode) {
|
String nodePartition, SchedulingMode schedulingMode) {
|
||||||
|
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
|
||||||
|
clusterResource);
|
||||||
|
|
||||||
// What is our current capacity?
|
// What is our current capacity?
|
||||||
// * It is equal to the max(required, queue-capacity) if
|
// * It is equal to the max(required, queue-capacity) if
|
||||||
// we're running below capacity. The 'max' ensures that jobs in queues
|
// we're running below capacity. The 'max' ensures that jobs in queues
|
||||||
|
@ -1079,7 +1091,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// (usedResources + required) (which extra resources we are allocating)
|
// (usedResources + required) (which extra resources we are allocating)
|
||||||
Resource queueCapacity =
|
Resource queueCapacity =
|
||||||
Resources.multiplyAndNormalizeUp(resourceCalculator,
|
Resources.multiplyAndNormalizeUp(resourceCalculator,
|
||||||
labelManager.getResourceByLabel(nodePartition, clusterResource),
|
partitionResource,
|
||||||
queueCapacities.getAbsoluteCapacity(nodePartition),
|
queueCapacities.getAbsoluteCapacity(nodePartition),
|
||||||
minimumAllocation);
|
minimumAllocation);
|
||||||
|
|
||||||
|
@ -1091,15 +1103,30 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// Allow progress for queues with miniscule capacity
|
// Allow progress for queues with miniscule capacity
|
||||||
queueCapacity =
|
queueCapacity =
|
||||||
Resources.max(
|
Resources.max(
|
||||||
resourceCalculator, clusterResource,
|
resourceCalculator, partitionResource,
|
||||||
queueCapacity,
|
queueCapacity,
|
||||||
required);
|
required);
|
||||||
|
|
||||||
Resource currentCapacity =
|
|
||||||
Resources.lessThan(resourceCalculator, clusterResource,
|
|
||||||
queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity
|
|
||||||
: Resources.add(queueUsage.getUsed(nodePartition), required);
|
|
||||||
|
|
||||||
|
/* We want to base the userLimit calculation on
|
||||||
|
* max(queueCapacity, usedResources+required). However, we want
|
||||||
|
* usedResources to be based on the combined ratios of all the users in the
|
||||||
|
* queue so we use consumedRatio to calculate such.
|
||||||
|
* The calculation is dependent on how the resourceCalculator calculates the
|
||||||
|
* ratio between two Resources. DRF Example: If usedResources is
|
||||||
|
* greater than queueCapacity and users have the following [mem,cpu] usages:
|
||||||
|
* User1: [10%,20%] - Dominant resource is 20%
|
||||||
|
* User2: [30%,10%] - Dominant resource is 30%
|
||||||
|
* Then total consumedRatio is then 20+30=50%. Yes, this value can be
|
||||||
|
* larger than 100% but for the purposes of making sure all users are
|
||||||
|
* getting their fair share, it works.
|
||||||
|
*/
|
||||||
|
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
|
||||||
|
partitionResource, qUsageRatios.getUsageRatio(nodePartition),
|
||||||
|
minimumAllocation);
|
||||||
|
Resource currentCapacity =
|
||||||
|
Resources.lessThan(resourceCalculator, partitionResource, consumed,
|
||||||
|
queueCapacity) ? queueCapacity : Resources.add(consumed, required);
|
||||||
// Never allow a single user to take more than the
|
// Never allow a single user to take more than the
|
||||||
// queue's configured capacity * user-limit-factor.
|
// queue's configured capacity * user-limit-factor.
|
||||||
// Also, the queue's configured capacity should be higher than
|
// Also, the queue's configured capacity should be higher than
|
||||||
|
@ -1108,9 +1135,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
final int activeUsers = activeUsersManager.getNumActiveUsers();
|
final int activeUsers = activeUsersManager.getNumActiveUsers();
|
||||||
|
|
||||||
// User limit resource is determined by:
|
// User limit resource is determined by:
|
||||||
// max{currentCapacity / #activeUsers, currentCapacity * user-limit-percentage%)
|
// max{currentCapacity / #activeUsers, currentCapacity *
|
||||||
|
// user-limit-percentage%)
|
||||||
Resource userLimitResource = Resources.max(
|
Resource userLimitResource = Resources.max(
|
||||||
resourceCalculator, clusterResource,
|
resourceCalculator, partitionResource,
|
||||||
Resources.divideAndCeil(
|
Resources.divideAndCeil(
|
||||||
resourceCalculator, currentCapacity, activeUsers),
|
resourceCalculator, currentCapacity, activeUsers),
|
||||||
Resources.divideAndCeil(
|
Resources.divideAndCeil(
|
||||||
|
@ -1134,8 +1162,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
maxUserLimit =
|
maxUserLimit =
|
||||||
Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
|
Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
|
||||||
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
||||||
maxUserLimit =
|
maxUserLimit = partitionResource;
|
||||||
labelManager.getResourceByLabel(nodePartition, clusterResource);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cap final user limit with maxUserLimit
|
// Cap final user limit with maxUserLimit
|
||||||
|
@ -1143,7 +1170,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
Resources.roundUp(
|
Resources.roundUp(
|
||||||
resourceCalculator,
|
resourceCalculator,
|
||||||
Resources.min(
|
Resources.min(
|
||||||
resourceCalculator, clusterResource,
|
resourceCalculator, partitionResource,
|
||||||
userLimitResource,
|
userLimitResource,
|
||||||
maxUserLimit
|
maxUserLimit
|
||||||
),
|
),
|
||||||
|
@ -1156,13 +1183,17 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
" userLimitPercent=" + userLimit +
|
" userLimitPercent=" + userLimit +
|
||||||
" userLimitFactor=" + userLimitFactor +
|
" userLimitFactor=" + userLimitFactor +
|
||||||
" required: " + required +
|
" required: " + required +
|
||||||
" consumed: " + user.getUsed() +
|
" consumed: " + consumed +
|
||||||
" user-limit-resource: " + userLimitResource +
|
" user-limit-resource: " + userLimitResource +
|
||||||
" queueCapacity: " + queueCapacity +
|
" queueCapacity: " + queueCapacity +
|
||||||
" qconsumed: " + queueUsage.getUsed() +
|
" qconsumed: " + queueUsage.getUsed() +
|
||||||
|
" consumedRatio: " + totalUserConsumedRatio +
|
||||||
" currentCapacity: " + currentCapacity +
|
" currentCapacity: " + currentCapacity +
|
||||||
" activeUsers: " + activeUsers +
|
" activeUsers: " + activeUsers +
|
||||||
" clusterCapacity: " + clusterResource
|
" clusterCapacity: " + clusterResource +
|
||||||
|
" resourceByLabel: " + partitionResource +
|
||||||
|
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
|
||||||
|
" Partition: " + nodePartition
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
user.setUserResourceLimit(userLimitResource);
|
user.setUserResourceLimit(userLimitResource);
|
||||||
|
@ -1248,6 +1279,42 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized float calculateUserUsageRatio(Resource clusterResource,
|
||||||
|
String nodePartition) {
|
||||||
|
Resource resourceByLabel =
|
||||||
|
labelManager.getResourceByLabel(nodePartition, clusterResource);
|
||||||
|
float consumed = 0;
|
||||||
|
User user;
|
||||||
|
for (Map.Entry<String, User> entry : users.entrySet()) {
|
||||||
|
user = entry.getValue();
|
||||||
|
consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
|
||||||
|
resourceByLabel, nodePartition);
|
||||||
|
}
|
||||||
|
return consumed;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void recalculateQueueUsageRatio(Resource clusterResource,
|
||||||
|
String nodePartition) {
|
||||||
|
ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
|
||||||
|
|
||||||
|
if (nodePartition == null) {
|
||||||
|
for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
|
||||||
|
queueResourceUsage.getNodePartitionsSet())) {
|
||||||
|
qUsageRatios.setUsageRatio(partition,
|
||||||
|
calculateUserUsageRatio(clusterResource, partition));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
qUsageRatios.setUsageRatio(nodePartition,
|
||||||
|
calculateUserUsageRatio(clusterResource, nodePartition));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void updateQueueUsageRatio(String nodePartition,
|
||||||
|
float delta) {
|
||||||
|
qUsageRatios.incUsageRatio(nodePartition, delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completedContainer(Resource clusterResource,
|
public void completedContainer(Resource clusterResource,
|
||||||
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
|
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
|
||||||
|
@ -1311,6 +1378,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
boolean isIncreasedAllocation) {
|
boolean isIncreasedAllocation) {
|
||||||
super.allocateResource(clusterResource, resource, nodePartition,
|
super.allocateResource(clusterResource, resource, nodePartition,
|
||||||
isIncreasedAllocation);
|
isIncreasedAllocation);
|
||||||
|
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
|
||||||
|
clusterResource);
|
||||||
|
|
||||||
// handle ignore exclusivity container
|
// handle ignore exclusivity container
|
||||||
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
||||||
|
@ -1329,6 +1398,12 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
String userName = application.getUser();
|
String userName = application.getUser();
|
||||||
User user = getUser(userName);
|
User user = getUser(userName);
|
||||||
user.assignContainer(resource, nodePartition);
|
user.assignContainer(resource, nodePartition);
|
||||||
|
|
||||||
|
// Update usage ratios
|
||||||
|
updateQueueUsageRatio(nodePartition,
|
||||||
|
user.updateUsageRatio(resourceCalculator, resourceByLabel,
|
||||||
|
nodePartition));
|
||||||
|
|
||||||
// Note this is a bit unconventional since it gets the object and modifies
|
// Note this is a bit unconventional since it gets the object and modifies
|
||||||
// it here, rather then using set routine
|
// it here, rather then using set routine
|
||||||
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
|
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
|
||||||
|
@ -1349,6 +1424,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
RMContainer rmContainer, boolean isChangeResource) {
|
RMContainer rmContainer, boolean isChangeResource) {
|
||||||
super.releaseResource(clusterResource, resource, nodePartition,
|
super.releaseResource(clusterResource, resource, nodePartition,
|
||||||
isChangeResource);
|
isChangeResource);
|
||||||
|
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
|
||||||
|
clusterResource);
|
||||||
|
|
||||||
// handle ignore exclusivity container
|
// handle ignore exclusivity container
|
||||||
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
||||||
|
@ -1368,6 +1445,12 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
String userName = application.getUser();
|
String userName = application.getUser();
|
||||||
User user = getUser(userName);
|
User user = getUser(userName);
|
||||||
user.releaseContainer(resource, nodePartition);
|
user.releaseContainer(resource, nodePartition);
|
||||||
|
|
||||||
|
// Update usage ratios
|
||||||
|
updateQueueUsageRatio(nodePartition,
|
||||||
|
user.updateUsageRatio(resourceCalculator, resourceByLabel,
|
||||||
|
nodePartition));
|
||||||
|
|
||||||
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -1408,6 +1491,9 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// during allocation
|
// during allocation
|
||||||
setQueueResourceLimitsInfo(clusterResource);
|
setQueueResourceLimitsInfo(clusterResource);
|
||||||
|
|
||||||
|
// Update user consumedRatios
|
||||||
|
recalculateQueueUsageRatio(clusterResource, null);
|
||||||
|
|
||||||
// Update metrics
|
// Update metrics
|
||||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||||
minimumAllocation, this, labelManager, null);
|
minimumAllocation, this, labelManager, null);
|
||||||
|
@ -1459,17 +1545,93 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
queueUsage.decAMUsed(nodeLabel, resourceToDec);
|
queueUsage.decAMUsed(nodeLabel, resourceToDec);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Usage Ratio
|
||||||
|
*/
|
||||||
|
static private class UsageRatios {
|
||||||
|
private Map<String, Float> usageRatios;
|
||||||
|
private ReadLock readLock;
|
||||||
|
private WriteLock writeLock;
|
||||||
|
|
||||||
|
public UsageRatios() {
|
||||||
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
readLock = lock.readLock();
|
||||||
|
writeLock = lock.writeLock();
|
||||||
|
usageRatios = new HashMap<String, Float>();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incUsageRatio(String label, float delta) {
|
||||||
|
try {
|
||||||
|
writeLock.lock();
|
||||||
|
Float fl = usageRatios.get(label);
|
||||||
|
if (null == fl) {
|
||||||
|
fl = new Float(0.0);
|
||||||
|
}
|
||||||
|
fl += delta;
|
||||||
|
usageRatios.put(label, new Float(fl));
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
float getUsageRatio(String label) {
|
||||||
|
try {
|
||||||
|
readLock.lock();
|
||||||
|
Float f = usageRatios.get(label);
|
||||||
|
if (null == f) {
|
||||||
|
return 0.0f;
|
||||||
|
}
|
||||||
|
return f;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setUsageRatio(String label, float ratio) {
|
||||||
|
try {
|
||||||
|
writeLock.lock();
|
||||||
|
usageRatios.put(label, new Float(ratio));
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public float getUsageRatio(String label) {
|
||||||
|
return qUsageRatios.getUsageRatio(label);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static class User {
|
public static class User {
|
||||||
ResourceUsage userResourceUsage = new ResourceUsage();
|
ResourceUsage userResourceUsage = new ResourceUsage();
|
||||||
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
|
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
|
||||||
int pendingApplications = 0;
|
int pendingApplications = 0;
|
||||||
int activeApplications = 0;
|
int activeApplications = 0;
|
||||||
|
private UsageRatios userUsageRatios = new UsageRatios();
|
||||||
|
|
||||||
public ResourceUsage getResourceUsage() {
|
public ResourceUsage getResourceUsage() {
|
||||||
return userResourceUsage;
|
return userResourceUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized float resetAndUpdateUsageRatio(
|
||||||
|
ResourceCalculator resourceCalculator,
|
||||||
|
Resource resource, String nodePartition) {
|
||||||
|
userUsageRatios.setUsageRatio(nodePartition, 0);
|
||||||
|
return updateUsageRatio(resourceCalculator, resource, nodePartition);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized float updateUsageRatio(
|
||||||
|
ResourceCalculator resourceCalculator,
|
||||||
|
Resource resource, String nodePartition) {
|
||||||
|
float delta;
|
||||||
|
float newRatio =
|
||||||
|
Resources.ratio(resourceCalculator, getUsed(nodePartition), resource);
|
||||||
|
delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
|
||||||
|
userUsageRatios.setUsageRatio(nodePartition, newRatio);
|
||||||
|
return delta;
|
||||||
|
}
|
||||||
|
|
||||||
public Resource getUsed() {
|
public Resource getUsed() {
|
||||||
return userResourceUsage.getUsed();
|
return userResourceUsage.getUsed();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
|
@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderi
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -91,6 +95,7 @@ import static org.mockito.Mockito.when;
|
||||||
public class TestLeafQueue {
|
public class TestLeafQueue {
|
||||||
private final RecordFactory recordFactory =
|
private final RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
|
||||||
|
|
||||||
RMContext rmContext;
|
RMContext rmContext;
|
||||||
RMContext spyRMContext;
|
RMContext spyRMContext;
|
||||||
|
@ -100,16 +105,29 @@ public class TestLeafQueue {
|
||||||
CapacitySchedulerContext csContext;
|
CapacitySchedulerContext csContext;
|
||||||
|
|
||||||
CSQueue root;
|
CSQueue root;
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues;
|
||||||
|
|
||||||
final static int GB = 1024;
|
final static int GB = 1024;
|
||||||
final static String DEFAULT_RACK = "/default";
|
final static String DEFAULT_RACK = "/default";
|
||||||
|
|
||||||
private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
|
private final ResourceCalculator resourceCalculator =
|
||||||
|
new DefaultResourceCalculator();
|
||||||
|
|
||||||
|
private final ResourceCalculator dominantResourceCalculator =
|
||||||
|
new DominantResourceCalculator();
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
setUpInternal(resourceCalculator);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setUpWithDominantResourceCalculator() throws Exception {
|
||||||
|
setUpInternal(dominantResourceCalculator);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setUpInternal(ResourceCalculator rC) throws Exception {
|
||||||
CapacityScheduler spyCs = new CapacityScheduler();
|
CapacityScheduler spyCs = new CapacityScheduler();
|
||||||
|
queues = new HashMap<String, CSQueue>();
|
||||||
cs = spy(spyCs);
|
cs = spy(spyCs);
|
||||||
rmContext = TestUtils.getMockRMContext();
|
rmContext = TestUtils.getMockRMContext();
|
||||||
spyRMContext = spy(rmContext);
|
spyRMContext = spy(rmContext);
|
||||||
|
@ -128,6 +146,8 @@ public class TestLeafQueue {
|
||||||
csConf =
|
csConf =
|
||||||
new CapacitySchedulerConfiguration();
|
new CapacitySchedulerConfiguration();
|
||||||
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
||||||
|
csConf.setBoolean(
|
||||||
|
"yarn.scheduler.capacity.reservations-continue-look-all-nodes", false);
|
||||||
final String newRoot = "root" + System.currentTimeMillis();
|
final String newRoot = "root" + System.currentTimeMillis();
|
||||||
setupQueueConfiguration(csConf, newRoot);
|
setupQueueConfiguration(csConf, newRoot);
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
@ -146,6 +166,7 @@ public class TestLeafQueue {
|
||||||
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
||||||
when(csContext.getResourceCalculator()).
|
when(csContext.getResourceCalculator()).
|
||||||
thenReturn(resourceCalculator);
|
thenReturn(resourceCalculator);
|
||||||
|
when(csContext.getResourceCalculator()).thenReturn(rC);
|
||||||
when(csContext.getRMContext()).thenReturn(rmContext);
|
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||||
RMContainerTokenSecretManager containerTokenSecretManager =
|
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||||
new RMContainerTokenSecretManager(conf);
|
new RMContainerTokenSecretManager(conf);
|
||||||
|
@ -173,6 +194,7 @@ public class TestLeafQueue {
|
||||||
when(cs.getNumClusterNodes()).thenReturn(3);
|
when(cs.getNumClusterNodes()).thenReturn(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final String A = "a";
|
private static final String A = "a";
|
||||||
private static final String B = "b";
|
private static final String B = "b";
|
||||||
private static final String C = "c";
|
private static final String C = "c";
|
||||||
|
@ -601,6 +623,172 @@ public class TestLeafQueue {
|
||||||
assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
|
assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
|
||||||
a.getMetrics().getAvailableMB());
|
a.getMetrics().getAvailableMB());
|
||||||
}
|
}
|
||||||
|
@Test
|
||||||
|
public void testDRFUsageRatioRounding() throws Exception {
|
||||||
|
CSAssignment assign;
|
||||||
|
setUpWithDominantResourceCalculator();
|
||||||
|
// Mock the queue
|
||||||
|
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(E));
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user0 = "user_0";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId0, user0, b,
|
||||||
|
b.getActiveUsersManager(), spyRMContext);
|
||||||
|
b.submitApplicationAttempt(app0, user0);
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
String host0 = "127.0.0.1";
|
||||||
|
FiCaSchedulerNode node0 =
|
||||||
|
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 80 * GB, 100);
|
||||||
|
|
||||||
|
// Make cluster relatively large so usageRatios are small
|
||||||
|
int numNodes = 1000;
|
||||||
|
Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (80 * GB), numNodes * 100);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Set user-limit. Need a small queue within a large cluster.
|
||||||
|
b.setUserLimit(50);
|
||||||
|
b.setUserLimitFactor(1000000);
|
||||||
|
b.setMaxCapacity(1.0f);
|
||||||
|
b.setAbsoluteCapacity(0.00001f);
|
||||||
|
|
||||||
|
// First allocation is larger than second but is still vcore dominant
|
||||||
|
// so usage ratio will be based on vcores. If consumedRatio doesn't round
|
||||||
|
// in our favor then new limit calculation will actually be less than
|
||||||
|
// what is currently consumed and we will fail to allocate
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
app0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||||
|
.createResourceRequest(ResourceRequest.ANY, 20 * GB, 29, 1, true,
|
||||||
|
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
|
||||||
|
assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
|
||||||
|
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
app0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||||
|
.createResourceRequest(ResourceRequest.ANY, 10 * GB, 29, 2, true,
|
||||||
|
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
|
||||||
|
assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
|
||||||
|
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertTrue("Still within limits, should assign",
|
||||||
|
assign.getResource().getMemorySize() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDRFUserLimits() throws Exception {
|
||||||
|
setUpWithDominantResourceCalculator();
|
||||||
|
|
||||||
|
// Mock the queue
|
||||||
|
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
|
||||||
|
// unset maxCapacity
|
||||||
|
b.setMaxCapacity(1.0f);
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user0 = "user_0";
|
||||||
|
final String user1 = "user_1";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId0, user0, b,
|
||||||
|
b.getActiveUsersManager(), spyRMContext);
|
||||||
|
b.submitApplicationAttempt(app0, user0);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId2 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(2, 0);
|
||||||
|
FiCaSchedulerApp app2 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId2, user1, b,
|
||||||
|
b.getActiveUsersManager(), spyRMContext);
|
||||||
|
b.submitApplicationAttempt(app2, user1);
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
String host0 = "127.0.0.1";
|
||||||
|
FiCaSchedulerNode node0 =
|
||||||
|
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8 * GB, 100);
|
||||||
|
String host1 = "127.0.0.2";
|
||||||
|
FiCaSchedulerNode node1 =
|
||||||
|
TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100);
|
||||||
|
|
||||||
|
int numNodes = 2;
|
||||||
|
Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Setup resource-requests so that one application is memory dominant
|
||||||
|
// and other application is vcores dominant
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
app0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||||
|
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 40, 10, true,
|
||||||
|
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
|
||||||
|
|
||||||
|
app2.updateResourceRequests(Collections.singletonList(TestUtils
|
||||||
|
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 10, 10, true,
|
||||||
|
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start testing...
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Set user-limit
|
||||||
|
b.setUserLimit(50);
|
||||||
|
b.setUserLimitFactor(2);
|
||||||
|
User queueUser0 = b.getUser(user0);
|
||||||
|
User queueUser1 = b.getUser(user1);
|
||||||
|
|
||||||
|
assertEquals("There should 2 active users!", 2, b
|
||||||
|
.getActiveUsersManager().getNumActiveUsers());
|
||||||
|
// Fill both Nodes as far as we can
|
||||||
|
CSAssignment assign;
|
||||||
|
do {
|
||||||
|
assign =
|
||||||
|
b.assignContainers(clusterResource, node0, new ResourceLimits(
|
||||||
|
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
LOG.info(assign.toString());
|
||||||
|
} while (assign.getResource().getMemorySize() > 0 &&
|
||||||
|
assign.getAssignmentInformation().getNumReservations() == 0);
|
||||||
|
do {
|
||||||
|
assign =
|
||||||
|
b.assignContainers(clusterResource, node1, new ResourceLimits(
|
||||||
|
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
} while (assign.getResource().getMemorySize() > 0 &&
|
||||||
|
assign.getAssignmentInformation().getNumReservations() == 0);
|
||||||
|
//LOG.info("user_0: " + queueUser0.getUsed());
|
||||||
|
//LOG.info("user_1: " + queueUser1.getUsed());
|
||||||
|
|
||||||
|
assertTrue("Verify user_0 got resources ", queueUser0.getUsed()
|
||||||
|
.getMemorySize() > 0);
|
||||||
|
assertTrue("Verify user_1 got resources ", queueUser1.getUsed()
|
||||||
|
.getMemorySize() > 0);
|
||||||
|
assertTrue(
|
||||||
|
"Exepected AbsoluteUsedCapacity > 0.95, got: "
|
||||||
|
+ b.getAbsoluteUsedCapacity(), b.getAbsoluteUsedCapacity() > 0.95);
|
||||||
|
|
||||||
|
// Verify consumedRatio is based on dominant resources
|
||||||
|
float expectedRatio =
|
||||||
|
queueUser0.getUsed().getVirtualCores()
|
||||||
|
/ (numNodes * 100.0f)
|
||||||
|
+ queueUser1.getUsed().getMemorySize()
|
||||||
|
/ (numNodes * 8.0f * GB);
|
||||||
|
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
|
||||||
|
// Add another node and make sure consumedRatio is adjusted
|
||||||
|
// accordingly.
|
||||||
|
numNodes = 3;
|
||||||
|
clusterResource =
|
||||||
|
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
root.updateClusterResource(clusterResource, new ResourceLimits(
|
||||||
|
clusterResource));
|
||||||
|
expectedRatio =
|
||||||
|
queueUser0.getUsed().getVirtualCores()
|
||||||
|
/ (numNodes * 100.0f)
|
||||||
|
+ queueUser1.getUsed().getMemorySize()
|
||||||
|
/ (numNodes * 8.0f * GB);
|
||||||
|
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUserLimits() throws Exception {
|
public void testUserLimits() throws Exception {
|
||||||
|
|
|
@ -160,9 +160,16 @@ public class TestUtils {
|
||||||
public static ResourceRequest createResourceRequest(
|
public static ResourceRequest createResourceRequest(
|
||||||
String resourceName, int memory, int numContainers, boolean relaxLocality,
|
String resourceName, int memory, int numContainers, boolean relaxLocality,
|
||||||
Priority priority, RecordFactory recordFactory, String labelExpression) {
|
Priority priority, RecordFactory recordFactory, String labelExpression) {
|
||||||
|
return createResourceRequest(resourceName, memory, 1, numContainers,
|
||||||
|
relaxLocality, priority, recordFactory, labelExpression);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ResourceRequest createResourceRequest(String resourceName,
|
||||||
|
int memory, int vcores, int numContainers, boolean relaxLocality,
|
||||||
|
Priority priority, RecordFactory recordFactory, String labelExpression) {
|
||||||
ResourceRequest request =
|
ResourceRequest request =
|
||||||
recordFactory.newRecordInstance(ResourceRequest.class);
|
recordFactory.newRecordInstance(ResourceRequest.class);
|
||||||
Resource capability = Resources.createResource(memory, 1);
|
Resource capability = Resources.createResource(memory, vcores);
|
||||||
|
|
||||||
request.setNumContainers(numContainers);
|
request.setNumContainers(numContainers);
|
||||||
request.setResourceName(resourceName);
|
request.setResourceName(resourceName);
|
||||||
|
@ -191,13 +198,18 @@ public class TestUtils {
|
||||||
return ApplicationAttemptId.newInstance(applicationId, attemptId);
|
return ApplicationAttemptId.newInstance(applicationId, attemptId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FiCaSchedulerNode getMockNode(
|
public static FiCaSchedulerNode getMockNode(String host, String rack,
|
||||||
String host, String rack, int port, int capability) {
|
int port, int memory) {
|
||||||
|
return getMockNode(host, rack, port, memory, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FiCaSchedulerNode getMockNode(String host, String rack,
|
||||||
|
int port, int memory, int vcores) {
|
||||||
NodeId nodeId = NodeId.newInstance(host, port);
|
NodeId nodeId = NodeId.newInstance(host, port);
|
||||||
RMNode rmNode = mock(RMNode.class);
|
RMNode rmNode = mock(RMNode.class);
|
||||||
when(rmNode.getNodeID()).thenReturn(nodeId);
|
when(rmNode.getNodeID()).thenReturn(nodeId);
|
||||||
when(rmNode.getTotalCapability()).thenReturn(
|
when(rmNode.getTotalCapability()).thenReturn(
|
||||||
Resources.createResource(capability, 1));
|
Resources.createResource(memory, vcores));
|
||||||
when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
|
when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
|
||||||
when(rmNode.getHostName()).thenReturn(host);
|
when(rmNode.getHostName()).thenReturn(host);
|
||||||
when(rmNode.getRackName()).thenReturn(rack);
|
when(rmNode.getRackName()).thenReturn(rack);
|
||||||
|
|
Loading…
Reference in New Issue