From 5183e881097b37b723f07f4d6af06721a326bea1 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 14 Sep 2011 22:46:57 +0000 Subject: [PATCH] MAPREDUCE-3005. Fix both FifoScheduler and CapacityScheduler to correctly enforce locality constraints. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170879 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../scheduler/capacity/LeafQueue.java | 16 ++- .../scheduler/fifo/FifoScheduler.java | 16 +++ .../scheduler/capacity/TestLeafQueue.java | 102 +++++++++++++++++- 4 files changed, 126 insertions(+), 11 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ffebfe98e6b..8ed6f5e2835 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1324,6 +1324,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2949. Fixed NodeManager to shut-down correctly if a service startup fails. (Ravi Teja via vinodkv) + MAPREDUCE-3005. Fix both FifoScheduler and CapacityScheduler to correctly + enforce locality constraints. (acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 2038e2d871e..9a3b1c4da35 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1023,21 +1023,17 @@ boolean canAssign(SchedulerApp application, Priority priority, // Check if we need containers on this rack ResourceRequest rackLocalRequest = application.getResourceRequest(priority, node.getRackName()); + if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { + return false; + } + + // If we are here, we do need containers on this rack for RACK_LOCAL req if (type == NodeType.RACK_LOCAL) { - if (rackLocalRequest == null) { - return false; - } else { - return rackLocalRequest.getNumContainers() > 0; - } + return true; } // Check if we need containers on this host if (type == NodeType.NODE_LOCAL) { - // First: Do we need containers on this rack? - if (rackLocalRequest != null && rackLocalRequest.getNumContainers() == 0) { - return false; - } - // Now check if we need containers on this host... ResourceRequest nodeLocalRequest = application.getResourceRequest(priority, node.getHostName()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 9b4b3169ff4..752b81ce5de 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -289,6 +289,7 @@ private SchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } + @SuppressWarnings("unchecked") private synchronized void addApplication(ApplicationAttemptId appAttemptId, String queueName, String user) { // TODO: Fix store @@ -440,6 +441,14 @@ private int assignNodeLocalContainers(SchedulerNode node, ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); if (request != null) { + // Don't allocate on this node if we don't need containers on this rack + ResourceRequest rackRequest = + application.getResourceRequest(priority, + node.getRMNode().getRackName()); + if (rackRequest == null || rackRequest.getNumContainers() <= 0) { + return 0; + } + int assignableContainers = Math.min( getMaxAllocatableContainers(application, priority, node, @@ -458,6 +467,13 @@ private int assignRackLocalContainers(SchedulerNode node, ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getRackName()); if (request != null) { + // Don't allocate on this rack if the application doens't need containers + ResourceRequest offSwitchRequest = + application.getResourceRequest(priority, SchedulerNode.ANY); + if (offSwitchRequest.getNumContainers() <= 0) { + return 0; + } + int assignableContainers = Math.min( getMaxAllocatableContainers(application, priority, node, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 01acd1162ff..3ea01003320 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -625,7 +625,6 @@ public void testReservationExchange() throws Exception { } - @Test public void testLocalityScheduling() throws Exception { @@ -876,6 +875,107 @@ public void testApplicationPriorityScheduling() throws Exception { } + @Test + public void testSchedulingConstraints() throws Exception { + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + // User + String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + SchedulerApp app_0 = + spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null)); + a.submitApplication(app_0, user_0, A); + + // Setup some nodes and racks + String host_0_0 = "host_0_0"; + String rack_0 = "rack_0"; + SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); + String host_0_1 = "host_0_1"; + SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); + + + String host_1_0 = "host_1_0"; + String rack_1 = "rack_1"; + SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8*GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests and submit + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0_0, 1*GB, 1, + priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0_1, 1*GB, 1, + priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_0, 1*GB, 1, + priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_1_0, 1*GB, 1, + priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_1, 1*GB, 1, + priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // Start testing... + + // Add one request + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one + priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // NODE_LOCAL - node_0_1 + a.assignContainers(clusterResource, node_0_0); + verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset + assertEquals(0, app_0.getTotalRequiredResources(priority)); + + // No allocation on node_1_0 even though it's node/rack local since + // required(ANY) == 0 + a.assignContainers(clusterResource, node_1_0); + verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero + // since #req=0 + assertEquals(0, app_0.getTotalRequiredResources(priority)); + + // Add one request + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one + priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // No allocation on node_0_1 even though it's node/rack local since + // required(rack_1) == 0 + a.assignContainers(clusterResource, node_0_1); + verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(1, app_0.getSchedulingOpportunities(priority)); + assertEquals(1, app_0.getTotalRequiredResources(priority)); + + // NODE_LOCAL - node_1 + a.assignContainers(clusterResource, node_1_0); + verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset + assertEquals(0, app_0.getTotalRequiredResources(priority)); + + } + @After public void tearDown() throws Exception { }