From b452ecf4a087879b99565765759014584b13d335 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 5 May 2016 14:40:35 +0000 Subject: [PATCH] YARN-4311. Removing nodes from include and exclude lists will not remove them from decommissioned nodes list. Contributed by Kuhu Shukla (cherry picked from commit ee86cef2fee992fb773e87c895000ffc2ee788b0) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml --- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 9 + .../yarn/sls/scheduler/RMNodeWrapper.java | 9 + .../hadoop/yarn/conf/YarnConfiguration.java | 9 + .../src/main/resources/yarn-default.xml | 13 + .../resourcemanager/NodesListManager.java | 120 ++++- .../server/resourcemanager/RMServerUtils.java | 2 +- .../server/resourcemanager/rmnode/RMNode.java | 4 + .../resourcemanager/rmnode/RMNodeImpl.java | 23 +- .../server/resourcemanager/MockNodes.java | 9 + .../TestResourceTrackerService.java | 483 +++++++++++++++++- .../webapp/TestRMWebServicesNodes.java | 12 +- 11 files changed, 656 insertions(+), 37 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 92d586bfa37..951f5a850df 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -199,6 +199,15 @@ public class NodeInfo { public ResourceUtilization getNodeUtilization() { return null; } + + @Override + public long getUntrackedTimeStamp() { + return 0; + } + + @Override + public void setUntrackedTimeStamp(long timeStamp) { + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 2e9cccb2778..e5013c43d75 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -188,4 +188,13 @@ public class RMNodeWrapper implements RMNode { public ResourceUtilization getNodeUtilization() { return node.getNodeUtilization(); } + + @Override + public long getUntrackedTimeStamp() { + return 0; + } + + @Override + public void setUntrackedTimeStamp(long timeStamp) { + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c509b66a786..f0428574262 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -645,6 +645,15 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION = "NONE"; + /** + * Timeout(msec) for an untracked node to remain in shutdown or decommissioned + * state. + */ + public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = + RM_PREFIX + "node-removal-untracked.timeout-ms"; + public static final int + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000; + /** * RM proxy users' prefix */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e251a12ced0..e0e3dadaa90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2619,4 +2619,17 @@ yarn.node-labels.fs-store.impl.class org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore + + + + The least amount of time(msec.) an inactive (decommissioned or shutdown) node can + stay in the nodes list of the resourcemanager after being declared untracked. + A node is marked untracked if and only if it is absent from both include and + exclude nodemanager lists on the RM. All inactive nodes are checked twice per + timeout interval or every 10 minutes, whichever is lesser, and marked appropriately. + The same is done when refreshNodes command (graceful or otherwise) is invoked. + + yarn.resourcemanager.node-removal-untracked.timeout-ms + 60000 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 40b7731405e..a093540c090 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService implements private String excludesFile; private Resolver resolver; + private Timer removalTimer; + private int nodeRemovalCheckInterval; public NodesListManager(RMContext rmContext) { super(NodesListManager.class.getName()); @@ -104,9 +107,72 @@ public class NodesListManager extends CompositeService implements } catch (IOException ioe) { disableHostsFileReader(ioe); } + + final int nodeRemovalTimeout = + conf.getInt( + YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration. + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); + nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2, + 600000)); + removalTimer = new Timer("Node Removal Timer"); + + removalTimer.schedule(new TimerTask() { + @Override + public void run() { + long now = Time.monotonicNow(); + for (Map.Entry entry : + rmContext.getInactiveRMNodes().entrySet()) { + NodeId nodeId = entry.getKey(); + RMNode rmNode = entry.getValue(); + if (isUntrackedNode(rmNode.getHostName())) { + if (rmNode.getUntrackedTimeStamp() == 0) { + rmNode.setUntrackedTimeStamp(now); + } else + if (now - rmNode.getUntrackedTimeStamp() > + nodeRemovalTimeout) { + RMNode result = rmContext.getInactiveRMNodes().remove(nodeId); + if (result != null) { + decrInactiveNMMetrics(rmNode); + LOG.info("Removed " +result.getState().toString() + " node " + + result.getHostName() + " from inactive nodes list"); + } + } + } else { + rmNode.setUntrackedTimeStamp(0); + } + } + } + }, nodeRemovalCheckInterval, nodeRemovalCheckInterval); + super.serviceInit(conf); } + private void decrInactiveNMMetrics(RMNode rmNode) { + ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); + switch (rmNode.getState()) { + case SHUTDOWN: + clusterMetrics.decrNumShutdownNMs(); + break; + case DECOMMISSIONED: + clusterMetrics.decrDecommisionedNMs(); + break; + case LOST: + clusterMetrics.decrNumLostNMs(); + break; + case REBOOTED: + clusterMetrics.decrNumRebootedNMs(); + break; + default: + LOG.debug("Unexpected node state"); + } + } + + @Override + public void serviceStop() { + removalTimer.cancel(); + } + private void printConfiguredHosts() { if (!LOG.isDebugEnabled()) { return; @@ -130,10 +196,13 @@ public class NodesListManager extends CompositeService implements for (NodeId nodeId: rmContext.getRMNodes().keySet()) { if (!isValidNode(nodeId.getHost())) { + RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + new RMNodeEvent(nodeId, nodeEventType)); } } + updateInactiveNodes(); } private void refreshHostsReader(Configuration yarnConf) throws IOException, @@ -170,6 +239,16 @@ public class NodesListManager extends CompositeService implements } } + @VisibleForTesting + public int getNodeRemovalCheckInterval() { + return nodeRemovalCheckInterval; + } + + @VisibleForTesting + public void setNodeRemovalCheckInterval(int interval) { + this.nodeRemovalCheckInterval = interval; + } + @VisibleForTesting public Resolver getResolver() { return resolver; @@ -373,6 +452,33 @@ public class NodesListManager extends CompositeService implements return hostsReader; } + private void updateInactiveNodes() { + long now = Time.monotonicNow(); + for(Entry entry : + rmContext.getInactiveRMNodes().entrySet()) { + NodeId nodeId = entry.getKey(); + RMNode rmNode = entry.getValue(); + if (isUntrackedNode(nodeId.getHost()) && + rmNode.getUntrackedTimeStamp() == 0) { + rmNode.setUntrackedTimeStamp(now); + } + } + } + + public boolean isUntrackedNode(String hostName) { + boolean untracked; + String ip = resolver.resolve(hostName); + + synchronized (hostsReader) { + Set hostsList = hostsReader.getHosts(); + Set excludeList = hostsReader.getExcludedHosts(); + untracked = !hostsList.isEmpty() && + !hostsList.contains(hostName) && !hostsList.contains(ip) && + !excludeList.contains(hostName) && !excludeList.contains(ip); + } + return untracked; + } + /** * Refresh the nodes gracefully * @@ -383,11 +489,13 @@ public class NodesListManager extends CompositeService implements public void refreshNodesGracefully(Configuration conf) throws IOException, YarnException { refreshHostsReader(conf); - for (Entry entry:rmContext.getRMNodes().entrySet()) { + for (Entry entry : rmContext.getRMNodes().entrySet()) { NodeId nodeId = entry.getKey(); if (!isValidNode(nodeId.getHost())) { + RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION)); + new RMNodeEvent(nodeId, nodeEventType)); } else { // Recommissioning the nodes if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { @@ -396,6 +504,7 @@ public class NodesListManager extends CompositeService implements } } } + updateInactiveNodes(); } /** @@ -419,8 +528,11 @@ public class NodesListManager extends CompositeService implements public void refreshNodesForcefully() { for (Entry entry : rmContext.getRMNodes().entrySet()) { if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { + RMNodeEventType nodeEventType = + isUntrackedNode(entry.getKey().getHost()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION)); + new RMNodeEvent(entry.getKey(), nodeEventType)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index e19d55ee81d..1318d5814be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -87,7 +87,7 @@ public class RMServerUtils { acceptedStates.contains(NodeState.LOST) || acceptedStates.contains(NodeState.REBOOTED)) { for (RMNode rmNode : context.getInactiveRMNodes().values()) { - if (acceptedStates.contains(rmNode.getState())) { + if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) { results.add(rmNode); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index d8df9f16ef8..48df1e836b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -168,4 +168,8 @@ public interface RMNode { NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); + + long getUntrackedTimeStamp(); + + void setUntrackedTimeStamp(long timeStamp); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b02994ace7d..b86dfda0120 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -120,6 +121,7 @@ public class RMNodeImpl implements RMNode, EventHandler { private long lastHealthReportTime; private String nodeManagerVersion; + private long timeStamp; /* Aggregated resource utilization for the containers. */ private ResourceUtilization containersUtilization; /* Resource utilization for the node. */ @@ -259,6 +261,9 @@ public class RMNodeImpl implements RMNode, EventHandler { .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + .addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN, + RMNodeEventType.SHUTDOWN, + new DeactivateNodeTransition(NodeState.SHUTDOWN)) // TODO (in YARN-3223) update resource when container finished. .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, @@ -346,6 +351,7 @@ public class RMNodeImpl implements RMNode, EventHandler { this.healthReport = "Healthy"; this.lastHealthReportTime = System.currentTimeMillis(); this.nodeManagerVersion = nodeManagerVersion; + this.timeStamp = 0; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -997,7 +1003,7 @@ public class RMNodeImpl implements RMNode, EventHandler { } /** - * Put a node in deactivated (decommissioned) status. + * Put a node in deactivated (decommissioned or shutdown) status. * @param rmNode * @param finalState */ @@ -1014,6 +1020,9 @@ public class RMNodeImpl implements RMNode, EventHandler { LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " + finalState); rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode); + if (rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) { + rmNode.setUntrackedTimeStamp(Time.monotonicNow()); + } } /** @@ -1385,4 +1394,14 @@ public class RMNodeImpl implements RMNode, EventHandler { public Resource getOriginalTotalCapability() { return this.originalTotalCapability; } - } + + @Override + public long getUntrackedTimeStamp() { + return this.timeStamp; + } + + @Override + public void setUntrackedTimeStamp(long ts) { + this.timeStamp = ts; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29b834..921b18eee93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -260,6 +260,15 @@ public class MockNodes { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + @Override + public long getUntrackedTimeStamp() { + return 0; + } + + @Override + public void setUntrackedTimeStamp(long timeStamp) { + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4259e6b5148..3871ae48a18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -31,6 +31,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; @@ -48,8 +50,6 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; @@ -141,12 +141,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm.getNodesListManager().refreshNodes(conf); - checkDecommissionedNMCount(rm, ++metricCount); + checkShutdownNMCount(rm, ++metricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert - .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs()); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN @@ -155,7 +155,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() - .getNumDecommisionedNMs()); + .getNumShutdownNMs()); + rm.stop(); } /** @@ -226,7 +227,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { MockNM nm2 = rm.registerNode("host2:5678", 10240); ClusterMetrics metrics = ClusterMetrics.getMetrics(); assert(metrics != null); - int initialMetricCount = metrics.getNumDecommisionedNMs(); + int initialMetricCount = metrics.getNumShutdownNMs(); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( NodeAction.NORMAL, @@ -239,16 +240,16 @@ public class TestResourceTrackerService extends NodeLabelTestBase { conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); - checkDecommissionedNMCount(rm, ++initialMetricCount); + checkShutdownNMCount(rm, ++initialMetricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( - "Node should not have been decomissioned.", + "Node should not have been shutdown.", NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); - nodeHeartbeat = nm2.nodeHeartbeat(true); - Assert.assertEquals("Node should have been decomissioned but is in state" + - nodeHeartbeat.getNodeAction(), - NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); + NodeState nodeState = + rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState(); + Assert.assertEquals("Node should have been shutdown but is in state" + + nodeState, NodeState.SHUTDOWN, nodeState); } /** @@ -509,7 +510,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase { Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId)); Assert .assertFalse( - "Node Labels should not accepted by RM If its configured with Central configuration", + "Node Labels should not accepted by RM If its configured with " + + "Central configuration", response.getAreNodeLabelsAcceptedByRM()); if (rm != null) { rm.stop(); @@ -891,15 +893,15 @@ public class TestResourceTrackerService extends NodeLabelTestBase { // node unhealthy nm1.nodeHeartbeat(false); - checkUnealthyNMCount(rm, nm1, true, 1); + checkUnhealthyNMCount(rm, nm1, true, 1); // node healthy again nm1.nodeHeartbeat(true); - checkUnealthyNMCount(rm, nm1, false, 0); + checkUnhealthyNMCount(rm, nm1, false, 0); } - private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health, - int count) throws Exception { + private void checkUnhealthyNMCount(MockRM rm, MockNM nm1, boolean health, + int count) throws Exception { int waitCount = 0; while((rm.getRMContext().getRMNodes().get(nm1.getNodeId()) @@ -1000,7 +1002,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(false); rm.drainEvents(); - checkUnealthyNMCount(rm, nm2, true, 1); + checkUnhealthyNMCount(rm, nm2, true, 1); final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs(); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); // TODO Metrics incorrect in case of the FifoScheduler @@ -1012,7 +1014,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); rm.drainEvents(); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); - checkUnealthyNMCount(rm, nm2, true, 1); + checkUnhealthyNMCount(rm, nm2, true, 1); // reconnect of unhealthy node nm2 = rm.registerNode("host2:5678", 5120); @@ -1020,7 +1022,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); rm.drainEvents(); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); - checkUnealthyNMCount(rm, nm2, true, 1); + checkUnhealthyNMCount(rm, nm2, true, 1); // unhealthy node changed back to healthy nm2 = rm.registerNode("host2:5678", 5120); @@ -1102,7 +1104,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { // node unhealthy nm1.nodeHeartbeat(false); - checkUnealthyNMCount(rm, nm1, true, 1); + checkUnhealthyNMCount(rm, nm1, true, 1); UnRegisterNodeManagerRequest request = Records .newRecord(UnRegisterNodeManagerRequest.class); request.setNodeId(nm1.getNodeId()); @@ -1117,8 +1119,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm.start(); ResourceTrackerService resourceTrackerService = rm .getResourceTrackerService(); - int shutdownNMsCount = ClusterMetrics.getMetrics() - .getNumShutdownNMs(); int decommisionedNMsCount = ClusterMetrics.getMetrics() .getNumDecommisionedNMs(); @@ -1143,10 +1143,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm.getNodesListManager().refreshNodes(conf); NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true); Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction()); + int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); checkShutdownNMCount(rm, shutdownNMsCount); - checkDecommissionedNMCount(rm, ++decommisionedNMsCount); + checkDecommissionedNMCount(rm, decommisionedNMsCount); request.setNodeId(nm1.getNodeId()); resourceTrackerService.unRegisterNodeManager(request); + shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); checkShutdownNMCount(rm, shutdownNMsCount); checkDecommissionedNMCount(rm, decommisionedNMsCount); @@ -1162,8 +1164,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm.getNodesListManager().refreshNodes(conf); request.setNodeId(nm2.getNodeId()); resourceTrackerService.unRegisterNodeManager(request); - checkShutdownNMCount(rm, shutdownNMsCount); - checkDecommissionedNMCount(rm, ++decommisionedNMsCount); + checkShutdownNMCount(rm, ++shutdownNMsCount); + checkDecommissionedNMCount(rm, decommisionedNMsCount); + rm.stop(); } @Test(timeout = 30000) @@ -1298,6 +1301,434 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm.stop(); } + /** + * Remove a node from all lists and check if its forgotten + */ + @Test + public void testNodeRemovalNormally() throws Exception { + testNodeRemovalUtil(false); + testNodeRemovalUtilLost(false); + testNodeRemovalUtilRebooted(false); + testNodeRemovalUtilUnhealthy(false); + } + + @Test + public void testNodeRemovalGracefully() throws Exception { + testNodeRemovalUtil(true); + testNodeRemovalUtilLost(true); + testNodeRemovalUtilRebooted(true); + testNodeRemovalUtilUnhealthy(true); + } + + public void refreshNodesOption(boolean doGraceful, Configuration conf) + throws Exception { + if (doGraceful) { + rm.getNodesListManager().refreshNodesGracefully(conf); + } else { + rm.getNodesListManager().refreshNodes(conf); + } + } + + public void testNodeRemovalUtil(boolean doGraceful) throws Exception { + Configuration conf = new Configuration(); + int timeoutValue = 500; + File excludeHostFile = new File(TEMP_DIR + File.separator + + "excludeHostFile.txt"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, ""); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, ""); + conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + timeoutValue); + CountDownLatch latch = new CountDownLatch(1); + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + RMContext rmContext = rm.getRMContext(); + refreshNodesOption(doGraceful, conf); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("localhost:4433", 1024); + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + assert (metrics != null); + + //check all 3 nodes joined in as NORMAL + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm3.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + rm.drainEvents(); + Assert.assertEquals("All 3 nodes should be active", + metrics.getNumActiveNMs(), 3); + + //Remove nm2 from include list, should now be shutdown with timer test + String ip = NetUtils.normalizeHostName("localhost"); + writeToHostsFile("host1", ip); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + rm.drainEvents(); + Assert.assertTrue("Node should not be in active node list", + !rmContext.getRMNodes().containsKey(nm2.getNodeId())); + + RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should be in inactive node list", + rmNode.getState(), NodeState.SHUTDOWN); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + Assert.assertEquals("Shutdown nodes should be 1", + metrics.getNumShutdownNMs(), 1); + + int nodeRemovalTimeout = + conf.getInt( + YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration. + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); + int nodeRemovalInterval = + rmContext.getNodesListManager().getNodeRemovalCheckInterval(); + long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout; + latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); + + rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should have been forgotten!", + rmNode, null); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + + //Check node removal and re-addition before timer expires + writeToHostsFile("host1", ip, "host2"); + refreshNodesOption(doGraceful, conf); + nm2 = rm.registerNode("host2:5678", 10240); + rm.drainEvents(); + writeToHostsFile("host1", ip); + refreshNodesOption(doGraceful, conf); + rm.drainEvents(); + rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should be shutdown", + rmNode.getState(), NodeState.SHUTDOWN); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + Assert.assertEquals("Shutdown nodes should be 1", + metrics.getNumShutdownNMs(), 1); + + //add back the node before timer expires + latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS); + writeToHostsFile("host1", ip, "host2"); + refreshNodesOption(doGraceful, conf); + nm2 = rm.registerNode("host2:5678", 10240); + nodeHeartbeat = nm2.nodeHeartbeat(true); + rm.drainEvents(); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + Assert.assertEquals("All 3 nodes should be active", + metrics.getNumActiveNMs(), 3); + + //Decommission this node, check timer doesn't remove it + writeToHostsFile("host1", "host2", ip); + writeToHostsFile(excludeHostFile, "host2"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile + .getAbsolutePath()); + refreshNodesOption(doGraceful, conf); + rm.drainEvents(); + rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + (rmNode.getState() == NodeState.DECOMMISSIONING)); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + Assert.assertEquals("Decommissioned/ing nodes should be 1 now", + metrics.getNumDecommisionedNMs(), 1); + } + latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); + + rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + (rmNode.getState() == NodeState.DECOMMISSIONING)); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + Assert.assertEquals("Decommissioned/ing nodes should be 1 now", + metrics.getNumDecommisionedNMs(), 1); + } + + //Test decommed/ing node that transitions to untracked,timer should remove + writeToHostsFile("host1", ip, "host2"); + writeToHostsFile(excludeHostFile, "host2"); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + //nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); + rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertNotEquals("Timer for this node was not canceled!", + rmNode, null); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + (rmNode.getState() == NodeState.DECOMMISSIONING)); + + writeToHostsFile("host1", ip); + writeToHostsFile(excludeHostFile, ""); + refreshNodesOption(doGraceful, conf); + latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); + rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should have been forgotten!", + rmNode, null); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumDecommisionedNMs(), 0); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + + rm.stop(); + } + + private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception { + Configuration conf = new Configuration(); + conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000); + int timeoutValue = 500; + File excludeHostFile = new File(TEMP_DIR + File.separator + + "excludeHostFile.txt"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + writeToHostsFile(hostFile, "host1", "localhost", "host2"); + writeToHostsFile(excludeHostFile, ""); + conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + timeoutValue); + + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + RMContext rmContext = rm.getRMContext(); + refreshNodesOption(doGraceful, conf); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("localhost:4433", 1024); + ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); + ClusterMetrics metrics = clusterMetrics; + assert (metrics != null); + rm.drainEvents(); + //check all 3 nodes joined in as NORMAL + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm3.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + rm.drainEvents(); + Assert.assertEquals("All 3 nodes should be active", + metrics.getNumActiveNMs(), 3); + int waitCount = 0; + while(waitCount ++<20){ + synchronized (this) { + wait(200); + } + nm3.nodeHeartbeat(true); + nm1.nodeHeartbeat(true); + } + Assert.assertNotEquals("host2 should be a lost NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); + Assert.assertEquals("host2 should be a lost NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), + NodeState.LOST); + Assert.assertEquals("There should be 1 Lost NM!", + clusterMetrics.getNumLostNMs(), 1); + Assert.assertEquals("There should be 2 Active NM!", + clusterMetrics.getNumActiveNMs(), 2); + int nodeRemovalTimeout = + conf.getInt( + YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration. + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); + int nodeRemovalInterval = + rmContext.getNodesListManager().getNodeRemovalCheckInterval(); + long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout; + writeToHostsFile(hostFile, "host1", "localhost"); + writeToHostsFile(excludeHostFile, ""); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + rm.drainEvents(); + waitCount = 0; + while(rmContext.getInactiveRMNodes().get( + nm2.getNodeId()) != null && waitCount++ < 2){ + synchronized (this) { + wait(maxThreadSleeptime); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + } + } + Assert.assertEquals("host2 should have been forgotten!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); + Assert.assertEquals("There should be no Lost NMs!", + clusterMetrics.getNumLostNMs(), 0); + Assert.assertEquals("There should be 2 Active NM!", + clusterMetrics.getNumActiveNMs(), 2); + rm.stop(); + } + + private void testNodeRemovalUtilRebooted(boolean doGraceful) + throws Exception { + Configuration conf = new Configuration(); + int timeoutValue = 500; + File excludeHostFile = new File(TEMP_DIR + File.separator + + "excludeHostFile.txt"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + writeToHostsFile(hostFile, "host1", "localhost", "host2"); + writeToHostsFile(excludeHostFile, ""); + conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + timeoutValue); + + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + RMContext rmContext = rm.getRMContext(); + refreshNodesOption(doGraceful, conf); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("localhost:4433", 1024); + ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); + ClusterMetrics metrics = clusterMetrics; + assert (metrics != null); + NodeHeartbeatResponse nodeHeartbeat = nm2.nodeHeartbeat( + new HashMap>(), true, -100); + rm.drainEvents(); + rm.drainEvents(); + + Assert.assertNotEquals("host2 should be a rebooted NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); + Assert.assertEquals("host2 should be a rebooted NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), + NodeState.REBOOTED); + Assert.assertEquals("There should be 1 Rebooted NM!", + clusterMetrics.getNumRebootedNMs(), 1); + Assert.assertEquals("There should be 2 Active NM!", + clusterMetrics.getNumActiveNMs(), 2); + + int nodeRemovalTimeout = + conf.getInt( + YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration. + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); + int nodeRemovalInterval = + rmContext.getNodesListManager().getNodeRemovalCheckInterval(); + long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout; + writeToHostsFile(hostFile, "host1", "localhost"); + writeToHostsFile(excludeHostFile, ""); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + rm.drainEvents(); + int waitCount = 0; + while(rmContext.getInactiveRMNodes().get( + nm2.getNodeId()) != null && waitCount++ < 2){ + synchronized (this) { + wait(maxThreadSleeptime); + } + } + Assert.assertEquals("host2 should have been forgotten!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); + Assert.assertEquals("There should be no Rebooted NMs!", + clusterMetrics.getNumRebootedNMs(), 0); + Assert.assertEquals("There should be 2 Active NM!", + clusterMetrics.getNumActiveNMs(), 2); + rm.stop(); + } + + private void testNodeRemovalUtilUnhealthy(boolean doGraceful) + throws Exception { + Configuration conf = new Configuration(); + int timeoutValue = 500; + File excludeHostFile = new File(TEMP_DIR + File.separator + + "excludeHostFile.txt"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + writeToHostsFile(hostFile, "host1", "localhost", "host2"); + writeToHostsFile(excludeHostFile, ""); + conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + timeoutValue); + + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + RMContext rmContext = rm.getRMContext(); + refreshNodesOption(doGraceful, conf); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("localhost:4433", 1024); + ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); + ClusterMetrics metrics = clusterMetrics; + assert (metrics != null); + rm.drainEvents(); + //check all 3 nodes joined in as NORMAL + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm3.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + rm.drainEvents(); + Assert.assertEquals("All 3 nodes should be active", + metrics.getNumActiveNMs(), 3); + // node healthy + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(false); + nm3.nodeHeartbeat(true); + checkUnhealthyNMCount(rm, nm2, true, 1); + writeToHostsFile(hostFile, "host1", "localhost"); + writeToHostsFile(excludeHostFile, ""); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(false); + nm3.nodeHeartbeat(true); + rm.drainEvents(); + Assert.assertNotEquals("host2 should be a shutdown NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); + Assert.assertEquals("host2 should be a shutdown NM!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(), + NodeState.SHUTDOWN); + Assert.assertEquals("There should be 2 Active NM!", + clusterMetrics.getNumActiveNMs(), 2); + Assert.assertEquals("There should be 1 Shutdown NM!", + clusterMetrics.getNumShutdownNMs(), 1); + Assert.assertEquals("There should be 0 Unhealthy NM!", + clusterMetrics.getUnhealthyNMs(), 0); + int nodeRemovalTimeout = + conf.getInt( + YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration. + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); + int nodeRemovalInterval = + rmContext.getNodesListManager().getNodeRemovalCheckInterval(); + long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout; + int waitCount = 0; + while(rmContext.getInactiveRMNodes().get( + nm2.getNodeId()) != null && waitCount++ < 2){ + synchronized (this) { + wait(maxThreadSleeptime); + } + } + Assert.assertEquals("host2 should have been forgotten!", + rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); + Assert.assertEquals("There should be no Shutdown NMs!", + clusterMetrics.getNumRebootedNMs(), 0); + Assert.assertEquals("There should be 2 Active NM!", + clusterMetrics.getNumActiveNMs(), 2); + rm.stop(); + } + private void writeToHostsFile(String... hosts) throws IOException { writeToHostsFile(hostFile, hosts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index 3fd1fd5f6f9..4b6ca1244e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -272,8 +272,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase { RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId); WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", info.getString("nodeHTTPAddress")); - WebServicesTestUtils.checkStringMatch("state", rmNode.getState() - .toString(), info.getString("state")); + if (rmNode != null) { + WebServicesTestUtils.checkStringMatch("state", + rmNode.getState().toString(), info.getString("state")); + } } } @@ -304,8 +306,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase { RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId); WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", info.getString("nodeHTTPAddress")); - WebServicesTestUtils.checkStringMatch("state", - rmNode.getState().toString(), info.getString("state")); + if (rmNode != null) { + WebServicesTestUtils.checkStringMatch("state", + rmNode.getState().toString(), info.getString("state")); + } } @Test