YARN-10873: Account for scheduled AM containers before deactivating node (#3287)

* Account for scheduled AM containers before deactivating node

* Move AM container check to separate method.

* Fix UTs

* Fix UTs

* Remove unnecessary import

* Add timeout for UT
This commit is contained in:
srinivasst 2021-08-17 14:18:55 +05:30 committed by GitHub
parent 53a2c65694
commit 4f3f26ce09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 2 deletions

View File

@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.collections.keyvalue.DefaultMapEntry;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -1354,8 +1355,23 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
initialState.equals(NodeState.DECOMMISSIONING);
if (isNodeDecommissioning) {
List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
// hasScheduledAMContainers solves the following race condition -
// 1. launch AM container on a node with 0 containers.
// 2. gracefully decommission this node.
// 3. Node heartbeats to RM. In StatusUpdateWhenHealthyTransition,
// rmNode.runningApplications will be empty as it is updated after
// call to RMNodeImpl.deactivateNode. This will cause the node to be
// deactivated even though container is running on it and hence kill
// all containers running on it.
// In order to avoid such race conditions the ground truth is retrieved
// from the scheduler before deactivating a DECOMMISSIONING node.
// Only AM containers are considered as AM container reattempts can
// cause application failures if max attempts is set to 1.
if (rmNode.runningApplications.isEmpty() &&
(keepAliveApps == null || keepAliveApps.isEmpty())) {
(keepAliveApps == null || keepAliveApps.isEmpty()) &&
!hasScheduledAMContainers(rmNode)) {
LOG.info("No containers running on " + rmNode.nodeId + ". "
+ "Attempting to deactivate decommissioning node.");
RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
return NodeState.DECOMMISSIONED;
}
@ -1401,6 +1417,17 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
return initialState;
}
/**
* Checks if the scheduler has scheduled any AMs on the given node.
* @return true if node has any AM scheduled on it.
*/
private boolean hasScheduledAMContainers(RMNodeImpl rmNode) {
return rmNode.context.getScheduler()
.getSchedulerNode(rmNode.getNodeID())
.getCopiedListOfRunningContainers()
.stream().anyMatch(RMContainer::isAMContainer);
}
}
public static class StatusUpdateWhenUnHealthyTransition implements

View File

@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@ -81,6 +83,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -125,7 +128,7 @@ public class TestRMNodeTransitions {
rmContext =
new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class),
null, null, mock(DelegationTokenRenewer.class), null, null, null,
null, null);
null, getMockResourceScheduler());
NodesListManager nodesListManager = mock(NodesListManager.class);
HostsFileReader reader = mock(HostsFileReader.class);
when(nodesListManager.getHostsReader()).thenReturn(reader);
@ -193,6 +196,16 @@ public class TestRMNodeTransitions {
return event;
}
private ResourceScheduler getMockResourceScheduler() {
ResourceScheduler resourceScheduler = mock(ResourceScheduler.class);
SchedulerNode schedulerNode = mock(SchedulerNode.class);
when(schedulerNode.getCopiedListOfRunningContainers())
.thenReturn(Collections.emptyList());
when(resourceScheduler.getSchedulerNode(ArgumentMatchers.any()))
.thenReturn(schedulerNode);
return resourceScheduler;
}
private List<ApplicationId> getAppIdList() {
List<ApplicationId> appIdList = new ArrayList<ApplicationId>();
appIdList.add(BuilderUtils.newApplicationId(0, 0));

View File

@ -463,6 +463,64 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.waitForState(id1, NodeState.DECOMMISSIONED);
}
/**
* Test graceful decommission of node when an AM container is scheduled on a
* node just before it is gracefully decommissioned.
*/
@Test (timeout = 60000)
public void testGracefulDecommissionAfterAMContainerAlloc() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
writeToHostsFile("");
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 10240);
MockNM nm2 = rm.registerNode("host2:5678", 20480);
MockNM nm3 = rm.registerNode("host3:4433", 10240);
NodeId id1 = nm1.getNodeId();
NodeId id2 = nm2.getNodeId();
NodeId id3 = nm3.getNodeId();
rm.waitForState(id1, NodeState.RUNNING);
rm.waitForState(id2, NodeState.RUNNING);
rm.waitForState(id3, NodeState.RUNNING);
// Create an app and schedule AM on host1.
RMApp app = MockRMAppSubmitter.submitWithMemory(2000, rm);
MockAM am = MockRM.launchAM(app, rm, nm1);
// Before sending heartbeat we gracefully decommission the node on which AM
// is scheduled to simulate race condition.
writeToHostsFile("host1", "host3");
rm.getNodesListManager().refreshNodes(conf, true);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
rm.waitForState(id3, NodeState.DECOMMISSIONING);
// Heartbeat after the node is in DECOMMISSIONING state. This will be the
// first heartbeat containing information about the AM container since the
// application was submitted.
ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
nm1.nodeHeartbeat(aaid, 1, ContainerState.RUNNING);
nm3.nodeHeartbeat(true);
// host1 should stay in DECOMMISSIONING as it has container running on it.
rm.waitForState(id1, NodeState.DECOMMISSIONING);
rm.waitForState(id3, NodeState.DECOMMISSIONED);
// Go through the normal application flow and wait for it to finish.
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
nm1.nodeHeartbeat(aaid, 1, ContainerState.COMPLETE);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
rm.waitForState(id1, NodeState.DECOMMISSIONED);
}
/**
* Decommissioning using a post-configured include hosts file
*/