From c6cc6a6a8e9e086512da58ef80c28e5ad2c96c48 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Mon, 14 Jul 2014 23:32:03 +0000 Subject: [PATCH] YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers when nodes resync during work-preserving RM restart. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1610557 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../resourcemanager/rmnode/RMNodeImpl.java | 26 ++++++++++++++----- .../TestWorkPreservingRMRestart.java | 8 ++++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bfe257558f0..6d26720fab6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -53,6 +53,9 @@ Release 2.6.0 - UNRELEASED YARN-2088. Fixed a bug in GetApplicationsRequestPBImpl#mergeLocalToBuilder. (Binglin Chang via jianhe) + YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers + when nodes resync during work-preserving RM restart. (Jian He via vinodkv) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index acee7d77b3b..e20adc5c4d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -20,9 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.ArrayList; import java.util.EnumSet; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -105,8 +104,8 @@ public class RMNodeImpl implements RMNode, EventHandler { private String nodeManagerVersion; /* set of containers that have just launched */ - private final Map justLaunchedContainers = - new HashMap(); + private final Set launchedContainers = + new HashSet(); /* set of containers that need to be cleaned */ private final Set containersToClean = new TreeSet( @@ -476,6 +475,13 @@ public class RMNodeImpl implements RMNode, EventHandler { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); + if (containers != null && !containers.isEmpty()) { + for (NMContainerStatus container : containers) { + if (container.getContainerState() == ContainerState.RUNNING) { + rmNode.launchedContainers.add(container.getContainerId()); + } + } + } } if (null != startEvent.getRunningApplications()) { @@ -664,14 +670,14 @@ public class RMNodeImpl implements RMNode, EventHandler { // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING) { - if (!rmNode.justLaunchedContainers.containsKey(containerId)) { + if (!rmNode.launchedContainers.contains(containerId)) { // Just launched container. RM knows about it the first time. - rmNode.justLaunchedContainers.put(containerId, remoteContainer); + rmNode.launchedContainers.add(containerId); newlyLaunchedContainers.add(remoteContainer); } } else { // A finished container - rmNode.justLaunchedContainers.remove(containerId); + rmNode.launchedContainers.remove(containerId); completedContainers.add(remoteContainer); } } @@ -748,4 +754,10 @@ public class RMNodeImpl implements RMNode, EventHandler { public int getQueueSize() { return nodeUpdateQueue.size(); } + + // For test only. + @VisibleForTesting + public Set getLaunchedContainers() { + return this.launchedContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index fb5c3a35ae3..59b11ef8228 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -29,11 +29,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -164,6 +167,11 @@ public class TestWorkPreservingRMRestart { // Wait for RM to settle down on recovering containers; waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); + Set launchedContainers = + ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId())) + .getLaunchedContainers(); + assertTrue(launchedContainers.contains(amContainer.getContainerId())); + assertTrue(launchedContainers.contains(runningContainer.getContainerId())); // check RMContainers are re-recreated and the container state is correct. rm2.waitForState(nm1, amContainer.getContainerId(),