YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers when nodes resync during work-preserving RM restart. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1610557 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-07-14 23:32:03 +00:00
parent 97b5fbc281
commit c6cc6a6a8e
3 changed files with 30 additions and 7 deletions

View File

@ -53,6 +53,9 @@ Release 2.6.0 - UNRELEASED
YARN-2088. Fixed a bug in GetApplicationsRequestPBImpl#mergeLocalToBuilder.
(Binglin Chang via jianhe)
YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers
when nodes resync during work-preserving RM restart. (Jian He via vinodkv)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -20,9 +20,8 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -105,8 +104,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private String nodeManagerVersion;
/* set of containers that have just launched */
private final Map<ContainerId, ContainerStatus> justLaunchedContainers =
new HashMap<ContainerId, ContainerStatus>();
private final Set<ContainerId> launchedContainers =
new HashSet<ContainerId>();
/* set of containers that need to be cleaned */
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
@ -476,6 +475,13 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
}
}
if (null != startEvent.getRunningApplications()) {
@ -664,14 +670,14 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Process running containers
if (remoteContainer.getState() == ContainerState.RUNNING) {
if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
if (!rmNode.launchedContainers.contains(containerId)) {
// Just launched container. RM knows about it the first time.
rmNode.justLaunchedContainers.put(containerId, remoteContainer);
rmNode.launchedContainers.add(containerId);
newlyLaunchedContainers.add(remoteContainer);
}
} else {
// A finished container
rmNode.justLaunchedContainers.remove(containerId);
rmNode.launchedContainers.remove(containerId);
completedContainers.add(remoteContainer);
}
}
@ -748,4 +754,10 @@ public void setNextHeartBeat(boolean nextHeartBeat) {
public int getQueueSize() {
return nodeUpdateQueue.size();
}
// For test only.
@VisibleForTesting
public Set<ContainerId> getLaunchedContainers() {
return this.launchedContainers;
}
}

View File

@ -29,11 +29,13 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -44,6 +46,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -164,6 +167,11 @@ public void testSchedulerRecovery() throws Exception {
// Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
Set<ContainerId> launchedContainers =
((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
.getLaunchedContainers();
assertTrue(launchedContainers.contains(amContainer.getContainerId()));
assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
// check RMContainers are re-recreated and the container state is correct.
rm2.waitForState(nm1, amContainer.getContainerId(),