YARN-877. Support resource blacklisting for FifoScheduler. (Junping Du via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1498021 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Luke Lu 2013-06-29 20:18:57 +00:00
parent 28d5fa1fb1
commit 8eb3be63f5
5 changed files with 175 additions and 28 deletions

View File

@ -408,6 +408,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-750. Allow for black-listing resources in YARN API and Impl in CS YARN-750. Allow for black-listing resources in YARN API and Impl in CS
(acmurthy via bikas) (acmurthy via bikas)
YARN-877. Support resource blacklisting for FifoScheduler.
(Junping Du via llu)
YARN-686. Flatten NodeReport. (sandyr via tucu) YARN-686. Flatten NodeReport. (sandyr via tucu)
YARN-737. Throw some specific exceptions directly instead of wrapping them YARN-737. Throw some specific exceptions directly instead of wrapping them

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
@ -816,7 +817,7 @@ public class LeafQueue implements CSQueue {
synchronized (application) { synchronized (application) {
// Check if this resource is on the blacklist // Check if this resource is on the blacklist
if (isBlacklisted(application, node)) { if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
continue; continue;
} }
@ -902,28 +903,6 @@ public class LeafQueue implements CSQueue {
return NULL_ASSIGNMENT; return NULL_ASSIGNMENT;
} }
boolean isBlacklisted(FiCaSchedulerApp application, FiCaSchedulerNode node) {
if (application.isBlacklisted(node.getHostName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'host' " + node.getHostName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
if (application.isBlacklisted(node.getRackName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'rack' " + node.getRackName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
return false;
}
private synchronized CSAssignment private synchronized CSAssignment
assignReservedContainer(FiCaSchedulerApp application, assignReservedContainer(FiCaSchedulerApp application,

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import org.apache.commons.logging.Log;
public class FiCaSchedulerUtils {
public static boolean isBlacklisted(FiCaSchedulerApp application,
FiCaSchedulerNode node, Log LOG) {
if (application.isBlacklisted(node.getHostName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'host' " + node.getHostName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
if (application.isBlacklisted(node.getRackName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'rack' " + node.getRackName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
return false;
}
}

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@ -290,7 +291,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
application.showRequests(); application.showRequests();
// Update application requests // Update application requests
application.updateResourceRequests(ask, null, null); application.updateResourceRequests(ask, blacklistAdditions, blacklistRemovals);
LOG.debug("allocate: post-update" + LOG.debug("allocate: post-update" +
" applicationId=" + applicationAttemptId + " applicationId=" + applicationAttemptId +
@ -388,6 +389,11 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
LOG.debug("pre-assignContainers"); LOG.debug("pre-assignContainers");
application.showRequests(); application.showRequests();
synchronized (application) { synchronized (application) {
// Check if this resource is on the blacklist
if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
continue;
}
for (Priority priority : application.getPriorities()) { for (Priority priority : application.getPriorities()) {
int maxContainers = int maxContainers =
getMaxAllocatableContainers(application, priority, node, getMaxAllocatableContainers(application, priority, node,

View File

@ -19,13 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -243,7 +242,6 @@ public class TestFifoScheduler {
fs.handle(new NodeAddedSchedulerEvent(n1)); fs.handle(new NodeAddedSchedulerEvent(n1));
fs.handle(new NodeAddedSchedulerEvent(n2)); fs.handle(new NodeAddedSchedulerEvent(n2));
List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
fs.handle(new NodeUpdateSchedulerEvent(n1)); fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB()); Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
@ -257,6 +255,120 @@ public class TestFifoScheduler {
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
} }
@Test (timeout = 50000)
public void testBlackListNodes() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler();
int rack_num_0 = 0;
int rack_num_1 = 1;
// Add 4 nodes in 2 racks
// host_0_0 in rack0
String host_0_0 = "127.0.0.1";
RMNode n1 =
MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, host_0_0);
fs.handle(new NodeAddedSchedulerEvent(n1));
// host_0_1 in rack0
String host_0_1 = "127.0.0.2";
RMNode n2 =
MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, host_0_1);
fs.handle(new NodeAddedSchedulerEvent(n2));
// host_1_0 in rack1
String host_1_0 = "127.0.0.3";
RMNode n3 =
MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, host_1_0);
fs.handle(new NodeAddedSchedulerEvent(n3));
// host_1_1 in rack1
String host_1_1 = "127.0.0.4";
RMNode n4 =
MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, host_1_1);
fs.handle(new NodeAddedSchedulerEvent(n4));
// Add one application
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue",
"user");
fs.handle(event1);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
// Allow rack-locality for rack_1, but blacklist host_1_0
// Set up resource requests
// Ask for a 1 GB container for app 1
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
"rack1", BuilderUtils.newResource(GB, 1), 1));
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
fs.allocate(appAttemptId1, ask1, emptyId, Collections.singletonList(host_1_0), null);
// Trigger container assignment
fs.handle(new NodeUpdateSchedulerEvent(n3));
// Get the allocation for the application and verify no allocation on blacklist node
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
Assert.assertEquals("allocation1", 0, allocation1.getContainers().size());
// verify host_1_1 can get allocated as not in blacklist
fs.handle(new NodeUpdateSchedulerEvent(n4));
Allocation allocation2 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
Assert.assertEquals("allocation2", 1, allocation2.getContainers().size());
List<Container> containerList = allocation2.getContainers();
for (Container container : containerList) {
Assert.assertEquals("Container is allocated on n4",
container.getNodeId(), n4.getNodeID());
}
// Ask for a 1 GB container again for app 1
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
// this time, rack0 is also in blacklist, so only host_1_1 is available to
// be assigned
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
fs.allocate(appAttemptId1, ask2, emptyId, Collections.singletonList("rack0"), null);
// verify n1 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n1));
Allocation allocation3 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
Assert.assertEquals("allocation3", 0, allocation3.getContainers().size());
// verify n2 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n2));
Allocation allocation4 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
Assert.assertEquals("allocation4", 0, allocation4.getContainers().size());
// verify n3 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n3));
Allocation allocation5 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
Assert.assertEquals("allocation5", 0, allocation5.getContainers().size());
fs.handle(new NodeUpdateSchedulerEvent(n4));
Allocation allocation6 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
Assert.assertEquals("allocation6", 1, allocation6.getContainers().size());
containerList = allocation6.getContainers();
for (Container container : containerList) {
Assert.assertEquals("Container is allocated on n4",
container.getNodeId(), n4.getNodeID());
}
rm.stop();
}
@Test (timeout = 50000) @Test (timeout = 50000)
public void testHeadroom() throws Exception { public void testHeadroom() throws Exception {
@ -287,7 +399,6 @@ public class TestFifoScheduler {
"user"); "user");
fs.handle(event2); fs.handle(event2);
List<ContainerStatus> emptyStatus = new ArrayList<ContainerStatus>();
List<ContainerId> emptyId = new ArrayList<ContainerId>(); List<ContainerId> emptyId = new ArrayList<ContainerId>();
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>(); List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();