From 78ff0b720e0418785d53802a1b4e72085c1a3556 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Fri, 13 Jan 2012 21:15:22 +0000 Subject: [PATCH] MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs may subsequently report as running. (Contributed by Vinod Kumar Vavilapalli) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231297 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/yarn/util/BuilderUtils.java | 11 ++ .../container/ContainerImpl.java | 10 +- .../resourcemanager/ResourceManager.java | 10 +- .../ResourceTrackerService.java | 4 +- .../server/resourcemanager/rmnode/RMNode.java | 4 +- .../resourcemanager/rmnode/RMNodeImpl.java | 57 ++++--- .../scheduler/SchedulerApp.java | 19 ++- .../scheduler/capacity/CapacityScheduler.java | 6 +- .../scheduler/fifo/FifoScheduler.java | 9 +- .../yarn/server/resourcemanager/MockNM.java | 16 +- .../server/resourcemanager/MockNodes.java | 4 +- .../TestApplicationCleanup.java | 150 +++++++++++++++++- .../TestResourceTrackerService.java | 3 +- 14 files changed, 234 insertions(+), 72 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ea08bb16821..8e4ec55b584 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -481,6 +481,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3625. CapacityScheduler web-ui display of queue's used capacity is broken. (Jason Lowe via mahadev) + MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs + may subsequently report as running. (Vinod Kumar Vavilapalli via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 0650e206fe4..8edade260d6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -205,6 +205,17 @@ public static NodeId newNodeId(String host, int port) { return nodeId; } + public static ContainerStatus newContainerStatus(ContainerId containerId, + ContainerState containerState, String diagnostics, int exitStatus) { + ContainerStatus containerStatus = recordFactory + .newRecordInstance(ContainerStatus.class); + containerStatus.setState(containerState); + containerStatus.setContainerId(containerId); + containerStatus.setDiagnostics(diagnostics); + containerStatus.setExitStatus(exitStatus); + return containerStatus; + } + public static Container newContainer(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, ContainerToken containerToken) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c6ba07aa7da..2c2d2baaa43 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; public class ContainerImpl implements Container { @@ -370,13 +371,8 @@ public ContainerLaunchContext getLaunchContext() { public ContainerStatus cloneAndGetContainerStatus() { this.readLock.lock(); try { - ContainerStatus containerStatus = - recordFactory.newRecordInstance(ContainerStatus.class); - containerStatus.setState(getCurrentState()); - containerStatus.setContainerId(this.launchContext.getContainerId()); - containerStatus.setDiagnostics(diagnostics.toString()); - containerStatus.setExitStatus(exitCode); - return containerStatus; + return BuilderUtils.newContainerStatus(this.getContainerID(), + getCurrentState(), diagnostics.toString(), exitCode); } finally { this.readLock.unlock(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 1a69bbfc8b6..18bdf8dbeae 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -67,16 +67,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; +import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; +import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; +import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; -import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; -import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; -import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; -import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; /** * The ResourceManager is the main class that is a set of components. @@ -256,7 +256,7 @@ protected RMAppManager createRMAppManager() { } @Private - public static final class SchedulerEventDispatcher extends AbstractService + public static class SchedulerEventDispatcher extends AbstractService implements EventHandler { private final ResourceScheduler scheduler; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 58697486d55..ccebe3a8908 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -265,8 +265,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) HeartbeatResponse latestResponse = recordFactory .newRecordInstance(HeartbeatResponse.class); latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1); - latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp()); - latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup()); + latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp()); + latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup()); latestResponse.setNodeAction(NodeAction.NORMAL); // 4. Send status to RMNode, saving the latest response. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 494dffcf250..8dda6eda1de 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -101,9 +101,9 @@ public interface RMNode { public RMNodeState getState(); - public List pullContainersToCleanUp(); + public List getContainersToCleanUp(); - public List pullAppsToCleanup(); + public List getAppsToCleanup(); public HeartbeatResponse getLastHeartBeatResponse(); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 2cadd890712..dd3e25fe167 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -89,7 +89,6 @@ public class RMNodeImpl implements RMNode, EventHandler { /* set of containers that have just launched */ private final Map justLaunchedContainers = new HashMap(); - /* set of containers that need to be cleaned */ private final Set containersToClean = new TreeSet( @@ -248,54 +247,38 @@ public RMNodeState getState() { } @Override - public List pullAppsToCleanup() { - this.writeLock.lock(); - - try { - List lastfinishedApplications = new ArrayList(); - lastfinishedApplications.addAll(this.finishedApplications); - this.finishedApplications.clear(); - return lastfinishedApplications; - } finally { - this.writeLock.unlock(); - } - - } - - @Private - public List getContainersToCleanUp() { + public List getAppsToCleanup() { this.readLock.lock(); + try { - return new ArrayList(containersToClean); + return new ArrayList(this.finishedApplications); } finally { this.readLock.unlock(); } + } @Override - public List pullContainersToCleanUp() { + public List getContainersToCleanUp() { - this.writeLock.lock(); + this.readLock.lock(); try { - List containersToCleanUp = new ArrayList(); - containersToCleanUp.addAll(this.containersToClean); - this.containersToClean.clear(); - return containersToCleanUp; + return new ArrayList(this.containersToClean); } finally { - this.writeLock.unlock(); + this.readLock.unlock(); } }; @Override public HeartbeatResponse getLastHeartBeatResponse() { - this.writeLock.lock(); + this.readLock.lock(); try { return this.latestHeartBeatResponse; } finally { - this.writeLock.unlock(); + this.readLock.unlock(); } } @@ -407,14 +390,22 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { for (ContainerStatus remoteContainer : statusEvent.getContainers()) { ContainerId containerId = remoteContainer.getContainerId(); - // Don't bother with containers already scheduled for cleanup, - // the scheduler doens't need to know any more about this container + // Don't bother with containers already scheduled for cleanup, or for + // applications already killed. The scheduler doens't need to know any + // more about this container if (rmNode.containersToClean.contains(containerId)) { LOG.info("Container " + containerId + " already scheduled for " + "cleanup, no further processing"); continue; } - + if (rmNode.finishedApplications.contains(containerId + .getApplicationAttemptId().getApplicationId())) { + LOG.info("Container " + containerId + + " belongs to an application that is already killed," + + " no further processing"); + continue; + } + // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING) { if (!rmNode.justLaunchedContainers.containsKey(containerId)) { @@ -435,6 +426,12 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications( statusEvent.getKeepAliveAppIds()); + + // HeartBeat processing from our end is done, as node pulls the following + // lists before sending status-updates. Clear data-structures + rmNode.containersToClean.clear(); + rmNode.finishedApplications.clear(); + return RMNodeState.RUNNING; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java index e3798c0defc..94ddb2af8a7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java @@ -39,9 +39,9 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -61,6 +62,7 @@ * Each running Application in the RM corresponds to one instance * of this class. */ +@SuppressWarnings("unchecked") public class SchedulerApp { private static final Log LOG = LogFactory.getLog(SchedulerApp.class); @@ -174,13 +176,20 @@ public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { this.appSchedulingInfo.stop(rmAppAttemptFinalState); } - synchronized public void containerLaunchedOnNode(ContainerId containerId) { + public synchronized void containerLaunchedOnNode(ContainerId containerId, + NodeId nodeId) { // Inform the container RMContainer rmContainer = getRMContainer(containerId); - rmContainer.handle( - new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); + return; + } + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); } synchronized public void containerCompleted(RMContainer rmContainer, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index a542d0339cc..364494b76cb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -76,6 +77,7 @@ @LimitedPrivate("yarn") @Evolving +@SuppressWarnings("unchecked") public class CapacityScheduler implements ResourceScheduler, CapacitySchedulerContext { @@ -588,10 +590,12 @@ private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); return; } - application.containerLaunchedOnNode(containerId); + application.containerLaunchedOnNode(containerId, node.getNodeID()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 8e274956b57..0f6a8a84c8e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -87,6 +88,7 @@ @LimitedPrivate("yarn") @Evolving +@SuppressWarnings("unchecked") public class FifoScheduler implements ResourceScheduler { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); @@ -282,7 +284,6 @@ private SchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } - @SuppressWarnings("unchecked") private synchronized void addApplication(ApplicationAttemptId appAttemptId, String user) { // TODO: Fix store @@ -655,10 +656,14 @@ private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + " on node: " + node); + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; } - application.containerLaunchedOnNode(containerId); + application.containerLaunchedOnNode(containerId, node.getNodeID()); } @Lock(FifoScheduler.class) diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index d5e87f137c3..bd44f10b9ed 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -39,15 +39,17 @@ public class MockNM { private int responseId; private NodeId nodeId; - private final String nodeIdStr; private final int memory; private final ResourceTrackerService resourceTracker; private final int httpPort = 2; MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { - this.nodeIdStr = nodeIdStr; this.memory = memory; this.resourceTracker = resourceTracker; + String[] splits = nodeIdStr.split(":"); + nodeId = Records.newRecord(NodeId.class); + nodeId.setHost(splits[0]); + nodeId.setPort(Integer.parseInt(splits[1])); } public NodeId getNodeId() { @@ -63,14 +65,10 @@ public void containerStatus(Container container) throws Exception { new HashMap>(); conts.put(container.getId().getApplicationAttemptId().getApplicationId(), Arrays.asList(new ContainerStatus[] { container.getContainerStatus() })); - nodeHeartbeat(conts, true,nodeId); + nodeHeartbeat(conts, true); } public NodeId registerNode() throws Exception { - String[] splits = nodeIdStr.split(":"); - nodeId = Records.newRecord(NodeId.class); - nodeId.setHost(splits[0]); - nodeId.setPort(Integer.parseInt(splits[1])); RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); @@ -83,11 +81,11 @@ public NodeId registerNode() throws Exception { } public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception { - return nodeHeartbeat(new HashMap>(), b,nodeId); + return nodeHeartbeat(new HashMap>(), b); } public HeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, NodeId nodeId) throws Exception { + List> conts, boolean isHealthy) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setNodeId(nodeId); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index a7df0229f19..90b43504c1c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -152,13 +152,13 @@ public RMNodeState getState() { } @Override - public List pullAppsToCleanup() { + public List getAppsToCleanup() { // TODO Auto-generated method stub return null; } @Override - public List pullContainersToCleanUp() { + public List getContainersToCleanUp() { // TODO Auto-generated method stub return null; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 86bf29055bb..ae2c07f0484 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -19,26 +19,39 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Test; -import org.mortbay.log.Log; public class TestApplicationCleanup { + private static final Log LOG = LogFactory + .getLog(TestApplicationCleanup.class); + @Test public void testAppCleanup() throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -67,11 +80,13 @@ public void testAppCleanup() throws Exception { List conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); int contReceived = conts.size(); - while (contReceived < request) { + int waitCount = 0; + while (contReceived < request && waitCount++ < 20) { conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); contReceived += conts.size(); - Log.info("Got " + contReceived + " containers. Waiting to get " + request); + LOG.info("Got " + contReceived + " containers. Waiting to get " + + request); Thread.sleep(2000); } Assert.assertEquals(request, conts.size()); @@ -86,11 +101,12 @@ public void testAppCleanup() throws Exception { //currently only containers are cleaned via this //AM container is cleaned via container launcher - while (cleanedConts < 2 || cleanedApps < 1) { + waitCount = 0; + while ((cleanedConts < 3 || cleanedApps < 1) && waitCount++ < 20) { HeartbeatResponse resp = nm1.nodeHeartbeat(true); contsToClean = resp.getContainersToCleanupList(); apps = resp.getApplicationsToCleanupList(); - Log.info("Waiting to get cleanup events.. cleanedConts: " + LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts + " cleanedApps: " + cleanedApps); cleanedConts += contsToClean.size(); cleanedApps += apps.size(); @@ -99,6 +115,130 @@ public void testAppCleanup() throws Exception { Assert.assertEquals(1, apps.size()); Assert.assertEquals(app.getApplicationId(), apps.get(0)); + Assert.assertEquals(1, cleanedApps); + Assert.assertEquals(3, cleanedConts); + + rm.stop(); + } + + @Test + public void testContainerCleanup() throws Exception { + + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = new MockRM() { + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + + MockNM nm1 = rm.registerNode("h1:1234", 5000); + + RMApp app = rm.submitApp(2000); + + //kick the scheduling + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + //request for containers + int request = 2; + am.allocate("h1" , 1000, request, + new ArrayList()); + dispatcher.await(); + + //kick the scheduler + nm1.nodeHeartbeat(true); + List conts = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + int contReceived = conts.size(); + int waitCount = 0; + while (contReceived < request && waitCount++ < 20) { + conts = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + dispatcher.await(); + contReceived += conts.size(); + LOG.info("Got " + contReceived + " containers. Waiting to get " + + request); + Thread.sleep(2000); + } + Assert.assertEquals(request, conts.size()); + + // Release a container. + ArrayList release = new ArrayList(); + release.add(conts.get(1).getId()); + am.allocate(new ArrayList(), release); + dispatcher.await(); + + // Send one more heartbeat with a fake running container. This is to + // simulate the situation that can happen if the NM reports that container + // is running in the same heartbeat when the RM asks it to clean it up. + Map> containerStatuses = + new HashMap>(); + ArrayList containerStatusList = + new ArrayList(); + containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1) + .getId(), ContainerState.RUNNING, "nothing", 0)); + containerStatuses.put(app.getApplicationId(), containerStatusList); + + HeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); + dispatcher.await(); + List contsToClean = resp.getContainersToCleanupList(); + int cleanedConts = contsToClean.size(); + waitCount = 0; + while (cleanedConts < 1 && waitCount++ < 20) { + resp = nm1.nodeHeartbeat(true); + dispatcher.await(); + contsToClean = resp.getContainersToCleanupList(); + LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); + cleanedConts += contsToClean.size(); + Thread.sleep(1000); + } + LOG.info("Got cleanup for " + contsToClean.get(0)); + Assert.assertEquals(1, cleanedConts); + + // Now to test the case when RM already gave cleanup, and NM suddenly + // realizes that the container is running. + LOG.info("Testing container launch much after release and " + + "NM getting cleanup"); + containerStatuses.clear(); + containerStatusList.clear(); + containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1) + .getId(), ContainerState.RUNNING, "nothing", 0)); + containerStatuses.put(app.getApplicationId(), containerStatusList); + + resp = nm1.nodeHeartbeat(containerStatuses, true); + dispatcher.await(); + contsToClean = resp.getContainersToCleanupList(); + cleanedConts = contsToClean.size(); + // The cleanup list won't be instantaneous as it is given out by scheduler + // and not RMNodeImpl. + waitCount = 0; + while (cleanedConts < 1 && waitCount++ < 20) { + resp = nm1.nodeHeartbeat(true); + dispatcher.await(); + contsToClean = resp.getContainersToCleanupList(); + LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); + cleanedConts += contsToClean.size(); + Thread.sleep(1000); + } + LOG.info("Got cleanup for " + contsToClean.get(0)); + Assert.assertEquals(1, cleanedConts); rm.stop(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index d40b7abad8b..183396092bb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -164,8 +164,7 @@ public void testReboot() throws Exception { Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat( - new HashMap>(), true, - recordFactory.newRecordInstance(NodeId.class)); + new HashMap>(), true); Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction())); checkRebootedNMCount(rm, ++initialMetricCount); }