From 42a88f9c5894265a9bd5584ea12b35a415d25307 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 14 Jan 2016 08:33:23 -0800 Subject: [PATCH] YARN-3446. FairScheduler headroom calculation should exclude nodes in the blacklist. (Zhihai Xu via kasha) (cherry picked from commit 9d04f26d4c42170ee3dab2f6fb09a94bbf72fc65) (cherry picked from commit f0923819c35b8c499a9bcd0c6c78511f3a400fef) --- .../scheduler/AbstractYarnScheduler.java | 15 ++++ .../scheduler/AppSchedulingInfo.java | 51 ++++++----- .../scheduler/fair/FSAppAttempt.java | 25 ++++++ .../scheduler/TestAppSchedulingInfo.java | 73 +++++++++++++++ .../scheduler/fair/TestFSAppAttempt.java | 88 +++++++++++++++++-- 5 files changed, 225 insertions(+), 27 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index cfe2a7a9ce0..5245bb6cb56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -180,6 +180,21 @@ public abstract class AbstractYarnScheduler return applications; } + /** + * Add blacklisted NodeIds to the list that is passed. + * + * @param app application attempt. + * @param blacklistNodeIdList the list to store blacklisted NodeIds. + */ + public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app, + List blacklistNodeIdList) { + for (Map.Entry nodeEntry : nodes.entrySet()) { + if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) { + blacklistNodeIdList.add(nodeEntry.getKey()); + } + } + } + @Override public Resource getClusterResource() { return clusterResource; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 9b2ba14ec70..5952cc2e27e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -18,6 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -37,19 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - /** * This class keeps track of all the consumption of an application. This also * keeps track of current running/completed containers for the application. @@ -72,7 +73,7 @@ public class AppSchedulingInfo { private ActiveUsersManager activeUsersManager; private boolean pending = true; // whether accepted/allocated by scheduler private ResourceUsage appResourceUsage; - + private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false); private final Set amBlacklist = new HashSet<>(); private Set userBlacklist = new HashSet<>(); private Set requestedPartitions = new HashSet<>(); @@ -451,10 +452,12 @@ public class AppSchedulingInfo { * @param blacklistAdditions resources to be added to the userBlacklist * @param blacklistRemovals resources to be removed from the userBlacklist */ - public void updateBlacklist( + public void updateBlacklist( List blacklistAdditions, List blacklistRemovals) { - updateUserOrAMBlacklist(userBlacklist, blacklistAdditions, - blacklistRemovals); + if (updateUserOrAMBlacklist(userBlacklist, blacklistAdditions, + blacklistRemovals)) { + userBlacklistChanged.set(true); + } } /** @@ -468,17 +471,25 @@ public class AppSchedulingInfo { blacklistRemovals); } - void updateUserOrAMBlacklist(Set blacklist, + boolean updateUserOrAMBlacklist(Set blacklist, List blacklistAdditions, List blacklistRemovals) { + boolean changed = false; synchronized (blacklist) { if (blacklistAdditions != null) { - blacklist.addAll(blacklistAdditions); + changed = blacklist.addAll(blacklistAdditions); } if (blacklistRemovals != null) { - blacklist.removeAll(blacklistRemovals); + if (blacklist.removeAll(blacklistRemovals)) { + changed = true; + } } } + return changed; + } + + public boolean getAndResetBlacklistChanged() { + return userBlacklistChanged.getAndSet(false); } public synchronized Collection getPriorities() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 25f40952a04..4b21f7fe244 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.Serializable; import java.text.DecimalFormat; import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; @@ -85,6 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Key = RackName, Value = Set of Nodes reserved by app on rack private Map> reservations = new HashMap<>(); + private List blacklistNodeIds = new ArrayList(); /** * Delay scheduling: We often want to prioritize scheduling of node-local * containers over rack-local or off-switch containers. To achieve this @@ -179,6 +181,27 @@ public class FSAppAttempt extends SchedulerApplicationAttempt + this.attemptResourceUsage.getReserved()); } + private void subtractResourcesOnBlacklistedNodes( + Resource availableResources) { + if (appSchedulingInfo.getAndResetBlacklistChanged()) { + blacklistNodeIds.clear(); + scheduler.addBlacklistedNodeIdsToList(this, blacklistNodeIds); + } + for (NodeId nodeId: blacklistNodeIds) { + SchedulerNode node = scheduler.getSchedulerNode(nodeId); + if (node != null) { + Resources.subtractFrom(availableResources, + node.getAvailableResource()); + } + } + if (availableResources.getMemory() < 0) { + availableResources.setMemory(0); + } + if (availableResources.getVirtualCores() < 0) { + availableResources.setVirtualCores(0); + } + } + /** * Headroom depends on resources in the cluster, current usage of the * queue, queue's fair-share and queue's max-resources. @@ -196,6 +219,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt Resource clusterAvailableResources = Resources.subtract(clusterResource, clusterUsage); + subtractResourcesOnBlacklistedNodes(clusterAvailableResources); + Resource queueMaxAvailableResources = Resources.subtract(queue.getMaxShare(), queueUsage); Resource maxAvailableResource = Resources.componentwiseMin( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java new file mode 100644 index 00000000000..4141a533888 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doReturn; +import java.util.ArrayList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.junit.Assert; +import org.junit.Test; + +public class TestAppSchedulingInfo { + + @Test + public void testBacklistChanged() { + ApplicationId appIdImpl = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appIdImpl, 1); + + FSLeafQueue queue = mock(FSLeafQueue.class); + doReturn("test").when(queue).getQueueName(); + AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo( + appAttemptId, "test", queue, null, 0, new ResourceUsage()); + + appSchedulingInfo.updateBlacklist(new ArrayList(), + new ArrayList()); + Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged()); + + ArrayList blacklistAdditions = new ArrayList(); + blacklistAdditions.add("node1"); + blacklistAdditions.add("node2"); + appSchedulingInfo.updateBlacklist(blacklistAdditions, + new ArrayList()); + Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged()); + + blacklistAdditions.clear(); + blacklistAdditions.add("node1"); + appSchedulingInfo.updateBlacklist(blacklistAdditions, + new ArrayList()); + Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged()); + + ArrayList blacklistRemovals = new ArrayList(); + blacklistRemovals.add("node1"); + appSchedulingInfo.updateBlacklist(new ArrayList(), + blacklistRemovals); + appSchedulingInfo.updateBlacklist(new ArrayList(), + blacklistRemovals); + Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged()); + + appSchedulingInfo.updateBlacklist(new ArrayList(), + blacklistRemovals); + Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 8d799efe101..b30fafd000c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -18,14 +18,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -35,13 +49,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - public class TestFSAppAttempt extends FairSchedulerTestBase { @Before @@ -260,6 +267,73 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { ); } + @Test + public void testHeadroomWithBlackListedNodes() { + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + assertEquals("We should have two alive nodes.", + 2, scheduler.getNumClusterNodes()); + Resource clusterResource = scheduler.getClusterResource(); + Resource clusterUsage = scheduler.getRootQueueMetrics() + .getAllocatedResources(); + assertEquals(12 * 1024, clusterResource.getMemory()); + assertEquals(12, clusterResource.getVirtualCores()); + assertEquals(0, clusterUsage.getMemory()); + assertEquals(0, clusterUsage.getVirtualCores()); + ApplicationAttemptId id11 = createAppAttemptId(1, 1); + createMockRMApp(id11); + scheduler.addApplication(id11.getApplicationId(), + "default", "user1", false); + scheduler.addApplicationAttempt(id11, false, false); + assertNotNull(scheduler.getSchedulerApplications().get(id11. + getApplicationId())); + FSAppAttempt app = scheduler.getSchedulerApp(id11); + assertNotNull(app); + Resource queueUsage = app.getQueue().getResourceUsage(); + assertEquals(0, queueUsage.getMemory()); + assertEquals(0, queueUsage.getVirtualCores()); + SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID()); + SchedulerNode n2 = scheduler.getSchedulerNode(node2.getNodeID()); + assertNotNull(n1); + assertNotNull(n2); + List blacklistAdditions = new ArrayList(1); + List blacklistRemovals = new ArrayList(1); + blacklistAdditions.add(n1.getNodeName()); + app.updateBlacklist(blacklistAdditions, blacklistRemovals); + app.getQueue().setFairShare(clusterResource); + FSAppAttempt spyApp = spy(app); + doReturn(false) + .when(spyApp).isWaitingForAMContainer(); + assertTrue(spyApp.isBlacklisted(n1.getNodeName())); + assertFalse(spyApp.isBlacklisted(n2.getNodeName())); + assertEquals(n2.getAvailableResource(), spyApp.getHeadroom()); + + blacklistAdditions.clear(); + blacklistAdditions.add(n2.getNodeName()); + blacklistRemovals.add(n1.getNodeName()); + app.updateBlacklist(blacklistAdditions, blacklistRemovals); + assertFalse(spyApp.isBlacklisted(n1.getNodeName())); + assertTrue(spyApp.isBlacklisted(n2.getNodeName())); + assertEquals(n1.getAvailableResource(), spyApp.getHeadroom()); + + blacklistAdditions.clear(); + blacklistRemovals.clear(); + blacklistRemovals.add(n2.getNodeName()); + app.updateBlacklist(blacklistAdditions, blacklistRemovals); + assertFalse(spyApp.isBlacklisted(n1.getNodeName())); + assertFalse(spyApp.isBlacklisted(n2.getNodeName())); + assertEquals(clusterResource, spyApp.getHeadroom()); + } + private static long min(long value1, long value2, long value3) { return Math.min(Math.min(value1, value2), value3); }