From 6f4c77409b978e5b05227a9c5bfddb94b845c9f5 Mon Sep 17 00:00:00 2001 From: Jian He Date: Thu, 2 Oct 2014 15:13:33 -0700 Subject: [PATCH] YARN-2628. Capacity scheduler with DominantResourceCalculator carries out reservation even though slots are free. Contributed by Varun Vasudev (cherry picked from commit 054f28552687e9b9859c0126e16a2066e20ead3f) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/CapacityScheduler.java | 4 +- .../capacity/TestCapacityScheduler.java | 64 +++++++++++++++++++ 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6aeb961f744..f6e849c0702 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -497,6 +497,9 @@ Release 2.6.0 - UNRELEASED YARN-2527. Fixed the potential NPE in ApplicationACLsManager and added test cases for it. (Benoy Antony via zjshen) + YARN-2628. Capacity scheduler with DominantResourceCalculator carries out + reservation even though slots are free. (Varun Vasudev via jianhe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d847579b674..e0816b5bdb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -941,8 +941,8 @@ public class CapacityScheduler extends // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - if (Resources.greaterThanOrEqual(calculator, getClusterResource(), - node.getAvailableResource(), minimumAllocation)) { + if (calculator.computeAvailableContainers(node.getAvailableResource(), + minimumAllocation) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e029749f1db..e5f4df29595 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -115,6 +116,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; @@ -1995,4 +1997,66 @@ public class TestCapacityScheduler { rm.stop(); } + // Test to ensure that we don't carry out reservation on nodes + // that have no CPU available when using the DominantResourceCalculator + @Test(timeout = 30000) + public void testAppReservationWithDominantResourceCalculator() throws Exception { + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration conf = new YarnConfiguration(csconf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1); + + // register extra nodes to bump up cluster resource + MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4); + rm.registerNode("127.0.0.1:1236", 10 * GB, 4); + + RMApp app1 = rm.submitApp(1024); + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + SchedulerNodeReport report_nm1 = + rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + + // check node report + Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(9 * GB, report_nm1.getAvailableResource().getMemory()); + + // add request for containers + am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 1 * GB, 1, 1); + am1.schedule(); // send the request + + // kick the scheduler, container reservation should not happen + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + AllocateResponse allocResponse = am1.schedule(); + ApplicationResourceUsageReport report = + rm.getResourceScheduler().getAppResourceUsageReport( + attempt1.getAppAttemptId()); + Assert.assertEquals(0, allocResponse.getAllocatedContainers().size()); + Assert.assertEquals(0, report.getNumReservedContainers()); + + // container should get allocated on this node + nm2.nodeHeartbeat(true); + + while (allocResponse.getAllocatedContainers().size() == 0) { + Thread.sleep(100); + allocResponse = am1.schedule(); + } + report = + rm.getResourceScheduler().getAppResourceUsageReport( + attempt1.getAppAttemptId()); + Assert.assertEquals(1, allocResponse.getAllocatedContainers().size()); + Assert.assertEquals(0, report.getNumReservedContainers()); + rm.stop(); + } }