YARN-1333. Support blacklisting in the Fair Scheduler (Tsuyoshi Ozawa via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1535902 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-10-26 00:34:14 +00:00
parent 9c78f870d8
commit 2301e3a163
7 changed files with 114 additions and 26 deletions

View File

@ -93,6 +93,9 @@ Release 2.2.1 - UNRELEASED
YARN-1335. Move duplicate code from FSSchedulerApp and FiCaSchedulerApp
into SchedulerApplication (Sandy Ryza)
YARN-1333. Support blacklisting in the Fair Scheduler (Tsuyoshi Ozawa via
Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.commons.logging.Log;
public class SchedulerAppUtils {
public static boolean isBlacklisted(SchedulerApplication application,
SchedulerNode node, Log LOG) {
if (application.isBlacklisted(node.getNodeName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'host' " + node.getNodeName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
if (application.isBlacklisted(node.getRackName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping 'rack' " + node.getRackName() +
" for " + application.getApplicationId() +
" since it has been blacklisted");
}
return true;
}
return false;
}
}

View File

@ -57,9 +57,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
@ -823,7 +823,7 @@ public class LeafQueue implements CSQueue {
synchronized (application) {
// Check if this resource is on the blacklist
if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
continue;
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@ -155,6 +156,10 @@ public class FSLeafQueue extends FSQueue {
Collections.sort(appScheds, comparator);
for (AppSchedulable sched : appScheds) {
if (sched.getRunnable()) {
if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
continue;
}
assigned = sched.assignContainer(node);
if (!assigned.equals(Resources.none())) {
break;

View File

@ -882,6 +882,8 @@ public class FairScheduler implements ResourceScheduler {
for (RMContainer container : application.getPreemptionContainers()) {
preemptionContainerIds.add(container.getContainerId());
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom(), preemptionContainerIds);

View File

@ -65,18 +65,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@ -405,7 +396,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
application.showRequests();
synchronized (application) {
// Check if this resource is on the blacklist
if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
continue;
}

View File

@ -22,11 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileWriter;
@ -36,7 +33,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -46,8 +42,6 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -59,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -80,7 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@ -90,16 +82,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.xml.sax.SAXException;
public class TestFairScheduler {
@ -2447,4 +2435,55 @@ public class TestFairScheduler {
assertEquals(2, jerryQueue.getAppSchedulables().size());
assertEquals(2, defaultQueue.getAppSchedulables().size());
}
@SuppressWarnings("resource")
@Test
public void testBlacklistNodes() throws Exception {
final int GB = 1024;
String host = "127.0.0.1";
RMNode node =
MockNodes.newNodeInfo(1, Resources.createResource(16 * GB, 16),
0, host);
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(nodeEvent);
ApplicationAttemptId appAttemptId =
createSchedulingRequest(GB, "root.default", "user", 1);
FSSchedulerApp app = scheduler.applications.get(appAttemptId);
// Verify the blacklist can be updated independent of requesting containers
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
assertTrue(app.isBlacklisted(host));
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
assertFalse(scheduler.applications.get(appAttemptId).isBlacklisted(host));
List<ResourceRequest> update = Arrays.asList(
createResourceRequest(GB, node.getHostName(), 1, 0, true));
// Verify a container does not actually get placed on the blacklisted host
scheduler.allocate(appAttemptId, update,
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
assertTrue(app.isBlacklisted(host));
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Incorrect number of containers allocated", 0, app
.getLiveContainers().size());
// Verify a container gets placed on the empty blacklist
scheduler.allocate(appAttemptId, update,
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
assertFalse(app.isBlacklisted(host));
createSchedulingRequest(GB, "root.default", "user", 1);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Incorrect number of containers allocated", 1, app
.getLiveContainers().size());
}
}