YARN-9608. DecommissioningNodesWatcher should get lists of running applications on node from RMNode. Contributed by Abhishek Modi.

This commit is contained in:
Zhankun Tang 2019-06-17 17:08:23 +08:00
parent ba681bb80e
commit 304a47e22c
2 changed files with 92 additions and 56 deletions

View File

@ -17,9 +17,11 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Timer; import java.util.Timer;
@ -36,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@ -58,13 +59,8 @@ import org.apache.hadoop.yarn.util.MonotonicClock;
* a DECOMMISSIONING node will be DECOMMISSIONED no later than * a DECOMMISSIONING node will be DECOMMISSIONED no later than
* DECOMMISSIONING_TIMEOUT regardless of running containers or applications. * DECOMMISSIONING_TIMEOUT regardless of running containers or applications.
* *
* To be efficient, DecommissioningNodesWatcher skip tracking application
* containers on a particular node before the node is in DECOMMISSIONING state.
* It only tracks containers once the node is in DECOMMISSIONING state.
* DecommissioningNodesWatcher basically is no cost when no node is * DecommissioningNodesWatcher basically is no cost when no node is
* DECOMMISSIONING. This sacrifices the possibility that the node once * DECOMMISSIONING.
* host containers of an application that is still running
* (the affected map tasks will be rescheduled).
*/ */
public class DecommissioningNodesWatcher { public class DecommissioningNodesWatcher {
private static final Logger LOG = private static final Logger LOG =
@ -88,8 +84,8 @@ public class DecommissioningNodesWatcher {
// number of running containers at the moment. // number of running containers at the moment.
private int numActiveContainers; private int numActiveContainers;
// All applications run on the node at or after decommissioningStartTime. // All applications run on the node.
private Set<ApplicationId> appIds; private List<ApplicationId> appIds;
// First moment the node is observed in DECOMMISSIONED state. // First moment the node is observed in DECOMMISSIONED state.
private long decommissionedTime; private long decommissionedTime;
@ -102,7 +98,7 @@ public class DecommissioningNodesWatcher {
public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) { public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) {
this.nodeId = nodeId; this.nodeId = nodeId;
this.appIds = new HashSet<ApplicationId>(); this.appIds = new ArrayList<>();
this.decommissioningStartTime = mclock.getTime(); this.decommissioningStartTime = mclock.getTime();
this.timeoutMs = 1000L * timeoutSec; this.timeoutMs = 1000L * timeoutSec;
} }
@ -164,9 +160,7 @@ public class DecommissioningNodesWatcher {
context.updateTimeout(rmNode.getDecommissioningTimeout()); context.updateTimeout(rmNode.getDecommissioningTimeout());
context.lastUpdateTime = now; context.lastUpdateTime = now;
if (remoteNodeStatus.getKeepAliveApplications() != null) { context.appIds = rmNode.getRunningApps();
context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications());
}
// Count number of active containers. // Count number of active containers.
int numActiveContainers = 0; int numActiveContainers = 0;
@ -176,14 +170,7 @@ public class DecommissioningNodesWatcher {
newState == ContainerState.NEW) { newState == ContainerState.NEW) {
numActiveContainers++; numActiveContainers++;
} }
context.numActiveContainers = numActiveContainers;
ApplicationId aid = cs.getContainerId()
.getApplicationAttemptId().getApplicationId();
if (!context.appIds.contains(aid)) {
context.appIds.add(aid);
} }
}
context.numActiveContainers = numActiveContainers; context.numActiveContainers = numActiveContainers;
// maintain lastContainerFinishTime. // maintain lastContainerFinishTime.
@ -254,7 +241,6 @@ public class DecommissioningNodesWatcher {
DecommissioningNodeStatus.TIMEOUT; DecommissioningNodeStatus.TIMEOUT;
} }
removeCompletedApps(context);
if (context.appIds.size() == 0) { if (context.appIds.size() == 0) {
return DecommissioningNodeStatus.READY; return DecommissioningNodeStatus.READY;
} else { } else {
@ -336,25 +322,6 @@ public class DecommissioningNodesWatcher {
return rmNode; return rmNode;
} }
private void removeCompletedApps(DecommissioningNodeContext context) {
Iterator<ApplicationId> it = context.appIds.iterator();
while (it.hasNext()) {
ApplicationId appId = it.next();
RMApp rmApp = rmContext.getRMApps().get(appId);
if (rmApp == null) {
LOG.debug("Consider non-existing app {} as completed", appId);
it.remove();
continue;
}
if (rmApp.getState() == RMAppState.FINISHED ||
rmApp.getState() == RMAppState.FAILED ||
rmApp.getState() == RMAppState.KILLED) {
LOG.debug("Remove {} app {}", rmApp.getState(), appId);
it.remove();
}
}
}
// Time in second to be decommissioned. // Time in second to be decommissioned.
private int getTimeoutInSec(DecommissioningNodeContext context) { private int getTimeoutInSec(DecommissioningNodeContext context) {
if (context.nodeState == NodeState.DECOMMISSIONED) { if (context.nodeState == NodeState.DECOMMISSIONED) {

View File

@ -19,11 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -35,7 +35,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -58,36 +59,104 @@ public class TestDecommissioningNodesWatcher {
new DecommissioningNodesWatcher(rm.getRMContext()); new DecommissioningNodesWatcher(rm.getRMContext());
MockNM nm1 = rm.registerNode("host1:1234", 10240); MockNM nm1 = rm.registerNode("host1:1234", 10240);
RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNodeImpl node1 =
(RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
NodeId id1 = nm1.getNodeId(); NodeId id1 = nm1.getNodeId();
rm.waitForState(id1, NodeState.RUNNING); rm.waitForState(id1, NodeState.RUNNING);
Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
RMApp app = rm.submitApp(2000); RMApp app = rm.submitApp(2000);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
NodeStatus nodeStatus = createNodeStatus(id1, app, 3);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
// Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
rm.sendNodeGracefulDecommission(nm1, rm.sendNodeGracefulDecommission(nm1,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
rm.waitForState(id1, NodeState.DECOMMISSIONING); rm.waitForState(id1, NodeState.DECOMMISSIONING);
// Update status with decreasing number of running containers until 0. // Update status with decreasing number of running containers until 0.
watcher.update(node1, createNodeStatus(id1, app, 12)); nodeStatus = createNodeStatus(id1, app, 3);
watcher.update(node1, createNodeStatus(id1, app, 11)); node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
watcher.update(node1, nodeStatus);
nodeStatus = createNodeStatus(id1, app, 2);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
watcher.update(node1, nodeStatus);
Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1)); Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
watcher.update(node1, createNodeStatus(id1, app, 1)); nodeStatus = createNodeStatus(id1, app, 1);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
watcher.update(node1, nodeStatus);
Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER, Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER,
watcher.checkDecommissioningStatus(id1)); watcher.checkDecommissioningStatus(id1));
watcher.update(node1, createNodeStatus(id1, app, 0)); nodeStatus = createNodeStatus(id1, app, 0);
watcher.update(node1, nodeStatus);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP, Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
watcher.checkDecommissioningStatus(id1)); watcher.checkDecommissioningStatus(id1));
// Set app to be FINISHED and verified DecommissioningNodeStatus is READY. // Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
watcher.update(node1, nodeStatus);
Assert.assertEquals(DecommissioningNodeStatus.READY,
watcher.checkDecommissioningStatus(id1));
}
@Test
public void testDecommissioningNodesWatcherWithPreviousRunningApps()
throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40");
rm = new MockRM(conf);
rm.start();
DecommissioningNodesWatcher watcher =
new DecommissioningNodesWatcher(rm.getRMContext());
MockNM nm1 = rm.registerNode("host1:1234", 10240);
RMNodeImpl node1 =
(RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
NodeId id1 = nm1.getNodeId();
rm.waitForState(id1, NodeState.RUNNING);
RMApp app = rm.submitApp(2000);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
NodeStatus nodeStatus = createNodeStatus(id1, app, 3);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
Assert.assertEquals(1, node1.getRunningApps().size());
// update node with 0 running containers
nodeStatus = createNodeStatus(id1, app, 0);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
Assert.assertEquals(1, node1.getRunningApps().size());
// Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. Right now
// there is no container running on the node.
rm.sendNodeGracefulDecommission(nm1,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
// we should still get WAIT_APP as container for a running app previously
// ran on this node.
watcher.update(node1, nodeStatus);
Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
watcher.checkDecommissioningStatus(id1));
// Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
Assert.assertEquals(0, node1.getRunningApps().size());
watcher.update(node1, nodeStatus);
Assert.assertEquals(DecommissioningNodeStatus.READY, Assert.assertEquals(DecommissioningNodeStatus.READY,
watcher.checkDecommissioningStatus(id1)); watcher.checkDecommissioningStatus(id1));
} }
@ -103,7 +172,7 @@ public class TestDecommissioningNodesWatcher {
NodeId nodeId, RMApp app, int numRunningContainers) { NodeId nodeId, RMApp app, int numRunningContainers) {
return NodeStatus.newInstance( return NodeStatus.newInstance(
nodeId, 0, getContainerStatuses(app, numRunningContainers), nodeId, 0, getContainerStatuses(app, numRunningContainers),
new ArrayList<ApplicationId>(), Collections.emptyList(),
NodeHealthStatus.newInstance( NodeHealthStatus.newInstance(
true, "", System.currentTimeMillis() - 1000), true, "", System.currentTimeMillis() - 1000),
null, null, null); null, null, null);
@ -113,8 +182,8 @@ public class TestDecommissioningNodesWatcher {
// where numRunningContainers are RUNNING. // where numRunningContainers are RUNNING.
private List<ContainerStatus> getContainerStatuses( private List<ContainerStatus> getContainerStatuses(
RMApp app, int numRunningContainers) { RMApp app, int numRunningContainers) {
// Total 12 containers // Total 3 containers
final int total = 12; final int total = 3;
numRunningContainers = Math.min(total, numRunningContainers); numRunningContainers = Math.min(total, numRunningContainers);
List<ContainerStatus> output = new ArrayList<ContainerStatus>(); List<ContainerStatus> output = new ArrayList<ContainerStatus>();
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
@ -122,8 +191,8 @@ public class TestDecommissioningNodesWatcher {
ContainerState.COMPLETE : ContainerState.RUNNING; ContainerState.COMPLETE : ContainerState.RUNNING;
output.add(ContainerStatus.newInstance( output.add(ContainerStatus.newInstance(
ContainerId.newContainerId( ContainerId.newContainerId(
ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1), ApplicationAttemptId.newInstance(app.getApplicationId(), 0), i),
cstate, "Dummy", 0)); cstate, "", 0));
} }
return output; return output;
} }