YARN-3388. Allocation in LeafQueue could get stuck because DRF calculator isn't well supported when computing user-limit. (Nathan Roberts via wangda)

(cherry picked from commit 444b2ea7af)
This commit is contained in:
Wangda Tan 2016-08-19 16:28:32 -07:00
parent 65822e55fd
commit c7d782d2f6
3 changed files with 407 additions and 45 deletions

View File

@ -18,17 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException; import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList; import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -60,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
@ -75,9 +65,22 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@Private @Private
@Unstable @Unstable
@ -116,7 +119,7 @@ public class LeafQueue extends AbstractCSQueue {
// cache last cluster resource to compute actual capacity // cache last cluster resource to compute actual capacity
private Resource lastClusterResource = Resources.none(); private Resource lastClusterResource = Resources.none();
private final QueueResourceLimitsInfo queueResourceLimitsInfo = private final QueueResourceLimitsInfo queueResourceLimitsInfo =
new QueueResourceLimitsInfo(); new QueueResourceLimitsInfo();
@ -124,6 +127,10 @@ public class LeafQueue extends AbstractCSQueue {
private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null; private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
// Summation of consumed ratios for all users in queue
private float totalUserConsumedRatio = 0;
private UsageRatios qUsageRatios;
// record all ignore partition exclusivityRMContainer, this will be used to do // record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on // preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers = private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
@ -140,6 +147,8 @@ public class LeafQueue extends AbstractCSQueue {
// One time initialization is enough since it is static ordering policy // One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
qUsageRatios = new UsageRatios();
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath()); + ", fullname=" + getQueuePath());
@ -164,7 +173,7 @@ public class LeafQueue extends AbstractCSQueue {
setQueueResourceLimitsInfo(clusterResource); setQueueResourceLimitsInfo(clusterResource);
CapacitySchedulerConfiguration conf = csContext.getConfiguration(); CapacitySchedulerConfiguration conf = csContext.getConfiguration();
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath())); setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath()); userLimit = conf.getUserLimit(getQueuePath());
@ -1113,6 +1122,9 @@ public class LeafQueue extends AbstractCSQueue {
private Resource computeUserLimit(FiCaSchedulerApp application, private Resource computeUserLimit(FiCaSchedulerApp application,
Resource clusterResource, User user, Resource clusterResource, User user,
String nodePartition, SchedulingMode schedulingMode) { String nodePartition, SchedulingMode schedulingMode) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// What is our current capacity? // What is our current capacity?
// * It is equal to the max(required, queue-capacity) if // * It is equal to the max(required, queue-capacity) if
// we're running below capacity. The 'max' ensures that jobs in queues // we're running below capacity. The 'max' ensures that jobs in queues
@ -1121,7 +1133,7 @@ public class LeafQueue extends AbstractCSQueue {
// (usedResources + required) (which extra resources we are allocating) // (usedResources + required) (which extra resources we are allocating)
Resource queueCapacity = Resource queueCapacity =
Resources.multiplyAndNormalizeUp(resourceCalculator, Resources.multiplyAndNormalizeUp(resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource), partitionResource,
queueCapacities.getAbsoluteCapacity(nodePartition), queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation); minimumAllocation);
@ -1133,15 +1145,30 @@ public class LeafQueue extends AbstractCSQueue {
// Allow progress for queues with miniscule capacity // Allow progress for queues with miniscule capacity
queueCapacity = queueCapacity =
Resources.max( Resources.max(
resourceCalculator, clusterResource, resourceCalculator, partitionResource,
queueCapacity, queueCapacity,
required); required);
/* We want to base the userLimit calculation on
* max(queueCapacity, usedResources+required). However, we want
* usedResources to be based on the combined ratios of all the users in the
* queue so we use consumedRatio to calculate such.
* The calculation is dependent on how the resourceCalculator calculates the
* ratio between two Resources. DRF Example: If usedResources is
* greater than queueCapacity and users have the following [mem,cpu] usages:
* User1: [10%,20%] - Dominant resource is 20%
* User2: [30%,10%] - Dominant resource is 30%
* Then total consumedRatio is then 20+30=50%. Yes, this value can be
* larger than 100% but for the purposes of making sure all users are
* getting their fair share, it works.
*/
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource, qUsageRatios.getUsageRatio(nodePartition),
minimumAllocation);
Resource currentCapacity = Resource currentCapacity =
Resources.lessThan(resourceCalculator, clusterResource, Resources.lessThan(resourceCalculator, partitionResource, consumed,
queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity queueCapacity) ? queueCapacity : Resources.add(consumed, required);
: Resources.add(queueUsage.getUsed(nodePartition), required);
// Never allow a single user to take more than the // Never allow a single user to take more than the
// queue's configured capacity * user-limit-factor. // queue's configured capacity * user-limit-factor.
// Also, the queue's configured capacity should be higher than // Also, the queue's configured capacity should be higher than
@ -1150,9 +1177,10 @@ public class LeafQueue extends AbstractCSQueue {
final int activeUsers = activeUsersManager.getNumActiveUsers(); final int activeUsers = activeUsersManager.getNumActiveUsers();
// User limit resource is determined by: // User limit resource is determined by:
// max{currentCapacity / #activeUsers, currentCapacity * user-limit-percentage%) // max{currentCapacity / #activeUsers, currentCapacity *
// user-limit-percentage%)
Resource userLimitResource = Resources.max( Resource userLimitResource = Resources.max(
resourceCalculator, clusterResource, resourceCalculator, partitionResource,
Resources.divideAndCeil( Resources.divideAndCeil(
resourceCalculator, currentCapacity, activeUsers), resourceCalculator, currentCapacity, activeUsers),
Resources.divideAndCeil( Resources.divideAndCeil(
@ -1176,8 +1204,7 @@ public class LeafQueue extends AbstractCSQueue {
maxUserLimit = maxUserLimit =
Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor); Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
maxUserLimit = maxUserLimit = partitionResource;
labelManager.getResourceByLabel(nodePartition, clusterResource);
} }
// Cap final user limit with maxUserLimit // Cap final user limit with maxUserLimit
@ -1185,7 +1212,7 @@ public class LeafQueue extends AbstractCSQueue {
Resources.roundUp( Resources.roundUp(
resourceCalculator, resourceCalculator,
Resources.min( Resources.min(
resourceCalculator, clusterResource, resourceCalculator, partitionResource,
userLimitResource, userLimitResource,
maxUserLimit maxUserLimit
), ),
@ -1193,18 +1220,22 @@ public class LeafQueue extends AbstractCSQueue {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
String userName = application.getUser(); String userName = application.getUser();
LOG.debug("User limit computation for " + userName + LOG.debug("User limit computation for " + userName +
" in queue " + getQueueName() + " in queue " + getQueueName() +
" userLimitPercent=" + userLimit + " userLimitPercent=" + userLimit +
" userLimitFactor=" + userLimitFactor + " userLimitFactor=" + userLimitFactor +
" required: " + required + " required: " + required +
" consumed: " + user.getUsed() + " consumed: " + consumed +
" user-limit-resource: " + userLimitResource + " user-limit-resource: " + userLimitResource +
" queueCapacity: " + queueCapacity + " queueCapacity: " + queueCapacity +
" qconsumed: " + queueUsage.getUsed() + " qconsumed: " + queueUsage.getUsed() +
" consumedRatio: " + totalUserConsumedRatio +
" currentCapacity: " + currentCapacity + " currentCapacity: " + currentCapacity +
" activeUsers: " + activeUsers + " activeUsers: " + activeUsers +
" clusterCapacity: " + clusterResource " clusterCapacity: " + clusterResource +
" resourceByLabel: " + partitionResource +
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
" Partition: " + nodePartition
); );
} }
user.setUserResourceLimit(userLimitResource); user.setUserResourceLimit(userLimitResource);
@ -1311,6 +1342,42 @@ public class LeafQueue extends AbstractCSQueue {
} }
} }
private synchronized float calculateUserUsageRatio(Resource clusterResource,
String nodePartition) {
Resource resourceByLabel =
labelManager.getResourceByLabel(nodePartition, clusterResource);
float consumed = 0;
User user;
for (Map.Entry<String, User> entry : users.entrySet()) {
user = entry.getValue();
consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
resourceByLabel, nodePartition);
}
return consumed;
}
private synchronized void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
if (nodePartition == null) {
for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
qUsageRatios.setUsageRatio(partition,
calculateUserUsageRatio(clusterResource, partition));
}
} else {
qUsageRatios.setUsageRatio(nodePartition,
calculateUserUsageRatio(clusterResource, nodePartition));
}
}
private synchronized void updateQueueUsageRatio(String nodePartition,
float delta) {
qUsageRatios.incUsageRatio(nodePartition, delta);
}
@Override @Override
public void completedContainer(Resource clusterResource, public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
@ -1348,7 +1415,7 @@ public class LeafQueue extends AbstractCSQueue {
removed = removed =
application.containerCompleted(rmContainer, containerStatus, application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition()); event, node.getPartition());
node.releaseContainer(container); node.releaseContainer(container);
} }
@ -1381,6 +1448,8 @@ public class LeafQueue extends AbstractCSQueue {
boolean isIncreasedAllocation) { boolean isIncreasedAllocation) {
super.allocateResource(clusterResource, resource, nodePartition, super.allocateResource(clusterResource, resource, nodePartition,
isIncreasedAllocation); isIncreasedAllocation);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// handle ignore exclusivity container // handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@ -1399,6 +1468,12 @@ public class LeafQueue extends AbstractCSQueue {
String userName = application.getUser(); String userName = application.getUser();
User user = getUser(userName); User user = getUser(userName);
user.assignContainer(resource, nodePartition); user.assignContainer(resource, nodePartition);
// Update usage ratios
updateQueueUsageRatio(nodePartition,
user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition));
// Note this is a bit unconventional since it gets the object and modifies // Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine // it here, rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom Resources.subtractFrom(application.getHeadroom(), resource); // headroom
@ -1419,6 +1494,8 @@ public class LeafQueue extends AbstractCSQueue {
RMContainer rmContainer, boolean isChangeResource) { RMContainer rmContainer, boolean isChangeResource) {
super.releaseResource(clusterResource, resource, nodePartition, super.releaseResource(clusterResource, resource, nodePartition,
isChangeResource); isChangeResource);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// handle ignore exclusivity container // handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@ -1438,6 +1515,12 @@ public class LeafQueue extends AbstractCSQueue {
String userName = application.getUser(); String userName = application.getUser();
User user = getUser(userName); User user = getUser(userName);
user.releaseContainer(resource, nodePartition); user.releaseContainer(resource, nodePartition);
// Update usage ratios
updateQueueUsageRatio(nodePartition,
user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition));
metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -1477,7 +1560,10 @@ public class LeafQueue extends AbstractCSQueue {
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation // during allocation
setQueueResourceLimitsInfo(clusterResource); setQueueResourceLimitsInfo(clusterResource);
// Update user consumedRatios
recalculateQueueUsageRatio(clusterResource, null);
// Update metrics // Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, null); minimumAllocation, this, labelManager, null);
@ -1529,17 +1615,93 @@ public class LeafQueue extends AbstractCSQueue {
queueUsage.decAMUsed(nodeLabel, resourceToDec); queueUsage.decAMUsed(nodeLabel, resourceToDec);
} }
/*
* Usage Ratio
*/
static private class UsageRatios {
private Map<String, Float> usageRatios;
private ReadLock readLock;
private WriteLock writeLock;
public UsageRatios() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
usageRatios = new HashMap<String, Float>();
}
private void incUsageRatio(String label, float delta) {
try {
writeLock.lock();
Float fl = usageRatios.get(label);
if (null == fl) {
fl = new Float(0.0);
}
fl += delta;
usageRatios.put(label, new Float(fl));
} finally {
writeLock.unlock();
}
}
float getUsageRatio(String label) {
try {
readLock.lock();
Float f = usageRatios.get(label);
if (null == f) {
return 0.0f;
}
return f;
} finally {
readLock.unlock();
}
}
private void setUsageRatio(String label, float ratio) {
try {
writeLock.lock();
usageRatios.put(label, new Float(ratio));
} finally {
writeLock.unlock();
}
}
}
@VisibleForTesting
public float getUsageRatio(String label) {
return qUsageRatios.getUsageRatio(label);
}
@VisibleForTesting @VisibleForTesting
public static class User { public static class User {
ResourceUsage userResourceUsage = new ResourceUsage(); ResourceUsage userResourceUsage = new ResourceUsage();
volatile Resource userResourceLimit = Resource.newInstance(0, 0); volatile Resource userResourceLimit = Resource.newInstance(0, 0);
int pendingApplications = 0; int pendingApplications = 0;
int activeApplications = 0; int activeApplications = 0;
private UsageRatios userUsageRatios = new UsageRatios();
public ResourceUsage getResourceUsage() { public ResourceUsage getResourceUsage() {
return userResourceUsage; return userResourceUsage;
} }
public synchronized float resetAndUpdateUsageRatio(
ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
userUsageRatios.setUsageRatio(nodePartition, 0);
return updateUsageRatio(resourceCalculator, resource, nodePartition);
}
public synchronized float updateUsageRatio(
ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
float delta;
float newRatio =
Resources.ratio(resourceCalculator, getUsed(nodePartition), resource);
delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
userUsageRatios.setUsageRatio(nodePartition, newRatio);
return delta;
}
public Resource getUsed() { public Resource getUsed() {
return userResourceUsage.getUsed(); return userResourceUsage.getUsed();
} }
@ -1677,7 +1839,7 @@ public class LeafQueue extends AbstractCSQueue {
.getSchedulableEntities()) { .getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId()); apps.add(pendingApp.getApplicationAttemptId());
} }
for (FiCaSchedulerApp app : for (FiCaSchedulerApp app :
orderingPolicy.getSchedulableEntities()) { orderingPolicy.getSchedulableEntities()) {
apps.add(app.getApplicationAttemptId()); apps.add(app.getApplicationAttemptId());
} }

