YARN-1333. Support blacklisting in the Fair Scheduler (Tsuyoshi Ozawa via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1535899 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5829029154
commit
025f171947
|
@ -111,6 +111,9 @@ Release 2.2.1 - UNRELEASED
|
|||
YARN-1335. Move duplicate code from FSSchedulerApp and FiCaSchedulerApp
|
||||
into SchedulerApplication (Sandy Ryza)
|
||||
|
||||
YARN-1333. Support blacklisting in the Fair Scheduler (Tsuyoshi Ozawa via
|
||||
Sandy Ryza)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -57,9 +57,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||
|
@ -823,7 +823,7 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
synchronized (application) {
|
||||
// Check if this resource is on the blacklist
|
||||
if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
|
||||
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@Private
|
||||
|
@ -155,6 +156,10 @@ public class FSLeafQueue extends FSQueue {
|
|||
Collections.sort(appScheds, comparator);
|
||||
for (AppSchedulable sched : appScheds) {
|
||||
if (sched.getRunnable()) {
|
||||
if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assigned = sched.assignContainer(node);
|
||||
if (!assigned.equals(Resources.none())) {
|
||||
break;
|
||||
|
|
|
@ -882,6 +882,8 @@ public class FairScheduler implements ResourceScheduler {
|
|||
for (RMContainer container : application.getPreemptionContainers()) {
|
||||
preemptionContainerIds.add(container.getContainerId());
|
||||
}
|
||||
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
|
||||
return new Allocation(application.pullNewlyAllocatedContainers(),
|
||||
application.getHeadroom(), preemptionContainerIds);
|
||||
|
|
|
@ -65,18 +65,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||
|
@ -405,7 +396,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
application.showRequests();
|
||||
synchronized (application) {
|
||||
// Check if this resource is on the blacklist
|
||||
if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
|
||||
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,11 +22,8 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
|
@ -36,7 +33,6 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -46,8 +42,6 @@ import junit.framework.Assert;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -59,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
@ -80,7 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
|
@ -90,16 +82,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
public class TestFairScheduler {
|
||||
|
@ -2447,4 +2435,55 @@ public class TestFairScheduler {
|
|||
assertEquals(2, jerryQueue.getAppSchedulables().size());
|
||||
assertEquals(2, defaultQueue.getAppSchedulables().size());
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test
|
||||
public void testBlacklistNodes() throws Exception {
|
||||
final int GB = 1024;
|
||||
String host = "127.0.0.1";
|
||||
RMNode node =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(16 * GB, 16),
|
||||
0, host);
|
||||
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||
scheduler.handle(nodeEvent);
|
||||
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createSchedulingRequest(GB, "root.default", "user", 1);
|
||||
FSSchedulerApp app = scheduler.applications.get(appAttemptId);
|
||||
|
||||
// Verify the blacklist can be updated independent of requesting containers
|
||||
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
|
||||
Collections.<ContainerId>emptyList(),
|
||||
Collections.singletonList(host), null);
|
||||
assertTrue(app.isBlacklisted(host));
|
||||
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
|
||||
Collections.<ContainerId>emptyList(), null,
|
||||
Collections.singletonList(host));
|
||||
assertFalse(scheduler.applications.get(appAttemptId).isBlacklisted(host));
|
||||
|
||||
List<ResourceRequest> update = Arrays.asList(
|
||||
createResourceRequest(GB, node.getHostName(), 1, 0, true));
|
||||
|
||||
// Verify a container does not actually get placed on the blacklisted host
|
||||
scheduler.allocate(appAttemptId, update,
|
||||
Collections.<ContainerId>emptyList(),
|
||||
Collections.singletonList(host), null);
|
||||
assertTrue(app.isBlacklisted(host));
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
assertEquals("Incorrect number of containers allocated", 0, app
|
||||
.getLiveContainers().size());
|
||||
|
||||
// Verify a container gets placed on the empty blacklist
|
||||
scheduler.allocate(appAttemptId, update,
|
||||
Collections.<ContainerId>emptyList(), null,
|
||||
Collections.singletonList(host));
|
||||
assertFalse(app.isBlacklisted(host));
|
||||
createSchedulingRequest(GB, "root.default", "user", 1);
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
assertEquals("Incorrect number of containers allocated", 1, app
|
||||
.getLiveContainers().size());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue