diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f82727b35cc..2b17669391a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -391,6 +391,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-750. Allow for black-listing resources in YARN API and Impl in CS (acmurthy via bikas) + YARN-877. Support resource blacklisting for FifoScheduler. + (Junping Du via llu) + YARN-686. Flatten NodeReport. (sandyr via tucu) YARN-737. Throw some specific exceptions directly instead of wrapping them diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index bf60baba8d3..0e7469561d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; @@ -816,7 +817,7 @@ public class LeafQueue implements CSQueue { synchronized (application) { // Check if this resource is on the blacklist - if (isBlacklisted(application, node)) { + if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) { continue; } @@ -902,28 +903,6 @@ public class LeafQueue implements CSQueue { return NULL_ASSIGNMENT; } - - boolean isBlacklisted(FiCaSchedulerApp application, FiCaSchedulerNode node) { - if (application.isBlacklisted(node.getHostName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping 'host' " + node.getHostName() + - " for " + application.getApplicationId() + - " since it has been blacklisted"); - } - return true; - } - - if (application.isBlacklisted(node.getRackName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping 'rack' " + node.getRackName() + - " for " + application.getApplicationId() + - " since it has been blacklisted"); - } - return true; - } - - return false; - } private synchronized CSAssignment assignReservedContainer(FiCaSchedulerApp application, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java new file mode 100644 index 00000000000..1e96949787c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; + +import org.apache.commons.logging.Log; + +public class FiCaSchedulerUtils { + + public static boolean isBlacklisted(FiCaSchedulerApp application, + FiCaSchedulerNode node, Log LOG) { + if (application.isBlacklisted(node.getHostName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping 'host' " + node.getHostName() + + " for " + application.getApplicationId() + + " since it has been blacklisted"); + } + return true; + } + + if (application.isBlacklisted(node.getRackName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping 'rack' " + node.getRackName() + + " for " + application.getApplicationId() + + " since it has been blacklisted"); + } + return true; + } + + return false; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index af9a4951663..d971f3b4496 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; @@ -290,7 +291,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { application.showRequests(); // Update application requests - application.updateResourceRequests(ask, null, null); + application.updateResourceRequests(ask, blacklistAdditions, blacklistRemovals); LOG.debug("allocate: post-update" + " applicationId=" + applicationAttemptId + @@ -388,6 +389,11 @@ public class FifoScheduler implements ResourceScheduler, Configurable { LOG.debug("pre-assignContainers"); application.showRequests(); synchronized (application) { + // Check if this resource is on the blacklist + if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) { + continue; + } + for (Priority priority : application.getPriorities()) { int maxContainers = getMaxAllocatableContainers(application, priority, node, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 54eb0c53a5b..c57a7529a3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -19,13 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -243,7 +242,6 @@ public class TestFifoScheduler { fs.handle(new NodeAddedSchedulerEvent(n1)); fs.handle(new NodeAddedSchedulerEvent(n2)); - List emptyList = new ArrayList(); fs.handle(new NodeUpdateSchedulerEvent(n1)); Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB()); @@ -257,6 +255,120 @@ public class TestFifoScheduler { Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); } + @Test (timeout = 50000) + public void testBlackListNodes() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); + + int rack_num_0 = 0; + int rack_num_1 = 1; + // Add 4 nodes in 2 racks + + // host_0_0 in rack0 + String host_0_0 = "127.0.0.1"; + RMNode n1 = + MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, host_0_0); + fs.handle(new NodeAddedSchedulerEvent(n1)); + + // host_0_1 in rack0 + String host_0_1 = "127.0.0.2"; + RMNode n2 = + MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, host_0_1); + fs.handle(new NodeAddedSchedulerEvent(n2)); + + // host_1_0 in rack1 + String host_1_0 = "127.0.0.3"; + RMNode n3 = + MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, host_1_0); + fs.handle(new NodeAddedSchedulerEvent(n3)); + + // host_1_1 in rack1 + String host_1_1 = "127.0.0.4"; + RMNode n4 = + MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, host_1_1); + fs.handle(new NodeAddedSchedulerEvent(n4)); + + // Add one application + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, 1); + SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue", + "user"); + fs.handle(event1); + + List emptyId = new ArrayList(); + List emptyAsk = new ArrayList(); + + // Allow rack-locality for rack_1, but blacklist host_1_0 + + // Set up resource requests + // Ask for a 1 GB container for app 1 + List ask1 = new ArrayList(); + ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), + "rack1", BuilderUtils.newResource(GB, 1), 1)); + ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), + ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); + fs.allocate(appAttemptId1, ask1, emptyId, Collections.singletonList(host_1_0), null); + + // Trigger container assignment + fs.handle(new NodeUpdateSchedulerEvent(n3)); + + // Get the allocation for the application and verify no allocation on blacklist node + Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + + Assert.assertEquals("allocation1", 0, allocation1.getContainers().size()); + + // verify host_1_1 can get allocated as not in blacklist + fs.handle(new NodeUpdateSchedulerEvent(n4)); + Allocation allocation2 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation2", 1, allocation2.getContainers().size()); + List containerList = allocation2.getContainers(); + for (Container container : containerList) { + Assert.assertEquals("Container is allocated on n4", + container.getNodeId(), n4.getNodeID()); + } + + // Ask for a 1 GB container again for app 1 + List ask2 = new ArrayList(); + // this time, rack0 is also in blacklist, so only host_1_1 is available to + // be assigned + ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), + ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); + fs.allocate(appAttemptId1, ask2, emptyId, Collections.singletonList("rack0"), null); + + // verify n1 is not qualified to be allocated + fs.handle(new NodeUpdateSchedulerEvent(n1)); + Allocation allocation3 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation3", 0, allocation3.getContainers().size()); + + // verify n2 is not qualified to be allocated + fs.handle(new NodeUpdateSchedulerEvent(n2)); + Allocation allocation4 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation4", 0, allocation4.getContainers().size()); + + // verify n3 is not qualified to be allocated + fs.handle(new NodeUpdateSchedulerEvent(n3)); + Allocation allocation5 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation5", 0, allocation5.getContainers().size()); + + fs.handle(new NodeUpdateSchedulerEvent(n4)); + Allocation allocation6 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Assert.assertEquals("allocation6", 1, allocation6.getContainers().size()); + + containerList = allocation6.getContainers(); + for (Container container : containerList) { + Assert.assertEquals("Container is allocated on n4", + container.getNodeId(), n4.getNodeID()); + } + + rm.stop(); + } + @Test (timeout = 50000) public void testHeadroom() throws Exception { @@ -287,7 +399,6 @@ public class TestFifoScheduler { "user"); fs.handle(event2); - List emptyStatus = new ArrayList(); List emptyId = new ArrayList(); List emptyAsk = new ArrayList();