YARN-4284. condition for AM blacklisting is too narrow. Contributed by Sangjin Lee
(cherry picked from commit 33a03af3c3
)
This commit is contained in:
parent
aeceb2eef6
commit
7d66e2e666
|
@ -956,6 +956,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4223. Fixed findbugs warnings in hadoop-yarn-server-nodemanager project
|
||||
(varun saxena via rohithsharmaks)
|
||||
|
||||
YARN-4284. condition for AM blacklisting is too narrow (Sangjin Lee via
|
||||
jlowe)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1417,7 +1417,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
|
||||
private boolean shouldCountTowardsNodeBlacklisting(int exitStatus) {
|
||||
return exitStatus == ContainerExitStatus.DISKS_FAILED;
|
||||
return !(exitStatus == ContainerExitStatus.SUCCESS
|
||||
|| exitStatus == ContainerExitStatus.PREEMPTED);
|
||||
}
|
||||
|
||||
private static final class UnmanagedAMAttemptSavedTransition
|
||||
|
|
|
@ -383,7 +383,7 @@ public class TestAMRestart {
|
|||
public void testAMBlacklistPreventsRestartOnSameNode() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
testAMBlacklistPreventRestartOnSameNode(conf);
|
||||
testAMBlacklistPreventRestartOnSameNode(false, conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
|
@ -393,11 +393,28 @@ public class TestAMRestart {
|
|||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
|
||||
true);
|
||||
testAMBlacklistPreventRestartOnSameNode(conf);
|
||||
testAMBlacklistPreventRestartOnSameNode(false, conf);
|
||||
}
|
||||
|
||||
private void testAMBlacklistPreventRestartOnSameNode(YarnConfiguration conf)
|
||||
throws Exception{
|
||||
@Test(timeout = 100000)
|
||||
public void testAMBlacklistPreemption() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
// disable the float so it is possible to blacklist the entire cluster
|
||||
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 1.5f);
|
||||
// since the exit status is PREEMPTED, it should not lead to the node being
|
||||
// blacklisted
|
||||
testAMBlacklistPreventRestartOnSameNode(true, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests AM blacklisting. In the multi-node mode (i.e. singleNode = false),
|
||||
* it tests the blacklisting behavior so that the AM container gets allocated
|
||||
* on the node that is not blacklisted. In the single-node mode, it tests the
|
||||
* PREEMPTED status to see if the AM container can continue to be scheduled.
|
||||
*/
|
||||
private void testAMBlacklistPreventRestartOnSameNode(boolean singleNode,
|
||||
YarnConfiguration conf) throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
|
@ -424,9 +441,12 @@ public class TestAMRestart {
|
|||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
MockNM nm2 =
|
||||
new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
MockNM nm2 = null;
|
||||
if (!singleNode) {
|
||||
nm2 =
|
||||
new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
}
|
||||
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
|
||||
|
@ -440,60 +460,84 @@ public class TestAMRestart {
|
|||
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
|
||||
MockNM currentNode, otherNode;
|
||||
if (nodeWhereAMRan == nm1.getNodeId()) {
|
||||
if (singleNode) {
|
||||
Assert.assertEquals(nm1.getNodeId(), nodeWhereAMRan);
|
||||
currentNode = nm1;
|
||||
otherNode = nm2;
|
||||
otherNode = null; // not applicable
|
||||
} else {
|
||||
currentNode = nm2;
|
||||
otherNode = nm1;
|
||||
if (nodeWhereAMRan == nm1.getNodeId()) {
|
||||
currentNode = nm1;
|
||||
otherNode = nm2;
|
||||
} else {
|
||||
currentNode = nm2;
|
||||
otherNode = nm1;
|
||||
}
|
||||
}
|
||||
|
||||
// set the exist status to test
|
||||
// any status other than SUCCESS and PREEMPTED should cause the node to be
|
||||
// blacklisted
|
||||
int exitStatus = singleNode ?
|
||||
ContainerExitStatus.PREEMPTED :
|
||||
ContainerExitStatus.INVALID;
|
||||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
|
||||
"", ContainerExitStatus.DISKS_FAILED, Resources.createResource(200));
|
||||
"", exitStatus, Resources.createResource(200));
|
||||
currentNode.containerStatus(containerStatus);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// restart the am
|
||||
RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1);
|
||||
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app1, rm1);
|
||||
System.out.println("Launch AM " + attempt.getAppAttemptId());
|
||||
|
||||
|
||||
|
||||
currentNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
Assert.assertEquals(
|
||||
"AppAttemptState should still be SCHEDULED if currentNode is " +
|
||||
"blacklisted correctly",
|
||||
RMAppAttemptState.SCHEDULED,
|
||||
attempt.getAppAttemptState());
|
||||
|
||||
otherNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
if (!singleNode) {
|
||||
Assert.assertEquals(
|
||||
"AppAttemptState should still be SCHEDULED if currentNode is " +
|
||||
"blacklisted correctly",
|
||||
RMAppAttemptState.SCHEDULED,
|
||||
attempt.getAppAttemptState());
|
||||
|
||||
otherNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
}
|
||||
|
||||
MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId());
|
||||
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
|
||||
amContainer =
|
||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||
rmContainer = scheduler.getRMContainer(amContainer);
|
||||
nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
Assert.assertEquals(
|
||||
"After blacklisting AM should have run on the other node",
|
||||
otherNode.getNodeId(), nodeWhereAMRan);
|
||||
if (singleNode) {
|
||||
// with preemption, the node should not be blacklisted and should get the
|
||||
// assignment (with a single node)
|
||||
Assert.assertEquals(
|
||||
"AM should still have been able to run on the same node",
|
||||
currentNode.getNodeId(), nodeWhereAMRan);
|
||||
} else {
|
||||
// with a failed status, the other node should receive the assignment
|
||||
Assert.assertEquals(
|
||||
"After blacklisting AM should have run on the other node",
|
||||
otherNode.getNodeId(), nodeWhereAMRan);
|
||||
|
||||
am2.registerAppAttempt();
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||
am2.registerAppAttempt();
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
List<Container> allocatedContainers =
|
||||
allocateContainers(currentNode, am2, 1);
|
||||
Assert.assertEquals(
|
||||
"Even though AM is blacklisted from the node, application can still " +
|
||||
"allocate containers there",
|
||||
currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
||||
List<Container> allocatedContainers =
|
||||
allocateContainers(currentNode, am2, 1);
|
||||
Assert.assertEquals(
|
||||
"Even though AM is blacklisted from the node, application can " +
|
||||
"still allocate containers there",
|
||||
currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// AM container preempted, nm disk failure
|
||||
// should not be counted towards AM max retry count.
|
||||
@Test(timeout = 100000)
|
||||
|
|
Loading…
Reference in New Issue