From 3e8dfd1299f2ec4e3f0cddca709162ef71237851 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 20 Feb 2015 15:08:48 +0000 Subject: [PATCH] YARN-3194. RM should handle NMContainerStatuses sent by NM while registering if NM is Reconnected node. Contributed by Rohith (cherry picked from commit a64dd3d24bfcb9af21eb63869924f6482b147fd3) --- hadoop-yarn-project/CHANGES.txt | 3 + .../ResourceTrackerService.java | 9 +- .../resourcemanager/rmnode/RMNodeImpl.java | 111 ++++++++++------ .../rmnode/RMNodeReconnectEvent.java | 9 +- .../TestApplicationCleanup.java | 121 ++++++++++++++++++ .../TestRMNodeTransitions.java | 4 +- 6 files changed, 209 insertions(+), 48 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 46fa7730b78..36deb68e114 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -590,6 +590,9 @@ Release 2.7.0 - UNRELEASED YARN-933. Fixed InvalidStateTransitonException at FINAL_SAVING state in RMApp. (Rohith Sharmaks via jianhe) + YARN-3194. RM should handle NMContainerStatuses sent by NM while + registering if NM is Reconnected node (Rohith via jlowe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 61a03499ccd..0de556bc1b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -312,9 +312,12 @@ public RegisterNodeManagerResponse registerNodeManager( } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeReconnectEvent(nodeId, rmNode, - request.getRunningApplications())); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeReconnectEvent(nodeId, rmNode, request + .getRunningApplications(), request.getNMContainerStatuses())); } // On every node manager register we will be clearing NMToken keys if // present for any running application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1bc98b28202..97017759942 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -601,6 +601,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.httpAddress = newNode.getHttpAddress(); rmNode.totalCapability = newNode.getTotalCapability(); + handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); + // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); } @@ -622,6 +624,26 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + + private void handleNMContainerStatus( + List nmContainerStatuses, RMNodeImpl rmnode) { + List containerStatuses = + new ArrayList(); + for (NMContainerStatus nmContainerStatus : nmContainerStatuses) { + containerStatuses.add(createContainerStatus(nmContainerStatus)); + } + rmnode.handleContainerStatus(containerStatuses); + } + + private ContainerStatus createContainerStatus( + NMContainerStatus remoteContainer) { + ContainerStatus cStatus = + ContainerStatus.newInstance(remoteContainer.getContainerId(), + remoteContainer.getContainerState(), + remoteContainer.getDiagnostics(), + remoteContainer.getContainerExitStatus()); + return cStatus; + } } public static class UpdateNodeResourceWhenRunningTransition @@ -747,49 +769,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return NodeState.UNHEALTHY; } - // Filter the map to only obtain just launched containers and finished - // containers. - List newlyLaunchedContainers = - new ArrayList(); - List completedContainers = - new ArrayList(); - for (ContainerStatus remoteContainer : statusEvent.getContainers()) { - ContainerId containerId = remoteContainer.getContainerId(); + rmNode.handleContainerStatus(statusEvent.getContainers()); - // Don't bother with containers already scheduled for cleanup, or for - // applications already killed. The scheduler doens't need to know any - // more about this container - if (rmNode.containersToClean.contains(containerId)) { - LOG.info("Container " + containerId + " already scheduled for " + - "cleanup, no further processing"); - continue; - } - if (rmNode.finishedApplications.contains(containerId - .getApplicationAttemptId().getApplicationId())) { - LOG.info("Container " + containerId - + " belongs to an application that is already killed," - + " no further processing"); - continue; - } - - // Process running containers - if (remoteContainer.getState() == ContainerState.RUNNING) { - if (!rmNode.launchedContainers.contains(containerId)) { - // Just launched container. RM knows about it the first time. - rmNode.launchedContainers.add(containerId); - newlyLaunchedContainers.add(remoteContainer); - } - } else { - // A finished container - rmNode.launchedContainers.remove(containerId); - completedContainers.add(remoteContainer); - } - } - if(newlyLaunchedContainers.size() != 0 - || completedContainers.size() != 0) { - rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo - (newlyLaunchedContainers, completedContainers)); - } if(rmNode.nextHeartBeat) { rmNode.nextHeartBeat = false; rmNode.context.getDispatcher().getEventHandler().handle( @@ -874,4 +855,50 @@ public Set getNodeLabels() { } return nlm.getLabelsOnNode(nodeId); } + + private void handleContainerStatus(List containerStatuses) { + // Filter the map to only obtain just launched containers and finished + // containers. + List newlyLaunchedContainers = + new ArrayList(); + List completedContainers = + new ArrayList(); + for (ContainerStatus remoteContainer : containerStatuses) { + ContainerId containerId = remoteContainer.getContainerId(); + + // Don't bother with containers already scheduled for cleanup, or for + // applications already killed. The scheduler doens't need to know any + // more about this container + if (containersToClean.contains(containerId)) { + LOG.info("Container " + containerId + " already scheduled for " + + "cleanup, no further processing"); + continue; + } + if (finishedApplications.contains(containerId.getApplicationAttemptId() + .getApplicationId())) { + LOG.info("Container " + containerId + + " belongs to an application that is already killed," + + " no further processing"); + continue; + } + + // Process running containers + if (remoteContainer.getState() == ContainerState.RUNNING) { + if (!launchedContainers.contains(containerId)) { + // Just launched container. RM knows about it the first time. + launchedContainers.add(containerId); + newlyLaunchedContainers.add(remoteContainer); + } + } else { + // A finished container + launchedContainers.remove(containerId); + completedContainers.add(remoteContainer); + } + } + if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { + nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, + completedContainers)); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java index ebbac9ab156..0bea44b2313 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java @@ -22,16 +22,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public class RMNodeReconnectEvent extends RMNodeEvent { private RMNode reconnectedNode; private List runningApplications; + private List containerStatuses; public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode, - List runningApps) { + List runningApps, List containerReports) { super(nodeId, RMNodeEventType.RECONNECTED); reconnectedNode = newNode; runningApplications = runningApps; + containerStatuses = containerReports; } public RMNode getReconnectedNode() { @@ -41,4 +44,8 @@ public RMNode getReconnectedNode() { public List getRunningApplications() { return runningApplications; } + + public List getNMContainerStatuses() { + return containerStatuses; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 891130f0f76..6e08aeb3745 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -28,7 +28,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.log4j.Level; @@ -478,6 +481,124 @@ public void testAppCleanupWhenNMReconnects() throws Exception { rm1.stop(); } + // The test verifies processing of NMContainerStatuses which are sent during + // NM registration. + // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM + // 2. AM sends ResourceRequest for 1 container with memory 2048MB. + // 3. Verify for number of container allocated by RM + // 4. Verify Memory Usage by cluster, it should be 3072. AM memory + requested + // memory. 1024 + 2048=3072 + // 5. Re-register NM by sending completed container status + // 6. Verify for Memory Used, it should be 1024 + // 7. Send AM heatbeat to RM. Allocated response should contain completed + // container. + @Test(timeout = 60000) + public void testProcessingNMContainerStatusesOnNMRestart() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + int nmMemory = 8192; + int amMemory = 1024; + int containerMemory = 2048; + MockNM nm1 = + new MockNM("127.0.0.1:1234", nmMemory, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app0 = rm1.submitApp(amMemory); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + // 2. AM sends ResourceRequest for 1 container with memory 2048MB. + int noOfContainers = 1; + List allocateContainers = + am0.allocateAndWaitForContainers(noOfContainers, containerMemory, nm1); + + // 3. Verify for number of container allocated by RM + Assert.assertEquals(noOfContainers, allocateContainers.size()); + Container container = allocateContainers.get(0); + + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), container.getId() + .getContainerId(), ContainerState.RUNNING); + + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + // 4. Verify Memory Usage by cluster, it should be 3072. AM memory + + // requested memory. 1024 + 2048=3072 + ResourceScheduler rs = rm1.getRMContext().getScheduler(); + int allocatedMB = rs.getRootQueueMetrics().getAllocatedMB(); + Assert.assertEquals(amMemory + containerMemory, allocatedMB); + + // 5. Re-register NM by sending completed container status + List nMContainerStatusForApp = + createNMContainerStatusForApp(am0); + nm1.registerNode(nMContainerStatusForApp, + Arrays.asList(app0.getApplicationId())); + + waitForClusterMemory(nm1, rs, amMemory); + + // 6. Verify for Memory Used, it should be 1024 + Assert.assertEquals(amMemory, rs.getRootQueueMetrics().getAllocatedMB()); + + // 7. Send AM heatbeat to RM. Allocated response should contain completed + // container + AllocateRequest req = + AllocateRequest.newInstance(0, 0F, new ArrayList(), + new ArrayList(), null); + AllocateResponse allocate = am0.allocate(req); + List completedContainersStatuses = + allocate.getCompletedContainersStatuses(); + Assert.assertEquals(noOfContainers, completedContainersStatuses.size()); + + // Application clean up should happen Cluster memory used is 0 + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + waitForClusterMemory(nm1, rs, 0); + + rm1.stop(); + } + + private void waitForClusterMemory(MockNM nm1, ResourceScheduler rs, + int clusterMemory) throws Exception, InterruptedException { + int counter = 0; + while (rs.getRootQueueMetrics().getAllocatedMB() != clusterMemory) { + nm1.nodeHeartbeat(true); + + Thread.sleep(100); + if (counter++ == 50) { + Assert.fail("Wait for cluster memory is timed out.Expected=" + + clusterMemory + " Actual=" + + rs.getRootQueueMetrics().getAllocatedMB()); + } + } + } + + public static List createNMContainerStatusForApp(MockAM am) { + List list = new ArrayList(); + NMContainerStatus amContainer = + createNMContainerStatus(am.getApplicationAttemptId(), 1, + ContainerState.RUNNING, 1024); + NMContainerStatus completedContainer = + createNMContainerStatus(am.getApplicationAttemptId(), 2, + ContainerState.COMPLETE, 2048); + list.add(amContainer); + list.add(completedContainer); + return list; + } + + public static NMContainerStatus createNMContainerStatus( + ApplicationAttemptId appAttemptId, int id, ContainerState containerState, + int memory) { + ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, containerState, + Resource.newInstance(memory, 1), "recover container", 0, + Priority.newInstance(0), 0); + return containerReport; + } + public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); t.testAppCleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index d877e25c2d6..c6da3fd1fe8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -540,7 +540,7 @@ public void testReconnect() { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null)); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null)); Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -614,7 +614,7 @@ public void testReconnnectUpdate() { Assert.assertEquals(nmVersion1, node.getNodeManagerVersion()); RMNodeImpl reconnectingNode = getRunningNode(nmVersion2); node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode, - null)); + null, null)); Assert.assertEquals(nmVersion2, node.getNodeManagerVersion()); } }