YARN-877. Support resource blacklisting for FifoScheduler. (Junping Du via llu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1498024 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
805fa42025
commit
8a93677f93
|
@ -391,6 +391,9 @@ Release 2.1.0-beta - 2013-07-02
|
||||||
YARN-750. Allow for black-listing resources in YARN API and Impl in CS
|
YARN-750. Allow for black-listing resources in YARN API and Impl in CS
|
||||||
(acmurthy via bikas)
|
(acmurthy via bikas)
|
||||||
|
|
||||||
|
YARN-877. Support resource blacklisting for FifoScheduler.
|
||||||
|
(Junping Du via llu)
|
||||||
|
|
||||||
YARN-686. Flatten NodeReport. (sandyr via tucu)
|
YARN-686. Flatten NodeReport. (sandyr via tucu)
|
||||||
|
|
||||||
YARN-737. Throw some specific exceptions directly instead of wrapping them
|
YARN-737. Throw some specific exceptions directly instead of wrapping them
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||||
|
@ -816,7 +817,7 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
synchronized (application) {
|
synchronized (application) {
|
||||||
// Check if this resource is on the blacklist
|
// Check if this resource is on the blacklist
|
||||||
if (isBlacklisted(application, node)) {
|
if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -903,28 +904,6 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isBlacklisted(FiCaSchedulerApp application, FiCaSchedulerNode node) {
|
|
||||||
if (application.isBlacklisted(node.getHostName())) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Skipping 'host' " + node.getHostName() +
|
|
||||||
" for " + application.getApplicationId() +
|
|
||||||
" since it has been blacklisted");
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (application.isBlacklisted(node.getRackName())) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Skipping 'rack' " + node.getRackName() +
|
|
||||||
" for " + application.getApplicationId() +
|
|
||||||
" since it has been blacklisted");
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized CSAssignment
|
private synchronized CSAssignment
|
||||||
assignReservedContainer(FiCaSchedulerApp application,
|
assignReservedContainer(FiCaSchedulerApp application,
|
||||||
FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
|
FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
|
||||||
|
public class FiCaSchedulerUtils {
|
||||||
|
|
||||||
|
public static boolean isBlacklisted(FiCaSchedulerApp application,
|
||||||
|
FiCaSchedulerNode node, Log LOG) {
|
||||||
|
if (application.isBlacklisted(node.getHostName())) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Skipping 'host' " + node.getHostName() +
|
||||||
|
" for " + application.getApplicationId() +
|
||||||
|
" since it has been blacklisted");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (application.isBlacklisted(node.getRackName())) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Skipping 'rack' " + node.getRackName() +
|
||||||
|
" for " + application.getApplicationId() +
|
||||||
|
" since it has been blacklisted");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||||
|
@ -290,7 +291,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
application.showRequests();
|
application.showRequests();
|
||||||
|
|
||||||
// Update application requests
|
// Update application requests
|
||||||
application.updateResourceRequests(ask, null, null);
|
application.updateResourceRequests(ask, blacklistAdditions, blacklistRemovals);
|
||||||
|
|
||||||
LOG.debug("allocate: post-update" +
|
LOG.debug("allocate: post-update" +
|
||||||
" applicationId=" + applicationAttemptId +
|
" applicationId=" + applicationAttemptId +
|
||||||
|
@ -388,6 +389,11 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
LOG.debug("pre-assignContainers");
|
LOG.debug("pre-assignContainers");
|
||||||
application.showRequests();
|
application.showRequests();
|
||||||
synchronized (application) {
|
synchronized (application) {
|
||||||
|
// Check if this resource is on the blacklist
|
||||||
|
if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
for (Priority priority : application.getPriorities()) {
|
for (Priority priority : application.getPriorities()) {
|
||||||
int maxContainers =
|
int maxContainers =
|
||||||
getMaxAllocatableContainers(application, priority, node,
|
getMaxAllocatableContainers(application, priority, node,
|
||||||
|
|
|
@ -19,13 +19,13 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configurable;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
|
@ -243,7 +242,6 @@ public class TestFifoScheduler {
|
||||||
|
|
||||||
fs.handle(new NodeAddedSchedulerEvent(n1));
|
fs.handle(new NodeAddedSchedulerEvent(n1));
|
||||||
fs.handle(new NodeAddedSchedulerEvent(n2));
|
fs.handle(new NodeAddedSchedulerEvent(n2));
|
||||||
List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
|
|
||||||
fs.handle(new NodeUpdateSchedulerEvent(n1));
|
fs.handle(new NodeUpdateSchedulerEvent(n1));
|
||||||
Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
|
Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
|
||||||
|
|
||||||
|
@ -257,6 +255,120 @@ public class TestFifoScheduler {
|
||||||
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
|
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 50000)
|
||||||
|
public void testBlackListNodes() throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
int rack_num_0 = 0;
|
||||||
|
int rack_num_1 = 1;
|
||||||
|
// Add 4 nodes in 2 racks
|
||||||
|
|
||||||
|
// host_0_0 in rack0
|
||||||
|
String host_0_0 = "127.0.0.1";
|
||||||
|
RMNode n1 =
|
||||||
|
MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, host_0_0);
|
||||||
|
fs.handle(new NodeAddedSchedulerEvent(n1));
|
||||||
|
|
||||||
|
// host_0_1 in rack0
|
||||||
|
String host_0_1 = "127.0.0.2";
|
||||||
|
RMNode n2 =
|
||||||
|
MockNodes.newNodeInfo(rack_num_0, MockNodes.newResource(4 * GB), 1, host_0_1);
|
||||||
|
fs.handle(new NodeAddedSchedulerEvent(n2));
|
||||||
|
|
||||||
|
// host_1_0 in rack1
|
||||||
|
String host_1_0 = "127.0.0.3";
|
||||||
|
RMNode n3 =
|
||||||
|
MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, host_1_0);
|
||||||
|
fs.handle(new NodeAddedSchedulerEvent(n3));
|
||||||
|
|
||||||
|
// host_1_1 in rack1
|
||||||
|
String host_1_1 = "127.0.0.4";
|
||||||
|
RMNode n4 =
|
||||||
|
MockNodes.newNodeInfo(rack_num_1, MockNodes.newResource(4 * GB), 1, host_1_1);
|
||||||
|
fs.handle(new NodeAddedSchedulerEvent(n4));
|
||||||
|
|
||||||
|
// Add one application
|
||||||
|
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
||||||
|
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
||||||
|
appId1, 1);
|
||||||
|
SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue",
|
||||||
|
"user");
|
||||||
|
fs.handle(event1);
|
||||||
|
|
||||||
|
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
||||||
|
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
|
||||||
|
|
||||||
|
// Allow rack-locality for rack_1, but blacklist host_1_0
|
||||||
|
|
||||||
|
// Set up resource requests
|
||||||
|
// Ask for a 1 GB container for app 1
|
||||||
|
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
|
||||||
|
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
|
||||||
|
"rack1", BuilderUtils.newResource(GB, 1), 1));
|
||||||
|
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
|
||||||
|
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
|
||||||
|
fs.allocate(appAttemptId1, ask1, emptyId, Collections.singletonList(host_1_0), null);
|
||||||
|
|
||||||
|
// Trigger container assignment
|
||||||
|
fs.handle(new NodeUpdateSchedulerEvent(n3));
|
||||||
|
|
||||||
|
// Get the allocation for the application and verify no allocation on blacklist node
|
||||||
|
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
|
||||||
|
|
||||||
|
Assert.assertEquals("allocation1", 0, allocation1.getContainers().size());
|
||||||
|
|
||||||
|
// verify host_1_1 can get allocated as not in blacklist
|
||||||
|
fs.handle(new NodeUpdateSchedulerEvent(n4));
|
||||||
|
Allocation allocation2 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
|
||||||
|
Assert.assertEquals("allocation2", 1, allocation2.getContainers().size());
|
||||||
|
List<Container> containerList = allocation2.getContainers();
|
||||||
|
for (Container container : containerList) {
|
||||||
|
Assert.assertEquals("Container is allocated on n4",
|
||||||
|
container.getNodeId(), n4.getNodeID());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ask for a 1 GB container again for app 1
|
||||||
|
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
|
||||||
|
// this time, rack0 is also in blacklist, so only host_1_1 is available to
|
||||||
|
// be assigned
|
||||||
|
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
|
||||||
|
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
|
||||||
|
fs.allocate(appAttemptId1, ask2, emptyId, Collections.singletonList("rack0"), null);
|
||||||
|
|
||||||
|
// verify n1 is not qualified to be allocated
|
||||||
|
fs.handle(new NodeUpdateSchedulerEvent(n1));
|
||||||
|
Allocation allocation3 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
|
||||||
|
Assert.assertEquals("allocation3", 0, allocation3.getContainers().size());
|
||||||
|
|
||||||
|
// verify n2 is not qualified to be allocated
|
||||||
|
fs.handle(new NodeUpdateSchedulerEvent(n2));
|
||||||
|
Allocation allocation4 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
|
||||||
|
Assert.assertEquals("allocation4", 0, allocation4.getContainers().size());
|
||||||
|
|
||||||
|
// verify n3 is not qualified to be allocated
|
||||||
|
fs.handle(new NodeUpdateSchedulerEvent(n3));
|
||||||
|
Allocation allocation5 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
|
||||||
|
Assert.assertEquals("allocation5", 0, allocation5.getContainers().size());
|
||||||
|
|
||||||
|
fs.handle(new NodeUpdateSchedulerEvent(n4));
|
||||||
|
Allocation allocation6 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null);
|
||||||
|
Assert.assertEquals("allocation6", 1, allocation6.getContainers().size());
|
||||||
|
|
||||||
|
containerList = allocation6.getContainers();
|
||||||
|
for (Container container : containerList) {
|
||||||
|
Assert.assertEquals("Container is allocated on n4",
|
||||||
|
container.getNodeId(), n4.getNodeID());
|
||||||
|
}
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 50000)
|
@Test (timeout = 50000)
|
||||||
public void testHeadroom() throws Exception {
|
public void testHeadroom() throws Exception {
|
||||||
|
|
||||||
|
@ -287,7 +399,6 @@ public class TestFifoScheduler {
|
||||||
"user");
|
"user");
|
||||||
fs.handle(event2);
|
fs.handle(event2);
|
||||||
|
|
||||||
List<ContainerStatus> emptyStatus = new ArrayList<ContainerStatus>();
|
|
||||||
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
||||||
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue