YARN-4311. Removing nodes from include and exclude lists will not remove them from decommissioned nodes list. Contributed by Kuhu Shukla

(cherry picked from commit 1cbcd4a491)
This commit is contained in:
Jason Lowe 2016-04-05 13:40:19 +00:00
parent 13a4e25f26
commit 814ceeb489
12 changed files with 387 additions and 30 deletions

View File

@ -199,6 +199,15 @@ public class NodeInfo {
public ResourceUtilization getNodeUtilization() { public ResourceUtilization getNodeUtilization() {
return null; return null;
} }
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
} }
public static RMNode newNodeInfo(String rackName, String hostName, public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -188,4 +188,13 @@ public class RMNodeWrapper implements RMNode {
public ResourceUtilization getNodeUtilization() { public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization(); return node.getNodeUtilization();
} }
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
} }

View File

@ -647,6 +647,15 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION = public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION =
"NONE"; "NONE";
/**
* Timeout(msec) for an untracked node to remain in shutdown or decommissioned
* state.
*/
public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC =
RM_PREFIX + "node-removal-untracked.timeout-ms";
public static final int
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
/** /**
* RM proxy users' prefix * RM proxy users' prefix
*/ */

View File

@ -2722,4 +2722,17 @@
<name>yarn.timeline-service.webapp.rest-csrf.methods-to-ignore</name> <name>yarn.timeline-service.webapp.rest-csrf.methods-to-ignore</name>
<value>GET,OPTIONS,HEAD</value> <value>GET,OPTIONS,HEAD</value>
</property> </property>
<property>
<description>
The least amount of time(msec.) an inactive (decommissioned or shutdown) node can
stay in the nodes list of the resourcemanager after being declared untracked.
A node is marked untracked if and only if it is absent from both include and
exclude nodemanager lists on the RM. All inactive nodes are checked twice per
timeout interval or every 10 minutes, whichever is lesser, and marked appropriately.
The same is done when refreshNodes command (graceful or otherwise) is invoked.
</description>
<name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name>
<value>60000</value>
</property>
</configuration> </configuration>

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService implements
private String excludesFile; private String excludesFile;
private Resolver resolver; private Resolver resolver;
private Timer removalTimer;
private int nodeRemovalCheckInterval;
public NodesListManager(RMContext rmContext) { public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName()); super(NodesListManager.class.getName());
@ -105,9 +108,56 @@ public class NodesListManager extends CompositeService implements
} catch (IOException ioe) { } catch (IOException ioe) {
disableHostsFileReader(ioe); disableHostsFileReader(ioe);
} }
final int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2,
600000));
removalTimer = new Timer("Node Removal Timer");
removalTimer.schedule(new TimerTask() {
@Override
public void run() {
long now = Time.monotonicNow();
for (Map.Entry<NodeId, RMNode> entry :
rmContext.getInactiveRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(rmNode.getHostName())) {
if (rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
} else if (now - rmNode.getUntrackedTimeStamp() >
nodeRemovalTimeout) {
RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
if (result != null) {
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
if (rmNode.getState() == NodeState.SHUTDOWN) {
clusterMetrics.decrNumShutdownNMs();
} else {
clusterMetrics.decrDecommisionedNMs();
}
LOG.info("Removed "+result.getHostName() +
" from inactive nodes list");
}
}
} else {
rmNode.setUntrackedTimeStamp(0);
}
}
}
}, nodeRemovalCheckInterval, nodeRemovalCheckInterval);
super.serviceInit(conf); super.serviceInit(conf);
} }
@Override
public void serviceStop() {
removalTimer.cancel();
}
private void printConfiguredHosts() { private void printConfiguredHosts() {
if (!LOG.isDebugEnabled()) { if (!LOG.isDebugEnabled()) {
return; return;
@ -131,10 +181,13 @@ public class NodesListManager extends CompositeService implements
for (NodeId nodeId: rmContext.getRMNodes().keySet()) { for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) { if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); new RMNodeEvent(nodeId, nodeEventType));
} }
} }
updateInactiveNodes();
} }
private void refreshHostsReader(Configuration yarnConf) throws IOException, private void refreshHostsReader(Configuration yarnConf) throws IOException,
@ -171,6 +224,16 @@ public class NodesListManager extends CompositeService implements
} }
} }
@VisibleForTesting
public int getNodeRemovalCheckInterval() {
return nodeRemovalCheckInterval;
}
@VisibleForTesting
public void setNodeRemovalCheckInterval(int interval) {
this.nodeRemovalCheckInterval = interval;
}
@VisibleForTesting @VisibleForTesting
public Resolver getResolver() { public Resolver getResolver() {
return resolver; return resolver;
@ -374,6 +437,33 @@ public class NodesListManager extends CompositeService implements
return hostsReader; return hostsReader;
} }
private void updateInactiveNodes() {
long now = Time.monotonicNow();
for(Entry<NodeId, RMNode> entry :
rmContext.getInactiveRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(nodeId.getHost()) &&
rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
}
}
}
public boolean isUntrackedNode(String hostName) {
boolean untracked;
String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
untracked = !hostsList.isEmpty() &&
!hostsList.contains(hostName) && !hostsList.contains(ip) &&
!excludeList.contains(hostName) && !excludeList.contains(ip);
}
return untracked;
}
/** /**
* Refresh the nodes gracefully * Refresh the nodes gracefully
* *
@ -384,11 +474,13 @@ public class NodesListManager extends CompositeService implements
public void refreshNodesGracefully(Configuration conf) throws IOException, public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException { YarnException {
refreshHostsReader(conf); refreshHostsReader(conf);
for (Entry<NodeId, RMNode> entry:rmContext.getRMNodes().entrySet()) { for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey(); NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) { if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION)); new RMNodeEvent(nodeId, nodeEventType));
} else { } else {
// Recommissioning the nodes // Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
@ -397,6 +489,7 @@ public class NodesListManager extends CompositeService implements
} }
} }
} }
updateInactiveNodes();
} }
/** /**
@ -420,8 +513,11 @@ public class NodesListManager extends CompositeService implements
public void refreshNodesForcefully() { public void refreshNodesForcefully() {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) { for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
RMNodeEventType nodeEventType =
isUntrackedNode(entry.getKey().getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION)); new RMNodeEvent(entry.getKey(), nodeEventType));
} }
} }
} }

View File

@ -87,7 +87,7 @@ public class RMServerUtils {
acceptedStates.contains(NodeState.LOST) || acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) { acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) { for (RMNode rmNode : context.getInactiveRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) { if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode); results.add(rmNode);
} }
} }

View File

@ -320,7 +320,8 @@ public class ResourceTrackerService extends AbstractService implements
} }
// Check if this node is a 'valid' node // Check if this node is a 'valid' node
if (!this.nodesListManager.isValidNode(host)) { if (!this.nodesListManager.isValidNode(host) ||
this.nodesListManager.isUntrackedNode(host)) {
String message = String message =
"Disallowed NodeManager from " + host "Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager."; + ", Sending SHUTDOWN signal to the NodeManager.";
@ -451,8 +452,9 @@ public class ResourceTrackerService extends AbstractService implements
// 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is // 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
// in decommissioning. // in decommissioning.
if (!this.nodesListManager.isValidNode(nodeId.getHost()) if ((!this.nodesListManager.isValidNode(nodeId.getHost()) &&
&& !isNodeInDecommissioning(nodeId)) { !isNodeInDecommissioning(nodeId)) ||
this.nodesListManager.isUntrackedNode(nodeId.getHost())) {
String message = String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: " "Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost(); + nodeId.getHost();

View File

@ -168,4 +168,8 @@ public interface RMNode {
NodeHeartbeatResponse response); NodeHeartbeatResponse response);
public List<Container> pullNewlyIncreasedContainers(); public List<Container> pullNewlyIncreasedContainers();
long getUntrackedTimeStamp();
void setUntrackedTimeStamp(long timer);
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
@ -120,6 +121,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private long lastHealthReportTime; private long lastHealthReportTime;
private String nodeManagerVersion; private String nodeManagerVersion;
private long timeStamp;
/* Aggregated resource utilization for the containers. */ /* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization; private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */ /* Resource utilization for the node. */
@ -259,6 +261,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
// TODO (in YARN-3223) update resource when container finished. // TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@ -346,6 +351,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.healthReport = "Healthy"; this.healthReport = "Healthy";
this.lastHealthReportTime = System.currentTimeMillis(); this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion; this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0;
this.latestNodeHeartBeatResponse.setResponseId(0); this.latestNodeHeartBeatResponse.setResponseId(0);
@ -1011,7 +1017,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
/** /**
* Put a node in deactivated (decommissioned) status. * Put a node in deactivated (decommissioned or shutdown) status.
* @param rmNode * @param rmNode
* @param finalState * @param finalState
*/ */
@ -1028,6 +1034,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState); + finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode); rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
if (finalState == NodeState.SHUTDOWN &&
rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) {
rmNode.setUntrackedTimeStamp(Time.monotonicNow());
}
} }
/** /**
@ -1383,4 +1393,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public Resource getOriginalTotalCapability() { public Resource getOriginalTotalCapability() {
return this.originalTotalCapability; return this.originalTotalCapability;
} }
@Override
public long getUntrackedTimeStamp() {
return this.timeStamp;
}
@Override
public void setUntrackedTimeStamp(long ts) {
this.timeStamp = ts;
}
} }

View File

@ -260,6 +260,15 @@ public class MockNodes {
public ResourceUtilization getNodeUtilization() { public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization; return this.nodeUtilization;
} }
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
}; };
private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode,

View File

@ -31,6 +31,8 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -48,8 +50,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
@ -141,12 +141,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++metricCount); checkShutdownNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true); nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert Assert
.assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true); nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@ -155,7 +155,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
nodeHeartbeat = nm3.nodeHeartbeat(true); nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
.getNumDecommisionedNMs()); .getNumShutdownNMs());
rm.stop();
} }
/** /**
@ -228,7 +229,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
MockNM nm2 = rm.registerNode("host2:5678", 10240); MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null); assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs(); int initialMetricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals( Assert.assertEquals(
NodeAction.NORMAL, NodeAction.NORMAL,
@ -241,16 +242,16 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath()); .getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++initialMetricCount); checkShutdownNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true); nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals( Assert.assertEquals(
"Node should not have been decomissioned.", "Node should not have been shutdown.",
NodeAction.NORMAL, NodeAction.NORMAL,
nodeHeartbeat.getNodeAction()); nodeHeartbeat.getNodeAction());
nodeHeartbeat = nm2.nodeHeartbeat(true); NodeState nodeState =
Assert.assertEquals("Node should have been decomissioned but is in state" + rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
nodeHeartbeat.getNodeAction(), Assert.assertEquals("Node should have been shutdown but is in state" +
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); nodeState, NodeState.SHUTDOWN, nodeState);
} }
/** /**
@ -1123,8 +1124,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.start(); rm.start();
ResourceTrackerService resourceTrackerService = rm ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService(); .getResourceTrackerService();
int shutdownNMsCount = ClusterMetrics.getMetrics()
.getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics() int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs(); .getNumDecommisionedNMs();
@ -1149,10 +1148,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true); NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction()); Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount); checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount); checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId()); request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request); resourceTrackerService.unRegisterNodeManager(request);
shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount); checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount); checkDecommissionedNMCount(rm, decommisionedNMsCount);
@ -1168,8 +1169,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId()); request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request); resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, shutdownNMsCount); checkShutdownNMCount(rm, ++shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount); checkDecommissionedNMCount(rm, decommisionedNMsCount);
rm.stop();
} }
@Test(timeout = 30000) @Test(timeout = 30000)
@ -1304,6 +1306,186 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.stop(); rm.stop();
} }
/**
* Remove a node from all lists and check if its forgotten
*/
@Test
public void testNodeRemovalNormally() throws Exception {
testNodeRemovalUtil(false);
}
@Test
public void testNodeRemovalGracefully() throws Exception {
testNodeRemovalUtil(true);
}
public void refreshNodesOption(boolean doGraceful, Configuration conf)
throws Exception {
if (doGraceful) {
rm.getNodesListManager().refreshNodesGracefully(conf);
} else {
rm.getNodesListManager().refreshNodes(conf);
}
}
public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
CountDownLatch latch = new CountDownLatch(1);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert (metrics != null);
//check all 3 nodes joined in as NORMAL
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
rm.drainEvents();
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
//Remove nm2 from include list, should now be shutdown with timer test
String ip = NetUtils.normalizeHostName("localhost");
writeToHostsFile("host1", ip);
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue("Node should not be in active node list",
!rmContext.getRMNodes().containsKey(nm2.getNodeId()));
RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be in inactive node list",
rmNode.getState(), NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
//Check node removal and re-addition before timer expires
writeToHostsFile("host1", ip, "host2");
refreshNodesOption(doGraceful, conf);
nm2 = rm.registerNode("host2:5678", 10240);
rm.drainEvents();
writeToHostsFile("host1", ip);
refreshNodesOption(doGraceful, conf);
rm.drainEvents();
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be shutdown",
rmNode.getState(), NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
//add back the node before timer expires
latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
writeToHostsFile("host1", ip, "host2");
refreshNodesOption(doGraceful, conf);
nm2 = rm.registerNode("host2:5678", 10240);
nodeHeartbeat = nm2.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
//Decommission this node, check timer doesn't remove it
writeToHostsFile("host1", "host2", ip);
writeToHostsFile(excludeHostFile, "host2");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
rm.drainEvents();
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
metrics.getNumDecommisionedNMs(), 1);
}
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
metrics.getNumDecommisionedNMs(), 1);
}
//Test decommed/ing node that transitions to untracked,timer should remove
writeToHostsFile("host1", ip, "host2");
writeToHostsFile(excludeHostFile, "host2");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
//nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertNotEquals("Timer for this node was not canceled!",
rmNode, null);
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
writeToHostsFile("host1", ip);
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumDecommisionedNMs(), 0);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
rm.stop();
}
private void writeToHostsFile(String... hosts) throws IOException { private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts); writeToHostsFile(hostFile, hosts);
} }

View File

@ -272,8 +272,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId); RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress")); info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", rmNode.getState() if (rmNode != null) {
.toString(), info.getString("state")); WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
} }
} }
@ -304,8 +306,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId); RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress")); info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", if (rmNode != null) {
rmNode.getState().toString(), info.getString("state")); WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
} }
@Test @Test