From 358b54d0635a28aa66dde4f12f81936004102b8c Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 22 Apr 2016 15:59:49 -0700 Subject: [PATCH] YARN-3215. Respect labels in CapacityScheduler when computing headroom. (Naganarasimha G R via wangda) --- .../scheduler/AppSchedulingInfo.java | 36 ++-- .../scheduler/capacity/AbstractCSQueue.java | 15 +- .../capacity/CapacityHeadroomProvider.java | 28 ++- .../scheduler/capacity/CapacityScheduler.java | 45 ++-- .../capacity/CapacitySchedulerContext.java | 18 +- .../scheduler/capacity/LeafQueue.java | 53 +++-- .../capacity/TestApplicationLimits.java | 56 ++--- .../TestApplicationLimitsByPartition.java | 200 +++++++++++++++++- .../scheduler/capacity/TestLeafQueue.java | 56 ++--- .../scheduler/capacity/TestReservations.java | 5 + .../scheduler/capacity/TestUtils.java | 12 +- 11 files changed, 405 insertions(+), 119 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 4df1f7c7341..9b2ba14ec70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -18,19 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -45,10 +32,24 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * This class keeps track of all the consumption of an application. This also * keeps track of current running/completed containers for the application. @@ -74,6 +75,7 @@ public class AppSchedulingInfo { private final Set amBlacklist = new HashSet<>(); private Set userBlacklist = new HashSet<>(); + private Set requestedPartitions = new HashSet<>(); final Set priorities = new TreeSet<>(COMPARATOR); final Map> resourceRequestMap = @@ -118,6 +120,10 @@ public class AppSchedulingInfo { return pending; } + public Set getRequestedPartitions() { + return requestedPartitions; + } + /** * Clear any pending requests from this application. */ @@ -339,6 +345,10 @@ public class AppSchedulingInfo { asks.put(resourceName, request); if (resourceName.equals(ResourceRequest.ANY)) { + //update the applications requested labels set + requestedPartitions.add(request.getNodeLabelExpression() == null + ? RMNodeLabelsManager.NO_LABEL : request.getNodeLabelExpression()); + anyResourcesUpdated = true; // Activate application. Metrics activation is done here. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 549fa3485a7..0921a24d7b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -434,14 +434,12 @@ public abstract class AbstractCSQueue implements CSQueue { * limit-set-by-parent) */ Resource queueMaxResource = - Resources.multiplyAndNormalizeDown(resourceCalculator, - labelManager.getResourceByLabel(nodePartition, clusterResource), - queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation); + getQueueMaxResource(nodePartition, clusterResource); if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { return Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit()); } - return queueMaxResource; + return queueMaxResource; } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { // When we doing non-exclusive resource allocation, maximum capacity of // all queues on this label equals to total resource with the label. @@ -450,7 +448,14 @@ public abstract class AbstractCSQueue implements CSQueue { return Resources.none(); } - + + Resource getQueueMaxResource(String nodePartition, Resource clusterResource) { + return Resources.multiplyAndNormalizeDown(resourceCalculator, + labelManager.getResourceByLabel(nodePartition, clusterResource), + queueCapacities.getAbsoluteMaximumCapacity(nodePartition), + minimumAllocation); + } + synchronized boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java index a3adf9a91a3..95a12dc9399 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import java.util.Set; + import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; public class CapacityHeadroomProvider { @@ -38,22 +42,32 @@ public class CapacityHeadroomProvider { } public Resource getHeadroom() { - + Resource queueCurrentLimit; Resource clusterResource; synchronized (queueResourceLimitsInfo) { queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); clusterResource = queueResourceLimitsInfo.getClusterResource(); } - Resource headroom = queue.getHeadroom(user, queueCurrentLimit, - clusterResource, application); - + Set requestedPartitions = + application.getAppSchedulingInfo().getRequestedPartitions(); + Resource headroom; + if (requestedPartitions.isEmpty() || (requestedPartitions.size() == 1 + && requestedPartitions.contains(RMNodeLabelsManager.NO_LABEL))) { + headroom = queue.getHeadroom(user, queueCurrentLimit, clusterResource, + application); + } else { + headroom = Resource.newInstance(0, 0); + for (String partition : requestedPartitions) { + Resource partitionHeadRoom = queue.getHeadroom(user, queueCurrentLimit, + clusterResource, application, partition); + Resources.addTo(headroom, partitionHeadRoom); + } + } // Corner case to deal with applications being slightly over-limit if (headroom.getMemory() < 0) { headroom.setMemory(0); } return headroom; - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1235946b297..be3ab5fd001 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,24 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -101,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -131,8 +116,23 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; @LimitedPrivate("yarn") @Evolving @@ -2057,4 +2057,9 @@ public class CapacityScheduler extends + rmApp.getQueue() + " for application: " + applicationId + " for the user: " + rmApp.getUser()); } + + @Override + public ResourceUsage getClusterResourceUsage() { + return root.getQueueResourceUsage(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 2a0dd0da981..0069453e961 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -18,17 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.Comparator; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import java.util.Comparator; + /** * Read-only interface to {@link CapacityScheduler} context. */ @@ -61,4 +64,15 @@ public interface CapacitySchedulerContext { PartitionedQueueComparator getPartitionedQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); + + FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId attemptId); + + SchedulerHealth getSchedulerHealth(); + + /** + * @return QueueCapacities root queue of the Capacity Scheduler Queue, root + * queue used capacities for different labels are same as that of the + * cluster. + */ + ResourceUsage getClusterResourceUsage(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a29f14e742b..16c1468c0c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -945,14 +945,21 @@ public class LeafQueue extends AbstractCSQueue { protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application) { - return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application, clusterResource, user, - RMNodeLabelsManager.NO_LABEL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + return getHeadroom(user, queueCurrentLimit, clusterResource, application, + RMNodeLabelsManager.NO_LABEL); } - - private Resource getHeadroom(User user, Resource currentResourceLimit, - Resource clusterResource, Resource userLimit) { + + protected Resource getHeadroom(User user, Resource queueCurrentLimit, + Resource clusterResource, FiCaSchedulerApp application, + String partition) { + return getHeadroom(user, queueCurrentLimit, clusterResource, + computeUserLimit(application, clusterResource, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); + } + + private Resource getHeadroom(User user, + Resource currentPartitionResourceLimit, Resource clusterResource, + Resource userLimitResource, String partition) { /** * Headroom is: * min( @@ -969,15 +976,33 @@ public class LeafQueue extends AbstractCSQueue { * * >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) << * + * sum of queue max capacities of multiple queue's will be greater than the + * actual capacity of a given partition, hence we need to ensure that the + * headroom is not greater than the available resource for a given partition + * + * headroom = min (unused resourcelimit of a label, calculated headroom ) */ - Resource headroom = - Resources.componentwiseMin( - Resources.subtract(userLimit, user.getUsed()), - Resources.subtract(currentResourceLimit, queueUsage.getUsed()) - ); + currentPartitionResourceLimit = + partition.equals(RMNodeLabelsManager.NO_LABEL) + ? currentPartitionResourceLimit + : getQueueMaxResource(partition, clusterResource); + + Resource headroom = Resources.componentwiseMin( + Resources.subtract(userLimitResource, user.getUsed(partition)), + Resources.subtract(currentPartitionResourceLimit, + queueUsage.getUsed(partition))); // Normalize it before return headroom = Resources.roundDown(resourceCalculator, headroom, minimumAllocation); + + //headroom = min (unused resourcelimit of a label, calculated headroom ) + Resource clusterPartitionResource = + labelManager.getResourceByLabel(partition, clusterResource); + Resource clusterFreePartitionResource = + Resources.subtract(clusterPartitionResource, + csContext.getClusterResourceUsage().getUsed(partition)); + headroom = Resources.min(resourceCalculator, clusterPartitionResource, + clusterFreePartitionResource, headroom); return headroom; } @@ -1004,10 +1029,10 @@ public class LeafQueue extends AbstractCSQueue { nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); - + Resource headroom = getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), - clusterResource, userLimit); + clusterResource, userLimit, nodePartition); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 0b32676a2eb..fb48dec4a21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -17,23 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -52,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -64,6 +48,23 @@ import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + public class TestApplicationLimits { private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class); @@ -577,8 +578,12 @@ public class TestApplicationLimits { when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null, + "root", queues, queues, TestUtils.spyHook); + + ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueCapacities); // Manipulate queue 'a' LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A)); @@ -655,8 +660,7 @@ public class TestApplicationLimits { queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute assertEquals(expectedHeadroom, app_0_0.getHeadroom()); - // TODO, need fix headroom in future patch - // assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change + assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = @@ -677,9 +681,8 @@ public class TestApplicationLimits { clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); - // TODO, need fix headroom in future patch -// assertEquals(expectedHeadroom, app_0_1.getHeadroom()); -// assertEquals(expectedHeadroom, app_1_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom()); + assertEquals(expectedHeadroom, app_1_0.getHeadroom()); // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); @@ -687,9 +690,8 @@ public class TestApplicationLimits { clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); - // TODO, need fix headroom in future patch -// assertEquals(expectedHeadroom, app_0_1.getHeadroom()); -// assertEquals(expectedHeadroom, app_1_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom()); + assertEquals(expectedHeadroom, app_1_0.getHeadroom()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index 89fcb166272..d33555265d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -18,12 +18,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -31,11 +48,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -47,7 +74,8 @@ public class TestApplicationLimitsByPartition { RMNodeLabelsManager mgr; private YarnConfiguration conf; - RMContext rmContext = null; + private final ResourceCalculator resourceCalculator = + new DefaultResourceCalculator(); @Before public void setUp() throws IOException { @@ -538,4 +566,174 @@ public class TestApplicationLimitsByPartition { rm1.close(); } + + @Test + public void testHeadroom() throws Exception { + /* + * Test Case: Verify Headroom calculated is sum of headrooms for each + * partition requested. So submit a app with requests for default partition + * and 'x' partition, so the total headroom for the user should be sum of + * the head room for both labels. + */ + + simpleNodeLabelMappingToManager(); + CapacitySchedulerConfiguration csConf = + (CapacitySchedulerConfiguration) TestUtils + .getComplexConfigurationWithQueueLabels(conf); + final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1"; + final String B2 = CapacitySchedulerConfiguration.ROOT + ".b" + ".b2"; + csConf.setUserLimit(A1, 25); + csConf.setUserLimit(B2, 25); + + YarnConfiguration conf = new YarnConfiguration(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()) + .thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()) + .thenReturn(Resources.createResource(16 * GB)); + when(csContext.getNonPartitionedQueueComparator()) + .thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + RMContext rmContext = TestUtils.getMockRMContext(); + RMContext spyRMContext = spy(rmContext); + when(spyRMContext.getNodeLabelManager()).thenReturn(mgr); + when(csContext.getRMContext()).thenReturn(spyRMContext); + + mgr.activateNode(NodeId.newInstance("h0", 0), + Resource.newInstance(160 * GB, 16)); // default Label + mgr.activateNode(NodeId.newInstance("h1", 0), + Resource.newInstance(160 * GB, 16)); // label x + mgr.activateNode(NodeId.newInstance("h2", 0), + Resource.newInstance(160 * GB, 16)); // label y + + // Say cluster has 100 nodes of 16G each + Resource clusterResource = Resources.createResource(160 * GB); + when(csContext.getClusterResource()).thenReturn(clusterResource); + + Map queues = new HashMap(); + CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null, + "root", queues, queues, TestUtils.spyHook); + + ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueResUsage); + + // Manipulate queue 'a' + LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue) queues.get("b2")); + queue.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode("h0", rack_0, 0, 160 * GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode("h1", rack_0, 0, 160 * GB); + + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + ConcurrentMap spyApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + ResourceRequest amResourceRequest = mock(ResourceRequest.class); + Resource amResource = Resources.createResource(0, 0); + when(amResourceRequest.getCapability()).thenReturn(amResource); + when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); + when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any())) + .thenReturn(rmAppAttempt); + when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); + Mockito.doReturn(true).when(spyApps) + .containsKey((ApplicationId) Matchers.any()); + + Priority priority_1 = TestUtils.createMockPriority(1); + + // Submit first application with some resource-requests from user_0, + // and check headroom + final ApplicationAttemptId appAttemptId_0_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0, + queue, queue.getActiveUsersManager(), spyRMContext); + queue.submitApplicationAttempt(app_0_0, user_0); + + List app_0_0_requests = new ArrayList(); + app_0_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory)); + app_0_0.updateResourceRequests(app_0_0_requests); + + // Schedule to compute + queue.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + //head room = queue capacity = 50 % 90% 160 GB + Resource expectedHeadroom = + Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + + // Submit second application from user_0, check headroom + final ApplicationAttemptId appAttemptId_0_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0, + queue, queue.getActiveUsersManager(), spyRMContext); + queue.submitApplicationAttempt(app_0_1, user_0); + + List app_0_1_requests = new ArrayList(); + app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory)); + app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory, "y")); + app_0_1.updateResourceRequests(app_0_1_requests); + + // Schedule to compute + queue.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute + queue.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute + assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change + //head room for default label + head room for y partition + //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB + Resource expectedHeadroomWithReqInY = + Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom); + assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom()); + + // Submit first application from user_1, check for new headroom + final ApplicationAttemptId appAttemptId_1_0 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1, + queue, queue.getActiveUsersManager(), spyRMContext); + queue.submitApplicationAttempt(app_1_0, user_1); + + List app_1_0_requests = new ArrayList(); + app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory)); + app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 2, true, priority_1, recordFactory, "y")); + app_1_0.updateResourceRequests(app_1_0_requests); + + // Schedule to compute + queue.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute + //head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users) + expectedHeadroom = + Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1); + //head room for default label + head room for y partition + //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB + expectedHeadroomWithReqInY = + Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1), + expectedHeadroom); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom()); + assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom()); + + + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 42dcd6de5e3..383088a7766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -18,36 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.ConcurrentModificationException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CyclicBarrier; - import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -71,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -90,7 +64,29 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; -import org.mortbay.log.Log; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class TestLeafQueue { private final RecordFactory recordFactory = @@ -163,6 +159,10 @@ public class TestLeafQueue { queues, queues, TestUtils.spyHook); + ResourceUsage queueResUsage = root.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueResUsage); + cs.setRMContext(spyRMContext); cs.init(csConf); cs.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 9b920d08a57..db28d2d2d6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -136,6 +137,10 @@ public class TestReservations { root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + ResourceUsage queueResUsage = root.getQueueResourceUsage(); + when(csContext.getClusterResourceUsage()) + .thenReturn(queueResUsage); + spyRMContext = spy(rmContext); when(spyRMContext.getScheduler()).thenReturn(cs); when(spyRMContext.getYarnConfiguration()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 17860690b9b..d2c51743633 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -159,7 +159,7 @@ public class TestUtils { public static ResourceRequest createResourceRequest( String resourceName, int memory, int numContainers, boolean relaxLocality, - Priority priority, RecordFactory recordFactory) { + Priority priority, RecordFactory recordFactory, String labelExpression) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); Resource capability = Resources.createResource(memory, 1); @@ -169,10 +169,18 @@ public class TestUtils { request.setCapability(capability); request.setRelaxLocality(relaxLocality); request.setPriority(priority); - request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + request.setNodeLabelExpression(labelExpression); return request; } + public static ResourceRequest createResourceRequest( + String resourceName, int memory, int numContainers, boolean relaxLocality, + Priority priority, + RecordFactory recordFactory) { + return createResourceRequest(resourceName, memory, numContainers, + relaxLocality, priority, recordFactory, RMNodeLabelsManager.NO_LABEL); + } + public static ApplicationId getMockApplicationId(int appId) { return ApplicationId.newInstance(0L, appId); }