YARN-6193. FairScheduler might not trigger preemption when using DRF. (kasha)

This commit is contained in:
Karthik Kambatla 2017-02-17 14:07:31 -08:00
parent c7a36e6130
commit dbbfcf74ab
2 changed files with 30 additions and 15 deletions

View File

@ -602,12 +602,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Check if the app's allocation will be over its fairshare even // Check if the app's allocation will be over its fairshare even
// after preempting this container // after preempting this container
Resource currentUsage = getResourceUsage(); Resource usageAfterPreemption = Resources.subtract(
Resource fairshare = getFairShare(); getResourceUsage(), container.getAllocatedResource());
Resource overFairShareBy = Resources.subtract(currentUsage, fairshare);
return (Resources.fitsIn(container.getAllocatedResource(), return !Resources.lessThan(fsQueue.getPolicy().getResourceCalculator(),
overFairShareBy)); scheduler.getClusterResource(), usageAfterPreemption, getFairShare());
} }
/** /**

View File

@ -57,6 +57,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private static final int NODE_CAPACITY_MULTIPLE = 4; private static final int NODE_CAPACITY_MULTIPLE = 4;
private final boolean fairsharePreemption; private final boolean fairsharePreemption;
private final boolean drf;
// App that takes up the entire cluster // App that takes up the entire cluster
private FSAppAttempt greedyApp; private FSAppAttempt greedyApp;
@ -67,13 +68,17 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> getParameters() { public static Collection<Object[]> getParameters() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
{"FairSharePreemption", true}, {"MinSharePreemption", 0},
{"MinSharePreemption", false}}); {"MinSharePreemptionWithDRF", 1},
{"FairSharePreemption", 2},
{"FairSharePreemptionWithDRF", 3}
});
} }
public TestFairSchedulerPreemption(String name, boolean fairshare) public TestFairSchedulerPreemption(String name, int mode)
throws IOException { throws IOException {
fairsharePreemption = fairshare; fairsharePreemption = (mode > 1); // 2 and 3
drf = (mode % 2 == 1); // 1 and 3
writeAllocFile(); writeAllocFile();
} }
@ -146,6 +151,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
out.println("</queue>"); // end of nonpreemptable queue out.println("</queue>"); // end of nonpreemptable queue
if (drf) {
out.println("<defaultQueueSchedulingPolicy>drf" +
"</defaultQueueSchedulingPolicy>");
}
out.println("</allocations>"); out.println("</allocations>");
out.close(); out.close();
@ -177,9 +186,14 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
scheduler.setClock(clock); scheduler.setClock(clock);
resourceManager.start(); resourceManager.start();
// Create and add two nodes to the cluster // Create and add two nodes to the cluster, with capacities
addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); // disproportional to the container requests.
addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE);
addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE);
// Reinitialize the scheduler so DRF policy picks up cluster capacity
// TODO (YARN-6194): One shouldn't need to call this
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Verify if child-1 and child-2 are preemptable // Verify if child-1 and child-2 are preemptable
FSQueue child1 = FSQueue child1 =
@ -257,7 +271,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private void verifyPreemption() throws InterruptedException { private void verifyPreemption() throws InterruptedException {
// Sleep long enough for four containers to be preempted. // Sleep long enough for four containers to be preempted.
for (int i = 0; i < 100; i++) { for (int i = 0; i < 1000; i++) {
if (greedyApp.getLiveContainers().size() == 4) { if (greedyApp.getLiveContainers().size() == 4) {
break; break;
} }
@ -265,12 +279,14 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
} }
// Verify the right amount of containers are preempted from greedyApp // Verify the right amount of containers are preempted from greedyApp
assertEquals(4, greedyApp.getLiveContainers().size()); assertEquals("Incorrect number of containers on the greedy app",
4, greedyApp.getLiveContainers().size());
sendEnoughNodeUpdatesToAssignFully(); sendEnoughNodeUpdatesToAssignFully();
// Verify the preempted containers are assigned to starvingApp // Verify the preempted containers are assigned to starvingApp
assertEquals(2, starvingApp.getLiveContainers().size()); assertEquals("Starved app is not assigned the right number of containers",
2, starvingApp.getLiveContainers().size());
} }
private void verifyNoPreemption() throws InterruptedException { private void verifyNoPreemption() throws InterruptedException {