From f9016dfec33f1d6486c03a54f0a479ed08aff34f Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Tue, 6 Sep 2016 16:23:06 -0700 Subject: [PATCH] YARN-5566. Client-side NM graceful decom is not triggered when jobs finish. (Robert Kanter via kasha) --- .../resourcemanager/rmnode/RMNodeImpl.java | 31 ++--- .../TestRMNodeTransitions.java | 29 +++-- .../TestResourceTrackerService.java | 112 ++++++++++++++++++ 3 files changed, 143 insertions(+), 29 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b716247be35..830a6a906f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1133,12 +1133,21 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { NodeState initialState = rmNode.getState(); boolean isNodeDecommissioning = initialState.equals(NodeState.DECOMMISSIONING); + if (isNodeDecommissioning) { + List keepAliveApps = statusEvent.getKeepAliveAppIds(); + if (rmNode.runningApplications.isEmpty() && + (keepAliveApps == null || keepAliveApps.isEmpty())) { + RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); + return NodeState.DECOMMISSIONED; + } + } + if (!remoteNodeHealthStatus.getIsNodeHealthy()) { LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " + remoteNodeHealthStatus.getHealthReport()); // if a node in decommissioning receives an unhealthy report, - // it will keep decommissioning. + // it will stay in decommissioning. if (isNodeDecommissioning) { return NodeState.DECOMMISSIONING; } else { @@ -1146,24 +1155,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return NodeState.UNHEALTHY; } } - if (isNodeDecommissioning) { - List runningApps = rmNode.getRunningApps(); - - List keepAliveApps = statusEvent.getKeepAliveAppIds(); - - // no running (and keeping alive) app on this node, get it - // decommissioned. - // TODO may need to check no container is being scheduled on this node - // as well. - if ((runningApps == null || runningApps.size() == 0) - && (keepAliveApps == null || keepAliveApps.size() == 0)) { - RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); - return NodeState.DECOMMISSIONED; - } - - // TODO (in YARN-3223) if node in decommissioning, get node resource - // updated if container get finished (keep available resource to be 0) - } rmNode.handleContainerStatus(statusEvent.getContainers()); rmNode.handleReportedIncreasedContainers( @@ -1337,7 +1328,7 @@ private void handleContainerStatus(List containerStatuses) { + " is the first container get launched for application " + containerAppId); } - runningApplications.add(containerAppId); + handleRunningAppOnNode(this, context, containerAppId, nodeId); } // Process running containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 83a7c73cfbf..5a462ea6fc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -73,6 +74,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -290,16 +292,17 @@ public void testContainerUpdate() throws InterruptedException{ NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); node2.handle(new RMNodeStartedEvent(null, null, null)); - + + ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); + ApplicationId app1 = BuilderUtils.newApplicationId(1, 1); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(0, 0), 0), 0); + BuilderUtils.newApplicationAttemptId(app0, 0), 0); ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(1, 1), 1), 1); + BuilderUtils.newApplicationAttemptId(app1, 1), 1); ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(1, 1), 1), 2); + BuilderUtils.newApplicationAttemptId(app1, 1), 2); + rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class)); + rmContext.getRMApps().put(app1, Mockito.mock(RMApp.class)); RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null); RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null); @@ -663,6 +666,7 @@ public void testUpdateHeartbeatResponseForAppLifeCycle() { NodeId nodeId = node.getNodeID(); ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1); + rmContext.getRMApps().put(runningAppId, Mockito.mock(RMApp.class)); // Create a running container ContainerId runningContainerId = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -930,16 +934,22 @@ public void testResourceUpdateOnRebootedNode() { } // Test unhealthy report on a decommissioning node will make it - // keep decommissioning. + // keep decommissioning as long as there's a running or keep alive app. + // Otherwise, it will go to decommissioned @Test public void testDecommissioningUnhealthy() { RMNodeImpl node = getDecommissioningNode(); NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); + List keepAliveApps = new ArrayList<>(); + keepAliveApps.add(BuilderUtils.newApplicationId(1, 1)); NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, - new ArrayList(), null, status, null, null, null); + null, keepAliveApps, status, null, null, null); node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + nodeStatus.setKeepAliveApplications(null); + node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); + Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); } @Test @@ -962,6 +972,7 @@ public void testContainerExpire() throws Exception { ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + rmContext.getRMApps().put(appId, Mockito.mock(RMApp.class)); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L); ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L); AllocationExpirationInfo expirationInfo1 = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 96efcfda5f5..f3f86443cec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -215,6 +216,117 @@ public void testDecommissionWithExcludeHosts() throws Exception { checkDecommissionedNMCount(rm, metricCount + 1); } + /** + * Graceful decommission node with no running application. + */ + @Test + public void testGracefulDecommissionNoApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("host3:4433", 5120); + + int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); + + rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); + rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); + + // Graceful decommission both host2 and host3. + writeToHostsFile("host2", "host3"); + rm.getNodesListManager().refreshNodesGracefully(conf); + + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED); + nodeHeartbeat3 = nm3.nodeHeartbeat(true); + rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED); + + checkDecommissionedNMCount(rm, metricCount + 2); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); + nodeHeartbeat3 = nm3.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); + } + + /** + * Graceful decommission node with running application. + */ + @Test + public void testGracefulDecommissionWithApp() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + MockNM nm2 = rm.registerNode("host2:5678", 20480); + MockNM nm3 = rm.registerNode("host3:4433", 10240); + NodeId id1 = nm1.getNodeId(); + NodeId id3 = nm3.getNodeId(); + rm.waitForState(id1, NodeState.RUNNING); + rm.waitForState(id3, NodeState.RUNNING); + + // Create an app and launch two containers on host1. + RMApp app = rm.submitApp(2000); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); + + // Graceful decommission host1 and host3 + writeToHostsFile("host1", "host3"); + rm.getNodesListManager().refreshNodesGracefully(conf); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + rm.waitForState(id3, NodeState.DECOMMISSIONING); + + // host1 should be DECOMMISSIONING due to running containers. + // host3 should become DECOMMISSIONED. + nm1.nodeHeartbeat(true); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + nm3.nodeHeartbeat(true); + rm.waitForState(id3, NodeState.DECOMMISSIONED); + nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING); + + // Complete containers on host1. + // Since the app is still RUNNING, expect NodeAction.NORMAL. + NodeHeartbeatResponse nodeHeartbeat1 = + nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction()); + + // Finish the app and verified DECOMMISSIONED. + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction()); + rm.waitForState(id1, NodeState.DECOMMISSIONED); + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction()); + } + /** * Decommissioning using a post-configured include hosts file */