YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda)

(cherry picked from commit 796638d9bc)
This commit is contained in:
Wangda Tan 2015-11-12 11:09:37 -08:00
parent 5d9212c139
commit 0c1fb15b29
9 changed files with 191 additions and 14 deletions

View File

@ -502,6 +502,8 @@ Release 2.8.0 - UNRELEASED
YARN-4310. FairScheduler: Log skipping reservation messages at DEBUG level (asuresh) YARN-4310. FairScheduler: Log skipping reservation messages at DEBUG level (asuresh)
YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -595,9 +595,11 @@ public class AppSchedulingInfo {
+ " container=" + container.getId() + " container=" + container.getId()
+ " host=" + container.getNodeId().toString() + " host=" + container.getNodeId().toString()
+ " user=" + user + " user=" + user
+ " resource=" + request.getCapability()); + " resource=" + request.getCapability()
+ " type=" + type);
} }
metrics.allocateResources(user, 1, request.getCapability(), true); metrics.allocateResources(user, 1, request.getCapability(), true);
metrics.incrNodeTypeAggregations(user, type);
return resourceRequests; return resourceRequests;
} }

View File

@ -64,6 +64,12 @@ public class QueueMetrics implements MetricsSource {
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers; @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated; @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of allocated node-local containers")
MutableCounterLong aggregateNodeLocalContainersAllocated;
@Metric("Aggregate # of allocated rack-local containers")
MutableCounterLong aggregateRackLocalContainersAllocated;
@Metric("Aggregate # of allocated off-switch containers")
MutableCounterLong aggregateOffSwitchContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeInt availableMB; @Metric("Available memory in MB") MutableGaugeInt availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores; @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@ -379,6 +385,25 @@ public class QueueMetrics implements MetricsSource {
pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1)); pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
} }
public void incrNodeTypeAggregations(String user, NodeType type) {
if (type == NodeType.NODE_LOCAL) {
aggregateNodeLocalContainersAllocated.incr();
} else if (type == NodeType.RACK_LOCAL) {
aggregateRackLocalContainersAllocated.incr();
} else if (type == NodeType.OFF_SWITCH) {
aggregateOffSwitchContainersAllocated.incr();
} else {
return;
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incrNodeTypeAggregations(user, type);
}
if (parent != null) {
parent.incrNodeTypeAggregations(user, type);
}
}
public void allocateResources(String user, int containers, Resource res, public void allocateResources(String user, int containers, Resource res,
boolean decrPending) { boolean decrPending) {
// if #containers = 0, means change container resource // if #containers = 0, means change container resource
@ -562,6 +587,18 @@ public class QueueMetrics implements MetricsSource {
return aggregateContainersAllocated.value(); return aggregateContainersAllocated.value();
} }
public long getAggregateNodeLocalContainersAllocated() {
return aggregateNodeLocalContainersAllocated.value();
}
public long getAggregateRackLocalContainersAllocated() {
return aggregateRackLocalContainersAllocated.value();
}
public long getAggregateOffSwitchContainersAllocated() {
return aggregateOffSwitchContainersAllocated.value();
}
public long getAggegatedReleasedContainers() { public long getAggegatedReleasedContainers() {
return aggregateContainersReleased.value(); return aggregateContainersReleased.value();
} }

View File

@ -184,6 +184,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private @Private
public static final int DEFAULT_NODE_LOCALITY_DELAY = 40; public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
@Private
public static final String RACK_LOCALITY_FULL_RESET =
PREFIX + "rack-locality-full-reset";
@Private
public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true;
@Private @Private
public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX = public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
PREFIX + "schedule-asynchronously"; PREFIX + "schedule-asynchronously";
@ -665,6 +672,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY); return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
} }
public boolean getRackLocalityFullReset() {
return getBoolean(RACK_LOCALITY_FULL_RESET,
DEFAULT_RACK_LOCALITY_FULL_RESET);
}
public ResourceCalculator getResourceCalculator() { public ResourceCalculator getResourceCalculator() {
return ReflectionUtils.newInstance( return ReflectionUtils.newInstance(
getClass( getClass(

View File

@ -86,6 +86,7 @@ public class LeafQueue extends AbstractCSQueue {
private float maxAMResourcePerQueuePercent; private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay; private volatile int nodeLocalityDelay;
private volatile boolean rackLocalityFullReset;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>(); new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
@ -190,6 +191,7 @@ public class LeafQueue extends AbstractCSQueue {
} }
nodeLocalityDelay = conf.getNodeLocalityDelay(); nodeLocalityDelay = conf.getNodeLocalityDelay();
rackLocalityFullReset = conf.getRackLocalityFullReset();
// re-init this since max allocation could have changed // re-init this since max allocation could have changed
this.minimumAllocationFactor = this.minimumAllocationFactor =
@ -1002,6 +1004,11 @@ public class LeafQueue extends AbstractCSQueue {
return nodeLocalityDelay; return nodeLocalityDelay;
} }
@Lock(NoLock.class)
public boolean getRackLocalityFullReset() {
return rackLocalityFullReset;
}
@Lock(NoLock.class) @Lock(NoLock.class)
private Resource computeUserLimit(FiCaSchedulerApp application, private Resource computeUserLimit(FiCaSchedulerApp application,
Resource clusterResource, User user, Resource clusterResource, User user,

View File

@ -89,7 +89,7 @@ public abstract class AbstractContainerAllocator {
LOG.info("assignedContainer" + " application attempt=" LOG.info("assignedContainer" + " application attempt="
+ application.getApplicationAttemptId() + " container=" + application.getApplicationAttemptId() + " container="
+ updatedContainer.getId() + " queue=" + this + " clusterResource=" + updatedContainer.getId() + " queue=" + this + " clusterResource="
+ clusterResource); + clusterResource + " type=" + assignment.getType());
application application
.getCSLeafQueue() .getCSLeafQueue()

View File

@ -246,8 +246,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
float localityWaitFactor = float localityWaitFactor =
getLocalityWaitFactor(priority, rmContext.getScheduler() getLocalityWaitFactor(priority, rmContext.getScheduler()
.getNumClusterNodes()); .getNumClusterNodes());
// Cap the delay by the number of nodes in the cluster. Under most conditions
return ((requiredContainers * localityWaitFactor) < missedOpportunities); // this means we will consider each node in the cluster before
// accepting an off-switch assignment.
return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
(requiredContainers * localityWaitFactor)) < missedOpportunities);
} }
// Check if we need containers on this rack // Check if we need containers on this rack
@ -643,8 +646,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Resetting scheduling opportunities"); LOG.debug("Resetting scheduling opportunities");
} }
// Only reset scheduling opportunities for RACK_LOCAL if configured
// to do so. Not resetting means we will continue to schedule
// RACK_LOCAL without delay.
if (allocationResult.containerNodeType == NodeType.NODE_LOCAL
|| application.getCSLeafQueue().getRackLocalityFullReset()) {
application.resetSchedulingOpportunities(priority); application.resetSchedulingOpportunities(priority);
} }
}
// Non-exclusive scheduling opportunity is different: we need reset // Non-exclusive scheduling opportunity is different: we need reset
// it every time to make sure non-labeled resource request will be // it every time to make sure non-labeled resource request will be

