diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 30f4eb96c6c..6ac726ee6c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -131,4 +131,16 @@ + + yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments + 1 + + Controls the number of OFF_SWITCH assignments allowed + during a node's heartbeat. Increasing this value can improve + scheduling rate for OFF_SWITCH containers. Lower values reduce + "clumping" of applications on particular nodes. The default is 1. + Legal values are 1-MAX_INT. This config is refreshable. + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374d6c8..6db5074f233 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -190,6 +190,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String RACK_LOCALITY_FULL_RESET = PREFIX + "rack-locality-full-reset"; + @Private + public static final int DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT = 1; + + @Private + public static final String OFFSWITCH_PER_HEARTBEAT_LIMIT = + PREFIX + "per-node-heartbeat.maximum-offswitch-assignments"; + @Private public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true; @@ -713,6 +720,20 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS); } + public int getOffSwitchPerHeartbeatLimit() { + int limit = getInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, + DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT); + if (limit < 1) { + LOG.warn(OFFSWITCH_PER_HEARTBEAT_LIMIT + "(" + limit + ") < 1. Using 1."); + limit = 1; + } + return limit; + } + + public void setOffSwitchPerHeartbeatLimit(int limit) { + setInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, limit); + } + public int getNodeLocalityDelay() { return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index ffb68928ec9..a69af6ecdf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -76,6 +76,7 @@ public class ParentQueue extends AbstractCSQueue { volatile int numApplications; private final CapacitySchedulerContext scheduler; private boolean needToResortQueuesAtNextAllocation = false; + private int offswitchPerHeartbeatLimit; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -125,12 +126,16 @@ public class ParentQueue extends AbstractCSQueue { } } + offswitchPerHeartbeatLimit = + csContext.getConfiguration().getOffSwitchPerHeartbeatLimit(); + LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + + ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit() + ", reservationsContinueLooking=" + reservationsContinueLooking); } finally { writeLock.unlock(); @@ -210,6 +215,11 @@ public class ParentQueue extends AbstractCSQueue { } + @Private + public int getOffSwitchPerHeartbeatLimit() { + return offswitchPerHeartbeatLimit; + } + private QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { try { @@ -427,6 +437,7 @@ public class ParentQueue extends AbstractCSQueue { public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + int offswitchCount = 0; try { writeLock.lock(); // if our queue cannot access this node, just return @@ -582,13 +593,18 @@ public class ParentQueue extends AbstractCSQueue { + getAbsoluteUsedCapacity()); } - // Do not assign more than one container if this isn't the root queue - // or if we've already assigned an off-switch container - if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { + if (assignment.getType() == NodeType.OFF_SWITCH) { + offswitchCount++; + } + + // Do not assign more containers if this isn't the root queue + // or if we've already assigned enough OFF_SWITCH containers in + // this pass + if (!rootQueue || offswitchCount >= getOffSwitchPerHeartbeatLimit()) { if (LOG.isDebugEnabled()) { - if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { - LOG.debug("Not assigning more than one off-switch container," - + " assignments so far: " + assignment); + if (rootQueue && offswitchCount >= getOffSwitchPerHeartbeatLimit()) { + LOG.debug("Assigned maximum number of off-switch containers: " + + offswitchCount + ", assignments so far: " + assignment); } } break; @@ -1046,4 +1062,4 @@ public class ParentQueue extends AbstractCSQueue { } } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 890e998ea97..42a88724cec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import java.util.HashMap; @@ -613,6 +614,8 @@ public class TestParentQueue { public void testOffSwitchScheduling() throws Exception { // Setup queue configs setupSingleLevelQueues(csConf); + csConf.setOffSwitchPerHeartbeatLimit(2); + Map queues = new HashMap(); CSQueue root = @@ -643,12 +646,18 @@ public class TestParentQueue { queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() .incPending(Resources.createResource(1 * GB)); - // Simulate B returning a container on node_0 - stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); + // Simulate returning 2 containers on node_0 before offswitch limit + stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verifyQueueMetrics(a, 0*GB, clusterResource); + InOrder allocationOrder = inOrder(a, b); + allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), + any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G @@ -657,27 +666,28 @@ public class TestParentQueue { stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - InOrder allocationOrder = inOrder(a, b); - allocationOrder.verify(a).assignContainers(eq(clusterResource), + allocationOrder = inOrder(a, b); + allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b).assignContainers(eq(clusterResource), + allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - verifyQueueMetrics(a, 2*GB, clusterResource); + verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); // Now, B should get the scheduling opportunity // since A has 2/6G while B has 2/14G, - // However, since B returns off-switch, A won't get an opportunity + // A also gets an opportunity because offswitchlimit not reached stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b).assignContainers(eq(clusterResource), + allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), + allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - verifyQueueMetrics(a, 2*GB, clusterResource); + verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); }