From 9862879e277f5183ca25debb89c8131a0a779022 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 1 Feb 2016 23:15:26 +0000 Subject: [PATCH] YARN-3102. Decommisioned Nodes not listed in Web UI. Contributed by Kuhu Shukla (cherry picked from commit ed55950164a66e08fa34e30dba1030c5a986d1f1) --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/NodesListManager.java | 119 +++++++++++++- .../resourcemanager/ResourceManager.java | 5 + .../resourcemanager/rmnode/RMNodeImpl.java | 24 ++- .../yarn/server/resourcemanager/MockRM.java | 16 ++ .../server/resourcemanager/TestRMRestart.java | 11 +- .../TestResourceTrackerService.java | 154 ++++++++++++++---- 7 files changed, 282 insertions(+), 50 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4075c5e8d8a..be726fb1a9f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1262,6 +1262,9 @@ Release 2.7.3 - UNRELEASED YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is not available (Chang Li via jlowe) + YARN-3102. Decommisioned Nodes not listed in Web UI (Kuhu Shukla via + jlowe) + Release 2.7.2 - 2016-01-25 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 4d9922b70d3..0be5e5b325c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.Node; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent. import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.util.Clock; @@ -95,7 +97,7 @@ public class NodesListManager extends CompositeService implements YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMsMetrics(); + setDecomissionedNMs(); printConfiguredHosts(); } catch (YarnException ex) { disableHostsFileReader(ex); @@ -157,9 +159,24 @@ public class NodesListManager extends CompositeService implements } } - private void setDecomissionedNMsMetrics() { + private void setDecomissionedNMs() { Set excludeList = hostsReader.getExcludedHosts(); - ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size()); + for (final String host : excludeList) { + UnknownNodeId nodeId = new UnknownNodeId(host); + RMNodeImpl rmNode = new RMNodeImpl(nodeId, + rmContext, host, -1, -1, new UnknownNode(host), null, null); + + RMNode prevRMNode = + rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); + if (prevRMNode != null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(prevRMNode.getNodeID(), + RMNodeEventType.DECOMMISSION)); + } else { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + } + } } @VisibleForTesting @@ -334,7 +351,7 @@ public class NodesListManager extends CompositeService implements conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); - setDecomissionedNMsMetrics(); + setDecomissionedNMs(); } catch (IOException ioe2) { // Should *never* happen this.hostsReader = null; @@ -417,4 +434,98 @@ public class NodesListManager extends CompositeService implements } } } + + /** + * A NodeId instance needed upon startup for populating inactive nodes Map. + * It only knows the hostname/ip and marks the port to -1 or invalid. + */ + public static class UnknownNodeId extends NodeId { + + private String host; + + public UnknownNodeId(String host) { + this.host = host; + } + + @Override + public String getHost() { + return this.host; + } + + @Override + protected void setHost(String hst) { + + } + + @Override + public int getPort() { + return -1; + } + + @Override + protected void setPort(int port) { + + } + + @Override + protected void build() { + + } + } + + /** + * A Node instance needed upon startup for populating inactive nodes Map. + * It only knows its hostname/ip. + */ + private static class UnknownNode implements Node { + + private String host; + + public UnknownNode(String host) { + this.host = host; + } + + @Override + public String getNetworkLocation() { + return null; + } + + @Override + public void setNetworkLocation(String location) { + + } + + @Override + public String getName() { + return host; + } + + @Override + public Node getParent() { + return null; + } + + @Override + public void setParent(Node parent) { + + } + + @Override + public int getLevel() { + return 0; + } + + @Override + public void setLevel(int i) { + + } + + public String getHost() { + return host; + } + + public void setHost(String hst) { + this.host = hst; + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 01a1c8f0fff..bd8a9685c07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -184,6 +184,11 @@ public class ResourceManager extends CompositeService implements Recoverable { clusterTimeStamp = timestamp; } + @VisibleForTesting + Dispatcher getRmDispatcher() { + return rmDispatcher; + } + @Override protected void serviceInit(Configuration conf) throws Exception { this.conf = conf; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 8448287fe41..b87a0428294 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -172,6 +173,9 @@ public class RMNodeImpl implements RMNode, EventHandler { .addTransition(NodeState.NEW, NodeState.NEW, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED, + RMNodeEventType.DECOMMISSION, + new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) //Transitions from RUNNING state .addTransition(NodeState.RUNNING, @@ -691,6 +695,8 @@ public class RMNodeImpl implements RMNode, EventHandler { case UNHEALTHY: metrics.decrNumUnhealthyNMs(); break; + case NEW: + break; default: LOG.warn("Unexpected initial state"); } @@ -754,12 +760,18 @@ public class RMNodeImpl implements RMNode, EventHandler { List containers = null; NodeId nodeId = rmNode.nodeId; - if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) { - // Old node rejoining - RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId); - rmNode.context.getInactiveRMNodes().remove(nodeId); - rmNode.updateMetricsForRejoinedNode(previouRMNode.getState()); + RMNode previousRMNode = + rmNode.context.getInactiveRMNodes().remove(nodeId); + if (previousRMNode != null) { + rmNode.updateMetricsForRejoinedNode(previousRMNode.getState()); } else { + NodesListManager.UnknownNodeId unknownNodeId = + new NodesListManager.UnknownNodeId(nodeId.getHost()); + previousRMNode = + rmNode.context.getInactiveRMNodes().remove(unknownNodeId); + if (previousRMNode != null) { + ClusterMetrics.getMetrics().decrDecommisionedNMs(); + } // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); @@ -771,7 +783,7 @@ public class RMNodeImpl implements RMNode, EventHandler { } } } - + if (null != startEvent.getRunningApplications()) { for (ApplicationId appId : startEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a5d14c3b55c..e0da2636e30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -142,6 +144,20 @@ public class MockRM extends ResourceManager { } } + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + + public void drainEvents() { + Dispatcher rmDispatcher = getRmDispatcher(); + if (rmDispatcher instanceof DrainDispatcher) { + ((DrainDispatcher) rmDispatcher).await(); + } else { + throw new UnsupportedOperationException("Not a Drain Dispatcher!"); + } + } + public void waitForState(ApplicationId appId, RMAppState finalState) throws Exception { RMApp app = getRMContext().getRMApps().get(appId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index bad68f4f9e8..84b1a9c8f42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1882,15 +1882,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile.getAbsolutePath()); writeToHostsFile(""); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm1 = null, rm2 = null; try { - rm1 = new MockRM(conf) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = rm1.registerNode("localhost:1234", 8000); MockNM nm2 = rm1.registerNode("host2:1234", 8000); @@ -1911,7 +1905,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - dispatcher.await(); + rm1.drainEvents(); Assert .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); @@ -1924,6 +1918,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // restart RM. rm2 = new MockRM(conf); rm2.start(); + rm2.drainEvents(); Assert .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index e42ed91a8f6..e0fd9ab186a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -168,27 +168,21 @@ public class TestResourceTrackerService extends NodeLabelTestBase { .getAbsolutePath()); writeToHostsFile(""); - final DrainDispatcher dispatcher = new DrainDispatcher(); - rm = new MockRM(conf) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + rm = new MockRM(conf); rm.start(); MockNM nm1 = rm.registerNode("host1:1234", 5120); MockNM nm2 = rm.registerNode("host2:5678", 10240); MockNM nm3 = rm.registerNode("localhost:4433", 1024); - dispatcher.await(); + rm.drainEvents(); int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); - dispatcher.await(); + rm.drainEvents(); // To test that IPs also work String ip = NetUtils.normalizeHostName("localhost"); @@ -207,15 +201,15 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - dispatcher.await(); + rm.drainEvents(); writeToHostsFile(""); rm.getNodesListManager().refreshNodes(conf); nm3 = rm.registerNode("localhost:4433", 1024); - dispatcher.await(); + rm.drainEvents(); nodeHeartbeat = nm3.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); // decommissined node is 1 since 1 node is rejoined after updating exclude // file @@ -990,7 +984,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { @Test public void testReconnectNode() throws Exception { - final DrainDispatcher dispatcher = new DrainDispatcher(); rm = new MockRM() { @Override protected EventHandler createSchedulerEventDispatcher() { @@ -1001,11 +994,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase { } }; } - - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } }; rm.start(); @@ -1013,7 +1001,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { MockNM nm2 = rm.registerNode("host2:5678", 5120); nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(false); - dispatcher.await(); + rm.drainEvents(); checkUnealthyNMCount(rm, nm2, true, 1); final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs(); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); @@ -1024,7 +1012,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nm1 = rm.registerNode("host1:1234", 5120); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); checkUnealthyNMCount(rm, nm2, true, 1); @@ -1032,23 +1020,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nm2 = rm.registerNode("host2:5678", 5120); response = nm2.nodeHeartbeat(false); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); checkUnealthyNMCount(rm, nm2, true, 1); // unhealthy node changed back to healthy nm2 = rm.registerNode("host2:5678", 5120); - dispatcher.await(); + rm.drainEvents(); response = nm2.nodeHeartbeat(true); response = nm2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(5120 + 5120, metrics.getAvailableMB()); // reconnect of node with changed capability nm1 = rm.registerNode("host2:5678", 10240); - dispatcher.await(); + rm.drainEvents(); response = nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); @@ -1056,9 +1044,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase { List runningApps = new ArrayList(); runningApps.add(ApplicationId.newInstance(1, 0)); nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps); - dispatcher.await(); + rm.drainEvents(); response = nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); @@ -1066,10 +1054,10 @@ public class TestResourceTrackerService extends NodeLabelTestBase { nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); nm1.setHttpPort(3); nm1.registerNode(); - dispatcher.await(); + rm.drainEvents(); response = nm1.nodeHeartbeat(true); response = nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); Assert.assertEquals(3, rmNode.getHttpPort()); Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory()); @@ -1184,14 +1172,116 @@ public class TestResourceTrackerService extends NodeLabelTestBase { checkDecommissionedNMCount(rm, ++decommisionedNMsCount); } + @Test(timeout = 30000) + public void testInitDecommMetric() throws Exception { + testInitDecommMetricHelper(true); + testInitDecommMetricHelper(false); + } + + public void testInitDecommMetricHelper(boolean hasIncludeList) + throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + File excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, "host1"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + + if (hasIncludeList) { + writeToHostsFile(hostFile, "host1", "host2"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + } + rm.getNodesListManager().refreshNodes(conf); + rm.drainEvents(); + rm.stop(); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + nm1 = rm1.registerNode("host1:1234", 5120); + nm2 = rm1.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + rm1.drainEvents(); + Assert.assertEquals("Number of Decommissioned nodes should be 1", + 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + Assert.assertEquals("The inactiveRMNodes should contain an entry for the" + + "decommissioned node", + 1, rm1.getRMContext().getInactiveRMNodes().size()); + excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, ""); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + rm1.getNodesListManager().refreshNodes(conf); + nm1 = rm1.registerNode("host1:1234", 5120); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + rm1.drainEvents(); + Assert.assertEquals("The decommissioned nodes metric should have " + + "decremented to 0", + 0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + Assert.assertEquals("The active nodes metric should be 2", + 2, ClusterMetrics.getMetrics().getNumActiveNMs()); + Assert.assertEquals("The inactive RMNodes entry should have been removed", + 0, rm1.getRMContext().getInactiveRMNodes().size()); + rm1.drainEvents(); + rm1.stop(); + } + + @Test(timeout = 30000) + public void testInitDecommMetricNoRegistration() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + //host3 will not register or heartbeat + File excludeHostFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + writeToHostsFile(excludeHostFile, "host3", "host2"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeHostFile.getAbsolutePath()); + writeToHostsFile(hostFile, "host1", "host2"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + rm.getNodesListManager().refreshNodes(conf); + rm.drainEvents(); + Assert.assertEquals("The decommissioned nodes metric should be 1 ", + 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm.stop(); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + rm1.getNodesListManager().refreshNodes(conf); + rm1.drainEvents(); + Assert.assertEquals("The decommissioned nodes metric should be 2 ", + 2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm1.stop(); + } + private void writeToHostsFile(String... hosts) throws IOException { - if (!hostFile.exists()) { + writeToHostsFile(hostFile, hosts); + } + + private void writeToHostsFile(File file, String... hosts) + throws IOException { + if (!file.exists()) { TEMP_DIR.mkdirs(); - hostFile.createNewFile(); + file.createNewFile(); } FileOutputStream fStream = null; try { - fStream = new FileOutputStream(hostFile); + fStream = new FileOutputStream(file); for (int i = 0; i < hosts.length; i++) { fStream.write(hosts[i].getBytes()); fStream.write("\n".getBytes());