YARN-4311. Removing nodes from include and exclude lists will not remove them from decommissioned nodes list. Contributed by Kuhu Shukla

(cherry picked from commit ee86cef2fe)

Conflicts:

	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
This commit is contained in:
Jason Lowe 2016-05-05 14:40:35 +00:00
parent d6e95ae47b
commit b452ecf4a0
11 changed files with 656 additions and 37 deletions

View File

@ -199,6 +199,15 @@ public class NodeInfo {
public ResourceUtilization getNodeUtilization() {
return null;
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
}
public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -188,4 +188,13 @@ public class RMNodeWrapper implements RMNode {
public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization();
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
}

View File

@ -645,6 +645,15 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION =
"NONE";
/**
* Timeout(msec) for an untracked node to remain in shutdown or decommissioned
* state.
*/
public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC =
RM_PREFIX + "node-removal-untracked.timeout-ms";
public static final int
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
/**
* RM proxy users' prefix
*/

View File

@ -2619,4 +2619,17 @@
<name>yarn.node-labels.fs-store.impl.class</name>
<value>org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore</value>
</property>
<property>
<description>
The least amount of time(msec.) an inactive (decommissioned or shutdown) node can
stay in the nodes list of the resourcemanager after being declared untracked.
A node is marked untracked if and only if it is absent from both include and
exclude nodemanager lists on the RM. All inactive nodes are checked twice per
timeout interval or every 10 minutes, whichever is lesser, and marked appropriately.
The same is done when refreshNodes command (graceful or otherwise) is invoked.
</description>
<name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name>
<value>60000</value>
</property>
</configuration>

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService implements
private String excludesFile;
private Resolver resolver;
private Timer removalTimer;
private int nodeRemovalCheckInterval;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
@ -104,9 +107,72 @@ public class NodesListManager extends CompositeService implements
} catch (IOException ioe) {
disableHostsFileReader(ioe);
}
final int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2,
600000));
removalTimer = new Timer("Node Removal Timer");
removalTimer.schedule(new TimerTask() {
@Override
public void run() {
long now = Time.monotonicNow();
for (Map.Entry<NodeId, RMNode> entry :
rmContext.getInactiveRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(rmNode.getHostName())) {
if (rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
} else
if (now - rmNode.getUntrackedTimeStamp() >
nodeRemovalTimeout) {
RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
if (result != null) {
decrInactiveNMMetrics(rmNode);
LOG.info("Removed " +result.getState().toString() + " node "
+ result.getHostName() + " from inactive nodes list");
}
}
} else {
rmNode.setUntrackedTimeStamp(0);
}
}
}
}, nodeRemovalCheckInterval, nodeRemovalCheckInterval);
super.serviceInit(conf);
}
private void decrInactiveNMMetrics(RMNode rmNode) {
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
switch (rmNode.getState()) {
case SHUTDOWN:
clusterMetrics.decrNumShutdownNMs();
break;
case DECOMMISSIONED:
clusterMetrics.decrDecommisionedNMs();
break;
case LOST:
clusterMetrics.decrNumLostNMs();
break;
case REBOOTED:
clusterMetrics.decrNumRebootedNMs();
break;
default:
LOG.debug("Unexpected node state");
}
}
@Override
public void serviceStop() {
removalTimer.cancel();
}
private void printConfiguredHosts() {
if (!LOG.isDebugEnabled()) {
return;
@ -130,10 +196,13 @@ public class NodesListManager extends CompositeService implements
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
new RMNodeEvent(nodeId, nodeEventType));
}
}
updateInactiveNodes();
}
private void refreshHostsReader(Configuration yarnConf) throws IOException,
@ -170,6 +239,16 @@ public class NodesListManager extends CompositeService implements
}
}
@VisibleForTesting
public int getNodeRemovalCheckInterval() {
return nodeRemovalCheckInterval;
}
@VisibleForTesting
public void setNodeRemovalCheckInterval(int interval) {
this.nodeRemovalCheckInterval = interval;
}
@VisibleForTesting
public Resolver getResolver() {
return resolver;
@ -373,6 +452,33 @@ public class NodesListManager extends CompositeService implements
return hostsReader;
}
private void updateInactiveNodes() {
long now = Time.monotonicNow();
for(Entry<NodeId, RMNode> entry :
rmContext.getInactiveRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(nodeId.getHost()) &&
rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
}
}
}
public boolean isUntrackedNode(String hostName) {
boolean untracked;
String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
untracked = !hostsList.isEmpty() &&
!hostsList.contains(hostName) && !hostsList.contains(ip) &&
!excludeList.contains(hostName) && !excludeList.contains(ip);
}
return untracked;
}
/**
* Refresh the nodes gracefully
*
@ -383,11 +489,13 @@ public class NodesListManager extends CompositeService implements
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
for (Entry<NodeId, RMNode> entry:rmContext.getRMNodes().entrySet()) {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
new RMNodeEvent(nodeId, nodeEventType));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
@ -396,6 +504,7 @@ public class NodesListManager extends CompositeService implements
}
}
}
updateInactiveNodes();
}
/**
@ -419,8 +528,11 @@ public class NodesListManager extends CompositeService implements
public void refreshNodesForcefully() {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
RMNodeEventType nodeEventType =
isUntrackedNode(entry.getKey().getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
new RMNodeEvent(entry.getKey(), nodeEventType));
}
}
}

View File

@ -87,7 +87,7 @@ public class RMServerUtils {
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) {
if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}

View File

@ -168,4 +168,8 @@ public interface RMNode {
NodeHeartbeatResponse response);
public List<Container> pullNewlyIncreasedContainers();
long getUntrackedTimeStamp();
void setUntrackedTimeStamp(long timeStamp);
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -120,6 +121,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private long lastHealthReportTime;
private String nodeManagerVersion;
private long timeStamp;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
@ -259,6 +261,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@ -346,6 +351,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.healthReport = "Healthy";
this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0;
this.latestNodeHeartBeatResponse.setResponseId(0);
@ -997,7 +1003,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
/**
* Put a node in deactivated (decommissioned) status.
* Put a node in deactivated (decommissioned or shutdown) status.
* @param rmNode
* @param finalState
*/
@ -1014,6 +1020,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
if (rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) {
rmNode.setUntrackedTimeStamp(Time.monotonicNow());
}
}
/**
@ -1385,4 +1394,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public Resource getOriginalTotalCapability() {
return this.originalTotalCapability;
}
}
@Override
public long getUntrackedTimeStamp() {
return this.timeStamp;
}
@Override
public void setUntrackedTimeStamp(long ts) {
this.timeStamp = ts;
}
}

View File

@ -260,6 +260,15 @@ public class MockNodes {
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
};
private static RMNode buildRMNode(int rack, final Resource perNode,

View File

@ -31,6 +31,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
@ -48,8 +50,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
@ -141,12 +141,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++metricCount);
checkShutdownNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert
.assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
.assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@ -155,7 +155,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
.getNumDecommisionedNMs());
.getNumShutdownNMs());
rm.stop();
}
/**
@ -226,7 +227,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs();
int initialMetricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
@ -239,16 +240,16 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++initialMetricCount);
checkShutdownNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
"Node should not have been decomissioned.",
"Node should not have been shutdown.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertEquals("Node should have been decomissioned but is in state" +
nodeHeartbeat.getNodeAction(),
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
NodeState nodeState =
rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
Assert.assertEquals("Node should have been shutdown but is in state" +
nodeState, NodeState.SHUTDOWN, nodeState);
}
/**
@ -509,7 +510,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
Assert
.assertFalse(
"Node Labels should not accepted by RM If its configured with Central configuration",
"Node Labels should not accepted by RM If its configured with " +
"Central configuration",
response.getAreNodeLabelsAcceptedByRM());
if (rm != null) {
rm.stop();
@ -891,15 +893,15 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
// node unhealthy
nm1.nodeHeartbeat(false);
checkUnealthyNMCount(rm, nm1, true, 1);
checkUnhealthyNMCount(rm, nm1, true, 1);
// node healthy again
nm1.nodeHeartbeat(true);
checkUnealthyNMCount(rm, nm1, false, 0);
checkUnhealthyNMCount(rm, nm1, false, 0);
}
private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health,
int count) throws Exception {
private void checkUnhealthyNMCount(MockRM rm, MockNM nm1, boolean health,
int count) throws Exception {
int waitCount = 0;
while((rm.getRMContext().getRMNodes().get(nm1.getNodeId())
@ -1000,7 +1002,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
rm.drainEvents();
checkUnealthyNMCount(rm, nm2, true, 1);
checkUnhealthyNMCount(rm, nm2, true, 1);
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// TODO Metrics incorrect in case of the FifoScheduler
@ -1012,7 +1014,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
checkUnhealthyNMCount(rm, nm2, true, 1);
// reconnect of unhealthy node
nm2 = rm.registerNode("host2:5678", 5120);
@ -1020,7 +1022,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
checkUnhealthyNMCount(rm, nm2, true, 1);
// unhealthy node changed back to healthy
nm2 = rm.registerNode("host2:5678", 5120);
@ -1102,7 +1104,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
// node unhealthy
nm1.nodeHeartbeat(false);
checkUnealthyNMCount(rm, nm1, true, 1);
checkUnhealthyNMCount(rm, nm1, true, 1);
UnRegisterNodeManagerRequest request = Records
.newRecord(UnRegisterNodeManagerRequest.class);
request.setNodeId(nm1.getNodeId());
@ -1117,8 +1119,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
int shutdownNMsCount = ClusterMetrics.getMetrics()
.getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
@ -1143,10 +1143,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
@ -1162,8 +1164,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
checkShutdownNMCount(rm, ++shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
rm.stop();
}
@Test(timeout = 30000)
@ -1298,6 +1301,434 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.stop();
}
/**
* Remove a node from all lists and check if its forgotten
*/
@Test
public void testNodeRemovalNormally() throws Exception {
testNodeRemovalUtil(false);
testNodeRemovalUtilLost(false);
testNodeRemovalUtilRebooted(false);
testNodeRemovalUtilUnhealthy(false);
}
@Test
public void testNodeRemovalGracefully() throws Exception {
testNodeRemovalUtil(true);
testNodeRemovalUtilLost(true);
testNodeRemovalUtilRebooted(true);
testNodeRemovalUtilUnhealthy(true);
}
public void refreshNodesOption(boolean doGraceful, Configuration conf)
throws Exception {
if (doGraceful) {
rm.getNodesListManager().refreshNodesGracefully(conf);
} else {
rm.getNodesListManager().refreshNodes(conf);
}
}
public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
CountDownLatch latch = new CountDownLatch(1);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert (metrics != null);
//check all 3 nodes joined in as NORMAL
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
rm.drainEvents();
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
//Remove nm2 from include list, should now be shutdown with timer test
String ip = NetUtils.normalizeHostName("localhost");
writeToHostsFile("host1", ip);
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue("Node should not be in active node list",
!rmContext.getRMNodes().containsKey(nm2.getNodeId()));
RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be in inactive node list",
rmNode.getState(), NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
//Check node removal and re-addition before timer expires
writeToHostsFile("host1", ip, "host2");
refreshNodesOption(doGraceful, conf);
nm2 = rm.registerNode("host2:5678", 10240);
rm.drainEvents();
writeToHostsFile("host1", ip);
refreshNodesOption(doGraceful, conf);
rm.drainEvents();
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be shutdown",
rmNode.getState(), NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
//add back the node before timer expires
latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
writeToHostsFile("host1", ip, "host2");
refreshNodesOption(doGraceful, conf);
nm2 = rm.registerNode("host2:5678", 10240);
nodeHeartbeat = nm2.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
//Decommission this node, check timer doesn't remove it
writeToHostsFile("host1", "host2", ip);
writeToHostsFile(excludeHostFile, "host2");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
rm.drainEvents();
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
metrics.getNumDecommisionedNMs(), 1);
}
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
metrics.getNumDecommisionedNMs(), 1);
}
//Test decommed/ing node that transitions to untracked,timer should remove
writeToHostsFile("host1", ip, "host2");
writeToHostsFile(excludeHostFile, "host2");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
//nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertNotEquals("Timer for this node was not canceled!",
rmNode, null);
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
writeToHostsFile("host1", ip);
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumDecommisionedNMs(), 0);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
rm.stop();
}
private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
Configuration conf = new Configuration();
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000);
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
writeToHostsFile(hostFile, "host1", "localhost", "host2");
writeToHostsFile(excludeHostFile, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ClusterMetrics metrics = clusterMetrics;
assert (metrics != null);
rm.drainEvents();
//check all 3 nodes joined in as NORMAL
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
rm.drainEvents();
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
int waitCount = 0;
while(waitCount ++<20){
synchronized (this) {
wait(200);
}
nm3.nodeHeartbeat(true);
nm1.nodeHeartbeat(true);
}
Assert.assertNotEquals("host2 should be a lost NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("host2 should be a lost NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
NodeState.LOST);
Assert.assertEquals("There should be 1 Lost NM!",
clusterMetrics.getNumLostNMs(), 1);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
writeToHostsFile(hostFile, "host1", "localhost");
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
rm.drainEvents();
waitCount = 0;
while(rmContext.getInactiveRMNodes().get(
nm2.getNodeId()) != null && waitCount++ < 2){
synchronized (this) {
wait(maxThreadSleeptime);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
}
}
Assert.assertEquals("host2 should have been forgotten!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("There should be no Lost NMs!",
clusterMetrics.getNumLostNMs(), 0);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
rm.stop();
}
private void testNodeRemovalUtilRebooted(boolean doGraceful)
throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
writeToHostsFile(hostFile, "host1", "localhost", "host2");
writeToHostsFile(excludeHostFile, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ClusterMetrics metrics = clusterMetrics;
assert (metrics != null);
NodeHeartbeatResponse nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
rm.drainEvents();
rm.drainEvents();
Assert.assertNotEquals("host2 should be a rebooted NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("host2 should be a rebooted NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
NodeState.REBOOTED);
Assert.assertEquals("There should be 1 Rebooted NM!",
clusterMetrics.getNumRebootedNMs(), 1);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
writeToHostsFile(hostFile, "host1", "localhost");
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
rm.drainEvents();
int waitCount = 0;
while(rmContext.getInactiveRMNodes().get(
nm2.getNodeId()) != null && waitCount++ < 2){
synchronized (this) {
wait(maxThreadSleeptime);
}
}
Assert.assertEquals("host2 should have been forgotten!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("There should be no Rebooted NMs!",
clusterMetrics.getNumRebootedNMs(), 0);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
rm.stop();
}
private void testNodeRemovalUtilUnhealthy(boolean doGraceful)
throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
writeToHostsFile(hostFile, "host1", "localhost", "host2");
writeToHostsFile(excludeHostFile, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ClusterMetrics metrics = clusterMetrics;
assert (metrics != null);
rm.drainEvents();
//check all 3 nodes joined in as NORMAL
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
rm.drainEvents();
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
// node healthy
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
nm3.nodeHeartbeat(true);
checkUnhealthyNMCount(rm, nm2, true, 1);
writeToHostsFile(hostFile, "host1", "localhost");
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
nm3.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertNotEquals("host2 should be a shutdown NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("host2 should be a shutdown NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
NodeState.SHUTDOWN);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
Assert.assertEquals("There should be 1 Shutdown NM!",
clusterMetrics.getNumShutdownNMs(), 1);
Assert.assertEquals("There should be 0 Unhealthy NM!",
clusterMetrics.getUnhealthyNMs(), 0);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
int waitCount = 0;
while(rmContext.getInactiveRMNodes().get(
nm2.getNodeId()) != null && waitCount++ < 2){
synchronized (this) {
wait(maxThreadSleeptime);
}
}
Assert.assertEquals("host2 should have been forgotten!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("There should be no Shutdown NMs!",
clusterMetrics.getNumRebootedNMs(), 0);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
rm.stop();
}
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}

View File

@ -272,8 +272,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
.toString(), info.getString("state"));
if (rmNode != null) {
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
}
}
@ -304,8 +306,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
if (rmNode != null) {
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
}
@Test