YARN-3212. RMNode State Transition Update with DECOMMISSIONING state. (Junping Du via wangda)

(cherry picked from commit 9bc913a35c)
This commit is contained in:
Wangda Tan 2015-09-18 10:04:17 -07:00
parent b4e1279217
commit 4a657e9326
7 changed files with 510 additions and 166 deletions

View File

@ -142,6 +142,9 @@ Release 2.8.0 - UNRELEASED
YARN-4034. Render cluster Max Priority in scheduler metrics in RM web
UI. (Rohith Sharma K S via jianhe)
YARN-3212. RMNode State Transition Update with DECOMMISSIONING state.
(Junping Du via wangda)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -399,7 +399,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION_WITH_TIMEOUT));
new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -399,8 +400,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeId nodeId = remoteNodeStatus.getNodeId();
// 1. Check if it's a valid (i.e. not excluded) node
if (!this.nodesListManager.isValidNode(nodeId.getHost())) {
// 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
// in decommissioning.
if (!this.nodesListManager.isValidNode(nodeId.getHost())
&& !isNodeInDecommissioning(nodeId)) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();
@ -486,6 +489,19 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
return nodeHeartBeatResponse;
}
/**
* Check if node in decommissioning state.
* @param nodeId
*/
private boolean isNodeInDecommissioning(NodeId nodeId) {
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode != null &&
rmNode.getState().equals(NodeState.DECOMMISSIONING)) {
return true;
}
return false;
}
@SuppressWarnings("unchecked")
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(

View File

@ -24,7 +24,7 @@ public enum RMNodeEventType {
// Source: AdminService
DECOMMISSION,
DECOMMISSION_WITH_TIMEOUT,
GRACEFUL_DECOMMISSION,
RECOMMISSION,
// Source: AdminService, ResourceTrackerService

View File

@ -144,101 +144,150 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType,
RMNodeEvent>(NodeState.NEW)
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from RUNNING state
.addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
.addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
.addTransition(NodeState.RUNNING, NodeState.LOST,
RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(NodeState.LOST))
.addTransition(NodeState.RUNNING, NodeState.REBOOTED,
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED))
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
//Transitions from RUNNING state
.addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE,
new StatusUpdateWhenHealthyTransition())
.addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
.addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONING,
RMNodeEventType.GRACEFUL_DECOMMISSION,
new DecommissioningNodeTransition(NodeState.RUNNING,
NodeState.DECOMMISSIONING))
.addTransition(NodeState.RUNNING, NodeState.LOST,
RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(NodeState.LOST))
.addTransition(NodeState.RUNNING, NodeState.REBOOTED,
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED))
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING),
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
//Transitions from REBOOTED state
.addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from DECOMMISSIONED state
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
//Transitions from REBOOTED state
.addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from LOST state
.addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
//Transitions from DECOMMISSIONED state
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
//Transitions from UNHEALTHY state
.addTransition(NodeState.UNHEALTHY,
EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
RMNodeEventType.STATUS_UPDATE,
new StatusUpdateWhenUnHealthyTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
.addTransition(NodeState.UNHEALTHY, NodeState.LOST,
RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(NodeState.LOST))
.addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED,
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED))
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
//Transitions from DECOMMISSIONING state
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
.addTransition(NodeState.DECOMMISSIONING, NodeState.RUNNING,
RMNodeEventType.RECOMMISSION,
new RecommissionNodeTransition(NodeState.RUNNING))
.addTransition(NodeState.DECOMMISSIONING,
EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
RMNodeEventType.STATUS_UPDATE,
new StatusUpdateWhenHealthyTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.GRACEFUL_DECOMMISSION,
new DecommissioningNodeTransition(NodeState.DECOMMISSIONING,
NodeState.DECOMMISSIONING))
.addTransition(NodeState.DECOMMISSIONING, NodeState.LOST,
RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(NodeState.LOST))
.addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED,
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED))
//Transitions from SHUTDOWN state
.addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
// create the topology tables
.installTopology();
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.DECOMMISSIONING, EnumSet.of(
NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenRunningTransition())
//Transitions from LOST state
.addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
//Transitions from UNHEALTHY state
.addTransition(NodeState.UNHEALTHY,
EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
RMNodeEventType.STATUS_UPDATE,
new StatusUpdateWhenUnHealthyTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
.addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONING,
RMNodeEventType.GRACEFUL_DECOMMISSION,
new DecommissioningNodeTransition(NodeState.UNHEALTHY,
NodeState.DECOMMISSIONING))
.addTransition(NodeState.UNHEALTHY, NodeState.LOST,
RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(NodeState.LOST))
.addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED,
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED))
.addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY),
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
//Transitions from SHUTDOWN state
.addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
// create the topology tables
.installTopology();
private final StateMachine<NodeState, RMNodeEventType,
RMNodeEvent> stateMachine;
@ -265,7 +314,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
this.containerAllocationExpirer = context.getContainerAllocationExpirer();
@ -291,6 +340,11 @@ public int getHttpPort() {
return httpPort;
}
// Test only
public void setHttpPort(int port) {
this.httpPort = port;
}
@Override
public NodeId getNodeID() {
return this.nodeId;
@ -497,23 +551,35 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
metrics.decrNumShutdownNMs();
break;
default:
LOG.debug("Unexpected previous node state");
LOG.debug("Unexpected previous node state");
}
}
// Treats nodes in decommissioning as active nodes
// TODO we may want to differentiate active nodes and decommissioning node in
// metrics later.
private void updateMetricsForGracefulDecommissionOnUnhealthyNode() {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.incrNumActiveNodes();
metrics.decrNumUnhealthyNMs();
}
private void updateMetricsForDeactivatedNode(NodeState initialState,
NodeState finalState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
switch (initialState) {
case RUNNING:
metrics.decrNumActiveNodes();
break;
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
default:
LOG.debug("Unexpected inital state");
case RUNNING:
metrics.decrNumActiveNodes();
break;
case DECOMMISSIONING:
metrics.decrNumActiveNodes();
break;
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
default:
LOG.debug("Unexpected inital state");
}
switch (finalState) {
@ -608,10 +674,10 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
public static class ReconnectNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
RMNode newNode = reconnectEvent.getReconnectedNode();
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
@ -622,6 +688,12 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// No application running on the node, so send node-removal event with
// cleaning up old container info.
if (noRunningApps) {
if (rmNode.getState() == NodeState.DECOMMISSIONING) {
// When node in decommissioning, and no running apps on this node,
// it will return as decommissioned state.
deactivateNode(rmNode, NodeState.DECOMMISSIONED);
return NodeState.DECOMMISSIONED;
}
rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
@ -652,6 +724,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
}
} else {
rmNode.httpPort = newNode.getHttpPort();
rmNode.httpAddress = newNode.getHttpAddress();
@ -678,17 +751,21 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
.newInstance(newNode.getTotalCapability(), -1)));
}
}
return rmNode.getState();
}
private void handleNMContainerStatus(
List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
List<ContainerStatus> containerStatuses =
new ArrayList<ContainerStatus>();
for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
containerStatuses.add(createContainerStatus(nmContainerStatus));
if (nmContainerStatuses != null) {
List<ContainerStatus> containerStatuses =
new ArrayList<ContainerStatus>();
for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
containerStatuses.add(createContainerStatus(nmContainerStatus));
}
rmnode.handleContainerStatus(containerStatuses);
}
rmnode.handleContainerStatus(containerStatuses);
}
private ContainerStatus createContainerStatus(
@ -770,31 +847,94 @@ public DeactivateNodeTransition(NodeState finalState) {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
NodeState initialState = rmNode.getState();
if (!initialState.equals(NodeState.UNHEALTHY)) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
RMNodeImpl.deactivateNode(rmNode, finalState);
}
}
/**
* Put a node in deactivated (decommissioned) status.
* @param rmNode
* @param finalState
*/
public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
reportNodeUnusable(rmNode, finalState);
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
}
/**
* Report node is UNUSABLE and update metrics.
* @param rmNode
* @param finalState
*/
public static void reportNodeUnusable(RMNodeImpl rmNode,
NodeState finalState) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
NodeState initialState = rmNode.getState();
if (!initialState.equals(NodeState.UNHEALTHY)) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
}
/**
* The transition to put node in decommissioning state.
*/
public static class DecommissioningNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
private final NodeState initState;
private final NodeState finalState;
public DecommissioningNodeTransition(NodeState initState,
NodeState finalState) {
this.initState = initState;
this.finalState = finalState;
}
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
if (initState.equals(NodeState.UNHEALTHY)) {
rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode();
}
// TODO (in YARN-3223) Keep NM's available resource to be 0
}
}
public static class RecommissionNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
private final NodeState finalState;
public RecommissionNodeTransition(NodeState finalState) {
this.finalState = finalState;
}
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " +
"recommissioned back to RUNNING.");
// TODO handle NM resource resume in YARN-3223.
}
}
/**
* Status update transition when node is healthy.
*/
public static class StatusUpdateWhenHealthyTransition implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
@Override
@ -805,25 +945,44 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus =
NodeHealthStatus remoteNodeHealthStatus =
statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());
NodeState initialState = rmNode.getState();
boolean isNodeDecommissioning =
initialState.equals(NodeState.DECOMMISSIONING);
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: "
+ remoteNodeHealthStatus.getHealthReport());
rmNode.nodeUpdateQueue.clear();
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Update metrics
rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
NodeState.UNHEALTHY);
return NodeState.UNHEALTHY;
LOG.info("Node " + rmNode.nodeId +
" reported UNHEALTHY with details: " +
remoteNodeHealthStatus.getHealthReport());
// if a node in decommissioning receives an unhealthy report,
// it will keep decommissioning.
if (isNodeDecommissioning) {
return NodeState.DECOMMISSIONING;
} else {
reportNodeUnusable(rmNode, NodeState.UNHEALTHY);
return NodeState.UNHEALTHY;
}
}
if (isNodeDecommissioning) {
List<ApplicationId> runningApps = rmNode.getRunningApps();
List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
// no running (and keeping alive) app on this node, get it
// decommissioned.
// TODO may need to check no container is being scheduled on this node
// as well.
if ((runningApps == null || runningApps.size() == 0)
&& (keepAliveApps == null || keepAliveApps.size() == 0)) {
RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
return NodeState.DECOMMISSIONED;
}
// TODO (in YARN-3223) if node in decommissioning, get node resource
// updated if container get finished (keep available resource to be 0)
}
rmNode.handleContainerStatus(statusEvent.getContainers());
@ -848,7 +1007,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
statusEvent.getKeepAliveAppIds());
}
return NodeState.RUNNING;
return initialState;
}
}
@ -857,11 +1016,12 @@ public static class StatusUpdateWhenUnHealthyTransition implements
@Override
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event;
// Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
NodeHealthStatus remoteNodeHealthStatus =
statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());

