YARN-4963. capacity scheduler: Make number of OFF_SWITCH assignments per heartbeat configurable. Contributed by Nathan Roberts

This commit is contained in:
Jason Lowe 2016-10-28 17:30:15 +00:00
parent 1b79c417dc
commit 1eae719bce
4 changed files with 77 additions and 18 deletions

View File

@ -131,4 +131,16 @@
</description> </description>
</property> </property>
<property>
<name>yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments</name>
<value>1</value>
<description>
Controls the number of OFF_SWITCH assignments allowed
during a node's heartbeat. Increasing this value can improve
scheduling rate for OFF_SWITCH containers. Lower values reduce
"clumping" of applications on particular nodes. The default is 1.
Legal values are 1-MAX_INT. This config is refreshable.
</description>
</property>
</configuration> </configuration>

View File

@ -190,6 +190,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String RACK_LOCALITY_FULL_RESET = public static final String RACK_LOCALITY_FULL_RESET =
PREFIX + "rack-locality-full-reset"; PREFIX + "rack-locality-full-reset";
@Private
public static final int DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT = 1;
@Private
public static final String OFFSWITCH_PER_HEARTBEAT_LIMIT =
PREFIX + "per-node-heartbeat.maximum-offswitch-assignments";
@Private @Private
public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true; public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true;
@ -713,6 +720,20 @@ public boolean getEnableUserMetrics() {
return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS); return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
} }
public int getOffSwitchPerHeartbeatLimit() {
int limit = getInt(OFFSWITCH_PER_HEARTBEAT_LIMIT,
DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT);
if (limit < 1) {
LOG.warn(OFFSWITCH_PER_HEARTBEAT_LIMIT + "(" + limit + ") < 1. Using 1.");
limit = 1;
}
return limit;
}
public void setOffSwitchPerHeartbeatLimit(int limit) {
setInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, limit);
}
public int getNodeLocalityDelay() { public int getNodeLocalityDelay() {
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY); return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
} }

View File

@ -76,6 +76,7 @@ public class ParentQueue extends AbstractCSQueue {
volatile int numApplications; volatile int numApplications;
private final CapacitySchedulerContext scheduler; private final CapacitySchedulerContext scheduler;
private boolean needToResortQueuesAtNextAllocation = false; private boolean needToResortQueuesAtNextAllocation = false;
private int offswitchPerHeartbeatLimit;
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
@ -125,12 +126,16 @@ void setupQueueConfigs(Resource clusterResource)
} }
} }
offswitchPerHeartbeatLimit =
csContext.getConfiguration().getOffSwitchPerHeartbeatLimit();
LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity()
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+ ", absoluteMaxCapacity=" + this.queueCapacities + ", absoluteMaxCapacity=" + this.queueCapacities
.getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+ ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit()
+ ", reservationsContinueLooking=" + reservationsContinueLooking); + ", reservationsContinueLooking=" + reservationsContinueLooking);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
@ -210,6 +215,11 @@ public QueueInfo getQueueInfo(
} }
@Private
public int getOffSwitchPerHeartbeatLimit() {
return offswitchPerHeartbeatLimit;
}
private QueueUserACLInfo getUserAclInfo( private QueueUserACLInfo getUserAclInfo(
UserGroupInformation user) { UserGroupInformation user) {
try { try {
@ -427,6 +437,7 @@ private String getParentName() {
public CSAssignment assignContainers(Resource clusterResource, public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits resourceLimits, FiCaSchedulerNode node, ResourceLimits resourceLimits,
SchedulingMode schedulingMode) { SchedulingMode schedulingMode) {
int offswitchCount = 0;
try { try {
writeLock.lock(); writeLock.lock();
// if our queue cannot access this node, just return // if our queue cannot access this node, just return
@ -582,13 +593,18 @@ public CSAssignment assignContainers(Resource clusterResource,
+ getAbsoluteUsedCapacity()); + getAbsoluteUsedCapacity());
} }
// Do not assign more than one container if this isn't the root queue if (assignment.getType() == NodeType.OFF_SWITCH) {
// or if we've already assigned an off-switch container offswitchCount++;
if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { }
// Do not assign more containers if this isn't the root queue
// or if we've already assigned enough OFF_SWITCH containers in
// this pass
if (!rootQueue || offswitchCount >= getOffSwitchPerHeartbeatLimit()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { if (rootQueue && offswitchCount >= getOffSwitchPerHeartbeatLimit()) {
LOG.debug("Not assigning more than one off-switch container," LOG.debug("Assigned maximum number of off-switch containers: " +
+ " assignments so far: " + assignment); offswitchCount + ", assignments so far: " + assignment);
} }
} }
break; break;

View File

@ -29,6 +29,7 @@
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.HashMap; import java.util.HashMap;
@ -613,6 +614,8 @@ public void testQueueCapacityZero() throws Exception {
public void testOffSwitchScheduling() throws Exception { public void testOffSwitchScheduling() throws Exception {
// Setup queue configs // Setup queue configs
setupSingleLevelQueues(csConf); setupSingleLevelQueues(csConf);
csConf.setOffSwitchPerHeartbeatLimit(2);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root = CSQueue root =
@ -643,12 +646,18 @@ public void testOffSwitchScheduling() throws Exception {
queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
.incPending(Resources.createResource(1 * GB)); .incPending(Resources.createResource(1 * GB));
// Simulate B returning a container on node_0 // Simulate returning 2 containers on node_0 before offswitch limit
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource); InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource);
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
@ -657,27 +666,28 @@ public void testOffSwitchScheduling() throws Exception {
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1, root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, b); allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource), allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource), allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource);
// Now, B should get the scheduling opportunity // Now, B should get the scheduling opportunity
// since A has 2/6G while B has 2/14G, // since A has 2/6G while B has 2/14G,
// However, since B returns off-switch, A won't get an opportunity // A also gets an opportunity because offswitchlimit not reached
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a); allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource), allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource), allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource);
} }