YARN-5731. Preemption calculation is not accurate when reserved containers are present in queue. (wangda)
Change-Id: Ie8c5145f449582253dcf8c0f6f388e3660ab1c6b
This commit is contained in:
parent
0020a8e7d9
commit
f2f09e2bb0
@ -44,12 +44,12 @@ public class FifoCandidatesSelector
|
||||
LogFactory.getLog(FifoCandidatesSelector.class);
|
||||
private PreemptableResourceCalculator preemptableAmountCalculator;
|
||||
|
||||
FifoCandidatesSelector(
|
||||
CapacitySchedulerPreemptionContext preemptionContext) {
|
||||
FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
|
||||
boolean includeReservedResource) {
|
||||
super(preemptionContext);
|
||||
|
||||
preemptableAmountCalculator = new PreemptableResourceCalculator(
|
||||
preemptionContext, false);
|
||||
preemptionContext, includeReservedResource);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -222,8 +222,13 @@ public void init(Configuration config, RMContext context,
|
||||
.add(new ReservedContainerCandidatesSelector(this));
|
||||
}
|
||||
|
||||
boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean(
|
||||
CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
|
||||
CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS);
|
||||
|
||||
// initialize candidates preemption selection policies
|
||||
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
|
||||
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
|
||||
additionalPreemptionBasedOnReservedResource));
|
||||
|
||||
// Do we need to specially consider intra queue
|
||||
boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(
|
||||
|
@ -1102,6 +1102,33 @@ public boolean getLazyPreemptionEnabled() {
|
||||
public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
|
||||
0.2f;
|
||||
|
||||
/**
|
||||
* By default, reserved resource will be excluded while balancing capacities
|
||||
* of queues.
|
||||
*
|
||||
* Why doing this? In YARN-4390, we added preemption-based-on-reserved-container
|
||||
* Support. To reduce unnecessary preemption for large containers. We will
|
||||
* not include reserved resources while calculating ideal-allocation in
|
||||
* FifoCandidatesSelector.
|
||||
*
|
||||
* Changes in YARN-4390 will significantly reduce number of containers preempted
|
||||
* When cluster has heterogeneous container requests. (Please check test
|
||||
* report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf
|
||||
*
|
||||
* However, on the other hand, in some corner cases, especially for
|
||||
* fragmented cluster. It could lead to preemption cannot kick in in some
|
||||
* cases. Please see YARN-5731.
|
||||
*
|
||||
* So to solve the problem, make this change to be configurable, and please
|
||||
* note that it is an experimental option.
|
||||
*/
|
||||
public static final String
|
||||
ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS =
|
||||
PREEMPTION_CONFIG_PREFIX
|
||||
+ "additional_res_balance_based_on_reserved_containers";
|
||||
public static final boolean
|
||||
DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = false;
|
||||
|
||||
/**
|
||||
* When calculating which containers to be preempted, we will try to preempt
|
||||
* containers for reserved containers first. By default is false.
|
||||
|
@ -129,9 +129,10 @@ public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app,
|
||||
public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
|
||||
ApplicationAttemptId appId, int expected) throws InterruptedException {
|
||||
int waitNum = 0;
|
||||
int total = 0;
|
||||
|
||||
while (waitNum < 500) {
|
||||
int total = 0;
|
||||
total = 0;
|
||||
for (RMContainer c : node.getCopiedListOfRunningContainers()) {
|
||||
if (c.getApplicationAttemptId().equals(appId)) {
|
||||
total++;
|
||||
@ -144,6 +145,8 @@ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
|
||||
waitNum++;
|
||||
}
|
||||
|
||||
Assert.fail();
|
||||
Assert.fail(
|
||||
"Check #live-container-on-node-from-app, actual=" + total + " expected="
|
||||
+ expected);
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
@ -38,6 +39,7 @@
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class TestCapacitySchedulerSurgicalPreemption
|
||||
extends CapacitySchedulerPreemptionTestBase {
|
||||
@ -243,4 +245,100 @@ public void testSurgicalPreemptionWithAvailableResource()
|
||||
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPreemptionForFragmentatedCluster() throws Exception {
|
||||
// Set additional_balance_queue_based_on_reserved_res to true to get
|
||||
// additional preemptions.
|
||||
conf.setBoolean(
|
||||
CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
|
||||
true);
|
||||
|
||||
/**
|
||||
* Two queues, a/b, each of them are 50/50
|
||||
* 5 nodes in the cluster, each of them is 30G.
|
||||
*
|
||||
* Submit first app, AM = 3G, and 4 * 21G containers.
|
||||
* Submit second app, AM = 3G, and 4 * 21G containers,
|
||||
*
|
||||
* We can get one container preempted from 1st app.
|
||||
*/
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
|
||||
this.conf);
|
||||
conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
1024 * 21);
|
||||
conf.setQueues("root", new String[] { "a", "b" });
|
||||
conf.setCapacity("root.a", 50);
|
||||
conf.setUserLimitFactor("root.a", 100);
|
||||
conf.setCapacity("root.b", 50);
|
||||
conf.setUserLimitFactor("root.b", 100);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||
rm1.start();
|
||||
|
||||
List<MockNM> nms = new ArrayList<>();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB));
|
||||
}
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
|
||||
// launch an app to queue, AM container should be launched in nm1
|
||||
RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0));
|
||||
|
||||
am1.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
|
||||
|
||||
// Do allocation for all nodes
|
||||
for (int i = 0; i < 10; i++) {
|
||||
MockNM mockNM = nms.get(i % nms.size());
|
||||
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
|
||||
}
|
||||
|
||||
// App1 should have 5 containers now
|
||||
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
||||
am1.getApplicationAttemptId());
|
||||
Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
|
||||
|
||||
// launch an app to queue, AM container should be launched in nm1
|
||||
RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b");
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2));
|
||||
|
||||
am2.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
|
||||
|
||||
// Do allocation for all nodes
|
||||
for (int i = 0; i < 10; i++) {
|
||||
MockNM mockNM = nms.get(i % nms.size());
|
||||
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
|
||||
}
|
||||
|
||||
// App2 should have 2 containers now
|
||||
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
||||
am2.getApplicationAttemptId());
|
||||
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
||||
|
||||
waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
|
||||
|
||||
// Call editSchedule twice and allocation once, container should get allocated
|
||||
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
||||
editPolicy.editSchedule();
|
||||
editPolicy.editSchedule();
|
||||
|
||||
int tick = 0;
|
||||
while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) {
|
||||
// Do allocation for all nodes
|
||||
for (int i = 0; i < 10; i++) {
|
||||
MockNM mockNM = nms.get(i % nms.size());
|
||||
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
|
||||
}
|
||||
tick++;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
Assert.assertEquals(3, schedulerApp2.getLiveContainers().size());
|
||||
|
||||
rm1.close();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user