diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
index 30f4eb96c6c..6ac726ee6c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
@@ -131,4 +131,16 @@
+
+ yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments
+ 1
+
+ Controls the number of OFF_SWITCH assignments allowed
+ during a node's heartbeat. Increasing this value can improve
+ scheduling rate for OFF_SWITCH containers. Lower values reduce
+ "clumping" of applications on particular nodes. The default is 1.
+ Legal values are 1-MAX_INT. This config is refreshable.
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index c7fe93954d7..0ab908a59e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -190,6 +190,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String RACK_LOCALITY_FULL_RESET =
PREFIX + "rack-locality-full-reset";
+ @Private
+ public static final int DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT = 1;
+
+ @Private
+ public static final String OFFSWITCH_PER_HEARTBEAT_LIMIT =
+ PREFIX + "per-node-heartbeat.maximum-offswitch-assignments";
+
@Private
public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true;
@@ -707,6 +714,20 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
}
+ public int getOffSwitchPerHeartbeatLimit() {
+ int limit = getInt(OFFSWITCH_PER_HEARTBEAT_LIMIT,
+ DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT);
+ if (limit < 1) {
+ LOG.warn(OFFSWITCH_PER_HEARTBEAT_LIMIT + "(" + limit + ") < 1. Using 1.");
+ limit = 1;
+ }
+ return limit;
+ }
+
+ public void setOffSwitchPerHeartbeatLimit(int limit) {
+ setInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, limit);
+ }
+
public int getNodeLocalityDelay() {
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index c5f1e11d7ef..29c032b1d6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -76,6 +76,7 @@ public class ParentQueue extends AbstractCSQueue {
volatile int numApplications;
private final CapacitySchedulerContext scheduler;
private boolean needToResortQueuesAtNextAllocation = false;
+ private int offswitchPerHeartbeatLimit;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -123,6 +124,9 @@ public class ParentQueue extends AbstractCSQueue {
}
}
+ offswitchPerHeartbeatLimit =
+ csContext.getConfiguration().getOffSwitchPerHeartbeatLimit();
+
LOG.info(queueName +
", capacity=" + this.queueCapacities.getCapacity() +
", asboluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
@@ -130,7 +134,8 @@ public class ParentQueue extends AbstractCSQueue {
", asboluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() +
", state=" + state +
", acls=" + aclsString +
- ", labels=" + labelStrBuilder.toString() + "\n" +
+ ", labels=" + labelStrBuilder.toString() +
+ ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit() +
", reservationsContinueLooking=" + reservationsContinueLooking);
}
@@ -196,6 +201,11 @@ public class ParentQueue extends AbstractCSQueue {
return queueInfo;
}
+ @Private
+ public int getOffSwitchPerHeartbeatLimit() {
+ return offswitchPerHeartbeatLimit;
+ }
+
private synchronized QueueUserACLInfo getUserAclInfo(
UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
@@ -383,6 +393,8 @@ public class ParentQueue extends AbstractCSQueue {
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits resourceLimits,
SchedulingMode schedulingMode) {
+ int offswitchCount = 0;
+
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
@@ -478,13 +490,18 @@ public class ParentQueue extends AbstractCSQueue {
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity());
}
- // Do not assign more than one container if this isn't the root queue
- // or if we've already assigned an off-switch container
- if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
+ if (assignment.getType() == NodeType.OFF_SWITCH) {
+ offswitchCount++;
+ }
+
+ // Do not assign more containers if this isn't the root queue
+ // or if we've already assigned enough OFF_SWITCH containers in
+ // this pass
+ if (!rootQueue || offswitchCount >= getOffSwitchPerHeartbeatLimit()) {
if (LOG.isDebugEnabled()) {
- if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
- LOG.debug("Not assigning more than one off-switch container," +
- " assignments so far: " + assignment);
+ if (rootQueue && offswitchCount >= getOffSwitchPerHeartbeatLimit()) {
+ LOG.debug("Assigned maximum number of off-switch containers: " +
+ offswitchCount + ", assignments so far: " + assignment);
}
}
break;
@@ -821,4 +838,4 @@ public class ParentQueue extends AbstractCSQueue {
public synchronized int getNumApplications() {
return numApplications;
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index e3c04f8dd9e..1dc8d130890 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import java.util.HashMap;
@@ -611,6 +612,8 @@ public class TestParentQueue {
public void testOffSwitchScheduling() throws Exception {
// Setup queue configs
setupSingleLevelQueues(csConf);
+ csConf.setOffSwitchPerHeartbeatLimit(2);
+
Map queues = new HashMap();
CSQueue root =
@@ -641,12 +644,18 @@ public class TestParentQueue {
queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
.incPending(Resources.createResource(1 * GB));
- // Simulate B returning a container on node_0
- stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
+ // Simulate returning 2 containers on node_0 before offswitch limit
+ stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
+
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verifyQueueMetrics(a, 0*GB, clusterResource);
+ InOrder allocationOrder = inOrder(a, b);
+ allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+ allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
+ verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
@@ -655,27 +664,28 @@ public class TestParentQueue {
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- InOrder allocationOrder = inOrder(a, b);
- allocationOrder.verify(a).assignContainers(eq(clusterResource),
+ allocationOrder = inOrder(a, b);
+ allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
- allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
- verifyQueueMetrics(a, 2*GB, clusterResource);
+ verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
// Now, B should get the scheduling opportunity
// since A has 2/6G while B has 2/14G,
- // However, since B returns off-switch, A won't get an opportunity
+ // A also gets an opportunity because offswitchlimit not reached
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
+
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
- allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
- allocationOrder.verify(a).assignContainers(eq(clusterResource),
+ allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
- verifyQueueMetrics(a, 2*GB, clusterResource);
+ verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
}