YARN-2641. Decommission nodes on -refreshNodes instead of next NM-RM heartbeat. (Zhihai Xu via kasha)

(cherry picked from commit da709a2eac)
This commit is contained in:
Karthik Kambatla 2014-10-13 16:23:04 -07:00
parent 31a4bf7321
commit a1116b56a4
4 changed files with 35 additions and 22 deletions

View File

@ -20,6 +20,9 @@ Release 2.7.0 - UNRELEASED
YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run
with both CS and FS. (Wei Yan and kasha via kasha) with both CS and FS. (Wei Yan and kasha via kasha)
YARN-2641. Decommission nodes on -refreshNodes instead of next
NM-RM heartbeat. (Zhihai Xu via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -38,6 +39,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -123,6 +126,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
.getConfigurationInputStream(this.conf, excludesFile)); .getConfigurationInputStream(this.conf, excludesFile));
printConfiguredHosts(); printConfiguredHosts();
} }
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
}
}
} }
private void setDecomissionedNMsMetrics() { private void setDecomissionedNMsMetrics() {

View File

@ -349,15 +349,25 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeStatus remoteNodeStatus = request.getNodeStatus(); NodeStatus remoteNodeStatus = request.getNodeStatus();
/** /**
* Here is the node heartbeat sequence... * Here is the node heartbeat sequence...
* 1. Check if it's a registered node * 1. Check if it's a valid (i.e. not excluded) node
* 2. Check if it's a valid (i.e. not excluded) node * 2. Check if it's a registered node
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
* 4. Send healthStatus to RMNode * 4. Send healthStatus to RMNode
*/ */
NodeId nodeId = remoteNodeStatus.getNodeId(); NodeId nodeId = remoteNodeStatus.getNodeId();
// 1. Check if it's a registered node // 1. Check if it's a valid (i.e. not excluded) node
if (!this.nodesListManager.isValidNode(nodeId.getHost())) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();
LOG.info(message);
shutDown.setDiagnosticsMessage(message);
return shutDown;
}
// 2. Check if it's a registered node
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) { if (rmNode == null) {
/* node does not exist */ /* node does not exist */
@ -370,18 +380,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// Send ping // Send ping
this.nmLivelinessMonitor.receivedPing(nodeId); this.nmLivelinessMonitor.receivedPing(nodeId);
// 2. Check if it's a valid (i.e. not excluded) node
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ rmNode.getNodeAddress();
LOG.info(message);
shutDown.setDiagnosticsMessage(message);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
return shutDown;
}
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse

View File

@ -130,17 +130,17 @@ public void testDecommissionWithIncludeHosts() throws Exception {
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true); nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert Assert
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true); nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
.equals(nodeHeartbeat.getNodeAction())); .equals(nodeHeartbeat.getNodeAction()));
checkDecommissionedNMCount(rm, ++metricCount);
nodeHeartbeat = nm3.nodeHeartbeat(true); nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
@ -185,6 +185,8 @@ protected Dispatcher createDispatcher() {
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, metricCount + 2);
nodeHeartbeat = nm1.nodeHeartbeat(true); nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true); nodeHeartbeat = nm2.nodeHeartbeat(true);
@ -195,7 +197,7 @@ protected Dispatcher createDispatcher() {
Assert.assertTrue("The decommisioned metrics are not updated", Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
dispatcher.await(); dispatcher.await();
checkDecommissionedNMCount(rm, metricCount + 2);
writeToHostsFile(""); writeToHostsFile("");
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
@ -234,6 +236,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath()); .getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true); nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals( Assert.assertEquals(
"Node should not have been decomissioned.", "Node should not have been decomissioned.",
@ -243,7 +246,6 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
Assert.assertEquals("Node should have been decomissioned but is in state" + Assert.assertEquals("Node should have been decomissioned but is in state" +
nodeHeartbeat.getNodeAction(), nodeHeartbeat.getNodeAction(),
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
checkDecommissionedNMCount(rm, ++initialMetricCount);
} }
/** /**
@ -271,6 +273,7 @@ public void testAddNewExcludePathToConfiguration() throws Exception {
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
.getAbsolutePath()); .getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf); rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true); nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals( Assert.assertEquals(
"Node should not have been decomissioned.", "Node should not have been decomissioned.",
@ -280,7 +283,6 @@ public void testAddNewExcludePathToConfiguration() throws Exception {
Assert.assertEquals("Node should have been decomissioned but is in state" + Assert.assertEquals("Node should have been decomissioned but is in state" +
nodeHeartbeat.getNodeAction(), nodeHeartbeat.getNodeAction(),
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
checkDecommissionedNMCount(rm, ++initialMetricCount);
} }
@Test @Test