From 06a0772d62288b5f1b0a57af75feca5cd11dede3 Mon Sep 17 00:00:00 2001 From: Jian He Date: Thu, 7 Aug 2014 20:01:14 +0000 Subject: [PATCH] Merge r1616580 from trunk. YARN-2008. Fixed CapacityScheduler to calculate headroom based on max available capacity instead of configured max capacity. Contributed by Craig Welch git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616581 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../resource/DefaultResourceCalculator.java | 7 + .../resource/DominantResourceCalculator.java | 8 + .../util/resource/ResourceCalculator.java | 9 + .../hadoop/yarn/util/resource/Resources.java | 5 + .../scheduler/capacity/CSQueueUtils.java | 53 ++++ .../scheduler/capacity/LeafQueue.java | 9 +- .../scheduler/capacity/TestCSQueueUtils.java | 262 ++++++++++++++++++ 8 files changed, 354 insertions(+), 2 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ef92bc662cd..c6eb8df27f5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -127,6 +127,9 @@ Release 2.6.0 - UNRELEASED YARN-2388. Fixed TestTimelineWebServices failure due to HADOOP-10791. (zjshen) + YARN-2008. Fixed CapacityScheduler to calculate headroom based on max available + capacity instead of configured max capacity. (Craig Welch via jianhe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index 294e6215efb..c2fc1f0e73a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -42,6 +42,13 @@ public float divide(Resource unused, Resource numerator, Resource denominator) { return ratio(numerator, denominator); } + + public boolean isInvalidDivisor(Resource r) { + if (r.getMemory() == 0.0f) { + return true; + } + return false; + } @Override public float ratio(Resource a, Resource b) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index d0f58331230..6f5b40eebfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -109,6 +109,14 @@ public float divide(Resource clusterResource, getResourceAsValue(clusterResource, numerator, true) / getResourceAsValue(clusterResource, denominator, true); } + + @Override + public boolean isInvalidDivisor(Resource r) { + if (r.getMemory() == 0.0f || r.getVirtualCores() == 0.0f) { + return true; + } + return false; + } @Override public float ratio(Resource a, Resource b) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index 490cd38c14c..cb076997d24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -149,6 +149,15 @@ public abstract Resource normalize(Resource r, Resource minimumResource, public abstract float divide( Resource clusterResource, Resource numerator, Resource denominator); + /** + * Determine if a resource is not suitable for use as a divisor + * (will result in divide by 0, etc) + * + * @param r resource + * @return true if divisor is invalid (should not be used), false else + */ + public abstract boolean isInvalidDivisor(Resource r); + /** * Ratio of resource a to resource b. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 800fa34e570..077c9628831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -184,6 +184,11 @@ public static Resource roundDown( return calculator.roundDown(lhs, factor); } + public static boolean isInvalidDivisor( + ResourceCalculator resourceCalculator, Resource divisor) { + return resourceCalculator.isInvalidDivisor(divisor); + } + public static float ratio( ResourceCalculator resourceCalculator, Resource lhs, Resource rhs) { return resourceCalculator.ratio(lhs, rhs); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 1dd55862b9c..737062bbcdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -24,6 +27,8 @@ class CSQueueUtils { + private static final Log LOG = LogFactory.getLog(CSQueueUtils.class); + final static float EPSILON = 0.0001f; public static void checkMaxCapacity(String queueName, @@ -113,4 +118,52 @@ public static void updateQueueStatistics( ) ); } + + public static float getAbsoluteMaxAvailCapacity( + ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) { + CSQueue parent = queue.getParent(); + if (parent == null) { + return queue.getAbsoluteMaximumCapacity(); + } + + //Get my parent's max avail, needed to determine my own + float parentMaxAvail = getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, parent); + //...and as a resource + Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail); + + //check for no resources parent before dividing, if so, max avail is none + if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) { + return 0.0f; + } + //sibling used is parent used - my used... + float siblingUsedCapacity = Resources.ratio( + resourceCalculator, + Resources.subtract(parent.getUsedResources(), queue.getUsedResources()), + parentResource); + //my max avail is the lesser of my max capacity and what is unused from my parent + //by my siblings (if they are beyond their base capacity) + float maxAvail = Math.min( + queue.getMaximumCapacity(), + 1.0f - siblingUsedCapacity); + //and, mutiply by parent to get absolute (cluster relative) value + float absoluteMaxAvail = maxAvail * parentMaxAvail; + + if (LOG.isDebugEnabled()) { + LOG.debug("qpath " + queue.getQueuePath()); + LOG.debug("parentMaxAvail " + parentMaxAvail); + LOG.debug("siblingUsedCapacity " + siblingUsedCapacity); + LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity()); + LOG.debug("maxAvail " + maxAvail); + LOG.debug("absoluteMaxAvail " + absoluteMaxAvail); + } + + if (absoluteMaxAvail < 0.0f) { + absoluteMaxAvail = 0.0f; + } else if (absoluteMaxAvail > 1.0f) { + absoluteMaxAvail = 1.0f; + } + + return absoluteMaxAvail; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 65938aab8dc..4fd8b490577 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -976,13 +976,18 @@ private Resource computeUserLimitAndSetHeadroom( Resource userLimit = // User limit computeUserLimit(application, clusterResource, required); - + + //Max avail capacity needs to take into account usage by ancestor-siblings + //which are greater than their base capacity, so we are interested in "max avail" + //capacity + float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, this); Resource queueMaxCap = // Queue Max-Capacity Resources.multiplyAndNormalizeDown( resourceCalculator, clusterResource, - absoluteMaxCapacity, + absoluteMaxAvailCapacity, minimumAllocation); Resource userConsumed = getUser(user).getConsumedResources(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java new file mode 100644 index 00000000000..7260afd9bcc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestCSQueueUtils { + + private static final Log LOG = LogFactory.getLog(TestCSQueueUtils.class); + + final static int GB = 1024; + + @Test + public void testAbsoluteMaxAvailCapacityInvalidDivisor() throws Exception { + runInvalidDivisorTest(false); + runInvalidDivisorTest(true); + } + + public void runInvalidDivisorTest(boolean useDominant) throws Exception { + + ResourceCalculator resourceCalculator; + Resource clusterResource; + if (useDominant) { + resourceCalculator = new DominantResourceCalculator(); + clusterResource = Resources.createResource(10, 0); + } else { + resourceCalculator = new DefaultResourceCalculator(); + clusterResource = Resources.createResource(0, 99); + } + + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getClusterResource()).thenReturn(clusterResource); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(0, 0)); + + final String L1Q1 = "L1Q1"; + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); + + final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; + csConf.setCapacity(L1Q1P, 90); + csConf.setMaximumCapacity(L1Q1P, 90); + + ParentQueue root = new ParentQueue(csContext, + CapacitySchedulerConfiguration.ROOT, null, null); + LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); + + LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, root)); + + LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l1q1)); + + assertEquals(0.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l1q1), 0.000001f); + + } + + @Test + public void testAbsoluteMaxAvailCapacityNoUse() throws Exception { + + ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); + + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getClusterResource()).thenReturn(clusterResource); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(16*GB, 32)); + + final String L1Q1 = "L1Q1"; + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); + + final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; + csConf.setCapacity(L1Q1P, 90); + csConf.setMaximumCapacity(L1Q1P, 90); + + ParentQueue root = new ParentQueue(csContext, + CapacitySchedulerConfiguration.ROOT, null, null); + LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); + + LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, root)); + + LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l1q1)); + + assertEquals(1.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, root), 0.000001f); + + assertEquals(0.9f, CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l1q1), 0.000001f); + + } + + @Test + public void testAbsoluteMaxAvailCapacityWithUse() throws Exception { + + ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); + + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getClusterResource()).thenReturn(clusterResource); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(16*GB, 32)); + + final String L1Q1 = "L1Q1"; + final String L1Q2 = "L1Q2"; + final String L2Q1 = "L2Q1"; + final String L2Q2 = "L2Q2"; + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1, L1Q2, + L2Q1, L2Q2}); + + final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; + csConf.setCapacity(L1Q1P, 80); + csConf.setMaximumCapacity(L1Q1P, 80); + + final String L1Q2P = CapacitySchedulerConfiguration.ROOT + "." + L1Q2; + csConf.setCapacity(L1Q2P, 20); + csConf.setMaximumCapacity(L1Q2P, 100); + + final String L2Q1P = L1Q1P + "." + L2Q1; + csConf.setCapacity(L2Q1P, 50); + csConf.setMaximumCapacity(L2Q1P, 50); + + final String L2Q2P = L1Q1P + "." + L2Q2; + csConf.setCapacity(L2Q2P, 50); + csConf.setMaximumCapacity(L2Q2P, 50); + + float result; + + ParentQueue root = new ParentQueue(csContext, + CapacitySchedulerConfiguration.ROOT, null, null); + + LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); + LeafQueue l1q2 = new LeafQueue(csContext, L1Q2, root, null); + LeafQueue l2q2 = new LeafQueue(csContext, L2Q2, l1q1, null); + LeafQueue l2q1 = new LeafQueue(csContext, L2Q1, l1q1, null); + + //no usage, all based on maxCapacity (prior behavior) + result = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l2q2); + assertEquals( 0.4f, result, 0.000001f); + LOG.info("t2 l2q2 " + result); + + //some usage, but below the base capacity + Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); + Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); + result = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l2q2); + assertEquals( 0.4f, result, 0.000001f); + LOG.info("t2 l2q2 " + result); + + //usage gt base on parent sibling + Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.3f)); + Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.3f)); + result = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l2q2); + assertEquals( 0.3f, result, 0.000001f); + LOG.info("t2 l2q2 " + result); + + //same as last, but with usage also on direct parent + Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); + Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); + result = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l2q2); + assertEquals( 0.3f, result, 0.000001f); + LOG.info("t2 l2q2 " + result); + + //add to direct sibling, below the threshold of effect at present + Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + result = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l2q2); + assertEquals( 0.3f, result, 0.000001f); + LOG.info("t2 l2q2 " + result); + + //add to direct sibling, now above the threshold of effect + //(it's cumulative with prior tests) + Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + result = CSQueueUtils.getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, l2q2); + assertEquals( 0.1f, result, 0.000001f); + LOG.info("t2 l2q2 " + result); + + + } + +}