YARN-5566. Client-side NM graceful decom is not triggered when jobs finish. (Robert Kanter via kasha)
(cherry picked from commit 74f4bae455
)
This commit is contained in:
parent
b1466b8124
commit
736dd1646d
|
@ -1170,12 +1170,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
NodeState initialState = rmNode.getState();
|
NodeState initialState = rmNode.getState();
|
||||||
boolean isNodeDecommissioning =
|
boolean isNodeDecommissioning =
|
||||||
initialState.equals(NodeState.DECOMMISSIONING);
|
initialState.equals(NodeState.DECOMMISSIONING);
|
||||||
|
if (isNodeDecommissioning) {
|
||||||
|
List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
|
||||||
|
if (rmNode.runningApplications.isEmpty() &&
|
||||||
|
(keepAliveApps == null || keepAliveApps.isEmpty())) {
|
||||||
|
RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
|
||||||
|
return NodeState.DECOMMISSIONED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
|
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
|
||||||
LOG.info("Node " + rmNode.nodeId +
|
LOG.info("Node " + rmNode.nodeId +
|
||||||
" reported UNHEALTHY with details: " +
|
" reported UNHEALTHY with details: " +
|
||||||
remoteNodeHealthStatus.getHealthReport());
|
remoteNodeHealthStatus.getHealthReport());
|
||||||
// if a node in decommissioning receives an unhealthy report,
|
// if a node in decommissioning receives an unhealthy report,
|
||||||
// it will keep decommissioning.
|
// it will stay in decommissioning.
|
||||||
if (isNodeDecommissioning) {
|
if (isNodeDecommissioning) {
|
||||||
return NodeState.DECOMMISSIONING;
|
return NodeState.DECOMMISSIONING;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1349,7 +1358,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
+ " is the first container get launched for application "
|
+ " is the first container get launched for application "
|
||||||
+ containerAppId);
|
+ containerAppId);
|
||||||
}
|
}
|
||||||
runningApplications.add(containerAppId);
|
handleRunningAppOnNode(this, context, containerAppId, nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process running containers
|
// Process running containers
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
|
||||||
.AllocationExpirationInfo;
|
.AllocationExpirationInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
|
@ -73,6 +74,7 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -279,16 +281,17 @@ public class TestRMNodeTransitions {
|
||||||
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
|
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
|
||||||
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
|
||||||
node2.handle(new RMNodeStartedEvent(null, null, null));
|
node2.handle(new RMNodeStartedEvent(null, null, null));
|
||||||
|
|
||||||
|
ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
|
||||||
|
ApplicationId app1 = BuilderUtils.newApplicationId(1, 1);
|
||||||
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
|
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
|
||||||
BuilderUtils.newApplicationAttemptId(
|
BuilderUtils.newApplicationAttemptId(app0, 0), 0);
|
||||||
BuilderUtils.newApplicationId(0, 0), 0), 0);
|
|
||||||
ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
|
ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
|
||||||
BuilderUtils.newApplicationAttemptId(
|
BuilderUtils.newApplicationAttemptId(app1, 1), 1);
|
||||||
BuilderUtils.newApplicationId(1, 1), 1), 1);
|
|
||||||
ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
|
ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
|
||||||
BuilderUtils.newApplicationAttemptId(
|
BuilderUtils.newApplicationAttemptId(app1, 1), 2);
|
||||||
BuilderUtils.newApplicationId(1, 1), 1), 2);
|
rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class));
|
||||||
|
rmContext.getRMApps().put(app1, Mockito.mock(RMApp.class));
|
||||||
|
|
||||||
RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
|
RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
|
||||||
RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
|
RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
|
||||||
|
@ -652,6 +655,7 @@ public class TestRMNodeTransitions {
|
||||||
NodeId nodeId = node.getNodeID();
|
NodeId nodeId = node.getNodeID();
|
||||||
|
|
||||||
ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
|
ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
|
||||||
|
rmContext.getRMApps().put(runningAppId, Mockito.mock(RMApp.class));
|
||||||
// Create a running container
|
// Create a running container
|
||||||
ContainerId runningContainerId = BuilderUtils.newContainerId(
|
ContainerId runningContainerId = BuilderUtils.newContainerId(
|
||||||
BuilderUtils.newApplicationAttemptId(
|
BuilderUtils.newApplicationAttemptId(
|
||||||
|
@ -919,16 +923,22 @@ public class TestRMNodeTransitions {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test unhealthy report on a decommissioning node will make it
|
// Test unhealthy report on a decommissioning node will make it
|
||||||
// keep decommissioning.
|
// keep decommissioning as long as there's a running or keep alive app.
|
||||||
|
// Otherwise, it will go to decommissioned
|
||||||
@Test
|
@Test
|
||||||
public void testDecommissioningUnhealthy() {
|
public void testDecommissioningUnhealthy() {
|
||||||
RMNodeImpl node = getDecommissioningNode();
|
RMNodeImpl node = getDecommissioningNode();
|
||||||
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
|
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
|
||||||
System.currentTimeMillis());
|
System.currentTimeMillis());
|
||||||
|
List<ApplicationId> keepAliveApps = new ArrayList<>();
|
||||||
|
keepAliveApps.add(BuilderUtils.newApplicationId(1, 1));
|
||||||
NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
|
NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
|
||||||
new ArrayList<ContainerStatus>(), null, status, null, null, null);
|
null, keepAliveApps, status, null, null, null);
|
||||||
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
|
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
|
||||||
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||||
|
nodeStatus.setKeepAliveApplications(null);
|
||||||
|
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
|
||||||
|
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -951,6 +961,7 @@ public class TestRMNodeTransitions {
|
||||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
ApplicationAttemptId.newInstance(appId, 1);
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
rmContext.getRMApps().put(appId, Mockito.mock(RMApp.class));
|
||||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
|
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
|
||||||
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
|
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
|
||||||
AllocationExpirationInfo expirationInfo1 =
|
AllocationExpirationInfo expirationInfo1 =
|
||||||
|
|
|
@ -294,6 +294,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
RMApp app = rm.submitApp(2000);
|
RMApp app = rm.submitApp(2000);
|
||||||
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
|
||||||
ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
|
ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
|
||||||
|
nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
|
||||||
|
nm3.nodeHeartbeat(true);
|
||||||
|
|
||||||
// Graceful decommission host1 and host3
|
// Graceful decommission host1 and host3
|
||||||
writeToHostsFile("host1", "host3");
|
writeToHostsFile("host1", "host3");
|
||||||
|
@ -303,7 +305,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
|
|
||||||
// host1 should be DECOMMISSIONING due to running containers.
|
// host1 should be DECOMMISSIONING due to running containers.
|
||||||
// host3 should become DECOMMISSIONED.
|
// host3 should become DECOMMISSIONED.
|
||||||
nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
|
nm1.nodeHeartbeat(true);
|
||||||
nm3.nodeHeartbeat(true);
|
nm3.nodeHeartbeat(true);
|
||||||
rm.waitForState(id1, NodeState.DECOMMISSIONING);
|
rm.waitForState(id1, NodeState.DECOMMISSIONING);
|
||||||
rm.waitForState(id3, NodeState.DECOMMISSIONED);
|
rm.waitForState(id3, NodeState.DECOMMISSIONED);
|
||||||
|
|
Loading…
Reference in New Issue