YARN-2073. Fair Scheduler: Add a utilization threshold to prevent preempting resources when cluster is free (Karthik Kambatla via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1597210 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1f47d35523
commit
87bc454e86
|
@ -89,6 +89,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-2059. Added admin ACLs support to Timeline Server. (Zhijie Shen via
|
YARN-2059. Added admin ACLs support to Timeline Server. (Zhijie Shen via
|
||||||
vinodkv)
|
vinodkv)
|
||||||
|
|
||||||
|
YARN-2073. Fair Scheduler: Add a utilization threshold to prevent preempting
|
||||||
|
resources when cluster is free (Karthik Kambatla via Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
|
|
@ -148,7 +148,11 @@ public class FairScheduler extends
|
||||||
// Time we last ran preemptTasksIfNecessary
|
// Time we last ran preemptTasksIfNecessary
|
||||||
private long lastPreemptCheckTime;
|
private long lastPreemptCheckTime;
|
||||||
|
|
||||||
// How often tasks are preempted
|
// Preemption related variables
|
||||||
|
protected boolean preemptionEnabled;
|
||||||
|
protected float preemptionUtilizationThreshold;
|
||||||
|
|
||||||
|
// How often tasks are preempted
|
||||||
protected long preemptionInterval;
|
protected long preemptionInterval;
|
||||||
|
|
||||||
// ms to wait before force killing stuff (must be longer than a couple
|
// ms to wait before force killing stuff (must be longer than a couple
|
||||||
|
@ -158,7 +162,6 @@ public class FairScheduler extends
|
||||||
// Containers whose AMs have been warned that they will be preempted soon.
|
// Containers whose AMs have been warned that they will be preempted soon.
|
||||||
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
|
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
|
||||||
|
|
||||||
protected boolean preemptionEnabled;
|
|
||||||
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
|
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
|
||||||
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
|
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
|
||||||
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
|
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
|
||||||
|
@ -318,7 +321,7 @@ public class FairScheduler extends
|
||||||
* and then select the right ones using preemptTasks.
|
* and then select the right ones using preemptTasks.
|
||||||
*/
|
*/
|
||||||
protected synchronized void preemptTasksIfNecessary() {
|
protected synchronized void preemptTasksIfNecessary() {
|
||||||
if (!preemptionEnabled) {
|
if (!shouldAttemptPreemption()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,10 +331,9 @@ public class FairScheduler extends
|
||||||
}
|
}
|
||||||
lastPreemptCheckTime = curTime;
|
lastPreemptCheckTime = curTime;
|
||||||
|
|
||||||
Resource resToPreempt = Resources.none();
|
Resource resToPreempt = Resources.clone(Resources.none());
|
||||||
|
|
||||||
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
||||||
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
|
Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
|
||||||
}
|
}
|
||||||
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
|
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
|
||||||
Resources.none())) {
|
Resources.none())) {
|
||||||
|
@ -1067,6 +1069,22 @@ public class FairScheduler extends
|
||||||
clusterResource, rootMetrics.getAllocatedResources()));
|
clusterResource, rootMetrics.getAllocatedResources()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if preemption is enabled and the utilization threshold for
|
||||||
|
* preemption is met.
|
||||||
|
*
|
||||||
|
* @return true if preemption should be attempted, false otherwise.
|
||||||
|
*/
|
||||||
|
private boolean shouldAttemptPreemption() {
|
||||||
|
if (preemptionEnabled) {
|
||||||
|
return (preemptionUtilizationThreshold < Math.max(
|
||||||
|
(float) rootMetrics.getAvailableMB() / clusterResource.getMemory(),
|
||||||
|
(float) rootMetrics.getAvailableVirtualCores() /
|
||||||
|
clusterResource.getVirtualCores()));
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueueMetrics getRootQueueMetrics() {
|
public QueueMetrics getRootQueueMetrics() {
|
||||||
return rootMetrics;
|
return rootMetrics;
|
||||||
|
@ -1172,6 +1190,8 @@ public class FairScheduler extends
|
||||||
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
|
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
|
||||||
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
|
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
|
||||||
preemptionEnabled = this.conf.getPreemptionEnabled();
|
preemptionEnabled = this.conf.getPreemptionEnabled();
|
||||||
|
preemptionUtilizationThreshold =
|
||||||
|
this.conf.getPreemptionUtilizationThreshold();
|
||||||
assignMultiple = this.conf.getAssignMultiple();
|
assignMultiple = this.conf.getAssignMultiple();
|
||||||
maxAssign = this.conf.getMaxAssign();
|
maxAssign = this.conf.getMaxAssign();
|
||||||
sizeBasedWeight = this.conf.getSizeBasedWeight();
|
sizeBasedWeight = this.conf.getSizeBasedWeight();
|
||||||
|
|
|
@ -101,6 +101,10 @@ public class FairSchedulerConfiguration extends Configuration {
|
||||||
/** Whether preemption is enabled. */
|
/** Whether preemption is enabled. */
|
||||||
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
|
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
|
||||||
protected static final boolean DEFAULT_PREEMPTION = false;
|
protected static final boolean DEFAULT_PREEMPTION = false;
|
||||||
|
|
||||||
|
protected static final String PREEMPTION_THRESHOLD =
|
||||||
|
CONF_PREFIX + "preemption.cluster-utilization-threshold";
|
||||||
|
protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
|
||||||
|
|
||||||
protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
|
protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
|
||||||
protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
|
protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
|
||||||
|
@ -185,6 +189,10 @@ public class FairSchedulerConfiguration extends Configuration {
|
||||||
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
|
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public float getPreemptionUtilizationThreshold() {
|
||||||
|
return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean getAssignMultiple() {
|
public boolean getAssignMultiple() {
|
||||||
return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
|
return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class FairSchedulerTestBase {
|
||||||
|
protected static class MockClock implements Clock {
|
||||||
|
private long time = 0;
|
||||||
|
@Override
|
||||||
|
public long getTime() {
|
||||||
|
return time;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tick(int seconds) {
|
||||||
|
time = time + seconds * 1000;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final static String TEST_DIR =
|
||||||
|
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
|
||||||
|
|
||||||
|
private static RecordFactory
|
||||||
|
recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
|
protected int APP_ID = 1; // Incrementing counter for scheduling apps
|
||||||
|
protected int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
|
||||||
|
|
||||||
|
protected Configuration conf;
|
||||||
|
protected FairScheduler scheduler;
|
||||||
|
protected ResourceManager resourceManager;
|
||||||
|
|
||||||
|
// Helper methods
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
|
||||||
|
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
||||||
|
1024);
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
|
||||||
|
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
|
||||||
|
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
|
||||||
|
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
|
||||||
|
return ApplicationAttemptId.newInstance(appIdImpl, attemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ResourceRequest createResourceRequest(
|
||||||
|
int memory, String host, int priority, int numContainers,
|
||||||
|
boolean relaxLocality) {
|
||||||
|
return createResourceRequest(memory, 1, host, priority, numContainers,
|
||||||
|
relaxLocality);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ResourceRequest createResourceRequest(
|
||||||
|
int memory, int vcores, String host, int priority, int numContainers,
|
||||||
|
boolean relaxLocality) {
|
||||||
|
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
|
||||||
|
request.setCapability(BuilderUtils.newResource(memory, vcores));
|
||||||
|
request.setResourceName(host);
|
||||||
|
request.setNumContainers(numContainers);
|
||||||
|
Priority prio = recordFactory.newRecordInstance(Priority.class);
|
||||||
|
prio.setPriority(priority);
|
||||||
|
request.setPriority(prio);
|
||||||
|
request.setRelaxLocality(relaxLocality);
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a single container priority-1 request and submits to
|
||||||
|
* scheduler.
|
||||||
|
*/
|
||||||
|
protected ApplicationAttemptId createSchedulingRequest(
|
||||||
|
int memory, String queueId, String userId) {
|
||||||
|
return createSchedulingRequest(memory, queueId, userId, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationAttemptId createSchedulingRequest(
|
||||||
|
int memory, int vcores, String queueId, String userId) {
|
||||||
|
return createSchedulingRequest(memory, vcores, queueId, userId, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationAttemptId createSchedulingRequest(
|
||||||
|
int memory, String queueId, String userId, int numContainers) {
|
||||||
|
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationAttemptId createSchedulingRequest(
|
||||||
|
int memory, int vcores, String queueId, String userId, int numContainers) {
|
||||||
|
return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationAttemptId createSchedulingRequest(
|
||||||
|
int memory, String queueId, String userId, int numContainers, int priority) {
|
||||||
|
return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
|
||||||
|
priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationAttemptId createSchedulingRequest(
|
||||||
|
int memory, int vcores, String queueId, String userId, int numContainers,
|
||||||
|
int priority) {
|
||||||
|
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
|
scheduler.addApplication(id.getApplicationId(), queueId, userId);
|
||||||
|
// This conditional is for testAclSubmitApplication where app is rejected
|
||||||
|
// and no app is added.
|
||||||
|
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
|
||||||
|
scheduler.addApplicationAttempt(id, false);
|
||||||
|
}
|
||||||
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
|
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||||
|
priority, numContainers, true);
|
||||||
|
ask.add(request);
|
||||||
|
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void createSchedulingRequestExistingApplication(
|
||||||
|
int memory, int priority, ApplicationAttemptId attId) {
|
||||||
|
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
|
||||||
|
priority, 1, true);
|
||||||
|
createSchedulingRequestExistingApplication(request, attId);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void createSchedulingRequestExistingApplication(
|
||||||
|
int memory, int vcores, int priority, ApplicationAttemptId attId) {
|
||||||
|
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||||
|
priority, 1, true);
|
||||||
|
createSchedulingRequestExistingApplication(request, attId);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void createSchedulingRequestExistingApplication(
|
||||||
|
ResourceRequest request, ApplicationAttemptId attId) {
|
||||||
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
|
ask.add(request);
|
||||||
|
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
|
@ -63,8 +62,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -94,7 +91,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -105,46 +101,14 @@ import org.xml.sax.SAXException;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class TestFairScheduler {
|
public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
|
private final static String ALLOC_FILE =
|
||||||
|
new File(TEST_DIR, "test-queues").getAbsolutePath();
|
||||||
|
|
||||||
static class MockClock implements Clock {
|
|
||||||
private long time = 0;
|
|
||||||
@Override
|
|
||||||
public long getTime() {
|
|
||||||
return time;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void tick(int seconds) {
|
|
||||||
time = time + seconds * 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
final static String TEST_DIR = new File(System.getProperty("test.build.data",
|
|
||||||
"/tmp")).getAbsolutePath();
|
|
||||||
|
|
||||||
final static String ALLOC_FILE = new File(TEST_DIR,
|
|
||||||
"test-queues").getAbsolutePath();
|
|
||||||
|
|
||||||
private FairScheduler scheduler;
|
|
||||||
private ResourceManager resourceManager;
|
|
||||||
private Configuration conf;
|
|
||||||
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
||||||
|
|
||||||
private int APP_ID = 1; // Incrementing counter for schedling apps
|
|
||||||
private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
|
|
||||||
|
|
||||||
// HELPER METHODS
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
scheduler = new FairScheduler();
|
scheduler = new FairScheduler();
|
||||||
conf = createConfiguration();
|
conf = createConfiguration();
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
|
|
||||||
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
||||||
1024);
|
|
||||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
|
|
||||||
// All tests assume only one assignment per node update
|
|
||||||
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
|
|
||||||
resourceManager = new ResourceManager();
|
resourceManager = new ResourceManager();
|
||||||
resourceManager.init(conf);
|
resourceManager.init(conf);
|
||||||
|
|
||||||
|
@ -198,107 +162,6 @@ public class TestFairScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration createConfiguration() {
|
|
||||||
Configuration conf = new YarnConfiguration();
|
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
|
|
||||||
ResourceScheduler.class);
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
|
|
||||||
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
|
|
||||||
ApplicationAttemptId attId =
|
|
||||||
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
|
|
||||||
return attId;
|
|
||||||
}
|
|
||||||
|
|
||||||
private ResourceRequest createResourceRequest(int memory, String host,
|
|
||||||
int priority, int numContainers, boolean relaxLocality) {
|
|
||||||
return createResourceRequest(memory, 1, host, priority, numContainers,
|
|
||||||
relaxLocality);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ResourceRequest createResourceRequest(int memory, int vcores, String host,
|
|
||||||
int priority, int numContainers, boolean relaxLocality) {
|
|
||||||
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
|
|
||||||
request.setCapability(BuilderUtils.newResource(memory, vcores));
|
|
||||||
request.setResourceName(host);
|
|
||||||
request.setNumContainers(numContainers);
|
|
||||||
Priority prio = recordFactory.newRecordInstance(Priority.class);
|
|
||||||
prio.setPriority(priority);
|
|
||||||
request.setPriority(prio);
|
|
||||||
request.setRelaxLocality(relaxLocality);
|
|
||||||
return request;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a single container priority-1 request and submits to
|
|
||||||
* scheduler.
|
|
||||||
*/
|
|
||||||
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
|
|
||||||
String userId) {
|
|
||||||
return createSchedulingRequest(memory, queueId, userId, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
|
||||||
String queueId, String userId) {
|
|
||||||
return createSchedulingRequest(memory, vcores, queueId, userId, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
|
|
||||||
String userId, int numContainers) {
|
|
||||||
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
|
||||||
String queueId, String userId, int numContainers) {
|
|
||||||
return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
|
|
||||||
String userId, int numContainers, int priority) {
|
|
||||||
return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
|
|
||||||
priority);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
|
||||||
String queueId, String userId, int numContainers, int priority) {
|
|
||||||
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
|
||||||
scheduler.addApplication(id.getApplicationId(), queueId, userId);
|
|
||||||
// This conditional is for testAclSubmitApplication where app is rejected
|
|
||||||
// and no app is added.
|
|
||||||
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
|
|
||||||
scheduler.addApplicationAttempt(id, false);
|
|
||||||
}
|
|
||||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
|
||||||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
|
||||||
priority, numContainers, true);
|
|
||||||
ask.add(request);
|
|
||||||
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createSchedulingRequestExistingApplication(int memory, int priority,
|
|
||||||
ApplicationAttemptId attId) {
|
|
||||||
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
|
|
||||||
priority, 1, true);
|
|
||||||
createSchedulingRequestExistingApplication(request, attId);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createSchedulingRequestExistingApplication(int memory, int vcores,
|
|
||||||
int priority, ApplicationAttemptId attId) {
|
|
||||||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
|
||||||
priority, 1, true);
|
|
||||||
createSchedulingRequestExistingApplication(request, attId);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createSchedulingRequestExistingApplication(ResourceRequest request,
|
|
||||||
ApplicationAttemptId attId) {
|
|
||||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
|
||||||
ask.add(request);
|
|
||||||
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TESTS
|
// TESTS
|
||||||
|
|
||||||
@Test(timeout=2000)
|
@Test(timeout=2000)
|
||||||
|
@ -1455,7 +1318,7 @@ public class TestFairScheduler {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
|
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
public void testMultipleContainersWaitingForReservation() throws IOException {
|
public void testMultipleContainersWaitingForReservation() throws IOException {
|
||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
|
@ -0,0 +1,178 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
|
private final static String ALLOC_FILE = new File(TEST_DIR,
|
||||||
|
TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
|
||||||
|
|
||||||
|
private MockClock clock;
|
||||||
|
|
||||||
|
private static class StubbedFairScheduler extends FairScheduler {
|
||||||
|
public int lastPreemptMemory = -1;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void preemptResources(
|
||||||
|
Collection<FSLeafQueue> scheds, Resource toPreempt) {
|
||||||
|
lastPreemptMemory = toPreempt.getMemory();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void resetLastPreemptResources() {
|
||||||
|
lastPreemptMemory = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
Configuration conf = super.createConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
conf = createConfiguration();
|
||||||
|
clock = new MockClock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() {
|
||||||
|
if (resourceManager != null) {
|
||||||
|
resourceManager.stop();
|
||||||
|
resourceManager = null;
|
||||||
|
}
|
||||||
|
conf = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startResourceManager(float utilizationThreshold) {
|
||||||
|
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
|
||||||
|
utilizationThreshold);
|
||||||
|
resourceManager = new MockRM(conf);
|
||||||
|
resourceManager.start();
|
||||||
|
|
||||||
|
assertTrue(
|
||||||
|
resourceManager.getResourceScheduler() instanceof StubbedFairScheduler);
|
||||||
|
scheduler = (FairScheduler)resourceManager.getResourceScheduler();
|
||||||
|
|
||||||
|
scheduler.setClock(clock);
|
||||||
|
scheduler.UPDATE_INTERVAL = 60 * 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerNodeAndSubmitApp(
|
||||||
|
int memory, int vcores, int appContainers, int appMemory) {
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(
|
||||||
|
1, Resources.createResource(memory, vcores), 1, "node1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
assertEquals("Incorrect amount of resources in the cluster",
|
||||||
|
memory, scheduler.rootMetrics.getAvailableMB());
|
||||||
|
assertEquals("Incorrect amount of resources in the cluster",
|
||||||
|
vcores, scheduler.rootMetrics.getAvailableVirtualCores());
|
||||||
|
|
||||||
|
createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
|
||||||
|
scheduler.update();
|
||||||
|
// Sufficient node check-ins to fully schedule containers
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeUpdate1);
|
||||||
|
}
|
||||||
|
assertEquals("app1's request is not met",
|
||||||
|
memory - appContainers * appMemory,
|
||||||
|
scheduler.rootMetrics.getAvailableMB());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreemptionWithFreeResources() throws Exception {
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"default\">");
|
||||||
|
out.println("<maxResources>0mb,0vcores</maxResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueA\">");
|
||||||
|
out.println("<weight>1</weight>");
|
||||||
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueB\">");
|
||||||
|
out.println("<weight>1</weight>");
|
||||||
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
||||||
|
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
startResourceManager(0f);
|
||||||
|
// Create node with 4GB memory and 4 vcores
|
||||||
|
registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
|
||||||
|
|
||||||
|
// Verify submitting another request doesn't trigger preemption
|
||||||
|
createSchedulingRequest(1024, "queueB", "user1", 1, 1);
|
||||||
|
scheduler.update();
|
||||||
|
clock.tick(6);
|
||||||
|
|
||||||
|
((StubbedFairScheduler) scheduler).resetLastPreemptResources();
|
||||||
|
scheduler.preemptTasksIfNecessary();
|
||||||
|
assertEquals("preemptResources() should have been called", 1024,
|
||||||
|
((StubbedFairScheduler) scheduler).lastPreemptMemory);
|
||||||
|
|
||||||
|
resourceManager.stop();
|
||||||
|
|
||||||
|
startResourceManager(0.8f);
|
||||||
|
// Create node with 4GB memory and 4 vcores
|
||||||
|
registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
|
||||||
|
|
||||||
|
// Verify submitting another request doesn't trigger preemption
|
||||||
|
createSchedulingRequest(1024, "queueB", "user1", 1, 1);
|
||||||
|
scheduler.update();
|
||||||
|
clock.tick(6);
|
||||||
|
|
||||||
|
((StubbedFairScheduler) scheduler).resetLastPreemptResources();
|
||||||
|
scheduler.preemptTasksIfNecessary();
|
||||||
|
assertEquals("preemptResources() should not have been called", -1,
|
||||||
|
((StubbedFairScheduler) scheduler).lastPreemptMemory);
|
||||||
|
}
|
||||||
|
}
|
|
@ -156,6 +156,12 @@ Properties that can be placed in yarn-site.xml
|
||||||
* Whether to use preemption. Note that preemption is experimental in the current
|
* Whether to use preemption. Note that preemption is experimental in the current
|
||||||
version. Defaults to false.
|
version. Defaults to false.
|
||||||
|
|
||||||
|
* <<<yarn.scheduler.fair.preemption.cluster-utilization-threshold>>>
|
||||||
|
|
||||||
|
* The utilization threshold after which preemption kicks in. The
|
||||||
|
utilization is computed as the maximum ratio of usage to capacity among
|
||||||
|
all resources. Defaults to 0.8f.
|
||||||
|
|
||||||
* <<<yarn.scheduler.fair.sizebasedweight>>>
|
* <<<yarn.scheduler.fair.sizebasedweight>>>
|
||||||
|
|
||||||
* Whether to assign shares to individual apps based on their size, rather than
|
* Whether to assign shares to individual apps based on their size, rather than
|
||||||
|
|
Loading…
Reference in New Issue