MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes on the webapps. Contributed by Bhallamudi Venkata Siva Kamesh and Jason Lowe.

svn merge --ignore-ancestry -c 1236433 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1236438 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-26 22:58:12 +00:00
parent 981ba68664
commit 890abf867c
20 changed files with 390 additions and 134 deletions

View File

@ -164,6 +164,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3718. Change default AM heartbeat interval to 1 second. (Hitesh MAPREDUCE-3718. Change default AM heartbeat interval to 1 second. (Hitesh
Shah via sseth) Shah via sseth)
MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes
on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv)
BUG FIXES BUG FIXES
MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2 MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2
(Jason Lowe via bobby) (Jason Lowe via bobby)

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@ -39,9 +38,9 @@ public class ClusterMetrics {
private static AtomicBoolean isInitialized = new AtomicBoolean(false); private static AtomicBoolean isInitialized = new AtomicBoolean(false);
@Metric("# of NMs") MutableGaugeInt numNMs; @Metric("# of active NMs") MutableGaugeInt numNMs;
@Metric("# of decommissioned NMs") MutableCounterInt numDecommissionedNMs; @Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs;
@Metric("# of lost NMs") MutableCounterInt numLostNMs; @Metric("# of lost NMs") MutableGaugeInt numLostNMs;
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs; @Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
@Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs; @Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs;
@ -73,8 +72,8 @@ public class ClusterMetrics {
} }
} }
//Total Nodemanagers //Active Nodemanagers
public int getNumNMs() { public int getNumActiveNMs() {
return numNMs.value(); return numNMs.value();
} }
@ -87,6 +86,10 @@ public class ClusterMetrics {
numDecommissionedNMs.incr(); numDecommissionedNMs.incr();
} }
public void decrDecommisionedNMs() {
numDecommissionedNMs.decr();
}
//Lost NMs //Lost NMs
public int getNumLostNMs() { public int getNumLostNMs() {
return numLostNMs.value(); return numLostNMs.value();
@ -96,6 +99,10 @@ public class ClusterMetrics {
numLostNMs.incr(); numLostNMs.incr();
} }
public void decrNumLostNMs() {
numLostNMs.decr();
}
//Unhealthy NMs //Unhealthy NMs
public int getUnhealthyNMs() { public int getUnhealthyNMs() {
return numUnhealthyNMs.value(); return numUnhealthyNMs.value();
@ -118,6 +125,10 @@ public class ClusterMetrics {
numRebootedNMs.incr(); numRebootedNMs.incr();
} }
public void decrNumRebootedNMs() {
numRebootedNMs.decr();
}
public void removeNode(RMNodeEventType nodeEventType) { public void removeNode(RMNodeEventType nodeEventType) {
numNMs.decr(); numNMs.decr();
switch(nodeEventType){ switch(nodeEventType){

View File

@ -43,6 +43,8 @@ public interface RMContext {
ApplicationsStore getApplicationsStore(); ApplicationsStore getApplicationsStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps(); ConcurrentMap<ApplicationId, RMApp> getRMApps();
ConcurrentMap<String, RMNode> getInactiveRMNodes();
ConcurrentMap<NodeId, RMNode> getRMNodes(); ConcurrentMap<NodeId, RMNode> getRMNodes();

View File

@ -43,6 +43,9 @@ public class RMContextImpl implements RMContext {
private final ConcurrentMap<NodeId, RMNode> nodes private final ConcurrentMap<NodeId, RMNode> nodes
= new ConcurrentHashMap<NodeId, RMNode>(); = new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amLivelinessMonitor;
private ContainerAllocationExpirer containerAllocationExpirer; private ContainerAllocationExpirer containerAllocationExpirer;
@ -83,6 +86,11 @@ public class RMContextImpl implements RMContext {
public ConcurrentMap<NodeId, RMNode> getRMNodes() { public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return this.nodes; return this.nodes;
} }
@Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return this.inactiveNodes;
}
@Override @Override
public ContainerAllocationExpirer getContainerAllocationExpirer() { public ContainerAllocationExpirer getContainerAllocationExpirer() {

View File

@ -220,10 +220,6 @@ public class ResourceTrackerService extends AbstractService implements
if (rmNode == null) { if (rmNode == null) {
/* node does not exist */ /* node does not exist */
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId()); LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
// Updating the metrics directly as reboot event cannot be
// triggered on a null rmNode
ClusterMetrics.getMetrics().incrNumRebootedNMs();
return reboot; return reboot;
} }

View File

@ -119,7 +119,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType.DECOMMISSION, new RemoveNodeTransition()) RMNodeEventType.DECOMMISSION, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST, .addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
RMNodeEventType.EXPIRE, new RemoveNodeTransition()) RMNodeEventType.EXPIRE, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST, .addTransition(RMNodeState.RUNNING, RMNodeState.REBOOTED,
RMNodeEventType.REBOOTING, new RemoveNodeTransition()) RMNodeEventType.REBOOTING, new RemoveNodeTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING, .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
@ -307,6 +307,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public static class AddNodeTransition implements public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> { SingleArcTransition<RMNodeImpl, RMNodeEvent> {
private void updateMetrics(RMNodeState nodeState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
switch (nodeState) {
case LOST:
metrics.decrNumLostNMs();
break;
case REBOOTED:
metrics.decrNumRebootedNMs();
break;
case DECOMMISSIONED:
metrics.decrDecommisionedNMs();
break;
}
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
@ -315,6 +330,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode)); new NodeAddedSchedulerEvent(rmNode));
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
RMNode node = rmNode.context.getInactiveRMNodes().get(host);
rmNode.context.getInactiveRMNodes().remove(host);
updateMetrics(node.getState());
}
ClusterMetrics.getMetrics().addNode(); ClusterMetrics.getMetrics().addNode();
} }
@ -353,7 +375,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Remove the node from the system. // Remove the node from the system.
rmNode.context.getRMNodes().remove(rmNode.nodeId); rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Removed Node " + rmNode.nodeId); LOG.info("Removed Node " + rmNode.nodeId);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics //Update the metrics
ClusterMetrics.getMetrics().removeNode(event.getType()); ClusterMetrics.getMetrics().removeNode(event.getType());
} }

View File

@ -68,7 +68,7 @@ public class MetricsOverviewTable extends HtmlBlock {
th().$class("ui-state-default")._("Memory Used")._(). th().$class("ui-state-default")._("Memory Used")._().
th().$class("ui-state-default")._("Memory Total")._(). th().$class("ui-state-default")._("Memory Total")._().
th().$class("ui-state-default")._("Memory Reserved")._(). th().$class("ui-state-default")._("Memory Reserved")._().
th().$class("ui-state-default")._("Total Nodes")._(). th().$class("ui-state-default")._("Active Nodes")._().
th().$class("ui-state-default")._("Decommissioned Nodes")._(). th().$class("ui-state-default")._("Decommissioned Nodes")._().
th().$class("ui-state-default")._("Lost Nodes")._(). th().$class("ui-state-default")._("Lost Nodes")._().
th().$class("ui-state-default")._("Unhealthy Nodes")._(). th().$class("ui-state-default")._("Unhealthy Nodes")._().
@ -82,7 +82,7 @@ public class MetricsOverviewTable extends HtmlBlock {
td(StringUtils.byteDesc(clusterMetrics.getAllocatedMB() * BYTES_IN_MB)). td(StringUtils.byteDesc(clusterMetrics.getAllocatedMB() * BYTES_IN_MB)).
td(StringUtils.byteDesc(clusterMetrics.getTotalMB() * BYTES_IN_MB)). td(StringUtils.byteDesc(clusterMetrics.getTotalMB() * BYTES_IN_MB)).
td(StringUtils.byteDesc(clusterMetrics.getReservedMB() * BYTES_IN_MB)). td(StringUtils.byteDesc(clusterMetrics.getReservedMB() * BYTES_IN_MB)).
td().a(url("nodes"),String.valueOf(clusterMetrics.getTotalNodes()))._(). td().a(url("nodes"),String.valueOf(clusterMetrics.getActiveNodes()))._().
td().a(url("nodes/decommissioned"),String.valueOf(clusterMetrics.getDecommissionedNodes()))._(). td().a(url("nodes/decommissioned"),String.valueOf(clusterMetrics.getDecommissionedNodes()))._().
td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._(). td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._().
td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._(). td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._().

View File

@ -24,6 +24,8 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.Collection;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -79,7 +82,19 @@ class NodesPage extends RmView {
if(type != null && !type.isEmpty()) { if(type != null && !type.isEmpty()) {
stateFilter = RMNodeState.valueOf(type.toUpperCase()); stateFilter = RMNodeState.valueOf(type.toUpperCase());
} }
for (RMNode ni : this.rmContext.getRMNodes().values()) { Collection<RMNode> rmNodes = this.rmContext.getRMNodes().values();
boolean isInactive = false;
if (stateFilter != null) {
switch (stateFilter) {
case DECOMMISSIONED:
case LOST:
case REBOOTED:
rmNodes = this.rmContext.getInactiveRMNodes().values();
isInactive = true;
break;
}
}
for (RMNode ni : rmNodes) {
if(stateFilter != null) { if(stateFilter != null) {
RMNodeState state = ni.getState(); RMNodeState state = ni.getState();
if(!stateFilter.equals(state)) { if(!stateFilter.equals(state)) {
@ -89,12 +104,17 @@ class NodesPage extends RmView {
NodeInfo info = new NodeInfo(ni, sched); NodeInfo info = new NodeInfo(ni, sched);
int usedMemory = (int)info.getUsedMemory(); int usedMemory = (int)info.getUsedMemory();
int availableMemory = (int)info.getAvailableMemory(); int availableMemory = (int)info.getAvailableMemory();
tbody.tr(). TR<TBODY<TABLE<Hamlet>>> row = tbody.tr().
td(info.getRack()). td(info.getRack()).
td(info.getState()). td(info.getState()).
td(info.getNodeId()). td(info.getNodeId());
td().a("http://" + info.getNodeHTTPAddress(), info.getNodeHTTPAddress())._(). if (isInactive) {
td(info.getHealthStatus()). row.td()._("N/A")._();
} else {
String httpAddress = info.getNodeHTTPAddress();
row.td().a("http://" + httpAddress, httpAddress)._();
}
row.td(info.getHealthStatus()).
td(Times.format(info.getLastHealthUpdate())). td(Times.format(info.getLastHealthUpdate())).
td(info.getHealthReport()). td(info.getHealthReport()).
td(String.valueOf(info.getNumContainers())). td(String.valueOf(info.getNumContainers())).

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp; package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -68,6 +69,7 @@ import com.google.inject.Singleton;
@Singleton @Singleton
@Path("/ws/v1/cluster") @Path("/ws/v1/cluster")
public class RMWebServices { public class RMWebServices {
private static final String EMPTY = "";
private static final Log LOG = LogFactory.getLog(RMWebServices.class); private static final Log LOG = LogFactory.getLog(RMWebServices.class);
private final ResourceManager rm; private final ResourceManager rm;
private static RecordFactory recordFactory = RecordFactoryProvider private static RecordFactory recordFactory = RecordFactoryProvider
@ -144,12 +146,23 @@ public class RMWebServices {
if (sched == null) { if (sched == null) {
throw new NotFoundException("Null ResourceScheduler instance"); throw new NotFoundException("Null ResourceScheduler instance");
} }
Collection<RMNode> rmNodes = this.rm.getRMContext().getRMNodes().values();
boolean isInactive = false;
if (filterState != null && !filterState.isEmpty()) {
RMNodeState nodeState = RMNodeState.valueOf(filterState.toUpperCase());
switch (nodeState) {
case DECOMMISSIONED:
case LOST:
case REBOOTED:
rmNodes = this.rm.getRMContext().getInactiveRMNodes().values();
isInactive = true;
break;
}
}
NodesInfo allNodes = new NodesInfo(); NodesInfo allNodes = new NodesInfo();
for (RMNode ni : this.rm.getRMContext().getRMNodes().values()) { for (RMNode ni : rmNodes) {
NodeInfo nodeInfo = new NodeInfo(ni, sched); NodeInfo nodeInfo = new NodeInfo(ni, sched);
if (filterState != null) { if (filterState != null) {
RMNodeState.valueOf(filterState);
if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) { if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) {
continue; continue;
} }
@ -165,6 +178,9 @@ public class RMWebServices {
continue; continue;
} }
} }
if (isInactive) {
nodeInfo.setNodeHTTPAddress(EMPTY);
}
allNodes.add(nodeInfo); allNodes.add(nodeInfo);
} }
return allNodes; return allNodes;
@ -183,10 +199,19 @@ public class RMWebServices {
} }
NodeId nid = ConverterUtils.toNodeId(nodeId); NodeId nid = ConverterUtils.toNodeId(nodeId);
RMNode ni = this.rm.getRMContext().getRMNodes().get(nid); RMNode ni = this.rm.getRMContext().getRMNodes().get(nid);
boolean isInactive = false;
if (ni == null) { if (ni == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found"); ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost());
if (ni == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
}
isInactive = true;
} }
return new NodeInfo(ni, sched); NodeInfo nodeInfo = new NodeInfo(ni, sched);
if (isInactive) {
nodeInfo.setNodeHTTPAddress(EMPTY);
}
return nodeInfo;
} }
@GET @GET

View File

@ -44,6 +44,7 @@ public class ClusterMetricsInfo {
protected int unhealthyNodes; protected int unhealthyNodes;
protected int decommissionedNodes; protected int decommissionedNodes;
protected int rebootedNodes; protected int rebootedNodes;
protected int activeNodes;
public ClusterMetricsInfo() { public ClusterMetricsInfo() {
} // JAXB needs this } // JAXB needs this
@ -59,12 +60,13 @@ public class ClusterMetricsInfo {
this.allocatedMB = metrics.getAllocatedGB() * MB_IN_GB; this.allocatedMB = metrics.getAllocatedGB() * MB_IN_GB;
this.containersAllocated = metrics.getAllocatedContainers(); this.containersAllocated = metrics.getAllocatedContainers();
this.totalMB = availableMB + reservedMB + allocatedMB; this.totalMB = availableMB + reservedMB + allocatedMB;
this.totalNodes = clusterMetrics.getNumNMs(); this.activeNodes = clusterMetrics.getNumActiveNMs();
this.lostNodes = clusterMetrics.getNumLostNMs(); this.lostNodes = clusterMetrics.getNumLostNMs();
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs(); this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs(); this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
this.totalNodes = activeNodes + lostNodes + decommissionedNodes
+ rebootedNodes;
} }
public int getAppsSubmitted() { public int getAppsSubmitted() {
@ -94,6 +96,10 @@ public class ClusterMetricsInfo {
public int getTotalNodes() { public int getTotalNodes() {
return this.totalNodes; return this.totalNodes;
} }
public int getActiveNodes() {
return this.activeNodes;
}
public int getLostNodes() { public int getLostNodes() {
return this.lostNodes; return this.lostNodes;

View File

@ -94,6 +94,10 @@ public class NodeInfo {
public String getNodeHTTPAddress() { public String getNodeHTTPAddress() {
return this.nodeHTTPAddress; return this.nodeHTTPAddress;
} }
public void setNodeHTTPAddress(String nodeHTTPAddress) {
this.nodeHTTPAddress = nodeHTTPAddress;
}
public String getHealthStatus() { public String getHealthStatus() {
return this.healthStatus; return this.healthStatus;

View File

@ -81,13 +81,20 @@ public class MockNM {
} }
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception { public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b); return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
b, ++responseId);
} }
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy) throws Exception { List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
return nodeHeartbeat(conts, isHealthy, ++responseId);
}
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class); NodeStatus status = Records.newRecord(NodeStatus.class);
status.setResponseId(resId);
status.setNodeId(nodeId); status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) { for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
status.setContainersStatuses(entry.getValue()); status.setContainersStatuses(entry.getValue());
@ -97,7 +104,6 @@ public class MockNM {
healthStatus.setIsNodeHealthy(isHealthy); healthStatus.setIsNodeHealthy(isHealthy);
healthStatus.setLastHealthReportTime(1); healthStatus.setLastHealthReportTime(1);
status.setNodeHealthStatus(healthStatus); status.setNodeHealthStatus(healthStatus);
status.setResponseId(++responseId);
req.setNodeStatus(status); req.setNodeStatus(status);
return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse(); return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
} }

View File

@ -56,6 +56,17 @@ public class MockNodes {
} }
return list; return list;
} }
public static List<RMNode> lostNodes(int racks, int nodesPerRack,
Resource perNode) {
List<RMNode> list = Lists.newArrayList();
for (int i = 0; i < racks; ++i) {
for (int j = 0; j < nodesPerRack; ++j) {
list.add(lostNodeInfo(i, perNode, RMNodeState.LOST));
}
}
return list;
}
public static NodeId newNodeID(String host, int port) { public static NodeId newNodeID(String host, int port) {
NodeId nid = recordFactory.newRecordInstance(NodeId.class); NodeId nid = recordFactory.newRecordInstance(NodeId.class);
@ -82,92 +93,120 @@ public class MockNodes {
return rs; return rs;
} }
public static RMNode newNodeInfo(int rack, final Resource perNode) { private static class MockRMNodeImpl implements RMNode {
private NodeId nodeId;
private String hostName;
private String nodeAddr;
private String httpAddress;
private int cmdPort;
private Resource perNode;
private String rackName;
private NodeHealthStatus nodeHealthStatus;
private RMNodeState state;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, NodeHealthStatus nodeHealthStatus,
int cmdPort, String hostName, RMNodeState state) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
this.httpAddress = httpAddress;
this.perNode = perNode;
this.rackName = rackName;
this.nodeHealthStatus = nodeHealthStatus;
this.cmdPort = cmdPort;
this.hostName = hostName;
this.state = state;
}
@Override
public NodeId getNodeID() {
return this.nodeId;
}
@Override
public String getHostName() {
return this.hostName;
}
@Override
public int getCommandPort() {
return this.cmdPort;
}
@Override
public int getHttpPort() {
return 0;
}
@Override
public String getNodeAddress() {
return this.nodeAddr;
}
@Override
public String getHttpAddress() {
return this.httpAddress;
}
@Override
public NodeHealthStatus getNodeHealthStatus() {
return this.nodeHealthStatus;
}
@Override
public Resource getTotalCapability() {
return this.perNode;
}
@Override
public String getRackName() {
return this.rackName;
}
@Override
public Node getNode() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public RMNodeState getState() {
return this.state;
}
@Override
public List<ContainerId> getContainersToCleanUp() {
return null;
}
@Override
public List<ApplicationId> getAppsToCleanup() {
return null;
}
@Override
public HeartbeatResponse getLastHeartBeatResponse() {
return null;
}
};
private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) {
final String rackName = "rack"+ rack; final String rackName = "rack"+ rack;
final int nid = NODE_ID++; final int nid = NODE_ID++;
final String hostName = "host"+ nid; final String hostName = "host"+ nid;
final int port = 123; final int port = 123;
final NodeId nodeID = newNodeID(hostName, port); final NodeId nodeID = newNodeID(hostName, port);
final String httpAddress = "localhost:0"; final String httpAddress = httpAddr;
final NodeHealthStatus nodeHealthStatus = final NodeHealthStatus nodeHealthStatus =
recordFactory.newRecordInstance(NodeHealthStatus.class); recordFactory.newRecordInstance(NodeHealthStatus.class);
final Resource used = newUsedResource(perNode); return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName,
final Resource avail = newAvailResource(perNode, used); nodeHealthStatus, nid, hostName, state);
return new RMNode() { }
@Override
public NodeId getNodeID() {
return nodeID;
}
@Override public static RMNode lostNodeInfo(int rack, final Resource perNode, RMNodeState state) {
public String getNodeAddress() { return buildRMNode(rack, perNode, state, "N/A");
return hostName; }
}
@Override public static RMNode newNodeInfo(int rack, final Resource perNode) {
public String getHttpAddress() { return buildRMNode(rack, perNode, null, "localhost:0");
return httpAddress;
}
@Override
public Resource getTotalCapability() {
return perNode;
}
@Override
public String getRackName() {
return rackName;
}
@Override
public Node getNode() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public NodeHealthStatus getNodeHealthStatus() {
return nodeHealthStatus;
}
@Override
public int getCommandPort() {
return nid;
}
@Override
public int getHttpPort() {
// TODO Auto-generated method stub
return 0;
}
@Override
public String getHostName() {
return hostName;
}
@Override
public RMNodeState getState() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<ApplicationId> getAppsToCleanup() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<ContainerId> getContainersToCleanUp() {
// TODO Auto-generated method stub
return null;
}
@Override
public HeartbeatResponse getLastHeartBeatResponse() {
// TODO Auto-generated method stub
return null;
}
};
} }
} }

View File

@ -130,6 +130,12 @@ public class MockRM extends ResourceManager {
nm.getNodeId()); nm.getNodeId());
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED)); node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
} }
public void sendNodeLost(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId());
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
}
public void NMwaitForState(NodeId nodeid, RMNodeState finalState) public void NMwaitForState(NodeId nodeid, RMNodeState finalState)
throws Exception { throws Exception {

View File

@ -31,6 +31,7 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
@ -100,8 +101,8 @@ public class TestRMNodeTransitions {
rmDispatcher.register(SchedulerEventType.class, rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher()); new TestSchedulerEventDispatcher());
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null); node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
} }