View File

@ -29,7 +29,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -75,7 +77,7 @@
public class TestRMNodeTransitions {
RMNodeImpl node;
private RMContext rmContext;
private YarnScheduler scheduler;
@ -168,6 +170,42 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent(
return event;
}
private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus();
doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
return event;
}
private List<ApplicationId> getAppIdList() {
List<ApplicationId> appIdList = new ArrayList<ApplicationId>();
appIdList.add(BuilderUtils.newApplicationId(0, 0));
return appIdList;
}
private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus();
doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
doReturn(null).when(event).getKeepAliveAppIds();
return event;
}
@Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
@ -195,7 +233,33 @@ public void testExpiredContainer() {
*/
verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));
}
@Test
public void testStatusUpdateOnDecommissioningNode(){
RMNodeImpl node = getDecommissioningNode();
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
// Verify node in DECOMMISSIONING won't be changed by status update
// with running apps
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEventWithRunningApps();
node.handle(statusEvent);
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
// Verify node in DECOMMISSIONING will be changed by status update
// without running apps
statusEvent = getMockRMNodeStatusEventWithoutRunningApps();
node.handle(statusEvent);
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test
public void testRecommissionNode(){
RMNodeImpl node = getDecommissioningNode();
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.RECOMMISSION));
Assert.assertEquals(NodeState.RUNNING, node.getState());
}
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
//Start the node
@ -253,9 +317,9 @@ public void testContainerUpdate() throws InterruptedException{
Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
.getContainerId());
Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
.getContainerId());
.getContainerId());
}
@Test (timeout = 5000)
public void testStatusChange(){
//Start the node
@ -292,7 +356,7 @@ public void testStatusChange(){
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
Assert.assertEquals(0, node.getQueueSize());
}
@Test
public void testRunningExpire() {
RMNodeImpl node = getRunningNode();
@ -375,7 +439,7 @@ public void testUnhealthyExpire() {
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.LOST, node.getState());
}
@Test
public void testUnhealthyExpireForSchedulerRemove() {
RMNodeImpl node = getUnhealthyNode();
@ -407,6 +471,28 @@ public void testRunningDecommission() {
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test
public void testDecommissionOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned + 1, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test
public void testUnhealthyDecommission() {
RMNodeImpl node = getUnhealthyNode();
@ -429,6 +515,30 @@ public void testUnhealthyDecommission() {
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
// Test Decommissioning on a unhealthy node will make it decommissioning.
@Test
public void testUnhealthyDecommissioning() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive + 1,
cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
}
@Test
public void testRunningRebooting() {
RMNodeImpl node = getRunningNode();
@ -567,6 +677,14 @@ private RMNodeImpl getRunningNode(String nmVersion, int port) {
return node;
}
private RMNodeImpl getDecommissioningNode() {
RMNodeImpl node = getRunningNode();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
return node;
}
private RMNodeImpl getUnhealthyNode() {
RMNodeImpl node = getRunningNode();
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
@ -577,20 +695,19 @@ private RMNodeImpl getUnhealthyNode() {
return node;
}
private RMNodeImpl getNewNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
return node;
}
private RMNodeImpl getNewNode(Resource capability) {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
capability, null);
return node;
}
private RMNodeImpl getRebootedNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
Resource capability = Resource.newInstance(4096, 4);
@ -650,7 +767,39 @@ public void testReconnect() {
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
nodesListManagerEvent.getType());
}
@Test
public void testReconnectOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
// Reconnect event with running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
getAppIdList(), null));
// still decommissioning
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
// Reconnect event without any running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test
public void testReconnectWithNewPortOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
Random r= new Random();
node.setHttpPort(r.nextInt(10000));
// Reconnect event with running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
getAppIdList(), null));
// still decommissioning
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
node.setHttpPort(r.nextInt(10000));
// Reconnect event without any running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test
public void testResourceUpdateOnRunningNode() {
RMNodeImpl node = getRunningNode();
@ -658,18 +807,23 @@ public void testResourceUpdateOnRunningNode() {
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.RUNNING, node.getState());
Assert.assertNotNull(nodesListManagerEvent);
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
nodesListManagerEvent.getType());
}
@Test
public void testDecommissioningOnRunningNode(){
getDecommissioningNode();
}
@Test
public void testResourceUpdateOnNewNode() {
RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
@ -682,10 +836,10 @@ public void testResourceUpdateOnNewNode() {
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.NEW, node.getState());
}
@Test
public void testResourceUpdateOnRebootedNode() {
RMNodeImpl node = getRebootedNode();
@ -702,6 +856,18 @@ public void testResourceUpdateOnRebootedNode() {
Assert.assertEquals(NodeState.REBOOTED, node.getState());
}
// Test unhealthy report on a decommissioning node will make it
// keep decommissioning.
@Test
public void testDecommissioningUnhealthy() {
RMNodeImpl node = getDecommissioningNode();
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
System.currentTimeMillis());
node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
new ArrayList<ContainerStatus>(), null, null));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
}
@Test
public void testReconnnectUpdate() {
final String nmVersion1 = "nm version 1";

View File

@ -43,8 +43,7 @@ public class TestNodesPage {
final int numberOfNodesPerRack = 8;
// The following is because of the way TestRMWebApp.mockRMContext creates
// nodes.
final int numberOfLostNodesPerRack = numberOfNodesPerRack
/ NodeState.values().length;
final int numberOfLostNodesPerRack = 1;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.