From 82de3e1c116573f903a114faa337bba8fbf39151 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 18 Nov 2015 13:58:47 -0800 Subject: [PATCH] YARN-4287. Capacity Scheduler: Rack Locality improvement. (Nathan Roberts via wangda) --- hadoop-yarn-project/CHANGES.txt | 2 + .../scheduler/AppSchedulingInfo.java | 4 +- .../scheduler/QueueMetrics.java | 37 +++++++++ .../CapacitySchedulerConfiguration.java | 14 +++- .../scheduler/capacity/LeafQueue.java | 29 +++++-- .../scheduler/TestQueueMetrics.java | 55 ++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 75 +++++++++++++++++-- 7 files changed, 202 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a078ce79282..32be9b9c205 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -8,6 +8,8 @@ Release 2.7.3 - UNRELEASED IMPROVEMENTS + YARN-4287. Capacity Scheduler: Rack Locality improvement. (Nathan Roberts via wangda) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 22476d8d97a..7358f03c821 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -285,9 +285,11 @@ public class AppSchedulingInfo { + " container=" + container.getId() + " host=" + container.getNodeId().toString() + " user=" + user - + " resource=" + request.getCapability()); + + " resource=" + request.getCapability() + + " type=" + type); } metrics.allocateResources(user, 1, request.getCapability(), true); + metrics.incrNodeTypeAggregations(user, type); return resourceRequests; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 507b798a562..db46cc6bbba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -63,6 +63,12 @@ public class QueueMetrics implements MetricsSource { @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; @Metric("# of allocated containers") MutableGaugeInt allocatedContainers; @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated; + @Metric("Aggregate # of allocated node-local containers") + MutableCounterLong aggregateNodeLocalContainersAllocated; + @Metric("Aggregate # of allocated rack-local containers") + MutableCounterLong aggregateRackLocalContainersAllocated; + @Metric("Aggregate # of allocated off-switch containers") + MutableCounterLong aggregateOffSwitchContainersAllocated; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; @Metric("Available memory in MB") MutableGaugeInt availableMB; @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores; @@ -375,6 +381,25 @@ public class QueueMetrics implements MetricsSource { pendingVCores.decr(res.getVirtualCores() * containers); } + public void incrNodeTypeAggregations(String user, NodeType type) { + if (type == NodeType.NODE_LOCAL) { + aggregateNodeLocalContainersAllocated.incr(); + } else if (type == NodeType.RACK_LOCAL) { + aggregateRackLocalContainersAllocated.incr(); + } else if (type == NodeType.OFF_SWITCH) { + aggregateOffSwitchContainersAllocated.incr(); + } else { + return; + } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrNodeTypeAggregations(user, type); + } + if (parent != null) { + parent.incrNodeTypeAggregations(user, type); + } + } + public void allocateResources(String user, int containers, Resource res, boolean decrPending) { allocatedContainers.incr(containers); @@ -546,4 +571,16 @@ public class QueueMetrics implements MetricsSource { public MetricsSystem getMetricsSystem() { return metricsSystem; } + + public long getAggregateNodeLocalContainersAllocated() { + return aggregateNodeLocalContainersAllocated.value(); + } + + public long getAggregateRackLocalContainersAllocated() { + return aggregateRackLocalContainersAllocated.value(); + } + + public long getAggregateOffSwitchContainersAllocated() { + return aggregateOffSwitchContainersAllocated.value(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 58a902ed2e7..d933c89023f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -171,6 +171,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final int DEFAULT_NODE_LOCALITY_DELAY = -1; + @Private + public static final String RACK_LOCALITY_FULL_RESET = + PREFIX + "rack-locality-full-reset"; + + @Private + public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true; + @Private public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX = PREFIX + "schedule-asynchronously"; @@ -621,7 +628,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur int delay = getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY); return (delay == DEFAULT_NODE_LOCALITY_DELAY) ? 0 : delay; } - + + public boolean getRackLocalityFullReset() { + return getBoolean(RACK_LOCALITY_FULL_RESET, + DEFAULT_RACK_LOCALITY_FULL_RESET); + } + public ResourceCalculator getResourceCalculator() { return ReflectionUtils.newInstance( getClass( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 65061ba40e9..59119293cc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -93,6 +93,7 @@ public class LeafQueue extends AbstractCSQueue { private float maxAMResourcePerQueuePercent; private int nodeLocalityDelay; + private volatile boolean rackLocalityFullReset; Set activeApplications; Map applicationAttemptMap = @@ -190,6 +191,7 @@ public class LeafQueue extends AbstractCSQueue { } nodeLocalityDelay = conf.getNodeLocalityDelay(); + rackLocalityFullReset = conf.getRackLocalityFullReset(); // re-init this since max allocation could have changed this.minimumAllocationFactor = @@ -863,7 +865,13 @@ public class LeafQueue extends AbstractCSQueue { if (LOG.isDebugEnabled()) { LOG.debug("Resetting scheduling opportunities"); } - application.resetSchedulingOpportunities(priority); + // Only reset scheduling opportunities for RACK_LOCAL if configured + // to do so. Not resetting means we will continue to schedule + // RACK_LOCAL without delay. + if (assignment.getType() == NodeType.NODE_LOCAL + || getRackLocalityFullReset()) { + application.resetSchedulingOpportunities(priority); + } } // Done @@ -985,7 +993,12 @@ public class LeafQueue extends AbstractCSQueue { return userLimit; } - + + @Lock(NoLock.class) + public boolean getRackLocalityFullReset() { + return rackLocalityFullReset; + } + @Lock(NoLock.class) private Resource computeUserLimit(FiCaSchedulerApp application, Resource clusterResource, Resource required, User user, @@ -1348,8 +1361,12 @@ public class LeafQueue extends AbstractCSQueue { float localityWaitFactor = application.getLocalityWaitFactor(priority, scheduler.getNumClusterNodes()); - - return ((requiredContainers * localityWaitFactor) < missedOpportunities); + + // Cap the delay by the number of nodes in the cluster. Under most conditions + // this means we will consider each node in the cluster before + // accepting an off-switch assignment. + return (Math.min(scheduler.getNumClusterNodes(), + (requiredContainers * localityWaitFactor)) < missedOpportunities); } // Check if we need containers on this rack @@ -1512,7 +1529,9 @@ public class LeafQueue extends AbstractCSQueue { " application attempt=" + application.getApplicationAttemptId() + " container=" + container + " queue=" + this + - " clusterResource=" + clusterResource); + " clusterResource=" + clusterResource + + " type=" + type); + createdContainer.setValue(allocatedContainer); return container.getResource(); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 8ad71d231bc..864620f1f54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -198,6 +198,53 @@ public class TestQueueMetrics { checkApps(userSource, 1, 0, 0, 1, 0, 0, true); } + + @Test public void testNodeTypeMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String user = "alice"; + + QueueMetrics parentMetrics = + QueueMetrics.forQueue(ms, parentQueueName, null, true, conf); + Queue parentQueue = make(stub(Queue.class).returning(parentMetrics). + from.getMetrics()); + QueueMetrics metrics = + QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf); + MetricsSource parentQueueSource = queueSource(ms, parentQueueName); + MetricsSource queueSource = queueSource(ms, leafQueueName); + //AppSchedulingInfo app = mockApp(user); + + metrics.submitApp(user); + MetricsSource userSource = userSource(ms, leafQueueName, user); + MetricsSource parentUserSource = userSource(ms, parentQueueName, user); + + metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL); + checkAggregatedNodeTypes(queueSource,1L,0L,0L); + checkAggregatedNodeTypes(parentQueueSource,1L,0L,0L); + checkAggregatedNodeTypes(userSource,1L,0L,0L); + checkAggregatedNodeTypes(parentUserSource,1L,0L,0L); + + metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL); + checkAggregatedNodeTypes(queueSource,1L,1L,0L); + checkAggregatedNodeTypes(parentQueueSource,1L,1L,0L); + checkAggregatedNodeTypes(userSource,1L,1L,0L); + checkAggregatedNodeTypes(parentUserSource,1L,1L,0L); + + metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); + checkAggregatedNodeTypes(queueSource,1L,1L,1L); + checkAggregatedNodeTypes(parentQueueSource,1L,1L,1L); + checkAggregatedNodeTypes(userSource,1L,1L,1L); + checkAggregatedNodeTypes(parentUserSource,1L,1L,1L); + + metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); + checkAggregatedNodeTypes(queueSource,1L,1L,2L); + checkAggregatedNodeTypes(parentQueueSource,1L,1L,2L); + checkAggregatedNodeTypes(userSource,1L,1L,2L); + checkAggregatedNodeTypes(parentUserSource,1L,1L,2L); + + } + + @Test public void testTwoLevelWithUserMetrics() { String parentQueueName = "root"; String leafQueueName = "root.leaf"; @@ -367,6 +414,14 @@ public class TestQueueMetrics { assertGauge("ReservedContainers", reservedCtnrs, rb); } + public static void checkAggregatedNodeTypes(MetricsSource source, + long nodeLocal, long rackLocal, long offSwitch) { + MetricsRecordBuilder rb = getMetrics(source); + assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb); + assertCounter("AggregateRackLocalContainersAllocated", rackLocal, rb); + assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb); + } + private static AppSchedulingInfo mockApp(String user) { AppSchedulingInfo app = mock(AppSchedulingInfo.class); when(app.getUser()).thenReturn(user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 6c2aacc7472..a61c838cbe6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1592,8 +1592,8 @@ public class TestLeafQueue { @Test public void testLocalityScheduling() throws Exception { - // Manipulate queue 'a' - LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + // Manipulate queue 'b' + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B)); // User String user_0 = "user_0"; @@ -1708,25 +1708,26 @@ public class TestLeafQueue { TestUtils.createResourceRequest(host_1, 1*GB, 1, true, priority, recordFactory)); app_0_requests_0.add( - TestUtils.createResourceRequest(rack_1, 1*GB, 1, + TestUtils.createResourceRequest(rack_1, 1*GB, 3, true, priority, recordFactory)); app_0_requests_0.add( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); - assertEquals(2, app_0.getTotalRequiredResources(priority)); + assertEquals(4, app_0.getTotalRequiredResources(priority)); String host_3 = "127.0.0.4"; // on rack_1 FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); // Rack-delay + doReturn(true).when(a).getRackLocalityFullReset(); doReturn(1).when(a).getNodeLocalityDelay(); // Shouldn't assign RACK_LOCAL yet assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); - assertEquals(2, app_0.getTotalRequiredResources(priority)); + assertEquals(4, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Should assign RACK_LOCAL now @@ -1735,10 +1736,70 @@ public class TestLeafQueue { verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset + assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.RACK_LOCAL, assignment.getType()); + + // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset. + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource)); + assertEquals(1, app_0.getSchedulingOpportunities(priority)); + assertEquals(3, app_0.getTotalRequiredResources(priority)); + + // Next time we schedule RACK_LOCAL, don't reset + doReturn(false).when(a).getRackLocalityFullReset(); + + // Should assign RACK_LOCAL now + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource)); + verify(app_0, Mockito.times(2)).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(2, app_0.getSchedulingOpportunities(priority)); // should NOT reset + assertEquals(2, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.RACK_LOCAL, assignment.getType()); + + // Another RACK_LOCAL since schedulingOpportunities not reset + assignment = a.assignContainers(clusterResource, node_3, + new ResourceLimits(clusterResource)); + verify(app_0, Mockito.times(3)).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(3, app_0.getSchedulingOpportunities(priority)); // should NOT reset assertEquals(1, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.RACK_LOCAL, assignment.getType()); + + // Add a request larger than cluster size to verify + // OFF_SWITCH delay is capped by cluster size + app_0.resetSchedulingOpportunities(priority); + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0, 1*GB, 100, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_0, 1*GB, 100, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 100, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // Start with off switch. 3 nodes in cluster so shouldn't allocate first 3 + for (int i = 0; i < numNodes; i++) { + assignment = + a.assignContainers(clusterResource, node_2, new ResourceLimits( + clusterResource)); + verify(app_0, Mockito.times(1)).allocate(any(NodeType.class), eq(node_2), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(i+1, app_0.getSchedulingOpportunities(priority)); + } + // delay should be capped at numNodes so next one should allocate + assignment = a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource)); + verify(app_0, Mockito.times(2)).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(numNodes+1, app_0.getSchedulingOpportunities(priority)); + assertEquals(NodeType.OFF_SWITCH, assignment.getType()); + } - + @Test public void testApplicationPriorityScheduling() throws Exception { // Manipulate queue 'a'