YARN-3102. Decommisioned Nodes not listed in Web UI. Contributed by Kuhu Shukla

This commit is contained in:
Jason Lowe 2016-02-01 23:18:44 +00:00
parent 69c61fae0c
commit 6b6167d401
7 changed files with 297 additions and 59 deletions

View File

@ -81,6 +81,9 @@ Release 2.7.3 - UNRELEASED
YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is
not available (Chang Li via jlowe)
YARN-3102. Decommisioned Nodes not listed in Web UI (Kuhu Shukla via
jlowe)
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import com.google.common.annotations.VisibleForTesting;
@ -78,7 +80,7 @@ public class NodesListManager extends AbstractService implements
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
this.hostsReader =
createHostsFileReader(this.includesFile, this.excludesFile);
setDecomissionedNMsMetrics();
setDecomissionedNMs();
printConfiguredHosts();
} catch (YarnException ex) {
disableHostsFileReader(ex);
@ -135,9 +137,24 @@ public class NodesListManager extends AbstractService implements
}
}
private void setDecomissionedNMsMetrics() {
private void setDecomissionedNMs() {
Set<String> excludeList = hostsReader.getExcludedHosts();
ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
for (final String host : excludeList) {
UnknownNodeId nodeId = new UnknownNodeId(host);
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
rmContext, host, -1, -1, new UnknownNode(host), null, null);
RMNode prevRMNode =
rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (prevRMNode != null) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(prevRMNode.getNodeID(),
RMNodeEventType.DECOMMISSION));
} else {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
}
}
}
public boolean isValidNode(String hostName) {
@ -210,7 +227,7 @@ public class NodesListManager extends AbstractService implements
conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
this.hostsReader =
createHostsFileReader(this.includesFile, this.excludesFile);
setDecomissionedNMsMetrics();
setDecomissionedNMs();
} catch (IOException ioe2) {
// Should *never* happen
this.hostsReader = null;
@ -240,4 +257,98 @@ public class NodesListManager extends AbstractService implements
.getConfigurationInputStream(this.conf, excludesFile));
return hostsReader;
}
/**
* A NodeId instance needed upon startup for populating inactive nodes Map.
* It only knows the hostname/ip and marks the port to -1 or invalid.
*/
public static class UnknownNodeId extends NodeId {
private String host;
public UnknownNodeId(String host) {
this.host = host;
}
@Override
public String getHost() {
return this.host;
}
@Override
protected void setHost(String hst) {
}
@Override
public int getPort() {
return -1;
}
@Override
protected void setPort(int port) {
}
@Override
protected void build() {
}
}
/**
* A Node instance needed upon startup for populating RMNode Map.
* It only knows its hostname/ip.
*/
private static class UnknownNode implements Node {
private String host;
public UnknownNode(String host) {
this.host = host;
}
@Override
public String getNetworkLocation() {
return null;
}
@Override
public void setNetworkLocation(String location) {
}
@Override
public String getName() {
return host;
}
@Override
public Node getParent() {
return null;
}
@Override
public void setParent(Node parent) {
}
@Override
public int getLevel() {
return 0;
}
@Override
public void setLevel(int i) {
}
public String getHost() {
return host;
}
public void setHost(String hst) {
this.host = hst;
}
}
}

View File

