diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 59ff9b890e2..0f0cdad6117 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -81,6 +81,9 @@ Release 2.7.3 - UNRELEASED YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is not available (Chang Li via jlowe) + YARN-3102. Decommisioned Nodes not listed in Web UI (Kuhu Shukla via + jlowe) + Release 2.7.2 - 2016-01-25 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index dd504018aa1..b153b0fab43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.Node; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.NodeId; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent. import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import com.google.common.annotations.VisibleForTesting; @@ -78,7 +80,7 @@ public class NodesListManager extends AbstractService implements YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMsMetrics(); + setDecomissionedNMs(); printConfiguredHosts(); } catch (YarnException ex) { disableHostsFileReader(ex); @@ -135,9 +137,24 @@ public class NodesListManager extends AbstractService implements } } - private void setDecomissionedNMsMetrics() { + private void setDecomissionedNMs() { Set excludeList = hostsReader.getExcludedHosts(); - ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size()); + for (final String host : excludeList) { + UnknownNodeId nodeId = new UnknownNodeId(host); + RMNodeImpl rmNode = new RMNodeImpl(nodeId, + rmContext, host, -1, -1, new UnknownNode(host), null, null); + + RMNode prevRMNode = + rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); + if (prevRMNode != null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(prevRMNode.getNodeID(), + RMNodeEventType.DECOMMISSION)); + } else { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } } public boolean isValidNode(String hostName) { @@ -210,7 +227,7 @@ public class NodesListManager extends AbstractService implements conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMsMetrics(); + setDecomissionedNMs(); } catch (IOException ioe2) { // Should *never* happen this.hostsReader = null; @@ -240,4 +257,98 @@ public class NodesListManager extends AbstractService implements .getConfigurationInputStream(this.conf, excludesFile)); return hostsReader; } + + /** + * A NodeId instance needed upon startup for populating inactive nodes Map. + * It only knows the hostname/ip and marks the port to -1 or invalid. + */ + public static class UnknownNodeId extends NodeId { + + private String host; + + public UnknownNodeId(String host) { + this.host = host; + } + + @Override + public String getHost() { + return this.host; + } + + @Override + protected void setHost(String hst) { + + } + + @Override + public int getPort() { + return -1; + } + + @Override + protected void setPort(int port) { + + } + + @Override + protected void build() { + + } + } + + /** + * A Node instance needed upon startup for populating RMNode Map. + * It only knows its hostname/ip. + */ + private static class UnknownNode implements Node { + + private String host; + + public UnknownNode(String host) { + this.host = host; + } + + @Override + public String getNetworkLocation() { + return null; + } + + @Override + public void setNetworkLocation(String location) { + + } + + @Override + public String getName() { + return host; + } + + @Override + public Node getParent() { + return null; + } + + @Override + public void setParent(Node parent) { + + } + + @Override + public int getLevel() { + return 0; + } + + @Override + public void setLevel(int i) { + + } + + public String getHost() { + return host; + } + + public void setHost(String hst) { + this.host = hst; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 9e590812dc3..711b69c8a54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -179,6 +179,11 @@ public class ResourceManager extends CompositeService implements Recoverable { clusterTimeStamp = timestamp; } + @VisibleForTesting + Dispatcher getRmDispatcher() { + return rmDispatcher; + } + @Override protected void serviceInit(Configuration conf) throws Exception { this.conf = conf; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 4556dadca58..673264fae29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -141,6 +142,9 @@ public class RMNodeImpl implements RMNode, EventHandler { .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED, + RMNodeEventType.DECOMMISSION, + new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) //Transitions from RUNNING state .addTransition(NodeState.RUNNING, @@ -491,6 +495,8 @@ public class RMNodeImpl implements RMNode, EventHandler { case UNHEALTHY: metrics.incrNumUnhealthyNMs(); break; + case NEW: + break; default: LOG.debug("Unexpected final state"); } @@ -531,24 +537,21 @@ public class RMNodeImpl implements RMNode, EventHandler { List containers = null; String host = rmNode.nodeId.getHost(); - if (rmNode.context.getInactiveRMNodes().containsKey(host)) { - // Old node rejoining - RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host); - rmNode.context.getInactiveRMNodes().remove(host); - rmNode.updateMetricsForRejoinedNode(previouRMNode.getState()); + RMNode previousRMNode = rmNode.context.getInactiveRMNodes().remove(host); + if (previousRMNode != null) { + if (previousRMNode.getNodeID().getPort() != -1) { + // Old node rejoining + rmNode.updateMetricsForRejoinedNode(previousRMNode.getState()); + } else { + // An old excluded node rejoining + ClusterMetrics.getMetrics().decrDecommisionedNMs(); + containers = updateNewNodeMetricsAndContainers(rmNode, startEvent); + } } else { // Increment activeNodes explicitly because this is a new node. - ClusterMetrics.getMetrics().incrNumActiveNodes(); - containers = startEvent.getNMContainerStatuses(); - if (containers != null && !containers.isEmpty()) { - for (NMContainerStatus container : containers) { - if (container.getContainerState() == ContainerState.RUNNING) { - rmNode.launchedContainers.add(container.getContainerId()); - } - } - } + containers = updateNewNodeMetricsAndContainers(rmNode, startEvent); } - + if (null != startEvent.getRunningApplications()) { for (ApplicationId appId : startEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); @@ -563,6 +566,21 @@ public class RMNodeImpl implements RMNode, EventHandler { } } + private static List updateNewNodeMetricsAndContainers( + RMNodeImpl rmNode, RMNodeStartedEvent startEvent) { + List containers; + ClusterMetrics.getMetrics().incrNumActiveNodes(); + containers = startEvent.getNMContainerStatuses(); + if (containers != null && !containers.isEmpty()) { + for (NMContainerStatus container : containers) { + if (container.getContainerState() == ContainerState.RUNNING) { + rmNode.launchedContainers.add(container.getContainerId()); + } + } + } + return containers; + } + public static class ReconnectNodeTransition implements SingleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index b1ce0f1f774..5bd952e84ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -132,6 +134,20 @@ public class MockRM extends ResourceManager { } } + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + + public void drainEvents() { + Dispatcher rmDispatcher = getRmDispatcher(); + if (rmDispatcher instanceof DrainDispatcher) { + ((DrainDispatcher) rmDispatcher).await(); + } else { + throw new UnsupportedOperationException("Not a Drain Dispatcher!"); + } + } + public void waitForState(ApplicationId appId, RMAppState finalState) throws Exception { RMApp app = getRMContext().getRMApps().get(appId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index d057498fe00..469135b2018 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1850,15 +1850,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile.getAbsolutePath()); writeToHostsFile(""); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm1 = null, rm2 = null; try { - rm1 = new MockRM(conf) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = rm1.registerNode("localhost:1234", 8000); MockNM nm2 = rm1.registerNode("host2:1234", 8000); @@ -1879,7 +1873,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - dispatcher.await(); + rm1.drainEvents(); Assert .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); @@ -1892,6 +1886,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // restart RM. rm2 = new MockRM(conf); rm2.start(); + rm2.drainEvents(); Assert .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index a904dc0af4c..83a3934db90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -158,27 +158,21 @@ public class TestResourceTrackerService { .getAbsolutePath()); writeToHostsFile(""); - final DrainDispatcher dispatcher = new DrainDispatcher(); - rm = new MockRM(conf) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + rm = new MockRM(conf); rm.start(); MockNM nm1 = rm.registerNode("host1:1234", 5120); MockNM nm2 = rm.registerNode("host2:5678", 10240); MockNM nm3 = rm.registerNode("localhost:4433", 1024); - dispatcher.await(); + rm.drainEvents(); int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); - dispatcher.await(); + rm.drainEvents(); // To test that IPs also work String ip = NetUtils.normalizeHostName("localhost"); @@ -197,15 +191,15 @@ public class TestResourceTrackerService { nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - dispatcher.await(); + rm.drainEvents(); writeToHostsFile(""); rm.getNodesListManager().refreshNodes(conf); nm3 = rm.registerNode("localhost:4433", 1024); - dispatcher.await(); + rm.drainEvents(); nodeHeartbeat = nm3.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); // decommissined node is 1 since 1 node is rejoined after updating exclude // file @@ -563,7 +557,6 @@ public class TestResourceTrackerService { @Test public void testReconnectNode() throws Exception { - final DrainDispatcher dispatcher = new DrainDispatcher(); rm = new MockRM() { @Override protected EventHandler createSchedulerEventDispatcher() { @@ -574,11 +567,6 @@ public class TestResourceTrackerService { } }; } - - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } }; rm.start(); @@ -586,7 +574,7 @@ public class TestResourceTrackerService { MockNM nm2 = rm.registerNode("host2:5678", 5120); nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(false); - dispatcher.await(); + rm.drainEvents(); checkUnealthyNMCount(rm, nm2, true, 1); final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs(); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); @@ -597,7 +585,7 @@ public class TestResourceTrackerService { nm1 = rm.registerNode("host1:1234", 5120); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); checkUnealthyNMCount(rm, nm2, true, 1); @@ -605,23 +593,23 @@ public class TestResourceTrackerService { nm2 = rm.registerNode("host2:5678", 5120); response = nm2.nodeHeartbeat(false); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); checkUnealthyNMCount(rm, nm2, true, 1); // unhealthy node changed back to healthy nm2 = rm.registerNode("host2:5678", 5120); - dispatcher.await(); + rm.drainEvents(); response = nm2.nodeHeartbeat(true); response = nm2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(5120 + 5120, metrics.getAvailableMB()); // reconnect of node with changed capability nm1 = rm.registerNode("host2:5678", 10240); - dispatcher.await(); + rm.drainEvents(); response = nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); @@ -629,9 +617,9 @@ public class TestResourceTrackerService { List runningApps = new ArrayList(); runningApps.add(ApplicationId.newInstance(1, 0)); nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps); - dispatcher.await(); + rm.drainEvents(); response = nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); @@ -639,10 +627,10 @@ public class TestResourceTrackerService { nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); nm1.setHttpPort(3); nm1.registerNode(); - dispatcher.await(); + rm.drainEvents(); response = nm1.nodeHeartbeat(true); response = nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); Assert.assertEquals(3, rmNode.getHttpPort()); Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory()); @@ -650,14 +638,116 @@ public class TestResourceTrackerService { } + @Test(timeout = 30000) + public void testInitDecommMetric() throws Exception { + testInitDecommMetricHelper(true); + testInitDecommMetricHelper(false); + } + + public void testInitDecommMetricHelper(boolean hasIncludeList) + throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + File excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, "host1"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + + if (hasIncludeList) { + writeToHostsFile(hostFile, "host1", "host2"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + } + rm.getNodesListManager().refreshNodes(conf); + rm.drainEvents(); + rm.stop(); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + nm1 = rm1.registerNode("host1:1234", 5120); + nm2 = rm1.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + rm1.drainEvents(); + Assert.assertEquals("Number of Decommissioned nodes should be 1", + 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + Assert.assertEquals("The inactiveRMNodes should contain an entry for the" + + "decommissioned node", + 1, rm1.getRMContext().getInactiveRMNodes().size()); + excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, ""); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + rm1.getNodesListManager().refreshNodes(conf); + nm1 = rm1.registerNode("host1:1234", 5120); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + rm1.drainEvents(); + Assert.assertEquals("The decommissioned nodes metric should have " + + "decremented to 0", + 0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + Assert.assertEquals("The active nodes metric should be 2", + 2, ClusterMetrics.getMetrics().getNumActiveNMs()); + Assert.assertEquals("The inactive RMNodes entry should have been removed", + 0, rm1.getRMContext().getInactiveRMNodes().size()); + rm1.drainEvents(); + rm1.stop(); + } + + @Test(timeout = 30000) + public void testInitDecommMetricNoRegistration() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + //host3 will not register or heartbeat + File excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, "host3", "host2"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + writeToHostsFile(hostFile, "host1", "host2"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + rm.getNodesListManager().refreshNodes(conf); + rm.drainEvents(); + Assert.assertEquals("The decommissioned nodes metric should be 1 ", + 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm.stop(); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + rm1.getNodesListManager().refreshNodes(conf); + rm1.drainEvents(); + Assert.assertEquals("The decommissioned nodes metric should be 2 ", + 2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm1.stop(); + } + private void writeToHostsFile(String... hosts) throws IOException { - if (!hostFile.exists()) { + writeToHostsFile(hostFile, hosts); + } + + private void writeToHostsFile(File file, String... hosts) + throws IOException { + if (!file.exists()) { TEMP_DIR.mkdirs(); - hostFile.createNewFile(); + file.createNewFile(); } FileOutputStream fStream = null; try { - fStream = new FileOutputStream(hostFile); + fStream = new FileOutputStream(file); for (int i = 0; i < hosts.length; i++) { fStream.write(hosts[i].getBytes()); fStream.write("\n".getBytes());