YARN-3266. RMContext#inactiveNodes should have NodeId as map key. Contributed by Chengbing Liu

(cherry picked from commit b46ee1e7a3)
This commit is contained in:
Jian He 2015-04-14 10:54:22 -07:00
parent c7ebecfff5
commit 81bbee6852
9 changed files with 72 additions and 21 deletions

View File

@ -144,6 +144,9 @@ Release 2.8.0 - UNRELEASED
YARN-3472. Fixed possible leak in DelegationTokenRenewer#allTokens. YARN-3472. Fixed possible leak in DelegationTokenRenewer#allTokens.
(Rohith Sharmaks via jianhe) (Rohith Sharmaks via jianhe)
YARN-3266. RMContext#inactiveNodes should have NodeId as map key.
(Chengbing Liu via jianhe)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -68,8 +68,8 @@ public class RMActiveServiceContext {
private final ConcurrentMap<NodeId, RMNode> nodes = private final ConcurrentMap<NodeId, RMNode> nodes =
new ConcurrentHashMap<NodeId, RMNode>(); new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes = private final ConcurrentMap<NodeId, RMNode> inactiveNodes =
new ConcurrentHashMap<String, RMNode>(); new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials = private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
new ConcurrentHashMap<ApplicationId, ByteBuffer>(); new ConcurrentHashMap<ApplicationId, ByteBuffer>();
@ -185,7 +185,7 @@ public class RMActiveServiceContext {
@Private @Private
@Unstable @Unstable
public ConcurrentMap<String, RMNode> getInactiveRMNodes() { public ConcurrentMap<NodeId, RMNode> getInactiveRMNodes() {
return this.inactiveNodes; return this.inactiveNodes;
} }

View File

@ -61,7 +61,7 @@ public interface RMContext {
ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps(); ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
ConcurrentMap<String, RMNode> getInactiveRMNodes(); ConcurrentMap<NodeId, RMNode> getInactiveRMNodes();
ConcurrentMap<NodeId, RMNode> getRMNodes(); ConcurrentMap<NodeId, RMNode> getRMNodes();

View File

@ -149,7 +149,7 @@ public class RMContextImpl implements RMContext {
} }
@Override @Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() { public ConcurrentMap<NodeId, RMNode> getInactiveRMNodes() {
return activeServiceContext.getInactiveRMNodes(); return activeServiceContext.getInactiveRMNodes();
} }

View File

@ -524,11 +524,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
List<NMContainerStatus> containers = null; List<NMContainerStatus> containers = null;
String host = rmNode.nodeId.getHost(); NodeId nodeId = rmNode.nodeId;
if (rmNode.context.getInactiveRMNodes().containsKey(host)) { if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) {
// Old node rejoining // Old node rejoining
RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host); RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId);
rmNode.context.getInactiveRMNodes().remove(host); rmNode.context.getInactiveRMNodes().remove(nodeId);
rmNode.updateMetricsForRejoinedNode(previouRMNode.getState()); rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
} else { } else {
// Increment activeNodes explicitly because this is a new node. // Increment activeNodes explicitly because this is a new node.
@ -737,7 +737,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.context.getRMNodes().remove(rmNode.nodeId); rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState); + finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode); rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
//Update the metrics //Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState); rmNode.updateMetricsForDeactivatedNode(initialState, finalState);

View File