View File

@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -83,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
@ -97,6 +101,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.T
public class TestLeafQueue { public class TestLeafQueue {
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
RMContext rmContext; RMContext rmContext;
RMContext spyRMContext; RMContext spyRMContext;
@ -106,16 +111,29 @@ public class TestLeafQueue {
CapacitySchedulerContext csContext; CapacitySchedulerContext csContext;
CSQueue root; CSQueue root;
Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); Map<String, CSQueue> queues;
final static int GB = 1024; final static int GB = 1024;
final static String DEFAULT_RACK = "/default"; final static String DEFAULT_RACK = "/default";
private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator();
private final ResourceCalculator dominantResourceCalculator =
new DominantResourceCalculator();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
setUpInternal(resourceCalculator);
}
private void setUpWithDominantResourceCalculator() throws Exception {
setUpInternal(dominantResourceCalculator);
}
private void setUpInternal(ResourceCalculator rC) throws Exception {
CapacityScheduler spyCs = new CapacityScheduler(); CapacityScheduler spyCs = new CapacityScheduler();
queues = new HashMap<String, CSQueue>();
cs = spy(spyCs); cs = spy(spyCs);
rmContext = TestUtils.getMockRMContext(); rmContext = TestUtils.getMockRMContext();
spyRMContext = spy(rmContext); spyRMContext = spy(rmContext);
@ -134,6 +152,8 @@ public class TestLeafQueue {
csConf = csConf =
new CapacitySchedulerConfiguration(); new CapacitySchedulerConfiguration();
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
csConf.setBoolean(
"yarn.scheduler.capacity.reservations-continue-look-all-nodes", false);
final String newRoot = "root" + System.currentTimeMillis(); final String newRoot = "root" + System.currentTimeMillis();
setupQueueConfiguration(csConf, newRoot); setupQueueConfiguration(csConf, newRoot);
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
@ -153,6 +173,7 @@ public class TestLeafQueue {
when(csContext.getResourceCalculator()). when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator); thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()).thenReturn(rC);
when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf); new RMContainerTokenSecretManager(conf);
@ -179,7 +200,8 @@ public class TestLeafQueue {
.thenReturn(new YarnConfiguration()); .thenReturn(new YarnConfiguration());
when(cs.getNumClusterNodes()).thenReturn(3); when(cs.getNumClusterNodes()).thenReturn(3);
} }
private static final String A = "a"; private static final String A = "a";
private static final String B = "b"; private static final String B = "b";
private static final String C = "c"; private static final String C = "c";
@ -608,14 +630,180 @@ public class TestLeafQueue {
assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()), assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
a.getMetrics().getAvailableMB()); a.getMetrics().getAvailableMB());
} }
@Test
public void testDRFUsageRatioRounding() throws Exception {
CSAssignment assign;
setUpWithDominantResourceCalculator();
// Mock the queue
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(E));
// Users
final String user0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b,
b.getActiveUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0);
// Setup some nodes
String host0 = "127.0.0.1";
FiCaSchedulerNode node0 =
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 80 * GB, 100);
// Make cluster relatively large so usageRatios are small
int numNodes = 1000;
Resource clusterResource =
Resources.createResource(numNodes * (80 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Set user-limit. Need a small queue within a large cluster.
b.setUserLimit(50);
b.setUserLimitFactor(1000000);
b.setMaxCapacity(1.0f);
b.setAbsoluteCapacity(0.00001f);
// First allocation is larger than second but is still vcore dominant
// so usage ratio will be based on vcores. If consumedRatio doesn't round
// in our favor then new limit calculation will actually be less than
// what is currently consumed and we will fail to allocate
Priority priority = TestUtils.createMockPriority(1);
app0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 20 * GB, 29, 1, true,
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
app0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 10 * GB, 29, 2, true,
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertTrue("Still within limits, should assign",
assign.getResource().getMemorySize() > 0);
}
@Test
public void testDRFUserLimits() throws Exception {
setUpWithDominantResourceCalculator();
// Mock the queue
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
// unset maxCapacity
b.setMaxCapacity(1.0f);
// Users
final String user0 = "user_0";
final String user1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b,
b.getActiveUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0);
final ApplicationAttemptId appAttemptId2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app2 =
new FiCaSchedulerApp(appAttemptId2, user1, b,
b.getActiveUsersManager(), spyRMContext);
b.submitApplicationAttempt(app2, user1);
// Setup some nodes
String host0 = "127.0.0.1";
FiCaSchedulerNode node0 =
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8 * GB, 100);
String host1 = "127.0.0.2";
FiCaSchedulerNode node1 =
TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100);
int numNodes = 2;
Resource clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests so that one application is memory dominant
// and other application is vcores dominant
Priority priority = TestUtils.createMockPriority(1);
app0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 1 * GB, 40, 10, true,
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
app2.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 10, 10, true,
priority, recordFactory, RMNodeLabelsManager.NO_LABEL)));
/**
* Start testing...
*/
// Set user-limit
b.setUserLimit(50);
b.setUserLimitFactor(2);
User queueUser0 = b.getUser(user0);
User queueUser1 = b.getUser(user1);
assertEquals("There should 2 active users!", 2, b
.getActiveUsersManager().getNumActiveUsers());
// Fill both Nodes as far as we can
CSAssignment assign;
do {
assign =
b.assignContainers(clusterResource, node0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
LOG.info(assign.toString());
} while (assign.getResource().getMemorySize() > 0 &&
assign.getAssignmentInformation().getNumReservations() == 0);
do {
assign =
b.assignContainers(clusterResource, node1, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
} while (assign.getResource().getMemorySize() > 0 &&
assign.getAssignmentInformation().getNumReservations() == 0);
//LOG.info("user_0: " + queueUser0.getUsed());
//LOG.info("user_1: " + queueUser1.getUsed());
assertTrue("Verify user_0 got resources ", queueUser0.getUsed()
.getMemorySize() > 0);
assertTrue("Verify user_1 got resources ", queueUser1.getUsed()
.getMemorySize() > 0);
assertTrue(
"Exepected AbsoluteUsedCapacity > 0.95, got: "
+ b.getAbsoluteUsedCapacity(), b.getAbsoluteUsedCapacity() > 0.95);
// Verify consumedRatio is based on dominant resources
float expectedRatio =
queueUser0.getUsed().getVirtualCores()
/ (numNodes * 100.0f)
+ queueUser1.getUsed().getMemorySize()
/ (numNodes * 8.0f * GB);
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
// Add another node and make sure consumedRatio is adjusted
// accordingly.
numNodes = 3;
clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
expectedRatio =
queueUser0.getUsed().getVirtualCores()
/ (numNodes * 100.0f)
+ queueUser1.getUsed().getMemorySize()
/ (numNodes * 8.0f * GB);
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
}
@Test @Test
public void testUserLimits() throws Exception { public void testUserLimits() throws Exception {
// Mock the queue // Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity //unset maxCapacity
a.setMaxCapacity(1.0f); a.setMaxCapacity(1.0f);
// Users // Users
final String user_0 = "user_0"; final String user_0 = "user_0";
final String user_1 = "user_1"; final String user_1 = "user_1";

View File

@ -161,10 +161,17 @@ public class TestUtils {
public static ResourceRequest createResourceRequest( public static ResourceRequest createResourceRequest(
String resourceName, int memory, int numContainers, boolean relaxLocality, String resourceName, int memory, int numContainers, boolean relaxLocality,
Priority priority, RecordFactory recordFactory, String labelExpression) { Priority priority, RecordFactory recordFactory, String labelExpression) {
ResourceRequest request = return createResourceRequest(resourceName, memory, 1, numContainers,
relaxLocality, priority, recordFactory, labelExpression);
}
public static ResourceRequest createResourceRequest(String resourceName,
int memory, int vcores, int numContainers, boolean relaxLocality,
Priority priority, RecordFactory recordFactory, String labelExpression) {
ResourceRequest request =
recordFactory.newRecordInstance(ResourceRequest.class); recordFactory.newRecordInstance(ResourceRequest.class);
Resource capability = Resources.createResource(memory, 1); Resource capability = Resources.createResource(memory, vcores);
request.setNumContainers(numContainers); request.setNumContainers(numContainers);
request.setResourceName(resourceName); request.setResourceName(resourceName);
request.setCapability(capability); request.setCapability(capability);
@ -192,13 +199,18 @@ public class TestUtils {
return ApplicationAttemptId.newInstance(applicationId, attemptId); return ApplicationAttemptId.newInstance(applicationId, attemptId);
} }
public static FiCaSchedulerNode getMockNode( public static FiCaSchedulerNode getMockNode(String host, String rack,
String host, String rack, int port, int capability) { int port, int memory) {
return getMockNode(host, rack, port, memory, 1);
}
public static FiCaSchedulerNode getMockNode(String host, String rack,
int port, int memory, int vcores) {
NodeId nodeId = NodeId.newInstance(host, port); NodeId nodeId = NodeId.newInstance(host, port);
RMNode rmNode = mock(RMNode.class); RMNode rmNode = mock(RMNode.class);
when(rmNode.getNodeID()).thenReturn(nodeId); when(rmNode.getNodeID()).thenReturn(nodeId);
when(rmNode.getTotalCapability()).thenReturn( when(rmNode.getTotalCapability()).thenReturn(
Resources.createResource(capability, 1)); Resources.createResource(memory, vcores));
when(rmNode.getNodeAddress()).thenReturn(host+":"+port); when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
when(rmNode.getHostName()).thenReturn(host); when(rmNode.getHostName()).thenReturn(host);
when(rmNode.getRackName()).thenReturn(rack); when(rmNode.getRackName()).thenReturn(rack);