From b70c7b0e517f575602ba96cd566a96d2f2690f60 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Fri, 18 Sep 2015 14:00:49 -0700 Subject: [PATCH] YARN-3920. FairScheduler container reservation on a node should be configurable to limit it to large containers (adhoot via asuresh) (cherry picked from commit 94dec5a9164cd9bc573fbf74e76bcff9e7c5c637) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FSAppAttempt.java | 19 +++- .../scheduler/fair/FairScheduler.java | 21 ++++- .../fair/FairSchedulerConfiguration.java | 22 ++++- .../scheduler/fair/FairSchedulerTestBase.java | 6 ++ .../scheduler/fair/TestFairScheduler.java | 89 ++++++++++++++++++- 6 files changed, 148 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d06b5e0ac12..79249a05786 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -402,6 +402,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup failure during commitJob. (Junping Du via wangda) + YARN-3920. FairScheduler container reservation on a node should be + configurable to limit it to large containers (adhoot via asuresh) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index cfec9157f8c..7af1891e854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -543,10 +543,23 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return container.getResource(); } - // The desired container won't fit here, so reserve - reserve(request.getPriority(), node, container, reserved); + if (isReservable(container)) { + // The desired container won't fit here, so reserve + reserve(request.getPriority(), node, container, reserved); - return FairScheduler.CONTAINER_RESERVED; + return FairScheduler.CONTAINER_RESERVED; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Not creating reservation as container " + container.getId() + + " is not reservable"); + } + return Resources.none(); + } + } + + private boolean isReservable(Container container) { + return scheduler.isAtLeastReservationThreshold( + getQueue().getPolicy().getResourceCalculator(), container.getResource()); } private boolean hasNodeOrRackLocalRequests(Priority priority) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3a397990656..a083272eb26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -194,7 +194,11 @@ public class FairScheduler extends private AllocationFileLoaderService allocsLoader; @VisibleForTesting AllocationConfiguration allocConf; - + + // Container size threshold for making a reservation. + @VisibleForTesting + Resource reservationThreshold; + public FairScheduler() { super(FairScheduler.class.getName()); clock = new SystemClock(); @@ -203,6 +207,12 @@ public class FairScheduler extends maxRunningEnforcer = new MaxRunningAppsEnforcer(this); } + public boolean isAtLeastReservationThreshold( + ResourceCalculator resourceCalculator, Resource resource) { + return Resources.greaterThanOrEqual( + resourceCalculator, clusterResource, resource, reservationThreshold); + } + private void validateConf(Configuration conf) { // validate scheduler memory allocation setting int minMem = conf.getInt( @@ -1325,6 +1335,7 @@ public class FairScheduler extends minimumAllocation = this.conf.getMinimumAllocation(); initMaximumResourceCapability(this.conf.getMaximumAllocation()); incrAllocation = this.conf.getIncrementAllocation(); + updateReservationThreshold(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs(); @@ -1391,6 +1402,14 @@ public class FairScheduler extends } } + private void updateReservationThreshold() { + Resource newThreshold = Resources.multiply( + getIncrementResourceCapability(), + this.conf.getReservationThresholdIncrementMultiple()); + + reservationThreshold = newThreshold; + } + private synchronized void startSchedulerThreads() { Preconditions.checkNotNull(updateThread, "updateThread is null"); Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index e477e6e4e2e..892484d55c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.File; -import java.util.ArrayList; -import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -49,7 +47,17 @@ public class FairSchedulerConfiguration extends Configuration { public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores"; public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1; - + + /** Threshold for container size for making a container reservation as a + * multiple of increment allocation. Only container sizes above this are + * allowed to reserve a node */ + public static final String + RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE = + YarnConfiguration.YARN_PREFIX + + "scheduler.reservation-threshold.increment-multiple"; + public static final float + DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = 2f; + private static final String CONF_PREFIX = "yarn.scheduler.fair."; public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file"; @@ -166,7 +174,13 @@ public class FairSchedulerConfiguration extends Configuration { DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES); return Resources.createResource(incrementMemory, incrementCores); } - + + public float getReservationThresholdIncrementMultiple() { + return getFloat( + RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, + DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE); + } + public float getLocalityThresholdNode() { return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 1c9801d7631..dd7ed41669a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -64,6 +64,7 @@ public class FairSchedulerTestBase { protected Configuration conf; protected FairScheduler scheduler; protected ResourceManager resourceManager; + public static final float TEST_RESERVATION_THRESHOLD = 0.09f; // Helper methods public Configuration createConfiguration() { @@ -76,6 +77,11 @@ public class FairSchedulerTestBase { conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + + conf.setFloat( + FairSchedulerConfiguration + .RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, + TEST_RESERVATION_THRESHOLD); return conf; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a15533b9cde..e68714f6c22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -710,9 +710,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(updateEvent); // Asked for less than increment allocation. - assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + assertEquals( + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); @@ -764,7 +765,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity @@ -939,8 +940,88 @@ public class TestFairScheduler extends FairSchedulerTestBase { getResourceUsage().getMemory()); } - + @Test + public void testReservationThresholdGatesReservations() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + // Set threshold to 2 * 1024 ==> 2048 MB & 2 * 1 ==> 2 vcores (test will + // use vcores) + conf.setFloat(FairSchedulerConfiguration. + RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, + 2f); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(4096, 4), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue 1 requests full capacity of node + createSchedulingRequest(4096, 4, "queue1", "user1", 1, 1); + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + + scheduler.handle(updateEvent); + + // Make sure queue 1 is allocated app capacity + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Now queue 2 requests below threshold + ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 2 has no reservation + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + assertEquals(0, + scheduler.getSchedulerApp(attId).getReservedContainers().size()); + + // Now queue requests CPU above threshold + createSchedulingRequestExistingApplication(1024, 3, 1, attId); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 2 is waiting with a reservation + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + + // Now another node checks in with capacity + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(1024, 4), 2, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + scheduler.handle(updateEvent2); + + // Make sure this goes to queue 2 + assertEquals(3, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getVirtualCores()); + + // The old reservation should still be there... + assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + // ... but it should disappear when we update the first node. + scheduler.handle(updateEvent); + assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation() + .getVirtualCores()); + } @Test public void testEmptyQueueName() throws Exception {