@ -320,7 +320,7 @@ public class RMWebServices {
RMNode ni = this.rm.getRMContext().getRMNodes().get(nid); RMNode ni = this.rm.getRMContext().getRMNodes().get(nid);
boolean isInactive = false; boolean isInactive = false;
if (ni == null) { if (ni == null) {
ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost()); ni = this.rm.getRMContext().getInactiveRMNodes().get(nid);
if (ni == null) { if (ni == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found"); throw new NotFoundException("nodeId, " + nodeId + ", is not found");
} }

View File

@ -306,6 +306,47 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.LOST, node.getState()); Assert.assertEquals(NodeState.LOST, node.getState());
} }
@Test
public void testRunningExpireMultiple() {
RMNodeImpl node1 = getRunningNode(null, 10001);
RMNodeImpl node2 = getRunningNode(null, 10002);
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node1.handle(new RMNodeEvent(node1.getNodeID(), RMNodeEventType.EXPIRE));
Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes", initialRebooted,
cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.LOST, node1.getState());
Assert.assertTrue("Node " + node1.toString() + " should be inactive",
rmContext.getInactiveRMNodes().containsKey(node1.getNodeID()));
Assert.assertFalse("Node " + node2.toString() + " should not be inactive",
rmContext.getInactiveRMNodes().containsKey(node2.getNodeID()));
node2.handle(new RMNodeEvent(node1.getNodeID(), RMNodeEventType.EXPIRE));
Assert.assertEquals("Active Nodes", initialActive - 2, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost + 2, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes", initialRebooted,
cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.LOST, node2.getState());
Assert.assertTrue("Node " + node1.toString() + " should be inactive",
rmContext.getInactiveRMNodes().containsKey(node1.getNodeID()));
Assert.assertTrue("Node " + node2.toString() + " should be inactive",
rmContext.getInactiveRMNodes().containsKey(node2.getNodeID()));
}
@Test @Test
public void testUnhealthyExpire() { public void testUnhealthyExpire() {
RMNodeImpl node = getUnhealthyNode(); RMNodeImpl node = getUnhealthyNode();
@ -458,14 +499,18 @@ public class TestRMNodeTransitions {
} }
private RMNodeImpl getRunningNode() { private RMNodeImpl getRunningNode() {
return getRunningNode(null); return getRunningNode(null, 0);
} }
private RMNodeImpl getRunningNode(String nmVersion) { private RMNodeImpl getRunningNode(String nmVersion) {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); return getRunningNode(nmVersion, 0);
}
private RMNodeImpl getRunningNode(String nmVersion, int port) {
NodeId nodeId = BuilderUtils.newNodeId("localhost", port);
Resource capability = Resource.newInstance(4096, 4); Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
null, capability, nmVersion); capability, nmVersion);
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals(NodeState.RUNNING, node.getState()); Assert.assertEquals(NodeState.RUNNING, node.getState());
return node; return node;

View File

@ -173,10 +173,10 @@ public class TestRMWebApp {
final List<RMNode> deactivatedNodes = final List<RMNode> deactivatedNodes =
MockNodes.deactivatedNodes(racks, numNodes, newResource(mbsPerNode)); MockNodes.deactivatedNodes(racks, numNodes, newResource(mbsPerNode));
final ConcurrentMap<String, RMNode> deactivatedNodesMap = final ConcurrentMap<NodeId, RMNode> deactivatedNodesMap =
Maps.newConcurrentMap(); Maps.newConcurrentMap();
for (RMNode node : deactivatedNodes) { for (RMNode node : deactivatedNodes) {
deactivatedNodesMap.put(node.getHostName(), node); deactivatedNodesMap.put(node.getNodeID(), node);
} }
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, RMContextImpl rmContext = new RMContextImpl(null, null, null, null,
@ -186,7 +186,7 @@ public class TestRMWebApp {
return applicationsMaps; return applicationsMaps;
} }
@Override @Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() { public ConcurrentMap<NodeId, RMNode> getInactiveRMNodes() {
return deactivatedNodesMap; return deactivatedNodesMap;
} }
@Override @Override

View File

@ -32,6 +32,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -263,8 +264,9 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
assertEquals("incorrect number of elements", 2, nodeArray.length()); assertEquals("incorrect number of elements", 2, nodeArray.length());
for (int i = 0; i < nodeArray.length(); ++i) { for (int i = 0; i < nodeArray.length(); ++i) {
JSONObject info = nodeArray.getJSONObject(i); JSONObject info = nodeArray.getJSONObject(i);
String host = info.get("id").toString().split(":")[0]; String[] node = info.get("id").toString().split(":");
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(host); NodeId nodeId = NodeId.newInstance(node[0], Integer.parseInt(node[1]));
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress")); info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", rmNode.getState() WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
@ -295,7 +297,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
assertEquals("Incorrect Node Information.", "h2:1234", id); assertEquals("Incorrect Node Information.", "h2:1234", id);
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get("h2"); NodeId nodeId = NodeId.newInstance("h2", 1234);
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress")); info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", WebServicesTestUtils.checkStringMatch("state",