@ -179,6 +179,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
clusterTimeStamp = timestamp;
}
@VisibleForTesting
Dispatcher getRmDispatcher() {
return rmDispatcher;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -141,6 +142,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.NEW, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
//Transitions from RUNNING state
.addTransition(NodeState.RUNNING,
@ -491,6 +495,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
case UNHEALTHY:
metrics.incrNumUnhealthyNMs();
break;
case NEW:
break;
default:
LOG.debug("Unexpected final state");
}
@ -531,24 +537,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
List<NMContainerStatus> containers = null;
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
// Old node rejoining
RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host);
rmNode.context.getInactiveRMNodes().remove(host);
rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
RMNode previousRMNode = rmNode.context.getInactiveRMNodes().remove(host);
if (previousRMNode != null) {
if (previousRMNode.getNodeID().getPort() != -1) {
// Old node rejoining
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
} else {
// An old excluded node rejoining
ClusterMetrics.getMetrics().decrDecommisionedNMs();
containers = updateNewNodeMetricsAndContainers(rmNode, startEvent);
}
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
}
containers = updateNewNodeMetricsAndContainers(rmNode, startEvent);
}
if (null != startEvent.getRunningApplications()) {
for (ApplicationId appId : startEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
@ -563,6 +566,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
private static List<NMContainerStatus> updateNewNodeMetricsAndContainers(
RMNodeImpl rmNode, RMNodeStartedEvent startEvent) {
List<NMContainerStatus> containers;
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
}
return containers;
}
public static class ReconnectNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {

View File

@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -132,6 +134,20 @@ public class MockRM extends ResourceManager {
}
}
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
public void drainEvents() {
Dispatcher rmDispatcher = getRmDispatcher();
if (rmDispatcher instanceof DrainDispatcher) {
((DrainDispatcher) rmDispatcher).await();
} else {
throw new UnsupportedOperationException("Not a Drain Dispatcher!");
}
}
public void waitForState(ApplicationId appId, RMAppState finalState)
throws Exception {
RMApp app = getRMContext().getRMApps().get(appId);

View File

@ -1850,15 +1850,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
writeToHostsFile("");
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = null, rm2 = null;
try {
rm1 = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
@ -1879,7 +1873,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
dispatcher.await();
rm1.drainEvents();
Assert
.assertEquals(2,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
@ -1892,6 +1886,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// restart RM.
rm2 = new MockRM(conf);
rm2.start();
rm2.drainEvents();
Assert
.assertEquals(2,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());

View File

@ -158,27 +158,21 @@ public class TestResourceTrackerService {
.getAbsolutePath());
writeToHostsFile("");
final DrainDispatcher dispatcher = new DrainDispatcher();
rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
dispatcher.await();
rm.drainEvents();
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
dispatcher.await();
rm.drainEvents();
// To test that IPs also work
String ip = NetUtils.normalizeHostName("localhost");
@ -197,15 +191,15 @@ public class TestResourceTrackerService {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
dispatcher.await();
rm.drainEvents();
writeToHostsFile("");
rm.getNodesListManager().refreshNodes(conf);
nm3 = rm.registerNode("localhost:4433", 1024);
dispatcher.await();
rm.drainEvents();
nodeHeartbeat = nm3.nodeHeartbeat(true);
dispatcher.await();
rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
// decommissined node is 1 since 1 node is rejoined after updating exclude
// file
@ -563,7 +557,6 @@ public class TestResourceTrackerService {
@Test
public void testReconnectNode() throws Exception {
final DrainDispatcher dispatcher = new DrainDispatcher();
rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
@ -574,11 +567,6 @@ public class TestResourceTrackerService {
}
};
}
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
@ -586,7 +574,7 @@ public class TestResourceTrackerService {
MockNM nm2 = rm.registerNode("host2:5678", 5120);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
dispatcher.await();
rm.drainEvents();
checkUnealthyNMCount(rm, nm2, true, 1);
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
@ -597,7 +585,7 @@ public class TestResourceTrackerService {
nm1 = rm.registerNode("host1:1234", 5120);
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
dispatcher.await();
rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
@ -605,23 +593,23 @@ public class TestResourceTrackerService {
nm2 = rm.registerNode("host2:5678", 5120);
response = nm2.nodeHeartbeat(false);
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
dispatcher.await();
rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
// unhealthy node changed back to healthy
nm2 = rm.registerNode("host2:5678", 5120);
dispatcher.await();
rm.drainEvents();
response = nm2.nodeHeartbeat(true);
response = nm2.nodeHeartbeat(true);
dispatcher.await();
rm.drainEvents();
Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
// reconnect of node with changed capability
nm1 = rm.registerNode("host2:5678", 10240);
dispatcher.await();
rm.drainEvents();
response = nm1.nodeHeartbeat(true);
dispatcher.await();
rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
@ -629,9 +617,9 @@ public class TestResourceTrackerService {
List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
runningApps.add(ApplicationId.newInstance(1, 0));
nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
dispatcher.await();
rm.drainEvents();
response = nm1.nodeHeartbeat(true);
dispatcher.await();
rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
@ -639,10 +627,10 @@ public class TestResourceTrackerService {
nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
nm1.setHttpPort(3);
nm1.registerNode();
dispatcher.await();
rm.drainEvents();
response = nm1.nodeHeartbeat(true);
response = nm1.nodeHeartbeat(true);
dispatcher.await();
rm.drainEvents();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
Assert.assertEquals(3, rmNode.getHttpPort());
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
@ -650,14 +638,116 @@ public class TestResourceTrackerService {
}
@Test(timeout = 30000)
public void testInitDecommMetric() throws Exception {
testInitDecommMetricHelper(true);
testInitDecommMetricHelper(false);
}
public void testInitDecommMetricHelper(boolean hasIncludeList)
throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
File excludeHostFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
writeToHostsFile(excludeHostFile, "host1");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
if (hasIncludeList) {
writeToHostsFile(hostFile, "host1", "host2");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
}
rm.getNodesListManager().refreshNodes(conf);
rm.drainEvents();
rm.stop();
MockRM rm1 = new MockRM(conf);
rm1.start();
nm1 = rm1.registerNode("host1:1234", 5120);
nm2 = rm1.registerNode("host2:5678", 10240);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
rm1.drainEvents();
Assert.assertEquals("Number of Decommissioned nodes should be 1",
1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
Assert.assertEquals("The inactiveRMNodes should contain an entry for the" +
"decommissioned node",
1, rm1.getRMContext().getInactiveRMNodes().size());
excludeHostFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
writeToHostsFile(excludeHostFile, "");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
rm1.getNodesListManager().refreshNodes(conf);
nm1 = rm1.registerNode("host1:1234", 5120);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
rm1.drainEvents();
Assert.assertEquals("The decommissioned nodes metric should have " +
"decremented to 0",
0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
Assert.assertEquals("The active nodes metric should be 2",
2, ClusterMetrics.getMetrics().getNumActiveNMs());
Assert.assertEquals("The inactive RMNodes entry should have been removed",
0, rm1.getRMContext().getInactiveRMNodes().size());
rm1.drainEvents();
rm1.stop();
}
@Test(timeout = 30000)
public void testInitDecommMetricNoRegistration() throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
//host3 will not register or heartbeat
File excludeHostFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
writeToHostsFile(excludeHostFile, "host3", "host2");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
writeToHostsFile(hostFile, "host1", "host2");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
rm.drainEvents();
Assert.assertEquals("The decommissioned nodes metric should be 1 ",
1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
rm.stop();
MockRM rm1 = new MockRM(conf);
rm1.start();
rm1.getNodesListManager().refreshNodes(conf);
rm1.drainEvents();
Assert.assertEquals("The decommissioned nodes metric should be 2 ",
2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
rm1.stop();
}
private void writeToHostsFile(String... hosts) throws IOException {
if (!hostFile.exists()) {
writeToHostsFile(hostFile, hosts);
}
private void writeToHostsFile(File file, String... hosts)
throws IOException {
if (!file.exists()) {
TEMP_DIR.mkdirs();
hostFile.createNewFile();
file.createNewFile();
}
FileOutputStream fStream = null;
try {
fStream = new FileOutputStream(hostFile);
fStream = new FileOutputStream(file);
for (int i = 0; i < hosts.length; i++) {
fStream.write(hosts[i].getBytes());
fStream.write("\n".getBytes());