YARN-5731. Preemption calculation is not accurate when reserved containers are present in queue. Contributed by Wangda Tan.

This commit is contained in:
Sunil G 2017-07-14 08:45:17 +05:30
parent f5cdee6bfd
commit fd6935709f
4 changed files with 125 additions and 7 deletions

View File

@ -43,12 +43,12 @@ public class FifoCandidatesSelector
LogFactory.getLog(FifoCandidatesSelector.class);
private PreemptableResourceCalculator preemptableAmountCalculator;
FifoCandidatesSelector(
CapacitySchedulerPreemptionContext preemptionContext) {
FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
boolean includeReservedResource) {
super(preemptionContext);
preemptableAmountCalculator = new PreemptableResourceCalculator(
preemptionContext, false);
preemptionContext, includeReservedResource);
}
@Override

View File

@ -233,7 +233,27 @@ public class ProportionalCapacityPreemptionPolicy
}
// initialize candidates preemption selection policies
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
// When select candidates for reserved containers is enabled, exclude reserved
// resource in fifo policy (less aggressive). Otherwise include reserved
// resource.
//
// Why doing this? In YARN-4390, we added preemption-based-on-reserved-container
// Support. To reduce unnecessary preemption for large containers. We will
// not include reserved resources while calculating ideal-allocation in
// FifoCandidatesSelector.
//
// Changes in YARN-4390 will significantly reduce number of containers preempted
// When cluster has heterogeneous container requests. (Please check test
// report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf
//
// However, on the other hand, in some corner cases, especially for
// fragmented cluster. It could lead to preemption cannot kick in in some
// cases. Please see YARN-5731.
//
// So to solve the problem, we will include reserved when surgical preemption
// for reserved container, which reverts behavior when YARN-4390 is disabled.
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
!selectCandidatesForResevedContainers));
// Do we need to specially consider intra queue
boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(

View File

@ -131,9 +131,10 @@ public class CapacitySchedulerPreemptionTestBase {
public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
ApplicationAttemptId appId, int expected) throws InterruptedException {
int waitNum = 0;
int total = 0;
while (waitNum < 500) {
int total = 0;
total = 0;
for (RMContainer c : node.getCopiedListOfRunningContainers()) {
if (c.getApplicationAttemptId().equals(appId)) {
total++;
@ -146,7 +147,9 @@ public class CapacitySchedulerPreemptionTestBase {
waitNum++;
}
Assert.fail();
Assert.fail(
"Check #live-container-on-node-from-app, actual=" + total + " expected="
+ expected);
}
public void checkNumberOfPreemptionCandidateFromApp(

View File

@ -36,11 +36,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
public class TestCapacitySchedulerSurgicalPreemption
@ -811,4 +811,99 @@ public class TestCapacitySchedulerSurgicalPreemption
rm1.close();
}
@Test(timeout = 60000)
public void testPreemptionForFragmentatedCluster() throws Exception {
conf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
false);
/**
* Two queues, a/b, each of them are 50/50
* 5 nodes in the cluster, each of them is 30G.
*
* Submit first app, AM = 3G, and 4 * 21G containers.
* Submit second app, AM = 3G, and 4 * 21G containers,
*
* We can get one container preempted from 1st app.
*/
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
this.conf);
conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
1024 * 21);
conf.setQueues("root", new String[] { "a", "b" });
conf.setCapacity("root.a", 50);
conf.setUserLimitFactor("root.a", 100);
conf.setCapacity("root.b", 50);
conf.setUserLimitFactor("root.b", 100);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
List<MockNM> nms = new ArrayList<>();
for (int i = 0; i < 5; i++) {
nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB));
}
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0));
am1.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
// Do allocation for all nodes
for (int i = 0; i < 10; i++) {
MockNM mockNM = nms.get(i % nms.size());
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
}
// App1 should have 5 containers now
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
// launch an app to queue, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2));
am2.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
// Do allocation for all nodes
for (int i = 0; i < 10; i++) {
MockNM mockNM = nms.get(i % nms.size());
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
}
// App2 should have 2 containers now
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
// Call editSchedule twice and allocation once, container should get allocated
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
editPolicy.editSchedule();
editPolicy.editSchedule();
int tick = 0;
while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) {
// Do allocation for all nodes
for (int i = 0; i < 10; i++) {
MockNM mockNM = nms.get(i % nms.size());
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
}
tick++;
Thread.sleep(100);
}
Assert.assertEquals(3, schedulerApp2.getLiveContainers().size());
rm1.close();
}
}