YARN-9041. Performance Optimization of method FSPreemptionThread#identifyContainersToPreempt. Contributed by Wanqiang Ji.

This commit is contained in:
Yufei Gu 2018-12-04 10:08:45 -08:00
parent db2d8b01c6
commit e89941fdbb
2 changed files with 140 additions and 70 deletions

View File

@ -97,12 +97,7 @@ class FSPreemptionThread extends Thread {
* Mechanics:
* 1. Fetch all {@link ResourceRequest}s corresponding to the amount of
* starvation.
* 2. For each {@link ResourceRequest}, iterate through matching
* nodes and identify containers to preempt all on one node, also
* optimizing for least number of AM container preemptions. Only nodes
* that match the locality level specified in the {@link ResourceRequest}
* are considered. However, if this would lead to AM preemption, and locality
* relaxation is allowed, then the search space is expanded to all nodes.
* 2. For each {@link ResourceRequest}, get the best preemptable containers.
*
* @param starvedApp starved application for which we are identifying
* preemption targets
@ -118,18 +113,7 @@ class FSPreemptionThread extends Thread {
.getNodesByResourceName(rr.getResourceName());
for (int i = 0; i < rr.getNumContainers(); i++) {
PreemptableContainers bestContainers =
identifyContainersToPreemptForOneContainer(potentialNodes, rr);
// Don't preempt AM containers just to satisfy local requests if relax
// locality is enabled.
if (bestContainers != null
&& bestContainers.numAMContainers > 0
&& !ResourceRequest.isAnyLocation(rr.getResourceName())
&& rr.getRelaxLocality()) {
bestContainers = identifyContainersToPreemptForOneContainer(
scheduler.getNodeTracker().getAllNodes(), rr);
}
getBestPreemptableContainers(rr, potentialNodes);
if (bestContainers != null) {
List<RMContainer> containers = bestContainers.getAllContainers();
if (containers.size() > 0) {
@ -240,6 +224,41 @@ class FSPreemptionThread extends Thread {
new PreemptContainersTask(containers), warnTimeBeforeKill);
}
/**
* Iterate through matching nodes and identify containers to preempt all on
* one node, also optimizing for least number of AM container preemptions.
* Only nodes that match the locality level specified in the
* {@link ResourceRequest} are considered. However, if this would lead to
* AM preemption, and locality relaxation is allowed, then the search space
* is expanded to the remaining nodes.
*
* @param rr resource request
* @param potentialNodes list of {@link FSSchedulerNode}
* @return the list of best preemptable containers for the resource request
*/
private PreemptableContainers getBestPreemptableContainers(ResourceRequest rr,
List<FSSchedulerNode> potentialNodes) {
PreemptableContainers bestContainers =
identifyContainersToPreemptForOneContainer(potentialNodes, rr);
if (rr.getRelaxLocality()
&& !ResourceRequest.isAnyLocation(rr.getResourceName())
&& bestContainers != null
&& bestContainers.numAMContainers > 0) {
List<FSSchedulerNode> remainingNodes =
scheduler.getNodeTracker().getAllNodes();
remainingNodes.removeAll(potentialNodes);
PreemptableContainers spareContainers =
identifyContainersToPreemptForOneContainer(remainingNodes, rr);
if (spareContainers != null && spareContainers.numAMContainers
< bestContainers.numAMContainers) {
bestContainers = spareContainers;
}
}
return bestContainers;
}
private class PreemptContainersTask extends TimerTask {
private final List<RMContainer> containers;

View File

@ -387,13 +387,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
}
}
private void setAllAMContainersOnNode(NodeId nodeId) {
SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId);
for (RMContainer container: node.getCopiedListOfRunningContainers()) {
((RMContainerImpl) container).setAMContainer(true);
}
}
@Test
public void testPreemptionSelectNonAMContainer() throws Exception {
takeAllResources("root.preemptable.child-1");
@ -412,51 +405,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
+ "nodes.", !host0.equals(host1));
}
@Test
public void testRelaxLocalityToNotPreemptAM() throws Exception {
takeAllResources("root.preemptable.child-1");
RMNode node1 = rmNodes.get(0);
setAllAMContainersOnNode(node1.getNodeID());
SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID());
ApplicationAttemptId greedyAppAttemptId =
node.getCopiedListOfRunningContainers().get(0)
.getApplicationAttemptId();
// Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be
// satisfied. This forces the RR that we consider for preemption to be the
// NODE_LOCAL one.
ResourceRequest nodeRequest =
createResourceRequest(GB, node1.getHostName(), 1, 4, true);
ResourceRequest rackRequest =
createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true);
ResourceRequest anyRequest =
createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true);
List<ResourceRequest> resourceRequests =
Arrays.asList(nodeRequest, rackRequest, anyRequest);
ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest(
"root.preemptable.child-2", "default", resourceRequests);
starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId);
// Move clock enough to identify starvation
clock.tickSec(1);
scheduler.update();
// Make sure 4 containers were preempted from the greedy app, but also that
// none were preempted on our all-AM node, even though the NODE_LOCAL RR
// asked for resources on it.
// TODO (YARN-7655) The starved app should be allocated 4 containers.
// It should be possible to modify the RRs such that this is true
// after YARN-7903.
verifyPreemption(0, 4);
for (RMContainer container : node.getCopiedListOfRunningContainers()) {
assert (container.isAMContainer());
assert (container.getApplicationAttemptId().equals(greedyAppAttemptId));
}
}
@Test
public void testAppNotPreemptedBelowFairShare() throws Exception {
takeAllResources("root.preemptable.child-1");
@ -492,4 +440,107 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
preemptHalfResources("root.preemptable.child-2");
verifyPreemption(1, 2);
}
/* It tests the case that there is less-AM-container solution in the
* remaining nodes.
*/
@Test
public void testRelaxLocalityPreemptionWithLessAMInRemainingNodes()
throws Exception {
takeAllResources("root.preemptable.child-1");
RMNode node1 = rmNodes.get(0);
setAllAMContainersOnNode(node1.getNodeID());
ApplicationAttemptId greedyAppAttemptId =
getGreedyAppAttemptIdOnNode(node1.getNodeID());
updateRelaxLocalityRequestSchedule(node1, GB, 4);
verifyRelaxLocalityPreemption(node1.getNodeID(), greedyAppAttemptId, 4);
}
/* It tests the case that there is no less-AM-container solution in the
* remaining nodes.
*/
@Test
public void testRelaxLocalityPreemptionWithNoLessAMInRemainingNodes()
throws Exception {
takeAllResources("root.preemptable.child-1");
RMNode node1 = rmNodes.get(0);
setNumAMContainersOnNode(3, node1.getNodeID());
RMNode node2 = rmNodes.get(1);
setAllAMContainersOnNode(node2.getNodeID());
ApplicationAttemptId greedyAppAttemptId =
getGreedyAppAttemptIdOnNode(node2.getNodeID());
updateRelaxLocalityRequestSchedule(node1, GB * 2, 1);
verifyRelaxLocalityPreemption(node2.getNodeID(), greedyAppAttemptId, 6);
}
private void setAllAMContainersOnNode(NodeId nodeId) {
setNumAMContainersOnNode(Integer.MAX_VALUE, nodeId);
}
private void setNumAMContainersOnNode(int num, NodeId nodeId) {
int count = 0;
SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId);
for (RMContainer container: node.getCopiedListOfRunningContainers()) {
count++;
if (count <= num) {
((RMContainerImpl) container).setAMContainer(true);
} else {
break;
}
}
}
private ApplicationAttemptId getGreedyAppAttemptIdOnNode(NodeId nodeId) {
SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId);
return node.getCopiedListOfRunningContainers().get(0)
.getApplicationAttemptId();
}
/*
* Send the resource requests allowed relax locality to scheduler. The
* params node/nodeMemory/numNodeContainers used for NODE_LOCAL request.
*/
private void updateRelaxLocalityRequestSchedule(RMNode node, int nodeMemory,
int numNodeContainers) {
// Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be
// satisfied. This forces the RR that we consider for preemption to be the
// NODE_LOCAL one.
ResourceRequest nodeRequest = createResourceRequest(nodeMemory,
node.getHostName(), 1, numNodeContainers, true);
ResourceRequest rackRequest =
createResourceRequest(GB * 10, node.getRackName(), 1, 1, true);
ResourceRequest anyRequest =
createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true);
List<ResourceRequest> resourceRequests =
Arrays.asList(nodeRequest, rackRequest, anyRequest);
ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest(
"root.preemptable.child-2", "default", resourceRequests);
starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId);
// Move clock enough to identify starvation
clock.tickSec(1);
scheduler.update();
}
private void verifyRelaxLocalityPreemption(NodeId notBePreemptedNodeId,
ApplicationAttemptId greedyAttemptId, int numGreedyAppContainers)
throws Exception {
// Make sure 4 containers were preempted from the greedy app, but also that
// none were preempted on our all-AM node, even though the NODE_LOCAL RR
// asked for resources on it.
// TODO (YARN-7655) The starved app should be allocated 4 containers.
// It should be possible to modify the RRs such that this is true
// after YARN-7903.
verifyPreemption(0, numGreedyAppContainers);
SchedulerNode node = scheduler.getNodeTracker()
.getNode(notBePreemptedNodeId);
for (RMContainer container : node.getCopiedListOfRunningContainers()) {
assert(container.isAMContainer());
assert(container.getApplicationAttemptId().equals(greedyAttemptId));
}
}
}