View File

@ -198,6 +198,53 @@ public class TestQueueMetrics {
checkApps(userSource, 1, 0, 0, 1, 0, 0, true); checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
} }
@Test public void testNodeTypeMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String user = "alice";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
Queue parentQueue = make(stub(Queue.class).returning(parentMetrics).
from.getMetrics());
QueueMetrics metrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
MetricsSource queueSource = queueSource(ms, leafQueueName);
//AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user);
MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL);
checkAggregatedNodeTypes(queueSource,1L,0L,0L);
checkAggregatedNodeTypes(parentQueueSource,1L,0L,0L);
checkAggregatedNodeTypes(userSource,1L,0L,0L);
checkAggregatedNodeTypes(parentUserSource,1L,0L,0L);
metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL);
checkAggregatedNodeTypes(queueSource,1L,1L,0L);
checkAggregatedNodeTypes(parentQueueSource,1L,1L,0L);
checkAggregatedNodeTypes(userSource,1L,1L,0L);
checkAggregatedNodeTypes(parentUserSource,1L,1L,0L);
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource,1L,1L,1L);
checkAggregatedNodeTypes(parentQueueSource,1L,1L,1L);
checkAggregatedNodeTypes(userSource,1L,1L,1L);
checkAggregatedNodeTypes(parentUserSource,1L,1L,1L);
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource,1L,1L,2L);
checkAggregatedNodeTypes(parentQueueSource,1L,1L,2L);
checkAggregatedNodeTypes(userSource,1L,1L,2L);
checkAggregatedNodeTypes(parentUserSource,1L,1L,2L);
}
@Test public void testTwoLevelWithUserMetrics() { @Test public void testTwoLevelWithUserMetrics() {
String parentQueueName = "root"; String parentQueueName = "root";
String leafQueueName = "root.leaf"; String leafQueueName = "root.leaf";
@ -367,6 +414,14 @@ public class TestQueueMetrics {
assertGauge("ReservedContainers", reservedCtnrs, rb); assertGauge("ReservedContainers", reservedCtnrs, rb);
} }
public static void checkAggregatedNodeTypes(MetricsSource source,
long nodeLocal, long rackLocal, long offSwitch) {
MetricsRecordBuilder rb = getMetrics(source);
assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb);
assertCounter("AggregateRackLocalContainersAllocated", rackLocal, rb);
assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb);
}
private static AppSchedulingInfo mockApp(String user) { private static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class); AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user); when(app.getUser()).thenReturn(user);

