YARN-2523. ResourceManager UI showing negative value for "Decommissioned Nodes" field. Contributed by Rohith
(cherry picked from commit 8269bfa613
)
This commit is contained in:
parent
87f07e67b1
commit
e4d46e5ff6
|
@ -417,6 +417,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2546. Made REST API for application creation/submission use numeric and
|
YARN-2546. Made REST API for application creation/submission use numeric and
|
||||||
boolean types instead of the string of them. (Varun Vasudev via zjshen)
|
boolean types instead of the string of them. (Varun Vasudev via zjshen)
|
||||||
|
|
||||||
|
YARN-2523. ResourceManager UI showing negative value for "Decommissioned
|
||||||
|
Nodes" field (Rohith via jlowe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -121,7 +121,6 @@ public class NodesListManager extends AbstractService implements
|
||||||
this.conf, includesFile), excludesFile.isEmpty() ? null
|
this.conf, includesFile), excludesFile.isEmpty() ? null
|
||||||
: this.rmContext.getConfigurationProvider()
|
: this.rmContext.getConfigurationProvider()
|
||||||
.getConfigurationInputStream(this.conf, excludesFile));
|
.getConfigurationInputStream(this.conf, excludesFile));
|
||||||
setDecomissionedNMsMetrics();
|
|
||||||
printConfiguredHosts();
|
printConfiguredHosts();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -460,22 +460,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decomissioned NMs equals to the nodes missing in include list (if
|
|
||||||
// include list not empty) or the nodes listed in excluded list.
|
|
||||||
// DecomissionedNMs as per exclude list is set upfront when the
|
|
||||||
// exclude list is read so that RM restart can also reflect the
|
|
||||||
// decomissionedNMs. Note that RM is still not able to know decomissionedNMs
|
|
||||||
// as per include list after it restarts as they are known when those nodes
|
|
||||||
// come for registration.
|
|
||||||
// DecomissionedNMs as per include list is incremented in this transition.
|
|
||||||
switch (finalState) {
|
switch (finalState) {
|
||||||
case DECOMMISSIONED:
|
case DECOMMISSIONED:
|
||||||
Set<String> ecludedHosts =
|
|
||||||
context.getNodesListManager().getHostsReader().getExcludedHosts();
|
|
||||||
if (!ecludedHosts.contains(hostName)
|
|
||||||
&& !ecludedHosts.contains(NetUtils.normalizeHostName(hostName))) {
|
|
||||||
metrics.incrDecommisionedNMs();
|
metrics.incrDecommisionedNMs();
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case LOST:
|
case LOST:
|
||||||
metrics.incrNumLostNMs();
|
metrics.incrNumLostNMs();
|
||||||
|
|
|
@ -77,6 +77,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
|
@ -1833,10 +1835,16 @@ public class TestRMRestart {
|
||||||
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
||||||
hostFile.getAbsolutePath());
|
hostFile.getAbsolutePath());
|
||||||
writeToHostsFile("");
|
writeToHostsFile("");
|
||||||
MockRM rm1 = new MockRM(conf);
|
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
MockRM rm1 = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
|
};
|
||||||
rm1.start();
|
rm1.start();
|
||||||
rm1.registerNode("localhost:1234", 8000);
|
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
||||||
rm1.registerNode("host2:1234", 8000);
|
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
String ip = NetUtils.normalizeHostName("localhost");
|
String ip = NetUtils.normalizeHostName("localhost");
|
||||||
|
@ -1845,15 +1853,25 @@ public class TestRMRestart {
|
||||||
|
|
||||||
// refresh nodes
|
// refresh nodes
|
||||||
rm1.getNodesListManager().refreshNodes(conf);
|
rm1.getNodesListManager().refreshNodes(conf);
|
||||||
|
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
|
Assert
|
||||||
|
.assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
rm1.stop();
|
||||||
|
Assert
|
||||||
|
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
|
||||||
// restart RM.
|
// restart RM.
|
||||||
MockRM rm2 = new MockRM(conf);
|
MockRM rm2 = new MockRM(conf);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -157,25 +157,33 @@ public class TestResourceTrackerService {
|
||||||
.getAbsolutePath());
|
.getAbsolutePath());
|
||||||
|
|
||||||
writeToHostsFile("");
|
writeToHostsFile("");
|
||||||
rm = new MockRM(conf);
|
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
|
};
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
||||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
// To test that IPs also work
|
// To test that IPs also work
|
||||||
String ip = NetUtils.normalizeHostName("localhost");
|
String ip = NetUtils.normalizeHostName("localhost");
|
||||||
writeToHostsFile("host2", ip);
|
writeToHostsFile("host2", ip);
|
||||||
|
|
||||||
rm.getNodesListManager().refreshNodes(conf);
|
rm.getNodesListManager().refreshNodes(conf);
|
||||||
checkDecommissionedNMCount(rm, metricCount + 2);
|
|
||||||
|
|
||||||
nodeHeartbeat = nm1.nodeHeartbeat(true);
|
nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
@ -186,6 +194,19 @@ public class TestResourceTrackerService {
|
||||||
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
||||||
Assert.assertTrue("The decommisioned metrics are not updated",
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
dispatcher.await();
|
||||||
|
checkDecommissionedNMCount(rm, metricCount + 2);
|
||||||
|
writeToHostsFile("");
|
||||||
|
rm.getNodesListManager().refreshNodes(conf);
|
||||||
|
|
||||||
|
nm3 = rm.registerNode("localhost:4433", 1024);
|
||||||
|
dispatcher.await();
|
||||||
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
// decommissined node is 1 since 1 node is rejoined after updating exclude
|
||||||
|
// file
|
||||||
|
checkDecommissionedNMCount(rm, metricCount + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue