YARN-6151. FS preemption does not consider child queues over fairshare if the parent is under. (Yufei Gu via kasha)
This commit is contained in:
parent
6b3443fbf5
commit
5d0ec2e24d
|
@ -351,7 +351,7 @@ public class FSLeafQueue extends FSQueue {
|
||||||
RMContainer toBePreempted = null;
|
RMContainer toBePreempted = null;
|
||||||
|
|
||||||
// If this queue is not over its fair share, reject
|
// If this queue is not over its fair share, reject
|
||||||
if (!preemptContainerPreCheck()) {
|
if (!canBePreempted()) {
|
||||||
return toBePreempted;
|
return toBePreempted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -533,16 +533,6 @@ public class FSLeafQueue extends FSQueue {
|
||||||
new ResourceWeights(weight));
|
new ResourceWeights(weight));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper method to check if the queue should preempt containers
|
|
||||||
*
|
|
||||||
* @return true if check passes (can preempt) or false otherwise
|
|
||||||
*/
|
|
||||||
private boolean preemptContainerPreCheck() {
|
|
||||||
return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
|
|
||||||
getFairShare());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is a queue being starved for its min share.
|
* Is a queue being starved for its min share.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -260,6 +260,16 @@ public class FSParentQueue extends FSQueue {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
for (FSQueue queue : childQueues) {
|
for (FSQueue queue : childQueues) {
|
||||||
|
// Skip selection for non-preemptable queue
|
||||||
|
if (!queue.canBePreempted()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("skipping from queue=" + getName()
|
||||||
|
+ " because it's a non-preemptable queue or there is no"
|
||||||
|
+ " sub-queues whose resource usage exceeds fair share.");
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (candidateQueue == null ||
|
if (candidateQueue == null ||
|
||||||
comparator.compare(queue, candidateQueue) > 0) {
|
comparator.compare(queue, candidateQueue) > 0) {
|
||||||
candidateQueue = queue;
|
candidateQueue = queue;
|
||||||
|
|
|
@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract class FSQueue implements Queue, Schedulable {
|
public abstract class FSQueue implements Queue, Schedulable {
|
||||||
|
@ -235,6 +237,28 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
|
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively check if the queue can be preempted based on whether the
|
||||||
|
* resource usage is greater than fair share.
|
||||||
|
*
|
||||||
|
* @return true if the queue can be preempted
|
||||||
|
*/
|
||||||
|
public boolean canBePreempted() {
|
||||||
|
if (parent == null || parent.policy.checkIfUsageOverFairShare(
|
||||||
|
getResourceUsage(), getFairShare())) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
// recursively find one queue which can be preempted
|
||||||
|
for (FSQueue queue: getChildQueues()) {
|
||||||
|
if (queue.canBePreempted()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recomputes the shares for all child queues and applications based on this
|
* Recomputes the shares for all child queues and applications based on this
|
||||||
* queue's current share
|
* queue's current share
|
||||||
|
|
|
@ -2036,10 +2036,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
.getLeafQueue("queueA.queueA2", false), clock.getTime());
|
.getLeafQueue("queueA.queueA2", false), clock.getTime());
|
||||||
assertEquals(3277, toPreempt.getMemorySize());
|
assertEquals(3277, toPreempt.getMemorySize());
|
||||||
|
|
||||||
// verify if the 3 containers required by queueA2 are preempted in the same
|
// verify if the 4 containers required by queueA2 are preempted in the same
|
||||||
// round
|
// round
|
||||||
scheduler.preemptResources(toPreempt);
|
scheduler.preemptResources(toPreempt);
|
||||||
assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
|
assertEquals(4, scheduler.getSchedulerApp(app1).getPreemptionContainers()
|
||||||
.size());
|
.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
|
@ -39,9 +40,9 @@ import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
|
private final int GB = 1024;
|
||||||
private final static String ALLOC_FILE = new File(TEST_DIR,
|
private final static String ALLOC_FILE = new File(TEST_DIR,
|
||||||
TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
|
TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
|
||||||
|
|
||||||
|
@ -90,8 +91,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
resourceManager = new MockRM(conf);
|
resourceManager = new MockRM(conf);
|
||||||
resourceManager.start();
|
resourceManager.start();
|
||||||
|
|
||||||
assertTrue(
|
|
||||||
resourceManager.getResourceScheduler() instanceof StubbedFairScheduler);
|
|
||||||
scheduler = (FairScheduler)resourceManager.getResourceScheduler();
|
scheduler = (FairScheduler)resourceManager.getResourceScheduler();
|
||||||
|
|
||||||
scheduler.setClock(clock);
|
scheduler.setClock(clock);
|
||||||
|
@ -189,4 +188,67 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
||||||
assertEquals("preemptResources() should have been called", 1024,
|
assertEquals("preemptResources() should have been called", 1024,
|
||||||
((StubbedFairScheduler) scheduler).lastPreemptMemory);
|
((StubbedFairScheduler) scheduler).lastPreemptMemory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreemptionFilterOutNonPreemptableQueues() throws Exception {
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"queueA\">");
|
||||||
|
out.println(" <queue name=\"queueA1\" />");
|
||||||
|
out.println(" <queue name=\"queueA2\" />");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueB\">");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<defaultFairSharePreemptionTimeout>5</defaultFairSharePreemptionTimeout>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
startResourceManager(0.8f);
|
||||||
|
|
||||||
|
// Add a node of 8 GB
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(1,
|
||||||
|
Resources.createResource(8 * GB, 8), 1, "127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
// Run apps in queueA.A1 and queueB
|
||||||
|
ApplicationAttemptId app1 = createSchedulingRequest(1 * GB, 1,
|
||||||
|
"queueA.queueA1", "user1", 4, 1);
|
||||||
|
ApplicationAttemptId app2 = createSchedulingRequest(1 * GB, 1, "queueB",
|
||||||
|
"user2", 4, 1);
|
||||||
|
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
for (int i = 0; i < 8; i++) {
|
||||||
|
scheduler.handle(nodeUpdate1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify if the apps got the containers they requested
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
||||||
|
|
||||||
|
// Now submit an app in queueA.queueA2
|
||||||
|
createSchedulingRequest(GB, 1, "queueA.queueA2", "user3", 2, 1);
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
// Let 6 sec pass
|
||||||
|
clock.tickSec(6);
|
||||||
|
|
||||||
|
scheduler.update();
|
||||||
|
Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
|
||||||
|
.getLeafQueue("queueA.queueA2", false), clock.getTime());
|
||||||
|
assertEquals(2 * GB, toPreempt.getMemorySize());
|
||||||
|
|
||||||
|
// Verify if containers required by queueA2 are preempted from queueA1
|
||||||
|
// instead of queueB
|
||||||
|
scheduler.preemptResources(toPreempt);
|
||||||
|
assertEquals(2, scheduler.getSchedulerApp(app1).getPreemptionContainers()
|
||||||
|
.size());
|
||||||
|
assertEquals(0, scheduler.getSchedulerApp(app2).getPreemptionContainers()
|
||||||
|
.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue