From ec06957941367930c855b5e05e6a84ba676fd46a Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 8 Apr 2016 15:33:04 -0700 Subject: [PATCH] YARN-3215. Respect labels in CapacityScheduler when computing headroom. (Naganarasimha G R via wangda) --- .../scheduler/AppSchedulingInfo.java | 12 +- .../scheduler/capacity/AbstractCSQueue.java | 14 +- .../capacity/CapacityHeadroomProvider.java | 28 ++- .../scheduler/capacity/CapacityScheduler.java | 6 + .../capacity/CapacitySchedulerContext.java | 12 +- .../scheduler/capacity/LeafQueue.java | 53 +++-- .../capacity/TestApplicationLimits.java | 22 +- .../TestApplicationLimitsByPartition.java | 200 +++++++++++++++++- .../scheduler/capacity/TestLeafQueue.java | 10 +- .../scheduler/capacity/TestReservations.java | 5 + .../scheduler/capacity/TestUtils.java | 12 +- 11 files changed, 328 insertions(+), 46 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index a61001e8bea..5952cc2e27e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -29,8 +29,8 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; @@ -75,6 +76,7 @@ public class AppSchedulingInfo { private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false); private final Set amBlacklist = new HashSet<>(); private Set userBlacklist = new HashSet<>(); + private Set requestedPartitions = new HashSet<>(); final Set priorities = new TreeSet<>(COMPARATOR); final Map> resourceRequestMap = @@ -119,6 +121,10 @@ public class AppSchedulingInfo { return pending; } + public Set getRequestedPartitions() { + return requestedPartitions; + } + /** * Clear any pending requests from this application. */ @@ -340,6 +346,10 @@ public class AppSchedulingInfo { asks.put(resourceName, request); if (resourceName.equals(ResourceRequest.ANY)) { + //update the applications requested labels set + requestedPartitions.add(request.getNodeLabelExpression() == null + ? RMNodeLabelsManager.NO_LABEL : request.getNodeLabelExpression()); + anyResourcesUpdated = true; // Activate application. Metrics activation is done here. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index c7d6d028983..dc90c5b0071 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -439,9 +439,8 @@ public abstract class AbstractCSQueue implements CSQueue { * limit-set-by-parent) */ Resource queueMaxResource = - Resources.multiplyAndNormalizeDown(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), - queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); + getQueueMaxResource(nodePartition, clusterResource); + return Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit()); } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { @@ -452,7 +451,14 @@ public abstract class AbstractCSQueue implements CSQueue { return Resources.none(); } - + + Resource getQueueMaxResource(String nodePartition, Resource clusterResource) { + return Resources.multiplyAndNormalizeDown(resourceCalculator, + labelManager.getResourceByLabel(nodePartition, clusterResource), + queueCapacities.getAbsoluteMaximumCapacity(nodePartition), + minimumAllocation); + } + synchronized boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java index a3adf9a91a3..95a12dc9399 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import java.util.Set; + import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; public class CapacityHeadroomProvider { @@ -38,22 +42,32 @@ public class CapacityHeadroomProvider { } public Resource getHeadroom() { - + Resource queueCurrentLimit; Resource clusterResource; synchronized (queueResourceLimitsInfo) { queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); clusterResource = queueResourceLimitsInfo.getClusterResource(); } - Resource headroom = queue.getHeadroom(user, queueCurrentLimit, - clusterResource, application); - + Set requestedPartitions = + application.getAppSchedulingInfo().getRequestedPartitions(); + Resource headroom; + if (requestedPartitions.isEmpty() || (requestedPartitions.size() == 1 + && requestedPartitions.contains(RMNodeLabelsManager.NO_LABEL))) { + headroom = queue.getHeadroom(user, queueCurrentLimit, clusterResource, + application); + } else { + headroom = Resource.newInstance(0, 0); + for (String partition : requestedPartitions) { + Resource partitionHeadRoom = queue.getHeadroom(user, queueCurrentLimit, + clusterResource, application, partition); + Resources.addTo(headroom, partitionHeadRoom); + } + } // Corner case to deal with applications being slightly over-limit if (headroom.getMemory() < 0) { headroom.setMemory(0); } return headroom; - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cf5c3b52b74..34a9829018a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -2124,4 +2125,9 @@ public class CapacityScheduler extends public PreemptionManager getPreemptionManager() { return preemptionManager; } + + @Override + public ResourceUsage getClusterResourceUsage() { + return root.getQueueResourceUsage(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 120327221d6..b39b289d049 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -18,11 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.util.Comparator; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -30,8 +33,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import java.util.Comparator; - /** * Read-only interface to {@link CapacityScheduler} context. */ @@ -72,4 +73,11 @@ public interface CapacitySchedulerContext { SchedulerHealth getSchedulerHealth(); long getLastNodeUpdateTime(); + + /** + * @return QueueCapacities root queue of the Capacity Scheduler Queue, root + * queue used capacities for different labels are same as that of the + * cluster. + */ + ResourceUsage getClusterResourceUsage(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9a74c2288c2..aabdf9c286b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -986,14 +986,21 @@ public class LeafQueue extends AbstractCSQueue { protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application) { - return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application, clusterResource, user, - RMNodeLabelsManager.NO_LABEL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + return getHeadroom(user, queueCurrentLimit, clusterResource, application, + RMNodeLabelsManager.NO_LABEL); } - - private Resource getHeadroom(User user, Resource currentResourceLimit, - Resource clusterResource, Resource userLimit) { + + protected Resource getHeadroom(User user, Resource queueCurrentLimit, + Resource clusterResource, FiCaSchedulerApp application, + String partition) { + return getHeadroom(user, queueCurrentLimit, clusterResource, + computeUserLimit(application, clusterResource, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); + } + + private Resource getHeadroom(User user, + Resource currentPartitionResourceLimit, Resource clusterResource, + Resource userLimitResource, String partition) { /** * Headroom is: * min( @@ -1010,15 +1017,33 @@ public class LeafQueue extends AbstractCSQueue { * * >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) << * + * sum of queue max capacities of multiple queue's will be greater than the + * actual capacity of a given partition, hence we need to ensure that the + * headroom is not greater than the available resource for a given partition + * + * headroom = min (unused resourcelimit of a label, calculated headroom ) */ - Resource headroom = - Resources.componentwiseMin( - Resources.subtract(userLimit, user.getUsed()), - Resources.subtract(currentResourceLimit, queueUsage.getUsed()) - ); + currentPartitionResourceLimit = + partition.equals(RMNodeLabelsManager.NO_LABEL) + ? currentPartitionResourceLimit + : getQueueMaxResource(partition, clusterResource); + + Resource headroom = Resources.componentwiseMin( + Resources.subtract(userLimitResource, user.getUsed(partition)), + Resources.subtract(currentPartitionResourceLimit, + queueUsage.getUsed(partition))); // Normalize it before return headroom = Resources.roundDown(resourceCalculator, headroom, minimumAllocation); + + //headroom = min (unused resourcelimit of a label, calculated headroom ) + Resource clusterPartitionResource = + labelManager.getResourceByLabel(partition, clusterResource); + Resource clusterFreePartitionResource = + Resources.subtract(clusterPartitionResource, + csContext.getClusterResourceUsage().getUsed(partition)); + headroom = Resources.min(resourceCalculator, clusterPartitionResource, + clusterFreePartitionResource, headroom); return headroom; } @@ -1045,10 +1070,10 @@ public class LeafQueue extends AbstractCSQueue { nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); - + Resource headroom = getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), - clusterResource, userLimit); + clusterResource, userLimit, nodePartition); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 171196fec8b..e668d94ecd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -579,8 +580,12 @@ public class TestApplicationLimits { when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null, + "root", queues, queues, TestUtils.spyHook); + + ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueCapacities); // Manipulate queue 'a' LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A)); @@ -657,8 +662,7 @@ public class TestApplicationLimits { queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute assertEquals(expectedHeadroom, app_0_0.getHeadroom()); - // TODO, need fix headroom in future patch - // assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change + assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = @@ -679,9 +683,8 @@ public class TestApplicationLimits { clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); - // TODO, need fix headroom in future patch -// assertEquals(expectedHeadroom, app_0_1.getHeadroom()); -// assertEquals(expectedHeadroom, app_1_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom()); + assertEquals(expectedHeadroom, app_1_0.getHeadroom()); // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); @@ -689,9 +692,8 @@ public class TestApplicationLimits { clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); - // TODO, need fix headroom in future patch -// assertEquals(expectedHeadroom, app_0_1.getHeadroom()); -// assertEquals(expectedHeadroom, app_1_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom()); + assertEquals(expectedHeadroom, app_1_0.getHeadroom()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index 89fcb166272..d33555265d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -18,12 +18,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -31,11 +48,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -47,7 +74,8 @@ public class TestApplicationLimitsByPartition { RMNodeLabelsManager mgr; private YarnConfiguration conf; - RMContext rmContext = null; + private final ResourceCalculator resourceCalculator = + new DefaultResourceCalculator(); @Before public void setUp() throws IOException { @@ -538,4 +566,174 @@ public class TestApplicationLimitsByPartition { rm1.close(); } + + @Test + public void testHeadroom() throws Exception { + /* + * Test Case: Verify Headroom calculated is sum of headrooms for each + * partition requested. So submit a app with requests for default partition + * and 'x' partition, so the total headroom for the user should be sum of + * the head room for both labels. + */ + + simpleNodeLabelMappingToManager(); + CapacitySchedulerConfiguration csConf = + (CapacitySchedulerConfiguration) TestUtils + .getComplexConfigurationWithQueueLabels(conf); + final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1"; + final String B2 = CapacitySchedulerConfiguration.ROOT + ".b" + ".b2"; + csConf.setUserLimit(A1, 25); + csConf.setUserLimit(B2, 25); + + YarnConfiguration conf = new YarnConfiguration(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()) + .thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()) + .thenReturn(Resources.createResource(16 * GB)); + when(csContext.getNonPartitionedQueueComparator()) + .thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + RMContext rmContext = TestUtils.getMockRMContext(); + RMContext spyRMContext = spy(rmContext); + when(spyRMContext.getNodeLabelManager()).thenReturn(mgr); + when(csContext.getRMContext()).thenReturn(spyRMContext); + + mgr.activateNode(NodeId.newInstance("h0", 0), + Resource.newInstance(160 * GB, 16)); // default Label + mgr.activateNode(NodeId.newInstance("h1", 0), + Resource.newInstance(160 * GB, 16)); // label x + mgr.activateNode(NodeId.newInstance("h2", 0), + Resource.newInstance(160 * GB, 16)); // label y + + // Say cluster has 100 nodes of 16G each + Resource clusterResource = Resources.createResource(160 * GB); + when(csContext.getClusterResource()).thenReturn(clusterResource); + + Map queues = new HashMap(); + CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null, + "root", queues, queues, TestUtils.spyHook); + + ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueResUsage); + + // Manipulate queue 'a' + LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue) queues.get("b2")); + queue.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode("h0", rack_0, 0, 160 * GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode("h1", rack_0, 0, 160 * GB); + + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + ConcurrentMap spyApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + ResourceRequest amResourceRequest = mock(ResourceRequest.class); + Resource amResource = Resources.createResource(0, 0); + when(amResourceRequest.getCapability()).thenReturn(amResource); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); + when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any())) + .thenReturn(rmAppAttempt); + when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); + Mockito.doReturn(true).when(spyApps) + .containsKey((ApplicationId) Matchers.any()); + + Priority priority_1 = TestUtils.createMockPriority(1); + + // Submit first application with some resource-requests from user_0, + // and check headroom + final ApplicationAttemptId appAttemptId_0_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0, + queue, queue.getActiveUsersManager(), spyRMContext); + queue.submitApplicationAttempt(app_0_0, user_0); + + List app_0_0_requests = new ArrayList(); + app_0_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory)); + app_0_0.updateResourceRequests(app_0_0_requests); + + // Schedule to compute + queue.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + //head room = queue capacity = 50 % 90% 160 GB + Resource expectedHeadroom = + Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + + // Submit second application from user_0, check headroom + final ApplicationAttemptId appAttemptId_0_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0, + queue, queue.getActiveUsersManager(), spyRMContext); + queue.submitApplicationAttempt(app_0_1, user_0); + + List app_0_1_requests = new ArrayList(); + app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory)); + app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory, "y")); + app_0_1.updateResourceRequests(app_0_1_requests); + + // Schedule to compute + queue.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute + queue.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute + assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change + //head room for default label + head room for y partition + //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB + Resource expectedHeadroomWithReqInY = + Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom); + assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom()); + + // Submit first application from user_1, check for new headroom + final ApplicationAttemptId appAttemptId_1_0 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1, + queue, queue.getActiveUsersManager(), spyRMContext); + queue.submitApplicationAttempt(app_1_0, user_1); + + List app_1_0_requests = new ArrayList(); + app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory)); + app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory, "y")); + app_1_0.updateResourceRequests(app_1_0_requests); + + // Schedule to compute + queue.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute + //head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users) + expectedHeadroom = + Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1); + //head room for default label + head room for y partition + //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB + expectedHeadroomWithReqInY = + Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1), + expectedHeadroom); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom()); + assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom()); + + + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 87a3d512122..263b95bdd1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -23,13 +23,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -47,7 +44,6 @@ import java.util.concurrent.CyclicBarrier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -71,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -91,7 +88,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; -import org.mortbay.log.Log; public class TestLeafQueue { private final RecordFactory recordFactory = @@ -165,6 +161,10 @@ public class TestLeafQueue { queues, queues, TestUtils.spyHook); + ResourceUsage queueResUsage = root.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueResUsage); + cs.setRMContext(spyRMContext); cs.init(csConf); cs.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 56facee37bf..632b54705c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -138,6 +139,10 @@ public class TestReservations { root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + ResourceUsage queueResUsage = root.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueResUsage); + spyRMContext = spy(rmContext); when(spyRMContext.getScheduler()).thenReturn(cs); when(spyRMContext.getYarnConfiguration()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 4441c6b4afc..621c5c52d6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -159,7 +159,7 @@ public class TestUtils { public static ResourceRequest createResourceRequest( String resourceName, int memory, int numContainers, boolean relaxLocality, - Priority priority, RecordFactory recordFactory) { + Priority priority, RecordFactory recordFactory, String labelExpression) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); Resource capability = Resources.createResource(memory, 1); @@ -169,10 +169,18 @@ public class TestUtils { request.setCapability(capability); request.setRelaxLocality(relaxLocality); request.setPriority(priority); - request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + request.setNodeLabelExpression(labelExpression); return request; } + public static ResourceRequest createResourceRequest( + String resourceName, int memory, int numContainers, boolean relaxLocality, + Priority priority, + RecordFactory recordFactory) { + return createResourceRequest(resourceName, memory, numContainers, + relaxLocality, priority, recordFactory, RMNodeLabelsManager.NO_LABEL); + } + public static ApplicationId getMockApplicationId(int appId) { return ApplicationId.newInstance(0L, appId); }