diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3b99b35f2aa..3594bb04cc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -18,17 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -75,9 +65,22 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @Private @Unstable @@ -116,7 +119,7 @@ public class LeafQueue extends AbstractCSQueue { // cache last cluster resource to compute actual capacity private Resource lastClusterResource = Resources.none(); - + private final QueueResourceLimitsInfo queueResourceLimitsInfo = new QueueResourceLimitsInfo(); @@ -124,6 +127,10 @@ public class LeafQueue extends AbstractCSQueue { private OrderingPolicy orderingPolicy = null; + // Summation of consumed ratios for all users in queue + private float totalUserConsumedRatio = 0; + private UsageRatios qUsageRatios; + // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on private Map> ignorePartitionExclusivityRMContainers = @@ -140,6 +147,8 @@ public LeafQueue(CapacitySchedulerContext cs, // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); + qUsageRatios = new UsageRatios(); + if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); @@ -164,7 +173,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) setQueueResourceLimitsInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - + setOrderingPolicy(conf.getOrderingPolicy(getQueuePath())); userLimit = conf.getUserLimit(getQueuePath()); @@ -1113,6 +1122,9 @@ public boolean getRackLocalityFullReset() { private Resource computeUserLimit(FiCaSchedulerApp application, Resource clusterResource, User user, String nodePartition, SchedulingMode schedulingMode) { + Resource partitionResource = labelManager.getResourceByLabel(nodePartition, + clusterResource); + // What is our current capacity? // * It is equal to the max(required, queue-capacity) if // we're running below capacity. The 'max' ensures that jobs in queues @@ -1121,7 +1133,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, // (usedResources + required) (which extra resources we are allocating) Resource queueCapacity = Resources.multiplyAndNormalizeUp(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), + partitionResource, queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); @@ -1133,15 +1145,30 @@ private Resource computeUserLimit(FiCaSchedulerApp application, // Allow progress for queues with miniscule capacity queueCapacity = Resources.max( - resourceCalculator, clusterResource, + resourceCalculator, partitionResource, queueCapacity, required); + + /* We want to base the userLimit calculation on + * max(queueCapacity, usedResources+required). However, we want + * usedResources to be based on the combined ratios of all the users in the + * queue so we use consumedRatio to calculate such. + * The calculation is dependent on how the resourceCalculator calculates the + * ratio between two Resources. DRF Example: If usedResources is + * greater than queueCapacity and users have the following [mem,cpu] usages: + * User1: [10%,20%] - Dominant resource is 20% + * User2: [30%,10%] - Dominant resource is 30% + * Then total consumedRatio is then 20+30=50%. Yes, this value can be + * larger than 100% but for the purposes of making sure all users are + * getting their fair share, it works. + */ + Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, + partitionResource, qUsageRatios.getUsageRatio(nodePartition), + minimumAllocation); Resource currentCapacity = - Resources.lessThan(resourceCalculator, clusterResource, - queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity - : Resources.add(queueUsage.getUsed(nodePartition), required); - + Resources.lessThan(resourceCalculator, partitionResource, consumed, + queueCapacity) ? queueCapacity : Resources.add(consumed, required); // Never allow a single user to take more than the // queue's configured capacity * user-limit-factor. // Also, the queue's configured capacity should be higher than @@ -1150,9 +1177,10 @@ private Resource computeUserLimit(FiCaSchedulerApp application, final int activeUsers = activeUsersManager.getNumActiveUsers(); // User limit resource is determined by: - // max{currentCapacity / #activeUsers, currentCapacity * user-limit-percentage%) + // max{currentCapacity / #activeUsers, currentCapacity * + // user-limit-percentage%) Resource userLimitResource = Resources.max( - resourceCalculator, clusterResource, + resourceCalculator, partitionResource, Resources.divideAndCeil( resourceCalculator, currentCapacity, activeUsers), Resources.divideAndCeil( @@ -1176,8 +1204,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor); } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - maxUserLimit = - labelManager.getResourceByLabel(nodePartition, clusterResource); + maxUserLimit = partitionResource; } // Cap final user limit with maxUserLimit @@ -1185,7 +1212,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, Resources.roundUp( resourceCalculator, Resources.min( - resourceCalculator, clusterResource, + resourceCalculator, partitionResource, userLimitResource, maxUserLimit ), @@ -1193,18 +1220,22 @@ private Resource computeUserLimit(FiCaSchedulerApp application, if (LOG.isDebugEnabled()) { String userName = application.getUser(); - LOG.debug("User limit computation for " + userName + + LOG.debug("User limit computation for " + userName + " in queue " + getQueueName() + " userLimitPercent=" + userLimit + " userLimitFactor=" + userLimitFactor + - " required: " + required + - " consumed: " + user.getUsed() + + " required: " + required + + " consumed: " + consumed + " user-limit-resource: " + userLimitResource + - " queueCapacity: " + queueCapacity + + " queueCapacity: " + queueCapacity + " qconsumed: " + queueUsage.getUsed() + + " consumedRatio: " + totalUserConsumedRatio + " currentCapacity: " + currentCapacity + " activeUsers: " + activeUsers + - " clusterCapacity: " + clusterResource + " clusterCapacity: " + clusterResource + + " resourceByLabel: " + partitionResource + + " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) + + " Partition: " + nodePartition ); } user.setUserResourceLimit(userLimitResource); @@ -1311,6 +1342,42 @@ private void updateSchedulerHealthForCompletedContainer( } } + private synchronized float calculateUserUsageRatio(Resource clusterResource, + String nodePartition) { + Resource resourceByLabel = + labelManager.getResourceByLabel(nodePartition, clusterResource); + float consumed = 0; + User user; + for (Map.Entry entry : users.entrySet()) { + user = entry.getValue(); + consumed += user.resetAndUpdateUsageRatio(resourceCalculator, + resourceByLabel, nodePartition); + } + return consumed; + } + + private synchronized void recalculateQueueUsageRatio(Resource clusterResource, + String nodePartition) { + ResourceUsage queueResourceUsage = this.getQueueResourceUsage(); + + if (nodePartition == null) { + for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(), + queueResourceUsage.getNodePartitionsSet())) { + qUsageRatios.setUsageRatio(partition, + calculateUserUsageRatio(clusterResource, partition)); + } + } else { + qUsageRatios.setUsageRatio(nodePartition, + calculateUserUsageRatio(clusterResource, nodePartition)); + } + } + + private synchronized void updateQueueUsageRatio(String nodePartition, + float delta) { + qUsageRatios.incUsageRatio(nodePartition, delta); + } + + @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, @@ -1348,7 +1415,7 @@ public void completedContainer(Resource clusterResource, removed = application.containerCompleted(rmContainer, containerStatus, event, node.getPartition()); - + node.releaseContainer(container); } @@ -1381,6 +1448,8 @@ synchronized void allocateResource(Resource clusterResource, boolean isIncreasedAllocation) { super.allocateResource(clusterResource, resource, nodePartition, isIncreasedAllocation); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); // handle ignore exclusivity container if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( @@ -1399,6 +1468,12 @@ synchronized void allocateResource(Resource clusterResource, String userName = application.getUser(); User user = getUser(userName); user.assignContainer(resource, nodePartition); + + // Update usage ratios + updateQueueUsageRatio(nodePartition, + user.updateUsageRatio(resourceCalculator, resourceByLabel, + nodePartition)); + // Note this is a bit unconventional since it gets the object and modifies // it here, rather then using set routine Resources.subtractFrom(application.getHeadroom(), resource); // headroom @@ -1419,6 +1494,8 @@ synchronized void releaseResource(Resource clusterResource, RMContainer rmContainer, boolean isChangeResource) { super.releaseResource(clusterResource, resource, nodePartition, isChangeResource); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); // handle ignore exclusivity container if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( @@ -1438,6 +1515,12 @@ synchronized void releaseResource(Resource clusterResource, String userName = application.getUser(); User user = getUser(userName); user.releaseContainer(resource, nodePartition); + + // Update usage ratios + updateQueueUsageRatio(nodePartition, + user.updateUsageRatio(resourceCalculator, resourceByLabel, + nodePartition)); + metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); if (LOG.isDebugEnabled()) { @@ -1477,7 +1560,10 @@ public synchronized void updateClusterResource(Resource clusterResource, // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation setQueueResourceLimitsInfo(clusterResource); - + + // Update user consumedRatios + recalculateQueueUsageRatio(clusterResource, null); + // Update metrics CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, minimumAllocation, this, labelManager, null); @@ -1529,17 +1615,93 @@ public void decAMUsedResource(String nodeLabel, Resource resourceToDec, queueUsage.decAMUsed(nodeLabel, resourceToDec); } + /* + * Usage Ratio + */ + static private class UsageRatios { + private Map usageRatios; + private ReadLock readLock; + private WriteLock writeLock; + + public UsageRatios() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + usageRatios = new HashMap(); + } + + private void incUsageRatio(String label, float delta) { + try { + writeLock.lock(); + Float fl = usageRatios.get(label); + if (null == fl) { + fl = new Float(0.0); + } + fl += delta; + usageRatios.put(label, new Float(fl)); + } finally { + writeLock.unlock(); + } + } + + float getUsageRatio(String label) { + try { + readLock.lock(); + Float f = usageRatios.get(label); + if (null == f) { + return 0.0f; + } + return f; + } finally { + readLock.unlock(); + } + } + + private void setUsageRatio(String label, float ratio) { + try { + writeLock.lock(); + usageRatios.put(label, new Float(ratio)); + } finally { + writeLock.unlock(); + } + } + } + + @VisibleForTesting + public float getUsageRatio(String label) { + return qUsageRatios.getUsageRatio(label); + } + @VisibleForTesting public static class User { ResourceUsage userResourceUsage = new ResourceUsage(); volatile Resource userResourceLimit = Resource.newInstance(0, 0); int pendingApplications = 0; int activeApplications = 0; + private UsageRatios userUsageRatios = new UsageRatios(); public ResourceUsage getResourceUsage() { return userResourceUsage; } + public synchronized float resetAndUpdateUsageRatio( + ResourceCalculator resourceCalculator, + Resource resource, String nodePartition) { + userUsageRatios.setUsageRatio(nodePartition, 0); + return updateUsageRatio(resourceCalculator, resource, nodePartition); + } + + public synchronized float updateUsageRatio( + ResourceCalculator resourceCalculator, + Resource resource, String nodePartition) { + float delta; + float newRatio = + Resources.ratio(resourceCalculator, getUsed(nodePartition), resource); + delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); + userUsageRatios.setUsageRatio(nodePartition, newRatio); + return delta; + } + public Resource getUsed() { return userResourceUsage.getUsed(); } @@ -1677,7 +1839,7 @@ public synchronized void collectSchedulerApplications( .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); } - for (FiCaSchedulerApp app : + for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) { apps.add(app.getApplicationAttemptId()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 274c0631769..b2c53da4746 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -83,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; @@ -97,6 +101,7 @@ public class TestLeafQueue { private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private static final Log LOG = LogFactory.getLog(TestLeafQueue.class); RMContext rmContext; RMContext spyRMContext; @@ -106,16 +111,29 @@ public class TestLeafQueue { CapacitySchedulerContext csContext; CSQueue root; - Map queues = new HashMap(); + Map queues; final static int GB = 1024; final static String DEFAULT_RACK = "/default"; - private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + private final ResourceCalculator resourceCalculator = + new DefaultResourceCalculator(); + private final ResourceCalculator dominantResourceCalculator = + new DominantResourceCalculator(); + @Before public void setUp() throws Exception { + setUpInternal(resourceCalculator); + } + + private void setUpWithDominantResourceCalculator() throws Exception { + setUpInternal(dominantResourceCalculator); + } + + private void setUpInternal(ResourceCalculator rC) throws Exception { CapacityScheduler spyCs = new CapacityScheduler(); + queues = new HashMap(); cs = spy(spyCs); rmContext = TestUtils.getMockRMContext(); spyRMContext = spy(rmContext); @@ -134,6 +152,8 @@ public void setUp() throws Exception { csConf = new CapacitySchedulerConfiguration(); csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); + csConf.setBoolean( + "yarn.scheduler.capacity.reservations-continue-look-all-nodes", false); final String newRoot = "root" + System.currentTimeMillis(); setupQueueConfiguration(csConf, newRoot); YarnConfiguration conf = new YarnConfiguration(); @@ -153,6 +173,7 @@ public void setUp() throws Exception { when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); + when(csContext.getResourceCalculator()).thenReturn(rC); when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); @@ -179,7 +200,8 @@ public void setUp() throws Exception { .thenReturn(new YarnConfiguration()); when(cs.getNumClusterNodes()).thenReturn(3); } - + + private static final String A = "a"; private static final String B = "b"; private static final String C = "c"; @@ -608,14 +630,180 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()), a.getMetrics().getAvailableMB()); } - + @Test + public void testDRFUsageRatioRounding() throws Exception { + CSAssignment assign; + setUpWithDominantResourceCalculator(); + // Mock the queue + LeafQueue b = stubLeafQueue((LeafQueue) queues.get(E)); + + // Users + final String user0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app0 = + new FiCaSchedulerApp(appAttemptId0, user0, b, + b.getActiveUsersManager(), spyRMContext); + b.submitApplicationAttempt(app0, user0); + + // Setup some nodes + String host0 = "127.0.0.1"; + FiCaSchedulerNode node0 = + TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 80 * GB, 100); + + // Make cluster relatively large so usageRatios are small + int numNodes = 1000; + Resource clusterResource = + Resources.createResource(numNodes * (80 * GB), numNodes * 100); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Set user-limit. Need a small queue within a large cluster. + b.setUserLimit(50); + b.setUserLimitFactor(1000000); + b.setMaxCapacity(1.0f); + b.setAbsoluteCapacity(0.00001f); + + // First allocation is larger than second but is still vcore dominant + // so usage ratio will be based on vcores. If consumedRatio doesn't round + // in our favor then new limit calculation will actually be less than + // what is currently consumed and we will fail to allocate + Priority priority = TestUtils.createMockPriority(1); + app0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 20 * GB, 29, 1, true, + priority, recordFactory, RMNodeLabelsManager.NO_LABEL))); + assign = b.assignContainers(clusterResource, node0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + app0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 10 * GB, 29, 2, true, + priority, recordFactory, RMNodeLabelsManager.NO_LABEL))); + assign = b.assignContainers(clusterResource, node0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertTrue("Still within limits, should assign", + assign.getResource().getMemorySize() > 0); + } + + @Test + public void testDRFUserLimits() throws Exception { + setUpWithDominantResourceCalculator(); + + // Mock the queue + LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B)); + // unset maxCapacity + b.setMaxCapacity(1.0f); + + // Users + final String user0 = "user_0"; + final String user1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app0 = + new FiCaSchedulerApp(appAttemptId0, user0, b, + b.getActiveUsersManager(), spyRMContext); + b.submitApplicationAttempt(app0, user0); + + final ApplicationAttemptId appAttemptId2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app2 = + new FiCaSchedulerApp(appAttemptId2, user1, b, + b.getActiveUsersManager(), spyRMContext); + b.submitApplicationAttempt(app2, user1); + + // Setup some nodes + String host0 = "127.0.0.1"; + FiCaSchedulerNode node0 = + TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8 * GB, 100); + String host1 = "127.0.0.2"; + FiCaSchedulerNode node1 = + TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100); + + int numNodes = 2; + Resource clusterResource = + Resources.createResource(numNodes * (8 * GB), numNodes * 100); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests so that one application is memory dominant + // and other application is vcores dominant + Priority priority = TestUtils.createMockPriority(1); + app0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 40, 10, true, + priority, recordFactory, RMNodeLabelsManager.NO_LABEL))); + + app2.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 10, 10, true, + priority, recordFactory, RMNodeLabelsManager.NO_LABEL))); + + /** + * Start testing... + */ + + // Set user-limit + b.setUserLimit(50); + b.setUserLimitFactor(2); + User queueUser0 = b.getUser(user0); + User queueUser1 = b.getUser(user1); + + assertEquals("There should 2 active users!", 2, b + .getActiveUsersManager().getNumActiveUsers()); + // Fill both Nodes as far as we can + CSAssignment assign; + do { + assign = + b.assignContainers(clusterResource, node0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + LOG.info(assign.toString()); + } while (assign.getResource().getMemorySize() > 0 && + assign.getAssignmentInformation().getNumReservations() == 0); + do { + assign = + b.assignContainers(clusterResource, node1, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + } while (assign.getResource().getMemorySize() > 0 && + assign.getAssignmentInformation().getNumReservations() == 0); + //LOG.info("user_0: " + queueUser0.getUsed()); + //LOG.info("user_1: " + queueUser1.getUsed()); + + assertTrue("Verify user_0 got resources ", queueUser0.getUsed() + .getMemorySize() > 0); + assertTrue("Verify user_1 got resources ", queueUser1.getUsed() + .getMemorySize() > 0); + assertTrue( + "Exepected AbsoluteUsedCapacity > 0.95, got: " + + b.getAbsoluteUsedCapacity(), b.getAbsoluteUsedCapacity() > 0.95); + + // Verify consumedRatio is based on dominant resources + float expectedRatio = + queueUser0.getUsed().getVirtualCores() + / (numNodes * 100.0f) + + queueUser1.getUsed().getMemorySize() + / (numNodes * 8.0f * GB); + assertEquals(expectedRatio, b.getUsageRatio(""), 0.001); + // Add another node and make sure consumedRatio is adjusted + // accordingly. + numNodes = 3; + clusterResource = + Resources.createResource(numNodes * (8 * GB), numNodes * 100); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); + expectedRatio = + queueUser0.getUsed().getVirtualCores() + / (numNodes * 100.0f) + + queueUser1.getUsed().getMemorySize() + / (numNodes * 8.0f * GB); + assertEquals(expectedRatio, b.getUsageRatio(""), 0.001); + } + @Test public void testUserLimits() throws Exception { // Mock the queue LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); //unset maxCapacity a.setMaxCapacity(1.0f); - + // Users final String user_0 = "user_0"; final String user_1 = "user_1"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 66e833f1798..a60b7ed4bf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -161,10 +161,17 @@ public static Priority createMockPriority( int priority) { public static ResourceRequest createResourceRequest( String resourceName, int memory, int numContainers, boolean relaxLocality, Priority priority, RecordFactory recordFactory, String labelExpression) { - ResourceRequest request = + return createResourceRequest(resourceName, memory, 1, numContainers, + relaxLocality, priority, recordFactory, labelExpression); + } + + public static ResourceRequest createResourceRequest(String resourceName, + int memory, int vcores, int numContainers, boolean relaxLocality, + Priority priority, RecordFactory recordFactory, String labelExpression) { + ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); - Resource capability = Resources.createResource(memory, 1); - + Resource capability = Resources.createResource(memory, vcores); + request.setNumContainers(numContainers); request.setResourceName(resourceName); request.setCapability(capability); @@ -192,13 +199,18 @@ public static ApplicationId getMockApplicationId(int appId) { return ApplicationAttemptId.newInstance(applicationId, attemptId); } - public static FiCaSchedulerNode getMockNode( - String host, String rack, int port, int capability) { + public static FiCaSchedulerNode getMockNode(String host, String rack, + int port, int memory) { + return getMockNode(host, rack, port, memory, 1); + } + + public static FiCaSchedulerNode getMockNode(String host, String rack, + int port, int memory, int vcores) { NodeId nodeId = NodeId.newInstance(host, port); RMNode rmNode = mock(RMNode.class); when(rmNode.getNodeID()).thenReturn(nodeId); when(rmNode.getTotalCapability()).thenReturn( - Resources.createResource(capability, 1)); + Resources.createResource(memory, vcores)); when(rmNode.getNodeAddress()).thenReturn(host+":"+port); when(rmNode.getHostName()).thenReturn(host); when(rmNode.getRackName()).thenReturn(rack);