YARN-5566. Client-side NM graceful decom is not triggered when jobs finish. (Robert Kanter via kasha)
This commit is contained in:
parent
162ee0f0a4
commit
f9016dfec3
@ -1133,12 +1133,21 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
NodeState initialState = rmNode.getState();
|
||||
boolean isNodeDecommissioning =
|
||||
initialState.equals(NodeState.DECOMMISSIONING);
|
||||
if (isNodeDecommissioning) {
|
||||
List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
|
||||
if (rmNode.runningApplications.isEmpty() &&
|
||||
(keepAliveApps == null || keepAliveApps.isEmpty())) {
|
||||
RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
|
||||
return NodeState.DECOMMISSIONED;
|
||||
}
|
||||
}
|
||||
|
||||
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
|
||||
LOG.info("Node " + rmNode.nodeId +
|
||||
" reported UNHEALTHY with details: " +
|
||||
remoteNodeHealthStatus.getHealthReport());
|
||||
// if a node in decommissioning receives an unhealthy report,
|
||||
// it will keep decommissioning.
|
||||
// it will stay in decommissioning.
|
||||
if (isNodeDecommissioning) {
|
||||
return NodeState.DECOMMISSIONING;
|
||||
} else {
|
||||
@ -1146,24 +1155,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
return NodeState.UNHEALTHY;
|
||||
}
|
||||
}
|
||||
if (isNodeDecommissioning) {
|
||||
List<ApplicationId> runningApps = rmNode.getRunningApps();
|
||||
|
||||
List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
|
||||
|
||||
// no running (and keeping alive) app on this node, get it
|
||||
// decommissioned.
|
||||
// TODO may need to check no container is being scheduled on this node
|
||||
// as well.
|
||||
if ((runningApps == null || runningApps.size() == 0)
|
||||
&& (keepAliveApps == null || keepAliveApps.size() == 0)) {
|
||||
RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
|
||||
return NodeState.DECOMMISSIONED;
|
||||
}
|
||||
|
||||
// TODO (in YARN-3223) if node in decommissioning, get node resource
|
||||
// updated if container get finished (keep available resource to be 0)
|
||||
}
|
||||
|
||||
rmNode.handleContainerStatus(statusEvent.getContainers());
|
||||
rmNode.handleReportedIncreasedContainers(
|
||||
@ -1337,7 +1328,7 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
|
||||
+ " is the first container get launched for application "
|
||||
+ containerAppId);
|
||||
}
|
||||
runningApplications.add(containerAppId);
|
||||
handleRunningAppOnNode(this, context, containerAppId, nodeId);
|
||||
}
|
||||
|
||||
// Process running containers
|
||||
|
@ -47,6 +47,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
|
||||
.AllocationExpirationInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
@ -73,6 +74,7 @@
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
@ -290,16 +292,17 @@ public void testContainerUpdate() throws InterruptedException{
|
||||
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
|
||||
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
||||
node2.handle(new RMNodeStartedEvent(null, null, null));
|
||||
|
||||
|
||||
ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
|
||||
ApplicationId app1 = BuilderUtils.newApplicationId(1, 1);
|
||||
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(0, 0), 0), 0);
|
||||
BuilderUtils.newApplicationAttemptId(app0, 0), 0);
|
||||
ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(1, 1), 1), 1);
|
||||
BuilderUtils.newApplicationAttemptId(app1, 1), 1);
|
||||
ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(1, 1), 1), 2);
|
||||
BuilderUtils.newApplicationAttemptId(app1, 1), 2);
|
||||
rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class));
|
||||
rmContext.getRMApps().put(app1, Mockito.mock(RMApp.class));
|
||||
|
||||
RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
|
||||
RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
|
||||
@ -663,6 +666,7 @@ public void testUpdateHeartbeatResponseForAppLifeCycle() {
|
||||
NodeId nodeId = node.getNodeID();
|
||||
|
||||
ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
|
||||
rmContext.getRMApps().put(runningAppId, Mockito.mock(RMApp.class));
|
||||
// Create a running container
|
||||
ContainerId runningContainerId = BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
@ -930,16 +934,22 @@ public void testResourceUpdateOnRebootedNode() {
|
||||
}
|
||||
|
||||
// Test unhealthy report on a decommissioning node will make it
|
||||
// keep decommissioning.
|
||||
// keep decommissioning as long as there's a running or keep alive app.
|
||||
// Otherwise, it will go to decommissioned
|
||||
@Test
|
||||
public void testDecommissioningUnhealthy() {
|
||||
RMNodeImpl node = getDecommissioningNode();
|
||||
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
|
||||
System.currentTimeMillis());
|
||||
List<ApplicationId> keepAliveApps = new ArrayList<>();
|
||||
keepAliveApps.add(BuilderUtils.newApplicationId(1, 1));
|
||||
NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
|
||||
new ArrayList<ContainerStatus>(), null, status, null, null, null);
|
||||
null, keepAliveApps, status, null, null, null);
|
||||
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||
nodeStatus.setKeepAliveApplications(null);
|
||||
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -962,6 +972,7 @@ public void testContainerExpire() throws Exception {
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
rmContext.getRMApps().put(appId, Mockito.mock(RMApp.class));
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
|
||||
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
|
||||
AllocationExpirationInfo expirationInfo1 =
|
||||
|
@ -65,6 +65,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
@ -215,6 +216,117 @@ public void testDecommissionWithExcludeHosts() throws Exception {
|
||||
checkDecommissionedNMCount(rm, metricCount + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Graceful decommission node with no running application.
|
||||
*/
|
||||
@Test
|
||||
public void testGracefulDecommissionNoApp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
|
||||
.getAbsolutePath());
|
||||
|
||||
writeToHostsFile("");
|
||||
rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||
MockNM nm3 = rm.registerNode("host3:4433", 5120);
|
||||
|
||||
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
||||
NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
|
||||
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
|
||||
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
|
||||
rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
|
||||
|
||||
// Graceful decommission both host2 and host3.
|
||||
writeToHostsFile("host2", "host3");
|
||||
rm.getNodesListManager().refreshNodesGracefully(conf);
|
||||
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
|
||||
rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
|
||||
|
||||
nodeHeartbeat1 = nm1.nodeHeartbeat(true);
|
||||
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
|
||||
nodeHeartbeat2 = nm2.nodeHeartbeat(true);
|
||||
rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED);
|
||||
nodeHeartbeat3 = nm3.nodeHeartbeat(true);
|
||||
rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED);
|
||||
|
||||
checkDecommissionedNMCount(rm, metricCount + 2);
|
||||
|
||||
nodeHeartbeat1 = nm1.nodeHeartbeat(true);
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
|
||||
nodeHeartbeat2 = nm2.nodeHeartbeat(true);
|
||||
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction());
|
||||
nodeHeartbeat3 = nm3.nodeHeartbeat(true);
|
||||
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
|
||||
}
|
||||
|
||||
/**
|
||||
* Graceful decommission node with running application.
|
||||
*/
|
||||
@Test
|
||||
public void testGracefulDecommissionWithApp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
|
||||
.getAbsolutePath());
|
||||
|
||||
writeToHostsFile("");
|
||||
rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("host1:1234", 10240);
|
||||
MockNM nm2 = rm.registerNode("host2:5678", 20480);
|
||||
MockNM nm3 = rm.registerNode("host3:4433", 10240);
|
||||
NodeId id1 = nm1.getNodeId();
|
||||
NodeId id3 = nm3.getNodeId();
|
||||
rm.waitForState(id1, NodeState.RUNNING);
|
||||
rm.waitForState(id3, NodeState.RUNNING);
|
||||
|
||||
// Create an app and launch two containers on host1.
|
||||
RMApp app = rm.submitApp(2000);
|
||||
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
||||
ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
|
||||
nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
|
||||
nm3.nodeHeartbeat(true);
|
||||
|
||||
// Graceful decommission host1 and host3
|
||||
writeToHostsFile("host1", "host3");
|
||||
rm.getNodesListManager().refreshNodesGracefully(conf);
|
||||
rm.waitForState(id1, NodeState.DECOMMISSIONING);
|
||||
rm.waitForState(id3, NodeState.DECOMMISSIONING);
|
||||
|
||||
// host1 should be DECOMMISSIONING due to running containers.
|
||||
// host3 should become DECOMMISSIONED.
|
||||
nm1.nodeHeartbeat(true);
|
||||
rm.waitForState(id1, NodeState.DECOMMISSIONING);
|
||||
nm3.nodeHeartbeat(true);
|
||||
rm.waitForState(id3, NodeState.DECOMMISSIONED);
|
||||
nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
|
||||
|
||||
// Complete containers on host1.
|
||||
// Since the app is still RUNNING, expect NodeAction.NORMAL.
|
||||
NodeHeartbeatResponse nodeHeartbeat1 =
|
||||
nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
|
||||
Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction());
|
||||
|
||||
// Finish the app and verified DECOMMISSIONED.
|
||||
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
|
||||
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
|
||||
nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
|
||||
Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction());
|
||||
rm.waitForState(id1, NodeState.DECOMMISSIONED);
|
||||
nodeHeartbeat1 = nm1.nodeHeartbeat(true);
|
||||
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction());
|
||||
}
|
||||
|
||||
/**
|
||||
* Decommissioning using a post-configured include hosts file
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user