View File

@ -157,14 +157,14 @@ public class TestResourceTrackerService {
rm.start(); rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120); MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = new MockNM("host2:1234", 2048, rm.getResourceTrackerService()); MockNM nm2 = rm.registerNode("host2:1234", 2048);
int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs(); int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat( nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true); new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
checkRebootedNMCount(rm, ++initialMetricCount); checkRebootedNMCount(rm, ++initialMetricCount);
} }

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock;
import org.apache.hadoop.yarn.webapp.test.WebAppTests; import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -36,39 +37,65 @@ import com.google.inject.Module;
* data for all the columns in the table as specified in the header. * data for all the columns in the table as specified in the header.
*/ */
public class TestNodesPage { public class TestNodesPage {
final int numberOfRacks = 2;
final int numberOfNodesPerRack = 2;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 10;
final int numberOfActualTableHeaders = 10;
@Test private Injector injector;
public void testNodesBlockRender() throws Exception {
final int numberOfRacks = 2; @Before
final int numberOfNodesPerRack = 2; public void setUp() throws Exception {
// Number of Actual Table Headers for NodesPage.NodesBlock might change in injector = WebAppTests.createMockInjector(RMContext.class, TestRMWebApp
// future. In that case this value should be adjusted to the new value. .mockRMContext(3, numberOfRacks, numberOfNodesPerRack,
final int numberOfThInMetricsTable = 10; 8 * TestRMWebApp.GiB), new Module() {
final int numberOfActualTableHeaders = 10;
Injector injector = WebAppTests.createMockInjector(RMContext.class,
TestRMWebApp.mockRMContext(3, numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB),
new Module() {
@Override @Override
public void configure(Binder binder) { public void configure(Binder binder) {
try { try {
binder.bind(ResourceManager.class).toInstance(TestRMWebApp.mockRm(3, binder.bind(ResourceManager.class).toInstance(
numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB)); TestRMWebApp.mockRm(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB));
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
}); });
}
@Test
public void testNodesBlockRender() throws Exception {
injector.getInstance(NodesBlock.class).render(); injector.getInstance(NodesBlock.class).render();
PrintWriter writer = injector.getInstance(PrintWriter.class); PrintWriter writer = injector.getInstance(PrintWriter.class);
WebAppTests.flushOutput(injector); WebAppTests.flushOutput(injector);
Mockito.verify(writer, Mockito.times(numberOfActualTableHeaders + Mockito.verify(writer,
numberOfThInMetricsTable)).print( Mockito.times(numberOfActualTableHeaders + numberOfThInMetricsTable))
"<th"); .print("<th");
Mockito.verify( Mockito.verify(
writer, writer,
Mockito.times(numberOfRacks * numberOfNodesPerRack Mockito.times(numberOfRacks * numberOfNodesPerRack
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print("<td"); * numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
"<td");
}
@Test
public void testNodesBlockRenderForLostNodes() {
NodesBlock nodesBlock = injector.getInstance(NodesBlock.class);
nodesBlock.set("node.state", "lost");
nodesBlock.render();
PrintWriter writer = injector.getInstance(PrintWriter.class);
WebAppTests.flushOutput(injector);
Mockito.verify(writer,
Mockito.times(numberOfActualTableHeaders + numberOfThInMetricsTable))
.print("<th");
Mockito.verify(
writer,
Mockito.times(numberOfRacks * numberOfNodesPerRack
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
"<td");
} }
} }

View File

@ -120,12 +120,23 @@ public class TestRMWebApp {
for (RMNode node : nodes) { for (RMNode node : nodes) {
nodesMap.put(node.getNodeID(), node); nodesMap.put(node.getNodeID(), node);
} }
final List<RMNode> lostNodes = MockNodes.lostNodes(racks, numNodes,
newResource(mbsPerNode));
final ConcurrentMap<String, RMNode> lostNodesMap = Maps.newConcurrentMap();
for (RMNode node : lostNodes) {
lostNodesMap.put(node.getHostName(), node);
}
return new RMContextImpl(new MemStore(), null, null, null, null) { return new RMContextImpl(new MemStore(), null, null, null, null) {
@Override @Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() { public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return applicationsMaps; return applicationsMaps;
} }
@Override @Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return lostNodesMap;
}
@Override
public ConcurrentMap<NodeId, RMNode> getRMNodes() { public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return nodesMap; return nodesMap;
} }

View File

@ -370,7 +370,8 @@ public class TestRMWebServices extends JerseyTest {
WebServicesTestUtils.getXmlInt(element, "lostNodes"), WebServicesTestUtils.getXmlInt(element, "lostNodes"),
WebServicesTestUtils.getXmlInt(element, "unhealthyNodes"), WebServicesTestUtils.getXmlInt(element, "unhealthyNodes"),
WebServicesTestUtils.getXmlInt(element, "decommissionedNodes"), WebServicesTestUtils.getXmlInt(element, "decommissionedNodes"),
WebServicesTestUtils.getXmlInt(element, "rebootedNodes")); WebServicesTestUtils.getXmlInt(element, "rebootedNodes"),
WebServicesTestUtils.getXmlInt(element, "activeNodes"));
} }
} }
@ -378,7 +379,7 @@ public class TestRMWebServices extends JerseyTest {
Exception { Exception {
assertEquals("incorrect number of elements", 1, json.length()); assertEquals("incorrect number of elements", 1, json.length());
JSONObject clusterinfo = json.getJSONObject("clusterMetrics"); JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
assertEquals("incorrect number of elements", 11, clusterinfo.length()); assertEquals("incorrect number of elements", 12, clusterinfo.length());
verifyClusterMetrics(clusterinfo.getInt("appsSubmitted"), verifyClusterMetrics(clusterinfo.getInt("appsSubmitted"),
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"), clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
clusterinfo.getInt("allocatedMB"), clusterinfo.getInt("allocatedMB"),
@ -386,13 +387,13 @@ public class TestRMWebServices extends JerseyTest {
clusterinfo.getInt("totalMB"), clusterinfo.getInt("totalNodes"), clusterinfo.getInt("totalMB"), clusterinfo.getInt("totalNodes"),
clusterinfo.getInt("lostNodes"), clusterinfo.getInt("unhealthyNodes"), clusterinfo.getInt("lostNodes"), clusterinfo.getInt("unhealthyNodes"),
clusterinfo.getInt("decommissionedNodes"), clusterinfo.getInt("decommissionedNodes"),
clusterinfo.getInt("rebootedNodes")); clusterinfo.getInt("rebootedNodes"),clusterinfo.getInt("activeNodes"));
} }
public void verifyClusterMetrics(int sub, int reservedMB, int availableMB, public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
int allocMB, int containersAlloc, int totalMB, int totalNodes, int allocMB, int containersAlloc, int totalMB, int totalNodes,
int lostNodes, int unhealthyNodes, int decommissionedNodes, int lostNodes, int unhealthyNodes, int decommissionedNodes,
int rebootedNodes) throws JSONException, Exception { int rebootedNodes, int activeNodes) throws JSONException, Exception {
ResourceScheduler rs = rm.getResourceScheduler(); ResourceScheduler rs = rm.getResourceScheduler();
QueueMetrics metrics = rs.getRootQueueMetrics(); QueueMetrics metrics = rs.getRootQueueMetrics();
@ -412,8 +413,11 @@ public class TestRMWebServices extends JerseyTest {
* MB_IN_GB, allocMB); * MB_IN_GB, allocMB);
assertEquals("containersAllocated doesn't match", 0, containersAlloc); assertEquals("containersAllocated doesn't match", 0, containersAlloc);
assertEquals("totalMB doesn't match", totalMBExpect, totalMB); assertEquals("totalMB doesn't match", totalMBExpect, totalMB);
assertEquals("totalNodes doesn't match", clusterMetrics.getNumNMs(), assertEquals(
totalNodes); "totalNodes doesn't match",
clusterMetrics.getNumActiveNMs() + clusterMetrics.getNumLostNMs()
+ clusterMetrics.getNumDecommisionedNMs()
+ clusterMetrics.getNumRebootedNMs(), totalNodes);
assertEquals("lostNodes doesn't match", clusterMetrics.getNumLostNMs(), assertEquals("lostNodes doesn't match", clusterMetrics.getNumLostNMs(),
lostNodes); lostNodes);
assertEquals("unhealthyNodes doesn't match", assertEquals("unhealthyNodes doesn't match",
@ -422,6 +426,8 @@ public class TestRMWebServices extends JerseyTest {
clusterMetrics.getNumDecommisionedNMs(), decommissionedNodes); clusterMetrics.getNumDecommisionedNMs(), decommissionedNodes);
assertEquals("rebootedNodes doesn't match", assertEquals("rebootedNodes doesn't match",
clusterMetrics.getNumRebootedNMs(), rebootedNodes); clusterMetrics.getNumRebootedNMs(), rebootedNodes);
assertEquals("activeNodes doesn't match", clusterMetrics.getNumActiveNMs(),
activeNodes);
} }
@Test @Test

View File

@ -202,6 +202,69 @@ public class TestRMWebServicesNodes extends JerseyTest {
rm.stop(); rm.stop();
} }
} }
@Test
public void testNodesQueryStateLost() throws JSONException, Exception {
WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1234", 5120);
rm.sendNodeStarted(nm1);
rm.sendNodeStarted(nm2);
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING);
rm.sendNodeLost(nm1);
rm.sendNodeLost(nm2);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", RMNodeState.LOST.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
JSONObject nodes = json.getJSONObject("nodes");
assertEquals("incorrect number of elements", 1, nodes.length());
JSONArray nodeArray = nodes.getJSONArray("node");
assertEquals("incorrect number of elements", 2, nodeArray.length());
for (int i = 0; i < nodeArray.length(); ++i) {
JSONObject info = nodeArray.getJSONObject(i);
String host = info.get("id").toString().split(":")[0];
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(host);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
.toString(), info.getString("state"));
}
}
@Test
public void testSingleNodeQueryStateLost() throws JSONException, Exception {
WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1234", 5120);
rm.sendNodeStarted(nm1);
rm.sendNodeStarted(nm2);
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING);
rm.sendNodeLost(nm1);
rm.sendNodeLost(nm2);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").path("h2:1234").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
JSONObject info = json.getJSONObject("node");
String id = info.get("id").toString();
assertEquals("Incorrect Node Information.", "h2:1234", id);
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get("h2");
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
@Test @Test
public void testNodesQueryHealthy() throws JSONException, Exception { public void testNodesQueryHealthy() throws JSONException, Exception {