diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d1ccecb34d8..375b4cf4101 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1170,12 +1170,21 @@ public class RMNodeImpl implements RMNode, EventHandler { NodeState initialState = rmNode.getState(); boolean isNodeDecommissioning = initialState.equals(NodeState.DECOMMISSIONING); + if (isNodeDecommissioning) { + List keepAliveApps = statusEvent.getKeepAliveAppIds(); + if (rmNode.runningApplications.isEmpty() && + (keepAliveApps == null || keepAliveApps.isEmpty())) { + RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); + return NodeState.DECOMMISSIONED; + } + } + if (!remoteNodeHealthStatus.getIsNodeHealthy()) { LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " + remoteNodeHealthStatus.getHealthReport()); // if a node in decommissioning receives an unhealthy report, - // it will keep decommissioning. + // it will stay in decommissioning. if (isNodeDecommissioning) { return NodeState.DECOMMISSIONING; } else { @@ -1349,7 +1358,7 @@ public class RMNodeImpl implements RMNode, EventHandler { + " is the first container get launched for application " + containerAppId); } - runningApplications.add(containerAppId); + handleRunningAppOnNode(this, context, containerAppId, nodeId); } // Process running containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index e82b93cc3a5..6038b317fd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -73,6 +74,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -279,16 +281,17 @@ public class TestRMNodeTransitions { NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); node2.handle(new RMNodeStartedEvent(null, null, null)); - + + ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); + ApplicationId app1 = BuilderUtils.newApplicationId(1, 1); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(0, 0), 0), 0); + BuilderUtils.newApplicationAttemptId(app0, 0), 0); ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(1, 1), 1), 1); + BuilderUtils.newApplicationAttemptId(app1, 1), 1); ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(1, 1), 1), 2); + BuilderUtils.newApplicationAttemptId(app1, 1), 2); + rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class)); + rmContext.getRMApps().put(app1, Mockito.mock(RMApp.class)); RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null); RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null); @@ -652,6 +655,7 @@ public class TestRMNodeTransitions { NodeId nodeId = node.getNodeID(); ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1); + rmContext.getRMApps().put(runningAppId, Mockito.mock(RMApp.class)); // Create a running container ContainerId runningContainerId = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -919,16 +923,22 @@ public class TestRMNodeTransitions { } // Test unhealthy report on a decommissioning node will make it - // keep decommissioning. + // keep decommissioning as long as there's a running or keep alive app. + // Otherwise, it will go to decommissioned @Test public void testDecommissioningUnhealthy() { RMNodeImpl node = getDecommissioningNode(); NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); + List keepAliveApps = new ArrayList<>(); + keepAliveApps.add(BuilderUtils.newApplicationId(1, 1)); NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, - new ArrayList(), null, status, null, null, null); + null, keepAliveApps, status, null, null, null); node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + nodeStatus.setKeepAliveApplications(null); + node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); + Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); } @Test @@ -951,6 +961,7 @@ public class TestRMNodeTransitions { ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + rmContext.getRMApps().put(appId, Mockito.mock(RMApp.class)); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L); ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L); AllocationExpirationInfo expirationInfo1 = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index c2a20a1af2d..916c9422c0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -299,6 +299,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase { RMApp app = rm.submitApp(2000); MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); // Graceful decommission host1 and host3 writeToHostsFile("host1", "host3"); @@ -308,7 +310,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { // host1 should be DECOMMISSIONING due to running containers. // host3 should become DECOMMISSIONED. - nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm1.nodeHeartbeat(true); nm3.nodeHeartbeat(true); rm.waitForState(id1, NodeState.DECOMMISSIONING); rm.waitForState(id3, NodeState.DECOMMISSIONED);