diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3c6e715dc21..37404342bf9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -20,6 +20,9 @@ Release 2.7.0 - UNRELEASED YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run with both CS and FS. (Wei Yan and kasha via kasha) + YARN-2641. Decommission nodes on -refreshNodes instead of next + NM-RM heartbeat. (Zhihai Xu via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 90d7b51bc62..786bf8c7866 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.HostsFileReader; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import com.google.common.annotations.VisibleForTesting; @@ -123,6 +126,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException, .getConfigurationInputStream(this.conf, excludesFile)); printConfiguredHosts(); } + + for (NodeId nodeId: rmContext.getRMNodes().keySet()) { + if (!isValidNode(nodeId.getHost())) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } } private void setDecomissionedNMsMetrics() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 42228882721..f5583bcfcc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -349,15 +349,25 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeStatus remoteNodeStatus = request.getNodeStatus(); /** * Here is the node heartbeat sequence... - * 1. Check if it's a registered node - * 2. Check if it's a valid (i.e. not excluded) node - * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat + * 1. Check if it's a valid (i.e. not excluded) node + * 2. Check if it's a registered node + * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat * 4. Send healthStatus to RMNode */ NodeId nodeId = remoteNodeStatus.getNodeId(); - // 1. Check if it's a registered node + // 1. Check if it's a valid (i.e. not excluded) node + if (!this.nodesListManager.isValidNode(nodeId.getHost())) { + String message = + "Disallowed NodeManager nodeId: " + nodeId + " hostname: " + + nodeId.getHost(); + LOG.info(message); + shutDown.setDiagnosticsMessage(message); + return shutDown; + } + + // 2. Check if it's a registered node RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); if (rmNode == null) { /* node does not exist */ @@ -370,18 +380,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); - // 2. Check if it's a valid (i.e. not excluded) node - if (!this.nodesListManager.isValidNode(rmNode.getHostName())) { - String message = - "Disallowed NodeManager nodeId: " + nodeId + " hostname: " - + rmNode.getNodeAddress(); - LOG.info(message); - shutDown.setDiagnosticsMessage(message); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); - return shutDown; - } - // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 077f4647c15..28d1d6383d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -130,17 +130,17 @@ public void testDecommissionWithIncludeHosts() throws Exception { rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, ++metricCount); + nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert - .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN .equals(nodeHeartbeat.getNodeAction())); - checkDecommissionedNMCount(rm, ++metricCount); - nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() @@ -185,6 +185,8 @@ protected Dispatcher createDispatcher() { rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, metricCount + 2); + nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat(true); @@ -195,7 +197,7 @@ protected Dispatcher createDispatcher() { Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); dispatcher.await(); - checkDecommissionedNMCount(rm, metricCount + 2); + writeToHostsFile(""); rm.getNodesListManager().refreshNodes(conf); @@ -234,6 +236,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception { conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, ++initialMetricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( "Node should not have been decomissioned.", @@ -243,7 +246,6 @@ public void testAddNewIncludePathToConfiguration() throws Exception { Assert.assertEquals("Node should have been decomissioned but is in state" + nodeHeartbeat.getNodeAction(), NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); - checkDecommissionedNMCount(rm, ++initialMetricCount); } /** @@ -271,6 +273,7 @@ public void testAddNewExcludePathToConfiguration() throws Exception { conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, ++initialMetricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( "Node should not have been decomissioned.", @@ -280,7 +283,6 @@ public void testAddNewExcludePathToConfiguration() throws Exception { Assert.assertEquals("Node should have been decomissioned but is in state" + nodeHeartbeat.getNodeAction(), NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); - checkDecommissionedNMCount(rm, ++initialMetricCount); } @Test