YARN-3446. FairScheduler headroom calculation should exclude nodes in the blacklist. (Zhihai Xu via kasha)
This commit is contained in:
parent
5cc44d18aa
commit
9d04f26d4c
|
@ -124,6 +124,9 @@ Release 2.9.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-4567. javadoc failing on java 8. (Steve Loughran via aajisaka)
|
YARN-4567. javadoc failing on java 8. (Steve Loughran via aajisaka)
|
||||||
|
|
||||||
|
YARN-3446. FairScheduler headroom calculation should exclude nodes in the
|
||||||
|
blacklist. (Zhihai Xu via kasha)
|
||||||
|
|
||||||
Release 2.8.0 - UNRELEASED
|
Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -181,6 +181,21 @@ public abstract class AbstractYarnScheduler
|
||||||
return applications;
|
return applications;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add blacklisted NodeIds to the list that is passed.
|
||||||
|
*
|
||||||
|
* @param app application attempt.
|
||||||
|
* @param blacklistNodeIdList the list to store blacklisted NodeIds.
|
||||||
|
*/
|
||||||
|
public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app,
|
||||||
|
List<NodeId> blacklistNodeIdList) {
|
||||||
|
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
|
||||||
|
if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) {
|
||||||
|
blacklistNodeIdList.add(nodeEntry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource getClusterResource() {
|
public Resource getClusterResource() {
|
||||||
return clusterResource;
|
return clusterResource;
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -69,7 +70,7 @@ public class AppSchedulingInfo {
|
||||||
private ActiveUsersManager activeUsersManager;
|
private ActiveUsersManager activeUsersManager;
|
||||||
private boolean pending = true; // whether accepted/allocated by scheduler
|
private boolean pending = true; // whether accepted/allocated by scheduler
|
||||||
private ResourceUsage appResourceUsage;
|
private ResourceUsage appResourceUsage;
|
||||||
|
private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
|
||||||
private final Set<String> amBlacklist = new HashSet<>();
|
private final Set<String> amBlacklist = new HashSet<>();
|
||||||
private Set<String> userBlacklist = new HashSet<>();
|
private Set<String> userBlacklist = new HashSet<>();
|
||||||
|
|
||||||
|
@ -424,10 +425,12 @@ public class AppSchedulingInfo {
|
||||||
* @param blacklistAdditions resources to be added to the userBlacklist
|
* @param blacklistAdditions resources to be added to the userBlacklist
|
||||||
* @param blacklistRemovals resources to be removed from the userBlacklist
|
* @param blacklistRemovals resources to be removed from the userBlacklist
|
||||||
*/
|
*/
|
||||||
public void updateBlacklist(
|
public void updateBlacklist(
|
||||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||||
updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
|
if (updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
|
||||||
blacklistRemovals);
|
blacklistRemovals)) {
|
||||||
|
userBlacklistChanged.set(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -441,17 +444,25 @@ public class AppSchedulingInfo {
|
||||||
blacklistRemovals);
|
blacklistRemovals);
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateUserOrAMBlacklist(Set<String> blacklist,
|
boolean updateUserOrAMBlacklist(Set<String> blacklist,
|
||||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||||
|
boolean changed = false;
|
||||||
synchronized (blacklist) {
|
synchronized (blacklist) {
|
||||||
if (blacklistAdditions != null) {
|
if (blacklistAdditions != null) {
|
||||||
blacklist.addAll(blacklistAdditions);
|
changed = blacklist.addAll(blacklistAdditions);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (blacklistRemovals != null) {
|
if (blacklistRemovals != null) {
|
||||||
blacklist.removeAll(blacklistRemovals);
|
if (blacklist.removeAll(blacklistRemovals)) {
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return changed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getAndResetBlacklistChanged() {
|
||||||
|
return userBlacklistChanged.getAndSet(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized Collection<Priority> getPriorities() {
|
public synchronized Collection<Priority> getPriorities() {
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.text.DecimalFormat;
|
import java.text.DecimalFormat;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -85,6 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
// Key = RackName, Value = Set of Nodes reserved by app on rack
|
// Key = RackName, Value = Set of Nodes reserved by app on rack
|
||||||
private Map<String, Set<String>> reservations = new HashMap<>();
|
private Map<String, Set<String>> reservations = new HashMap<>();
|
||||||
|
|
||||||
|
private List<NodeId> blacklistNodeIds = new ArrayList<NodeId>();
|
||||||
/**
|
/**
|
||||||
* Delay scheduling: We often want to prioritize scheduling of node-local
|
* Delay scheduling: We often want to prioritize scheduling of node-local
|
||||||
* containers over rack-local or off-switch containers. To achieve this
|
* containers over rack-local or off-switch containers. To achieve this
|
||||||
|
@ -179,6 +181,27 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
+ this.attemptResourceUsage.getReserved());
|
+ this.attemptResourceUsage.getReserved());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void subtractResourcesOnBlacklistedNodes(
|
||||||
|
Resource availableResources) {
|
||||||
|
if (appSchedulingInfo.getAndResetBlacklistChanged()) {
|
||||||
|
blacklistNodeIds.clear();
|
||||||
|
scheduler.addBlacklistedNodeIdsToList(this, blacklistNodeIds);
|
||||||
|
}
|
||||||
|
for (NodeId nodeId: blacklistNodeIds) {
|
||||||
|
SchedulerNode node = scheduler.getSchedulerNode(nodeId);
|
||||||
|
if (node != null) {
|
||||||
|
Resources.subtractFrom(availableResources,
|
||||||
|
node.getAvailableResource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (availableResources.getMemory() < 0) {
|
||||||
|
availableResources.setMemory(0);
|
||||||
|
}
|
||||||
|
if (availableResources.getVirtualCores() < 0) {
|
||||||
|
availableResources.setVirtualCores(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Headroom depends on resources in the cluster, current usage of the
|
* Headroom depends on resources in the cluster, current usage of the
|
||||||
* queue, queue's fair-share and queue's max-resources.
|
* queue, queue's fair-share and queue's max-resources.
|
||||||
|
@ -196,6 +219,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
|
|
||||||
Resource clusterAvailableResources =
|
Resource clusterAvailableResources =
|
||||||
Resources.subtract(clusterResource, clusterUsage);
|
Resources.subtract(clusterResource, clusterUsage);
|
||||||
|
subtractResourcesOnBlacklistedNodes(clusterAvailableResources);
|
||||||
|
|
||||||
Resource queueMaxAvailableResources =
|
Resource queueMaxAvailableResources =
|
||||||
Resources.subtract(queue.getMaxShare(), queueUsage);
|
Resources.subtract(queue.getMaxShare(), queueUsage);
|
||||||
Resource maxAvailableResource = Resources.componentwiseMin(
|
Resource maxAvailableResource = Resources.componentwiseMin(
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestAppSchedulingInfo {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBacklistChanged() {
|
||||||
|
ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appIdImpl, 1);
|
||||||
|
|
||||||
|
FSLeafQueue queue = mock(FSLeafQueue.class);
|
||||||
|
doReturn("test").when(queue).getQueueName();
|
||||||
|
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
|
||||||
|
appAttemptId, "test", queue, null, 0, new ResourceUsage());
|
||||||
|
|
||||||
|
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
|
||||||
|
new ArrayList<String>());
|
||||||
|
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||||
|
|
||||||
|
ArrayList<String> blacklistAdditions = new ArrayList<String>();
|
||||||
|
blacklistAdditions.add("node1");
|
||||||
|
blacklistAdditions.add("node2");
|
||||||
|
appSchedulingInfo.updateBlacklist(blacklistAdditions,
|
||||||
|
new ArrayList<String>());
|
||||||
|
Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||||
|
|
||||||
|
blacklistAdditions.clear();
|
||||||
|
blacklistAdditions.add("node1");
|
||||||
|
appSchedulingInfo.updateBlacklist(blacklistAdditions,
|
||||||
|
new ArrayList<String>());
|
||||||
|
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||||
|
|
||||||
|
ArrayList<String> blacklistRemovals = new ArrayList<String>();
|
||||||
|
blacklistRemovals.add("node1");
|
||||||
|
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
|
||||||
|
blacklistRemovals);
|
||||||
|
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
|
||||||
|
blacklistRemovals);
|
||||||
|
Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||||
|
|
||||||
|
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
|
||||||
|
blacklistRemovals);
|
||||||
|
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,16 +18,30 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
|
@ -256,6 +270,73 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHeadroomWithBlackListedNodes() {
|
||||||
|
// Add two nodes
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||||
|
"127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
RMNode node2 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
|
||||||
|
"127.0.0.2");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
assertEquals("We should have two alive nodes.",
|
||||||
|
2, scheduler.getNumClusterNodes());
|
||||||
|
Resource clusterResource = scheduler.getClusterResource();
|
||||||
|
Resource clusterUsage = scheduler.getRootQueueMetrics()
|
||||||
|
.getAllocatedResources();
|
||||||
|
assertEquals(12 * 1024, clusterResource.getMemory());
|
||||||
|
assertEquals(12, clusterResource.getVirtualCores());
|
||||||
|
assertEquals(0, clusterUsage.getMemory());
|
||||||
|
assertEquals(0, clusterUsage.getVirtualCores());
|
||||||
|
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
||||||
|
createMockRMApp(id11);
|
||||||
|
scheduler.addApplication(id11.getApplicationId(),
|
||||||
|
"default", "user1", false);
|
||||||
|
scheduler.addApplicationAttempt(id11, false, false);
|
||||||
|
assertNotNull(scheduler.getSchedulerApplications().get(id11.
|
||||||
|
getApplicationId()));
|
||||||
|
FSAppAttempt app = scheduler.getSchedulerApp(id11);
|
||||||
|
assertNotNull(app);
|
||||||
|
Resource queueUsage = app.getQueue().getResourceUsage();
|
||||||
|
assertEquals(0, queueUsage.getMemory());
|
||||||
|
assertEquals(0, queueUsage.getVirtualCores());
|
||||||
|
SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID());
|
||||||
|
SchedulerNode n2 = scheduler.getSchedulerNode(node2.getNodeID());
|
||||||
|
assertNotNull(n1);
|
||||||
|
assertNotNull(n2);
|
||||||
|
List<String> blacklistAdditions = new ArrayList<String>(1);
|
||||||
|
List<String> blacklistRemovals = new ArrayList<String>(1);
|
||||||
|
blacklistAdditions.add(n1.getNodeName());
|
||||||
|
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||||
|
app.getQueue().setFairShare(clusterResource);
|
||||||
|
FSAppAttempt spyApp = spy(app);
|
||||||
|
doReturn(false)
|
||||||
|
.when(spyApp).isWaitingForAMContainer();
|
||||||
|
assertTrue(spyApp.isBlacklisted(n1.getNodeName()));
|
||||||
|
assertFalse(spyApp.isBlacklisted(n2.getNodeName()));
|
||||||
|
assertEquals(n2.getAvailableResource(), spyApp.getHeadroom());
|
||||||
|
|
||||||
|
blacklistAdditions.clear();
|
||||||
|
blacklistAdditions.add(n2.getNodeName());
|
||||||
|
blacklistRemovals.add(n1.getNodeName());
|
||||||
|
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||||
|
assertFalse(spyApp.isBlacklisted(n1.getNodeName()));
|
||||||
|
assertTrue(spyApp.isBlacklisted(n2.getNodeName()));
|
||||||
|
assertEquals(n1.getAvailableResource(), spyApp.getHeadroom());
|
||||||
|
|
||||||
|
blacklistAdditions.clear();
|
||||||
|
blacklistRemovals.clear();
|
||||||
|
blacklistRemovals.add(n2.getNodeName());
|
||||||
|
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||||
|
assertFalse(spyApp.isBlacklisted(n1.getNodeName()));
|
||||||
|
assertFalse(spyApp.isBlacklisted(n2.getNodeName()));
|
||||||
|
assertEquals(clusterResource, spyApp.getHeadroom());
|
||||||
|
}
|
||||||
|
|
||||||
private static int min(int value1, int value2, int value3) {
|
private static int min(int value1, int value2, int value3) {
|
||||||
return Math.min(Math.min(value1, value2), value3);
|
return Math.min(Math.min(value1, value2), value3);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue