diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c74e777d400..48822a8e36c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -89,6 +89,9 @@ Release 2.5.0 - UNRELEASED YARN-2059. Added admin ACLs support to Timeline Server. (Zhijie Shen via vinodkv) + + YARN-2073. Fair Scheduler: Add a utilization threshold to prevent preempting + resources when cluster is free (Karthik Kambatla via Sandy Ryza) OPTIMIZATIONS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5eeda64af0e..830f6f75099 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -148,7 +148,11 @@ public class FairScheduler extends // Time we last ran preemptTasksIfNecessary private long lastPreemptCheckTime; - // How often tasks are preempted + // Preemption related variables + protected boolean preemptionEnabled; + protected float preemptionUtilizationThreshold; + + // How often tasks are preempted protected long preemptionInterval; // ms to wait before force killing stuff (must be longer than a couple @@ -158,7 +162,6 @@ public class FairScheduler extends // Containers whose AMs have been warned that they will be preempted soon. private List warnedContainers = new ArrayList(); - protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not @@ -318,7 +321,7 @@ public class FairScheduler extends * and then select the right ones using preemptTasks. */ protected synchronized void preemptTasksIfNecessary() { - if (!preemptionEnabled) { + if (!shouldAttemptPreemption()) { return; } @@ -328,10 +331,9 @@ public class FairScheduler extends } lastPreemptCheckTime = curTime; - Resource resToPreempt = Resources.none(); - + Resource resToPreempt = Resources.clone(Resources.none()); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); + Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); } if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { @@ -1067,6 +1069,22 @@ public class FairScheduler extends clusterResource, rootMetrics.getAllocatedResources())); } + /** + * Check if preemption is enabled and the utilization threshold for + * preemption is met. + * + * @return true if preemption should be attempted, false otherwise. + */ + private boolean shouldAttemptPreemption() { + if (preemptionEnabled) { + return (preemptionUtilizationThreshold < Math.max( + (float) rootMetrics.getAvailableMB() / clusterResource.getMemory(), + (float) rootMetrics.getAvailableVirtualCores() / + clusterResource.getVirtualCores())); + } + return false; + } + @Override public QueueMetrics getRootQueueMetrics() { return rootMetrics; @@ -1172,6 +1190,8 @@ public class FairScheduler extends nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); preemptionEnabled = this.conf.getPreemptionEnabled(); + preemptionUtilizationThreshold = + this.conf.getPreemptionUtilizationThreshold(); assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); sizeBasedWeight = this.conf.getSizeBasedWeight(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index e271b053737..0fd242d662d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -101,6 +101,10 @@ public class FairSchedulerConfiguration extends Configuration { /** Whether preemption is enabled. */ protected static final String PREEMPTION = CONF_PREFIX + "preemption"; protected static final boolean DEFAULT_PREEMPTION = false; + + protected static final String PREEMPTION_THRESHOLD = + CONF_PREFIX + "preemption.cluster-utilization-threshold"; + protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f; protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval"; protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000; @@ -185,6 +189,10 @@ public class FairSchedulerConfiguration extends Configuration { return getBoolean(PREEMPTION, DEFAULT_PREEMPTION); } + public float getPreemptionUtilizationThreshold() { + return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD); + } + public boolean getAssignMultiple() { return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java new file mode 100644 index 00000000000..5f926763d9b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Clock; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class FairSchedulerTestBase { + protected static class MockClock implements Clock { + private long time = 0; + @Override + public long getTime() { + return time; + } + + public void tick(int seconds) { + time = time + seconds * 1000; + } + } + + protected final static String TEST_DIR = + new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); + + private static RecordFactory + recordFactory = RecordFactoryProvider.getRecordFactory(null); + + protected int APP_ID = 1; // Incrementing counter for scheduling apps + protected int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts + + protected Configuration conf; + protected FairScheduler scheduler; + protected ResourceManager resourceManager; + + // Helper methods + protected Configuration createConfiguration() { + Configuration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); + conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + 1024); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + return conf; + } + + protected ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { + ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); + return ApplicationAttemptId.newInstance(appIdImpl, attemptId); + } + + protected ResourceRequest createResourceRequest( + int memory, String host, int priority, int numContainers, + boolean relaxLocality) { + return createResourceRequest(memory, 1, host, priority, numContainers, + relaxLocality); + } + + protected ResourceRequest createResourceRequest( + int memory, int vcores, String host, int priority, int numContainers, + boolean relaxLocality) { + ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); + request.setCapability(BuilderUtils.newResource(memory, vcores)); + request.setResourceName(host); + request.setNumContainers(numContainers); + Priority prio = recordFactory.newRecordInstance(Priority.class); + prio.setPriority(priority); + request.setPriority(prio); + request.setRelaxLocality(relaxLocality); + return request; + } + + /** + * Creates a single container priority-1 request and submits to + * scheduler. + */ + protected ApplicationAttemptId createSchedulingRequest( + int memory, String queueId, String userId) { + return createSchedulingRequest(memory, queueId, userId, 1); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId) { + return createSchedulingRequest(memory, vcores, queueId, userId, 1); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, String queueId, String userId, int numContainers) { + return createSchedulingRequest(memory, queueId, userId, numContainers, 1); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers) { + return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, String queueId, String userId, int numContainers, int priority) { + return createSchedulingRequest(memory, 1, queueId, userId, numContainers, + priority); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority) { + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + scheduler.addApplication(id.getApplicationId(), queueId, userId); + // This conditional is for testAclSubmitApplication where app is rejected + // and no app is added. + if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { + scheduler.addApplicationAttempt(id, false); + } + List ask = new ArrayList(); + ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, + priority, numContainers, true); + ask.add(request); + scheduler.allocate(id, ask, new ArrayList(), null, null); + return id; + } + + protected void createSchedulingRequestExistingApplication( + int memory, int priority, ApplicationAttemptId attId) { + ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, + priority, 1, true); + createSchedulingRequestExistingApplication(request, attId); + } + + protected void createSchedulingRequestExistingApplication( + int memory, int vcores, int priority, ApplicationAttemptId attId) { + ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, + priority, 1, true); + createSchedulingRequestExistingApplication(request, attId); + } + + protected void createSchedulingRequestExistingApplication( + ResourceRequest request, ApplicationAttemptId attId) { + List ask = new ArrayList(); + ask.add(request); + scheduler.allocate(attId, ask, new ArrayList(), null, null); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 0c4953e0ddc..32f838713c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; @@ -63,8 +62,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -94,7 +91,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; @@ -105,46 +101,14 @@ import org.xml.sax.SAXException; import com.google.common.collect.Sets; @SuppressWarnings("unchecked") -public class TestFairScheduler { +public class TestFairScheduler extends FairSchedulerTestBase { + private final static String ALLOC_FILE = + new File(TEST_DIR, "test-queues").getAbsolutePath(); - static class MockClock implements Clock { - private long time = 0; - @Override - public long getTime() { - return time; - } - - public void tick(int seconds) { - time = time + seconds * 1000; - } - - } - - final static String TEST_DIR = new File(System.getProperty("test.build.data", - "/tmp")).getAbsolutePath(); - - final static String ALLOC_FILE = new File(TEST_DIR, - "test-queues").getAbsolutePath(); - - private FairScheduler scheduler; - private ResourceManager resourceManager; - private Configuration conf; - private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - - private int APP_ID = 1; // Incrementing counter for schedling apps - private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts - - // HELPER METHODS @Before public void setUp() throws IOException { scheduler = new FairScheduler(); conf = createConfiguration(); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); - conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, - 1024); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); - // All tests assume only one assignment per node update - conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); resourceManager = new ResourceManager(); resourceManager.init(conf); @@ -198,107 +162,6 @@ public class TestFairScheduler { } } - private Configuration createConfiguration() { - Configuration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, - ResourceScheduler.class); - return conf; - } - - private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { - ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); - ApplicationAttemptId attId = - ApplicationAttemptId.newInstance(appIdImpl, attemptId); - return attId; - } - - private ResourceRequest createResourceRequest(int memory, String host, - int priority, int numContainers, boolean relaxLocality) { - return createResourceRequest(memory, 1, host, priority, numContainers, - relaxLocality); - } - - private ResourceRequest createResourceRequest(int memory, int vcores, String host, - int priority, int numContainers, boolean relaxLocality) { - ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); - request.setCapability(BuilderUtils.newResource(memory, vcores)); - request.setResourceName(host); - request.setNumContainers(numContainers); - Priority prio = recordFactory.newRecordInstance(Priority.class); - prio.setPriority(priority); - request.setPriority(prio); - request.setRelaxLocality(relaxLocality); - return request; - } - - /** - * Creates a single container priority-1 request and submits to - * scheduler. - */ - private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, - String userId) { - return createSchedulingRequest(memory, queueId, userId, 1); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, - String queueId, String userId) { - return createSchedulingRequest(memory, vcores, queueId, userId, 1); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, - String userId, int numContainers) { - return createSchedulingRequest(memory, queueId, userId, numContainers, 1); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, - String queueId, String userId, int numContainers) { - return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, - String userId, int numContainers, int priority) { - return createSchedulingRequest(memory, 1, queueId, userId, numContainers, - priority); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, - String queueId, String userId, int numContainers, int priority) { - ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId); - // This conditional is for testAclSubmitApplication where app is rejected - // and no app is added. - if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false); - } - List ask = new ArrayList(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); - ask.add(request); - scheduler.allocate(id, ask, new ArrayList(), null, null); - return id; - } - - private void createSchedulingRequestExistingApplication(int memory, int priority, - ApplicationAttemptId attId) { - ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, - priority, 1, true); - createSchedulingRequestExistingApplication(request, attId); - } - - private void createSchedulingRequestExistingApplication(int memory, int vcores, - int priority, ApplicationAttemptId attId) { - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, 1, true); - createSchedulingRequestExistingApplication(request, attId); - } - - private void createSchedulingRequestExistingApplication(ResourceRequest request, - ApplicationAttemptId attId) { - List ask = new ArrayList(); - ask.add(request); - scheduler.allocate(attId, ask, new ArrayList(), null, null); - } - // TESTS @Test(timeout=2000) @@ -1455,7 +1318,7 @@ public class TestFairScheduler { assertEquals( 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); } - + @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { scheduler.reinitialize(conf, resourceManager.getRMContext()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java new file mode 100644 index 00000000000..b3ab299ea88 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class TestFairSchedulerPreemption extends FairSchedulerTestBase { + private final static String ALLOC_FILE = new File(TEST_DIR, + TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath(); + + private MockClock clock; + + private static class StubbedFairScheduler extends FairScheduler { + public int lastPreemptMemory = -1; + + @Override + protected void preemptResources( + Collection scheds, Resource toPreempt) { + lastPreemptMemory = toPreempt.getMemory(); + } + + public void resetLastPreemptResources() { + lastPreemptMemory = -1; + } + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class, + ResourceScheduler.class); + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + return conf; + } + + @Before + public void setup() throws IOException { + conf = createConfiguration(); + clock = new MockClock(); + } + + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + conf = null; + } + + private void startResourceManager(float utilizationThreshold) { + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, + utilizationThreshold); + resourceManager = new MockRM(conf); + resourceManager.start(); + + assertTrue( + resourceManager.getResourceScheduler() instanceof StubbedFairScheduler); + scheduler = (FairScheduler)resourceManager.getResourceScheduler(); + + scheduler.setClock(clock); + scheduler.UPDATE_INTERVAL = 60 * 1000; + } + + private void registerNodeAndSubmitApp( + int memory, int vcores, int appContainers, int appMemory) { + RMNode node1 = MockNodes.newNodeInfo( + 1, Resources.createResource(memory, vcores), 1, "node1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + assertEquals("Incorrect amount of resources in the cluster", + memory, scheduler.rootMetrics.getAvailableMB()); + assertEquals("Incorrect amount of resources in the cluster", + vcores, scheduler.rootMetrics.getAvailableVirtualCores()); + + createSchedulingRequest(appMemory, "queueA", "user1", appContainers); + scheduler.update(); + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + } + assertEquals("app1's request is not met", + memory - appContainers * appMemory, + scheduler.rootMetrics.getAvailableMB()); + } + + @Test + public void testPreemptionWithFreeResources() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println("1"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println("1"); + out.println("1024mb,0vcores"); + out.println(""); + out.print("5"); + out.print("10"); + out.println(""); + out.close(); + + startResourceManager(0f); + // Create node with 4GB memory and 4 vcores + registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024); + + // Verify submitting another request doesn't trigger preemption + createSchedulingRequest(1024, "queueB", "user1", 1, 1); + scheduler.update(); + clock.tick(6); + + ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); + scheduler.preemptTasksIfNecessary(); + assertEquals("preemptResources() should have been called", 1024, + ((StubbedFairScheduler) scheduler).lastPreemptMemory); + + resourceManager.stop(); + + startResourceManager(0.8f); + // Create node with 4GB memory and 4 vcores + registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024); + + // Verify submitting another request doesn't trigger preemption + createSchedulingRequest(1024, "queueB", "user1", 1, 1); + scheduler.update(); + clock.tick(6); + + ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); + scheduler.preemptTasksIfNecessary(); + assertEquals("preemptResources() should not have been called", -1, + ((StubbedFairScheduler) scheduler).lastPreemptMemory); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 611c390a820..48011cbe50e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -156,6 +156,12 @@ Properties that can be placed in yarn-site.xml * Whether to use preemption. Note that preemption is experimental in the current version. Defaults to false. + * <<>> + + * The utilization threshold after which preemption kicks in. The + utilization is computed as the maximum ratio of usage to capacity among + all resources. Defaults to 0.8f. + * <<>> * Whether to assign shares to individual apps based on their size, rather than