YARN-3446. FairScheduler headroom calculation should exclude nodes in the blacklist. (Zhihai Xu via kasha)

(cherry picked from commit 9d04f26d4c)
(cherry picked from commit f0923819c3)
This commit is contained in:
Karthik Kambatla 2016-01-14 08:33:23 -08:00 committed by Vinod Kumar Vavilapalli (I am also known as @tshooter.)
parent 1c4e039ba7
commit 42a88f9c58
5 changed files with 225 additions and 27 deletions

View File

@ -180,6 +180,21 @@ public abstract class AbstractYarnScheduler
return applications;
}
/**
* Add blacklisted NodeIds to the list that is passed.
*
* @param app application attempt.
* @param blacklistNodeIdList the list to store blacklisted NodeIds.
*/
public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app,
List<NodeId> blacklistNodeIdList) {
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) {
blacklistNodeIdList.add(nodeEntry.getKey());
}
}
}
@Override
public Resource getClusterResource() {
return clusterResource;

View File

@ -18,6 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -37,19 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class keeps track of all the consumption of an application. This also
* keeps track of current running/completed containers for the application.
@ -72,7 +73,7 @@ public class AppSchedulingInfo {
private ActiveUsersManager activeUsersManager;
private boolean pending = true; // whether accepted/allocated by scheduler
private ResourceUsage appResourceUsage;
private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
private final Set<String> amBlacklist = new HashSet<>();
private Set<String> userBlacklist = new HashSet<>();
private Set<String> requestedPartitions = new HashSet<>();
@ -453,8 +454,10 @@ public class AppSchedulingInfo {
*/
public void updateBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
blacklistRemovals);
if (updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
blacklistRemovals)) {
userBlacklistChanged.set(true);
}
}
/**
@ -468,18 +471,26 @@ public class AppSchedulingInfo {
blacklistRemovals);
}
void updateUserOrAMBlacklist(Set<String> blacklist,
boolean updateUserOrAMBlacklist(Set<String> blacklist,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
boolean changed = false;
synchronized (blacklist) {
if (blacklistAdditions != null) {
blacklist.addAll(blacklistAdditions);
changed = blacklist.addAll(blacklistAdditions);
}
if (blacklistRemovals != null) {
blacklist.removeAll(blacklistRemovals);
if (blacklist.removeAll(blacklistRemovals)) {
changed = true;
}
}
}
return changed;
}
public boolean getAndResetBlacklistChanged() {
return userBlacklistChanged.getAndSet(false);
}
public synchronized Collection<Priority> getPriorities() {
return priorities;

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@ -85,6 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Key = RackName, Value = Set of Nodes reserved by app on rack
private Map<String, Set<String>> reservations = new HashMap<>();
private List<NodeId> blacklistNodeIds = new ArrayList<NodeId>();
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To achieve this
@ -179,6 +181,27 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
+ this.attemptResourceUsage.getReserved());
}
private void subtractResourcesOnBlacklistedNodes(
Resource availableResources) {
if (appSchedulingInfo.getAndResetBlacklistChanged()) {
blacklistNodeIds.clear();
scheduler.addBlacklistedNodeIdsToList(this, blacklistNodeIds);
}
for (NodeId nodeId: blacklistNodeIds) {
SchedulerNode node = scheduler.getSchedulerNode(nodeId);
if (node != null) {
Resources.subtractFrom(availableResources,
node.getAvailableResource());
}
}
if (availableResources.getMemory() < 0) {
availableResources.setMemory(0);
}
if (availableResources.getVirtualCores() < 0) {
availableResources.setVirtualCores(0);
}
}
/**
* Headroom depends on resources in the cluster, current usage of the
* queue, queue's fair-share and queue's max-resources.
@ -196,6 +219,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resource clusterAvailableResources =
Resources.subtract(clusterResource, clusterUsage);
subtractResourcesOnBlacklistedNodes(clusterAvailableResources);
Resource queueMaxAvailableResources =
Resources.subtract(queue.getMaxShare(), queueUsage);
Resource maxAvailableResource = Resources.componentwiseMin(

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.doReturn;
import java.util.ArrayList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.junit.Assert;
import org.junit.Test;
public class TestAppSchedulingInfo {
@Test
public void testBacklistChanged() {
ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appIdImpl, 1);
FSLeafQueue queue = mock(FSLeafQueue.class);
doReturn("test").when(queue).getQueueName();
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
appAttemptId, "test", queue, null, 0, new ResourceUsage());
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
new ArrayList<String>());
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
ArrayList<String> blacklistAdditions = new ArrayList<String>();
blacklistAdditions.add("node1");
blacklistAdditions.add("node2");
appSchedulingInfo.updateBlacklist(blacklistAdditions,
new ArrayList<String>());
Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged());
blacklistAdditions.clear();
blacklistAdditions.add("node1");
appSchedulingInfo.updateBlacklist(blacklistAdditions,
new ArrayList<String>());
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
ArrayList<String> blacklistRemovals = new ArrayList<String>();
blacklistRemovals.add("node1");
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
blacklistRemovals);
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
blacklistRemovals);
Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged());
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
blacklistRemovals);
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
}
}

View File

@ -18,14 +18,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@ -35,13 +49,6 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
public class TestFSAppAttempt extends FairSchedulerTestBase {
@Before
@ -260,6 +267,73 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
);
}
@Test
public void testHeadroomWithBlackListedNodes() {
// Add two nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
assertEquals("We should have two alive nodes.",
2, scheduler.getNumClusterNodes());
Resource clusterResource = scheduler.getClusterResource();
Resource clusterUsage = scheduler.getRootQueueMetrics()
.getAllocatedResources();
assertEquals(12 * 1024, clusterResource.getMemory());
assertEquals(12, clusterResource.getVirtualCores());
assertEquals(0, clusterUsage.getMemory());
assertEquals(0, clusterUsage.getVirtualCores());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
scheduler.addApplication(id11.getApplicationId(),
"default", "user1", false);
scheduler.addApplicationAttempt(id11, false, false);
assertNotNull(scheduler.getSchedulerApplications().get(id11.
getApplicationId()));
FSAppAttempt app = scheduler.getSchedulerApp(id11);
assertNotNull(app);
Resource queueUsage = app.getQueue().getResourceUsage();
assertEquals(0, queueUsage.getMemory());
assertEquals(0, queueUsage.getVirtualCores());
SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID());
SchedulerNode n2 = scheduler.getSchedulerNode(node2.getNodeID());
assertNotNull(n1);
assertNotNull(n2);
List<String> blacklistAdditions = new ArrayList<String>(1);
List<String> blacklistRemovals = new ArrayList<String>(1);
blacklistAdditions.add(n1.getNodeName());
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
app.getQueue().setFairShare(clusterResource);
FSAppAttempt spyApp = spy(app);
doReturn(false)
.when(spyApp).isWaitingForAMContainer();
assertTrue(spyApp.isBlacklisted(n1.getNodeName()));
assertFalse(spyApp.isBlacklisted(n2.getNodeName()));
assertEquals(n2.getAvailableResource(), spyApp.getHeadroom());
blacklistAdditions.clear();
blacklistAdditions.add(n2.getNodeName());
blacklistRemovals.add(n1.getNodeName());
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
assertFalse(spyApp.isBlacklisted(n1.getNodeName()));
assertTrue(spyApp.isBlacklisted(n2.getNodeName()));
assertEquals(n1.getAvailableResource(), spyApp.getHeadroom());
blacklistAdditions.clear();
blacklistRemovals.clear();
blacklistRemovals.add(n2.getNodeName());
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
assertFalse(spyApp.isBlacklisted(n1.getNodeName()));
assertFalse(spyApp.isBlacklisted(n2.getNodeName()));
assertEquals(clusterResource, spyApp.getHeadroom());
}
private static long min(long value1, long value2, long value3) {
return Math.min(Math.min(value1, value2), value3);
}