YARN-3226. UI changes for decommissioning node. Contributed by Sunil G.

(cherry picked from commit 1de56b0448)
This commit is contained in:
Junping Du 2015-12-17 15:19:48 -08:00
parent 294d0f6f6a
commit 6d2914a697
9 changed files with 161 additions and 42 deletions

View File

@ -225,6 +225,9 @@ Release 2.8.0 - UNRELEASED
YARN-3623. Add a new config to indicate the Timeline Service version.
(Xuan Gong via junping_du)
YARN-3226. UI changes for decommissioning node. (Sunil G via
junping_du)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -40,6 +40,7 @@ public class ClusterMetrics {
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
@Metric("# of active NMs") MutableGaugeInt numActiveNMs;
@Metric("# of decommissioning NMs") MutableGaugeInt numDecommissioningNMs;
@Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs;
@Metric("# of lost NMs") MutableGaugeInt numLostNMs;
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
@ -86,7 +87,24 @@ synchronized static void destroy() {
public int getNumActiveNMs() {
return numActiveNMs.value();
}
// Decommissioning NMs
public int getNumDecommissioningNMs() {
return numDecommissioningNMs.value();
}
public void incrDecommissioningNMs() {
numDecommissioningNMs.incr();
}
public void setDecommissioningNMs(int num) {
numDecommissioningNMs.set(num);
}
public void decrDecommissioningNMs() {
numDecommissioningNMs.decr();
}
//Decommisioned NMs
public int getNumDecommisionedNMs() {
return numDecommissionedNMs.value();

View File

@ -647,13 +647,34 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
}
}
// Treats nodes in decommissioning as active nodes
// TODO we may want to differentiate active nodes and decommissioning node in
// metrics later.
private void updateMetricsForGracefulDecommissionOnUnhealthyNode() {
// Update metrics when moving to Decommissioning state
private void updateMetricsForGracefulDecommission(NodeState initialState,
NodeState finalState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.incrNumActiveNodes();
metrics.decrNumUnhealthyNMs();
switch (initialState) {
case UNHEALTHY :
metrics.decrNumUnhealthyNMs();
break;
case RUNNING :
metrics.decrNumActiveNodes();
break;
case DECOMMISSIONING :
metrics.decrDecommissioningNMs();
break;
default :
LOG.warn("Unexpcted initial state");
}
switch (finalState) {
case DECOMMISSIONING :
metrics.incrDecommissioningNMs();
break;
case RUNNING :
metrics.incrNumActiveNodes();
break;
default :
LOG.warn("Unexpected final state");
}
}
private void updateMetricsForDeactivatedNode(NodeState initialState,
@ -665,18 +686,18 @@ private void updateMetricsForDeactivatedNode(NodeState initialState,
metrics.decrNumActiveNodes();
break;
case DECOMMISSIONING:
metrics.decrNumActiveNodes();
metrics.decrDecommissioningNMs();
break;
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
default:
LOG.debug("Unexpected inital state");
LOG.warn("Unexpected initial state");
}
switch (finalState) {
case DECOMMISSIONED:
metrics.incrDecommisionedNMs();
metrics.incrDecommisionedNMs();
break;
case LOST:
metrics.incrNumLostNMs();
@ -691,7 +712,7 @@ private void updateMetricsForDeactivatedNode(NodeState initialState,
metrics.incrNumShutdownNMs();
break;
default:
LOG.debug("Unexpected final state");
LOG.warn("Unexpected final state");
}
}
@ -1014,9 +1035,8 @@ public DecommissioningNodeTransition(NodeState initState,
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
if (initState.equals(NodeState.UNHEALTHY)) {
rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode();
}
// Update NM metrics during graceful decommissioning.
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
// TODO (in YARN-3223) Keep NM's available resource to be 0
}
}
@ -1033,6 +1053,8 @@ public RecommissionNodeTransition(NodeState finalState) {
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " +
"recommissioned back to RUNNING.");
rmNode
.updateMetricsForGracefulDecommission(rmNode.getState(), finalState);
// TODO handle NM resource resume in YARN-3223.
}
}

View File