View File

@ -1506,8 +1506,8 @@ public class TestLeafQueue {
@Test @Test
public void testLocalityScheduling() throws Exception { public void testLocalityScheduling() throws Exception {
// Manipulate queue 'a' // Manipulate queue 'b'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));
// User // User
String user_0 = "user_0"; String user_0 = "user_0";
@ -1614,32 +1614,85 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(host_1, 1*GB, 1, TestUtils.createResourceRequest(host_1, 1*GB, 1,
true, priority, recordFactory)); true, priority, recordFactory));
app_0_requests_0.add( app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1, TestUtils.createResourceRequest(rack_1, 1*GB, 3,
true, priority, recordFactory)); true, priority, recordFactory));
app_0_requests_0.add( app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra
true, priority, recordFactory)); true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0); app_0.updateResourceRequests(app_0_requests_0);
assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(4, app_0.getTotalRequiredResources(priority));
String host_3 = "127.0.0.4"; // on rack_1 String host_3 = "127.0.0.4"; // on rack_1
FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
// Rack-delay // Rack-delay
doReturn(true).when(a).getRackLocalityFullReset();
doReturn(1).when(a).getNodeLocalityDelay(); doReturn(1).when(a).getNodeLocalityDelay();
// Shouldn't assign RACK_LOCAL yet // Shouldn't assign RACK_LOCAL yet
assignment = a.assignContainers(clusterResource, node_3, assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(4, app_0.getTotalRequiredResources(priority));
// Should assign RACK_LOCAL now // Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3, assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(3, app_0.getTotalRequiredResources(priority));
// Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset.
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
// Next time we schedule RACK_LOCAL, don't reset
doReturn(false).when(a).getRackLocalityFullReset();
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
assertEquals(2, app_0.getSchedulingOpportunities(priority)); // should NOT reset
assertEquals(2, app_0.getTotalRequiredResources(priority));
// Another RACK_LOCAL since schedulingOpportunities not reset
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
assertEquals(3, app_0.getSchedulingOpportunities(priority)); // should NOT reset
assertEquals(1, app_0.getTotalRequiredResources(priority)); assertEquals(1, app_0.getTotalRequiredResources(priority));
// Add a request larger than cluster size to verify
// OFF_SWITCH delay is capped by cluster size
app_0.resetSchedulingOpportunities(priority);
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0, 1*GB, 100,
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 100,
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 100,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// Start with off switch. 3 nodes in cluster so shouldn't allocate first 3
for (int i = 0; i < numNodes; i++) {
assignment =
a.assignContainers(clusterResource, node_2, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyNoContainerAllocated(assignment);
assertEquals(i+1, app_0.getSchedulingOpportunities(priority));
}
// delay should be capped at numNodes so next one should allocate
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
assertEquals(numNodes+1, app_0.getSchedulingOpportunities(priority));
} }
@Test @Test