YARN-7655. Avoid AM preemption caused by RRs for specific nodes or racks. Contributed by Steven Rand.

This commit is contained in:
Yufei Gu 2018-02-08 12:32:43 -08:00
parent eb2449d539
commit 1bc03ddf97
2 changed files with 95 additions and 20 deletions

View File

@ -99,7 +99,10 @@ class FSPreemptionThread extends Thread {
* starvation. * starvation.
* 2. For each {@link ResourceRequest}, iterate through matching * 2. For each {@link ResourceRequest}, iterate through matching
* nodes and identify containers to preempt all on one node, also * nodes and identify containers to preempt all on one node, also
* optimizing for least number of AM container preemptions. * optimizing for least number of AM container preemptions. Only nodes
* that match the locality level specified in the {@link ResourceRequest}
* are considered. However, if this would lead to AM preemption, and locality
* relaxation is allowed, then the search space is expanded to all nodes.
* *
* @param starvedApp starved application for which we are identifying * @param starvedApp starved application for which we are identifying
* preemption targets * preemption targets
@ -111,27 +114,21 @@ class FSPreemptionThread extends Thread {
// Iterate through enough RRs to address app's starvation // Iterate through enough RRs to address app's starvation
for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) { for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) {
List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
.getNodesByResourceName(rr.getResourceName());
for (int i = 0; i < rr.getNumContainers(); i++) { for (int i = 0; i < rr.getNumContainers(); i++) {
PreemptableContainers bestContainers = null; PreemptableContainers bestContainers =
List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker() identifyContainersToPreemptForOneContainer(potentialNodes, rr);
.getNodesByResourceName(rr.getResourceName());
int maxAMContainers = Integer.MAX_VALUE;
for (FSSchedulerNode node : potentialNodes) { // Don't preempt AM containers just to satisfy local requests if relax
PreemptableContainers preemptableContainers = // locality is enabled.
identifyContainersToPreemptOnNode( if (bestContainers != null
rr.getCapability(), node, maxAMContainers); && bestContainers.numAMContainers > 0
&& !ResourceRequest.isAnyLocation(rr.getResourceName())
if (preemptableContainers != null) { && rr.getRelaxLocality()) {
// This set is better than any previously identified set. bestContainers = identifyContainersToPreemptForOneContainer(
bestContainers = preemptableContainers; scheduler.getNodeTracker().getAllNodes(), rr);
maxAMContainers = bestContainers.numAMContainers; }
if (maxAMContainers == 0) {
break;
}
}
} // End of iteration through nodes for one RR
if (bestContainers != null) { if (bestContainers != null) {
List<RMContainer> containers = bestContainers.getAllContainers(); List<RMContainer> containers = bestContainers.getAllContainers();
@ -154,6 +151,29 @@ class FSPreemptionThread extends Thread {
return containersToPreempt; return containersToPreempt;
} }
private PreemptableContainers identifyContainersToPreemptForOneContainer(
List<FSSchedulerNode> potentialNodes, ResourceRequest rr) {
PreemptableContainers bestContainers = null;
int maxAMContainers = Integer.MAX_VALUE;
for (FSSchedulerNode node : potentialNodes) {
PreemptableContainers preemptableContainers =
identifyContainersToPreemptOnNode(
rr.getCapability(), node, maxAMContainers);
if (preemptableContainers != null) {
// This set is better than any previously identified set.
bestContainers = preemptableContainers;
maxAMContainers = bestContainers.numAMContainers;
if (maxAMContainers == 0) {
break;
}
}
}
return bestContainers;
}
/** /**
* Identify containers to preempt on a given node. Try to find a list with * Identify containers to preempt on a given node. Try to find a list with
* least AM containers to avoid preempting AM containers. This method returns * least AM containers to avoid preempting AM containers. This method returns

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@ -384,6 +387,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
} }
} }
private void setAllAMContainersOnNode(NodeId nodeId) {
SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId);
for (RMContainer container: node.getCopiedListOfRunningContainers()) {
((RMContainerImpl) container).setAMContainer(true);
}
}
@Test @Test
public void testPreemptionSelectNonAMContainer() throws Exception { public void testPreemptionSelectNonAMContainer() throws Exception {
takeAllResources("root.preemptable.child-1"); takeAllResources("root.preemptable.child-1");
@ -402,6 +412,51 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
+ "nodes.", !host0.equals(host1)); + "nodes.", !host0.equals(host1));
} }
@Test
public void testRelaxLocalityToNotPreemptAM() throws Exception {
takeAllResources("root.preemptable.child-1");
RMNode node1 = rmNodes.get(0);
setAllAMContainersOnNode(node1.getNodeID());
SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID());
ApplicationAttemptId greedyAppAttemptId =
node.getCopiedListOfRunningContainers().get(0)
.getApplicationAttemptId();
// Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be
// satisfied. This forces the RR that we consider for preemption to be the
// NODE_LOCAL one.
ResourceRequest nodeRequest =
createResourceRequest(GB, node1.getHostName(), 1, 4, true);
ResourceRequest rackRequest =
createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true);
ResourceRequest anyRequest =
createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true);
List<ResourceRequest> resourceRequests =
Arrays.asList(nodeRequest, rackRequest, anyRequest);
ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest(
"root.preemptable.child-2", "default", resourceRequests);
starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId);
// Move clock enough to identify starvation
clock.tickSec(1);
scheduler.update();
// Make sure 4 containers were preempted from the greedy app, but also that
// none were preempted on our all-AM node, even though the NODE_LOCAL RR
// asked for resources on it.
// TODO (YARN-7655) The starved app should be allocated 4 containers.
// It should be possible to modify the RRs such that this is true
// after YARN-7903.
verifyPreemption(0, 4);
for (RMContainer container : node.getCopiedListOfRunningContainers()) {
assert (container.isAMContainer());
assert (container.getApplicationAttemptId().equals(greedyAppAttemptId));
}
}
@Test @Test
public void testAppNotPreemptedBelowFairShare() throws Exception { public void testAppNotPreemptedBelowFairShare() throws Exception {
takeAllResources("root.preemptable.child-1"); takeAllResources("root.preemptable.child-1");