@ -53,8 +53,7 @@ protected void render(Block html) {
//CSS in the correct spot
html.style(".metrics {margin-bottom:5px}");
ClusterMetricsInfo clusterMetrics =
new ClusterMetricsInfo(this.rm);
ClusterMetricsInfo clusterMetrics = new ClusterMetricsInfo(this.rm);
DIV<Hamlet> div = html.div().$class("metrics");
@ -73,12 +72,6 @@ protected void render(Block html) {
th().$class("ui-state-default")._("VCores Used")._().
th().$class("ui-state-default")._("VCores Total")._().
th().$class("ui-state-default")._("VCores Reserved")._().
th().$class("ui-state-default")._("Active Nodes")._().
th().$class("ui-state-default")._("Decommissioned Nodes")._().
th().$class("ui-state-default")._("Lost Nodes")._().
th().$class("ui-state-default")._("Unhealthy Nodes")._().
th().$class("ui-state-default")._("Rebooted Nodes")._().
th().$class("ui-state-default")._("Shutdown Nodes")._().
_().
_().
tbody().$class("ui-widget-content").
@ -99,7 +92,26 @@ protected void render(Block html) {
td(String.valueOf(clusterMetrics.getAllocatedVirtualCores())).
td(String.valueOf(clusterMetrics.getTotalVirtualCores())).
td(String.valueOf(clusterMetrics.getReservedVirtualCores())).
_().
_()._();
div.h3("Cluster Nodes Metrics").
table("#nodemetricsoverview").
thead().$class("ui-widget-header").
tr().
th().$class("ui-state-default")._("Active Nodes")._().
th().$class("ui-state-default")._("Decommissioning Nodes")._().
th().$class("ui-state-default")._("Decommissioned Nodes")._().
th().$class("ui-state-default")._("Lost Nodes")._().
th().$class("ui-state-default")._("Unhealthy Nodes")._().
th().$class("ui-state-default")._("Rebooted Nodes")._().
th().$class("ui-state-default")._("Shutdown Nodes")._().
_().
_().
tbody().$class("ui-widget-content").
tr().
td().a(url("nodes"),String.valueOf(clusterMetrics.getActiveNodes()))._().
td().a(url("nodes/decommissioning"), String.valueOf(clusterMetrics.getDecommissioningNodes()))._().
td().a(url("nodes/decommissioned"),String.valueOf(clusterMetrics.getDecommissionedNodes()))._().
td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._().
td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._().
@ -107,7 +119,7 @@ protected void render(Block html) {
td().a(url("nodes/shutdown"),String.valueOf(clusterMetrics.getShutdownNodes()))._().
_().
_()._();
String user = request().getRemoteUser();
if (user != null) {
UserMetricsInfo userMetrics = new UserMetricsInfo(this.rm, user);

View File

@ -94,6 +94,9 @@ protected void render(Block html) {
rmNodes = this.rm.getRMContext().getInactiveRMNodes().values();
isInactive = true;
break;
case DECOMMISSIONING:
// Do nothing
break;
default:
LOG.debug("Unexpected state filter for inactive RM node");
}

View File

@ -54,6 +54,7 @@ public class ClusterMetricsInfo {
protected int totalNodes;
protected int lostNodes;
protected int unhealthyNodes;
protected int decommissioningNodes;
protected int decommissionedNodes;
protected int rebootedNodes;
protected int activeNodes;
@ -91,6 +92,7 @@ public ClusterMetricsInfo(final ResourceManager rm) {
this.activeNodes = clusterMetrics.getNumActiveNMs();
this.lostNodes = clusterMetrics.getNumLostNMs();
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
this.decommissioningNodes = clusterMetrics.getNumDecommissioningNMs();
this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
@ -186,6 +188,10 @@ public int getUnhealthyNodes() {
return this.unhealthyNodes;
}
public int getDecommissioningNodes() {
return this.decommissioningNodes;
}
public int getDecommissionedNodes() {
return this.decommissionedNodes;
}

View File

@ -236,29 +236,49 @@ public void testExpiredContainer() {
}
@Test
public void testStatusUpdateOnDecommissioningNode(){
public void testStatusUpdateOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialDecommissioning = cm.getNumDecommissioningNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
// Verify node in DECOMMISSIONING won't be changed by status update
// with running apps
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEventWithRunningApps();
node.handle(statusEvent);
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
cm.getNumDecommissioningNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
cm.getNumDecommisionedNMs());
// Verify node in DECOMMISSIONING will be changed by status update
// without running apps
statusEvent = getMockRMNodeStatusEventWithoutRunningApps();
node.handle(statusEvent);
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
cm.getNumDecommissioningNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
cm.getNumDecommisionedNMs());
}
@Test
public void testRecommissionNode(){
public void testRecommissionNode() {
RMNodeImpl node = getDecommissioningNode();
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.RECOMMISSION));
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialDecommissioning = cm.getNumDecommissioningNMs();
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.RECOMMISSION));
Assert.assertEquals(NodeState.RUNNING, node.getState());
Assert
.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
cm.getNumDecommissioningNMs());
}
@Test (timeout = 5000)
@ -481,16 +501,18 @@ public void testDecommissionOnDecommissioningNode() {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
int initialDecommissioning = cm.getNumDecommissioningNMs();
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned + 1, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
cm.getNumDecommissioningNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes", initialRebooted,
cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@ -525,16 +547,19 @@ public void testUnhealthyDecommissioning() {
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialDecommissioning = cm.getNumDecommissioningNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive + 1,
Assert.assertEquals("Active Nodes", initialActive,
cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
cm.getNumDecommisionedNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
cm.getNumDecommissioningNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
@ -681,9 +706,16 @@ private RMNodeImpl getRunningNode(String nmVersion, int port) {
private RMNodeImpl getDecommissioningNode() {
RMNodeImpl node = getRunningNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialDecommissioning = cm.getNumDecommissioningNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
Assert
.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
cm.getNumDecommissioningNMs());
return node;
}
@ -774,16 +806,30 @@ public void testReconnect() {
@Test
public void testReconnectOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialDecommissioning = cm.getNumDecommissioningNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
// Reconnect event with running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
getAppIdList(), null));
// still decommissioning
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
cm.getNumDecommissioningNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
cm.getNumDecommisionedNMs());
// Reconnect event without any running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
cm.getNumDecommissioningNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
cm.getNumDecommisionedNMs());
}
@Test
@ -846,17 +892,26 @@ public void testResourceUpdateOnNewNode() {
@Test
public void testResourceUpdateOnRebootedNode() {
RMNodeImpl node = getRebootedNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialUnHealthy = cm.getUnhealthyNMs();
int initialDecommissioning = cm.getNumDecommissioningNMs();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), ResourceOption
.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.REBOOTED, node.getState());
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Unhelathy Nodes", initialUnHealthy,
cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
cm.getNumDecommissioningNMs());
}
// Test unhealthy report on a decommissioning node will make it

View File

@ -47,7 +47,7 @@ public class TestNodesPage {
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 22;
final int numberOfThInMetricsTable = 23;
final int numberOfActualTableHeaders = 13;
private Injector injector;

View File

@ -429,7 +429,7 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
Exception {
assertEquals("incorrect number of elements", 1, json.length());
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
assertEquals("incorrect number of elements", 24, clusterinfo.length());
assertEquals("incorrect number of elements", 25, clusterinfo.length());
verifyClusterMetrics(
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),