From ca7a6a73653ae3bbe086987aeecc28aab50159f7 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 22 Feb 2017 15:45:45 -0800 Subject: [PATCH] YARN-6210. FairScheduler: Node reservations can interfere with preemption. (kasha) (cherry picked from commit 718ad9f6ee93d4145f2bb19b7582ce4e1174feaf) --- .../resource/DefaultResourceCalculator.java | 3 +- .../resource/DominantResourceCalculator.java | 13 +- .../util/resource/ResourceCalculator.java | 32 ++++- .../scheduler/fair/FSAppAttempt.java | 55 +++++--- .../DominantResourceFairnessPolicy.java | 8 +- .../fair/policies/FairSharePolicy.java | 3 +- .../scheduler/fair/TestFairScheduler.java | 128 ++++++++---------- .../fair/TestFairSchedulerPreemption.java | 44 ++++-- 8 files changed, 178 insertions(+), 108 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index 42c45adbde5..ef7229c6227 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -30,7 +30,8 @@ public class DefaultResourceCalculator extends ResourceCalculator { LogFactory.getLog(DefaultResourceCalculator.class); @Override - public int compare(Resource unused, Resource lhs, Resource rhs) { + public int compare(Resource unused, Resource lhs, Resource rhs, + boolean singleType) { // Only consider memory return Long.compare(lhs.getMemorySize(), rhs.getMemorySize()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 9f1c8d73dd4..032aa020fd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -51,17 +51,18 @@ public class DominantResourceCalculator extends ResourceCalculator { LogFactory.getLog(DominantResourceCalculator.class); @Override - public int compare(Resource clusterResource, Resource lhs, Resource rhs) { + public int compare(Resource clusterResource, Resource lhs, Resource rhs, + boolean singleType) { if (lhs.equals(rhs)) { return 0; } if (isInvalidDivisor(clusterResource)) { - if ((lhs.getMemorySize() < rhs.getMemorySize() && lhs.getVirtualCores() > rhs - .getVirtualCores()) - || (lhs.getMemorySize() > rhs.getMemorySize() && lhs.getVirtualCores() < rhs - .getVirtualCores())) { + if ((lhs.getMemorySize() < rhs.getMemorySize() && + lhs.getVirtualCores() > rhs.getVirtualCores()) || + (lhs.getMemorySize() > rhs.getMemorySize() && + lhs.getVirtualCores() < rhs.getVirtualCores())) { return 0; } else if (lhs.getMemorySize() > rhs.getMemorySize() || lhs.getVirtualCores() > rhs.getVirtualCores()) { @@ -79,7 +80,7 @@ public class DominantResourceCalculator extends ResourceCalculator { return -1; } else if (l > r) { return 1; - } else { + } else if (!singleType) { l = getResourceAsValue(clusterResource, lhs, false); r = getResourceAsValue(clusterResource, rhs, false); if (l < r) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index 50ce04cc2f1..a2f85b314cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -28,8 +28,36 @@ import org.apache.hadoop.yarn.api.records.Resource; @Unstable public abstract class ResourceCalculator { - public abstract int - compare(Resource clusterResource, Resource lhs, Resource rhs); + /** + * On a cluster with capacity {@code clusterResource}, compare {@code lhs} + * and {@code rhs}. Consider all resources unless {@code singleType} is set + * to true. When {@code singleType} is set to true, consider only one + * resource as per the {@link ResourceCalculator} implementation; the + * {@link DefaultResourceCalculator} considers memory and + * {@link DominantResourceCalculator} considers the dominant resource. + * + * @param clusterResource cluster capacity + * @param lhs First {@link Resource} to compare + * @param rhs Second {@link Resource} to compare + * @param singleType Whether to consider a single resource type or all + * resource types + * @return -1 if {@code lhs} is smaller, 0 if equal and 1 if it is larger + */ + public abstract int compare( + Resource clusterResource, Resource lhs, Resource rhs, boolean singleType); + + /** + * On a cluster with capacity {@code clusterResource}, compare {@code lhs} + * and {@code rhs} considering all resources. + * + * @param clusterResource cluster capacity + * @param lhs First {@link Resource} to compare + * @param rhs Second {@link Resource} to compare + * @return -1 if {@code lhs} is smaller, 0 if equal and 1 if it is larger + */ + public int compare(Resource clusterResource, Resource lhs, Resource rhs) { + return compare(clusterResource, lhs, rhs, false); + } public static int divideAndCeil(int a, int b) { if (b == 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 6d5307bfd01..62fb8fca84f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -604,8 +603,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt Resource usageAfterPreemption = Resources.subtract( getResourceUsage(), container.getAllocatedResource()); - return !Resources.lessThan(fsQueue.getPolicy().getResourceCalculator(), - scheduler.getClusterResource(), usageAfterPreemption, getFairShare()); + return !isUsageBelowShare(usageAfterPreemption, getFairShare()); } /** @@ -855,8 +853,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } private boolean isReservable(Resource capacity) { - return scheduler.isAtLeastReservationThreshold( - getQueue().getPolicy().getResourceCalculator(), capacity); + // Reserve only when the app is starved and the requested container size + // is larger than the configured threshold + return isStarved() && + scheduler.isAtLeastReservationThreshold( + getQueue().getPolicy().getResourceCalculator(), capacity); } private boolean hasNodeOrRackLocalRequests(SchedulerRequestKey schedulerKey) { @@ -1078,25 +1079,36 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * @return freshly computed fairshare starvation */ Resource fairShareStarvation() { + long now = scheduler.getClock().getTime(); Resource threshold = Resources.multiply( getFairShare(), fsQueue.getFairSharePreemptionThreshold()); - Resource starvation = Resources.componentwiseMin(threshold, demand); - Resources.subtractFromNonNegative(starvation, getResourceUsage()); + Resource fairDemand = Resources.componentwiseMin(threshold, demand); - long now = scheduler.getClock().getTime(); - boolean starved = !Resources.isNone(starvation); + // Check if the queue is starved for fairshare + boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand); if (!starved) { lastTimeAtFairShare = now; } - if (starved && - (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) { - this.fairshareStarvation = starvation; + if (!starved || + now - lastTimeAtFairShare < fsQueue.getFairSharePreemptionTimeout()) { + fairshareStarvation = Resources.none(); } else { - this.fairshareStarvation = Resources.none(); + // The app has been starved for longer than preemption-timeout. + fairshareStarvation = + Resources.subtractFromNonNegative(fairDemand, getResourceUsage()); } - return this.fairshareStarvation; + return fairshareStarvation; + } + + /** + * Helper method that checks if {@code usage} is strictly less than + * {@code share}. + */ + private boolean isUsageBelowShare(Resource usage, Resource share) { + return fsQueue.getPolicy().getResourceCalculator().compare( + scheduler.getClusterResource(), usage, share, true) < 0; } ResourceRequest getNextResourceRequest() { @@ -1107,9 +1119,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * Helper method that captures if this app is identified to be starved. * @return true if the app is starved for fairshare, false otherwise */ - @VisibleForTesting boolean isStarvedForFairShare() { - return !Resources.isNone(fairshareStarvation); + return isUsageBelowShare(getResourceUsage(), getFairShare()); + } + + /** + * Is application starved for fairshare or minshare + */ + private boolean isStarved() { + return isStarvedForFairShare() || !Resources.isNone(minshareStarvation); } /** @@ -1324,6 +1342,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return super.equals(o); } + @Override + public String toString() { + return getApplicationAttemptId() + " Alloc: " + getCurrentConsumption(); + } + @Override public boolean isPreemptable() { return getQueue().isPreemptable(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 6f04cb78ac4..369b8a17633 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -155,8 +155,12 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { resourceOrder1, resourceOrder2); } if (res == 0) { - // Apps are tied in fairness ratio. Break the tie by submit time. - res = (int)(s1.getStartTime() - s2.getStartTime()); + // Apps are tied in fairness ratio. Break the tie by submit time and job + // name to get a deterministic ordering, which is useful for unit tests. + res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + if (res == 0) { + res = s1.getName().compareTo(s2.getName()); + } } return res; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 9036a03c096..f8cdb45929f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -131,8 +131,9 @@ public class FairSharePolicy extends SchedulingPolicy { // Apps are tied in fairness ratio. Break the tie by submit time and job // name to get a deterministic ordering, which is useful for unit tests. res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); - if (res == 0) + if (res == 0) { res = s1.getName().compareTo(s2.getName()); + } } return res; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5ed0128804c..4873079c0d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -116,6 +116,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; import org.mockito.Mockito; import org.xml.sax.SAXException; @@ -2619,71 +2620,58 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); } + /** + * Reserve at a lower priority and verify the lower priority request gets + * allocated + */ @Test (timeout = 5000) - public void testReservationWhileMultiplePriorities() throws IOException { + public void testReservationWithMultiplePriorities() throws IOException { scheduler.init(conf); scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node - RMNode node1 = - MockNodes - .newNodeInfo(1, Resources.createResource(1024, 4), 1, "127.0.0.1"); + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2)); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - - ApplicationAttemptId attId = createSchedulingRequest(1024, 4, "queue1", - "user1", 1, 2); - scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(updateEvent); - - FSAppAttempt app = scheduler.getSchedulerApp(attId); - assertEquals(1, app.getLiveContainers().size()); - - ContainerId containerId = scheduler.getSchedulerApp(attId) - .getLiveContainers().iterator().next().getContainerId(); - // Cause reservation to be created - createSchedulingRequestExistingApplication(1024, 4, 2, attId); + // Create first app and take up half resources so the second app that asks + // for the entire node won't have enough. + FSAppAttempt app1 = scheduler.getSchedulerApp( + createSchedulingRequest(1024, 1, "queue", "user", 1)); scheduler.update(); scheduler.handle(updateEvent); + assertEquals("Basic allocation failed", 1, app1.getLiveContainers().size()); - assertEquals(1, app.getLiveContainers().size()); - assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); - assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); - - // Create request at higher priority - createSchedulingRequestExistingApplication(1024, 4, 1, attId); + // Create another app and reserve at a lower priority first + ApplicationAttemptId attId = + createSchedulingRequest(2048, 2, "queue1", "user1", 1, 2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId); scheduler.update(); scheduler.handle(updateEvent); - - assertEquals(1, app.getLiveContainers().size()); - // Reserved container should still be at lower priority - for (RMContainer container : app.getReservedContainers()) { - assertEquals(2, - container.getReservedSchedulerKey().getPriority().getPriority()); - } - - // Complete container - scheduler.allocate(attId, new ArrayList(), + assertEquals("Reservation at lower priority failed", + 1, app2.getReservedContainers().size()); + + // Request container on the second app at a higher priority + createSchedulingRequestExistingApplication(2048, 2, 1, attId); + + // Complete the first container so we can trigger allocation for app2 + ContainerId containerId = + app1.getLiveContainers().iterator().next().getContainerId(); + scheduler.allocate(app1.getApplicationAttemptId(), + new ArrayList(), Arrays.asList(containerId), null, null, null, null); - assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); - assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); - - // Schedule at opening - scheduler.update(); + + // Trigger allocation for app2 scheduler.handle(updateEvent); - + // Reserved container (at lower priority) should be run - Collection liveContainers = app.getLiveContainers(); - assertEquals(1, liveContainers.size()); - for (RMContainer liveContainer : liveContainers) { - Assert.assertEquals(2, liveContainer.getContainer().getPriority() - .getPriority()); - } - assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); - assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); + Collection liveContainers = app2.getLiveContainers(); + assertEquals("Allocation post completion failed", 1, liveContainers.size()); + assertEquals("High prio container allocated against low prio reservation", + 2, liveContainers.iterator().next().getContainer(). + getPriority().getPriority()); } @Test @@ -3213,8 +3201,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { } /** - * If we update our ask to strictly request a node, it doesn't make sense to keep - * a reservation on another. + * Strict locality requests shouldn't reserve resources on another node. */ @Test public void testReservationsStrictLocality() throws IOException { @@ -3222,40 +3209,39 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); - RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); + // Add two nodes + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1)); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1)); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); - ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", - "user1", 0); + // Submit application without container requests + ApplicationAttemptId attId = + createSchedulingRequest(1024, "queue1", "user1", 0); FSAppAttempt app = scheduler.getSchedulerApp(attId); - - ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); - ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); - ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY, - 1, 2, false); + + // Request a container on node2 + ResourceRequest nodeRequest = + createResourceRequest(1024, node2.getHostName(), 1, 1, true); + ResourceRequest rackRequest = + createResourceRequest(1024, "rack1", 1, 1, false); + ResourceRequest anyRequest = + createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false); createSchedulingRequestExistingApplication(nodeRequest, attId); createSchedulingRequestExistingApplication(rackRequest, attId); createSchedulingRequestExistingApplication(anyRequest, attId); - scheduler.update(); + // Heartbeat from node1. App shouldn't get an allocation or reservation NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdateEvent); - assertEquals(1, app.getLiveContainers().size()); + assertEquals("App assigned a container on the wrong node", + 0, app.getLiveContainers().size()); scheduler.handle(nodeUpdateEvent); - assertEquals(1, app.getReservedContainers().size()); - - // now, make our request node-specific (on a different node) - rackRequest = createResourceRequest(1024, "rack1", 1, 1, false); - anyRequest = createResourceRequest(1024, ResourceRequest.ANY, - 1, 1, false); - scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest), - new ArrayList(), null, null, null, null); - - scheduler.handle(nodeUpdateEvent); - assertEquals(0, app.getReservedContainers().size()); + assertEquals("App reserved a container on the wrong node", + 0, app.getReservedContainers().size()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 480a329771c..322ad5b3f58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -72,7 +72,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { {"MinSharePreemptionWithDRF", 1}, {"FairSharePreemption", 2}, {"FairSharePreemptionWithDRF", 3} - }); + }); } public TestFairSchedulerPreemption(String name, int mode) @@ -110,6 +110,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { * |--- preemptable * |--- child-1 * |--- child-2 + * |--- preemptable-sibling * |--- nonpreemptible * |--- child-1 * |--- child-2 @@ -133,6 +134,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { out.println(""); // end of preemptable queue + out.println(""); + writePreemptionParams(out); + out.println(""); + // Queue with preemption disallowed out.println(""); out.println("false" + @@ -269,10 +274,11 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { preemptHalfResources(queue2); } - private void verifyPreemption() throws InterruptedException { + private void verifyPreemption(int numStarvedAppContainers) + throws InterruptedException { // Sleep long enough for four containers to be preempted. for (int i = 0; i < 1000; i++) { - if (greedyApp.getLiveContainers().size() == 4) { + if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) { break; } Thread.sleep(10); @@ -280,13 +286,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { // Verify the right amount of containers are preempted from greedyApp assertEquals("Incorrect number of containers on the greedy app", - 4, greedyApp.getLiveContainers().size()); + 2 * numStarvedAppContainers, greedyApp.getLiveContainers().size()); sendEnoughNodeUpdatesToAssignFully(); // Verify the preempted containers are assigned to starvingApp assertEquals("Starved app is not assigned the right number of containers", - 2, starvingApp.getLiveContainers().size()); + numStarvedAppContainers, starvingApp.getLiveContainers().size()); } private void verifyNoPreemption() throws InterruptedException { @@ -305,7 +311,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { String queue = "root.preemptable.child-1"; submitApps(queue, queue); if (fairsharePreemption) { - verifyPreemption(); + verifyPreemption(2); } else { verifyNoPreemption(); } @@ -314,13 +320,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { submitApps("root.preemptable.child-1", "root.preemptable.child-2"); - verifyPreemption(); + verifyPreemption(2); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); - verifyPreemption(); + verifyPreemption(2); } @Test @@ -354,7 +360,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { setNumAMContainersPerNode(2); preemptHalfResources("root.preemptable.child-2"); - verifyPreemption(); + verifyPreemption(2); ArrayList containers = (ArrayList) starvingApp.getLiveContainers(); @@ -365,4 +371,24 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertTrue("Preempted containers should come from two different " + "nodes.", !host0.equals(host1)); } + + @Test + public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() + throws InterruptedException { + // Run this test only for fairshare preemption + if (!fairsharePreemption) { + return; + } + + // Let one of the child queues take over the entire cluster + takeAllResources("root.preemptable.child-1"); + + // Submit a job so half the resources go to parent's sibling + preemptHalfResources("root.preemptable-sibling"); + verifyPreemption(2); + + // Submit a job to the child's sibling to force preemption from the child + preemptHalfResources("root.preemptable.child-2"); + verifyPreemption(1); + } }