From 5b7889f9a7dea81bbb3c8be6944419b59ca4bed1 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 2 Aug 2013 06:54:22 +0000 Subject: [PATCH] YARN-903. Changed ContainerManager to suppress unnecessary warnings when stopping already stopped containers. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1509560 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/server/nodemanager/NodeManager.java | 7 ++ .../server/nodemanager/NodeStatusUpdater.java | 5 ++ .../nodemanager/NodeStatusUpdaterImpl.java | 78 ++++++++++++++++++- .../ContainerManagerImpl.java | 43 ++++++---- .../nodemanager/TestNodeStatusUpdater.java | 36 ++++++++- .../server/TestContainerManagerSecurity.java | 65 +++++++++++++++- 7 files changed, 214 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8cab948aac7..819846a2bdf 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -54,6 +54,9 @@ Release 2.1.1-beta - UNRELEASED YARN-573. Shared data structures in Public Localizer and Private Localizer are not Thread safe. (Omkar Vinit Joshi via jlowe) + YARN-903. Changed ContainerManager to suppress unnecessary warnings when + stopping already stopped containers. (Omkar Vinit Joshi via vinodkv) + Release 2.1.0-beta - 2013-08-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index d4776bce758..5b178df2f75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.SecurityUtil; @@ -456,4 +457,10 @@ public class NodeManager extends CompositeService Configuration conf = new YarnConfiguration(); nodeManager.initAndStartNodeManager(conf, false); } + + @VisibleForTesting + @Private + public NodeStatusUpdater getNodeStatusUpdater() { + return nodeStatusUpdater; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index a65c2d51bab..6ac71b4bd52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.records.NodeStatus; public interface NodeStatusUpdater extends Service { @@ -28,4 +29,8 @@ public interface NodeStatusUpdater extends Service { NodeStatus getNodeStatusAndUpdateContainersInContext(); long getRMIdentifier(); + + public boolean isContainerRecentlyStopped(ContainerId containerId); + + public void clearFinishedContainersFromCache(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8169677bd42..05d9feaefaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -68,6 +69,9 @@ import com.google.common.annotations.VisibleForTesting; public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { + public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS = + YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers"; + private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); private final Object heartbeatMonitor = new Object(); @@ -88,6 +92,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private Map appTokenKeepAliveMap = new HashMap(); private Random keepAliveDelayRandom = new Random(); + // It will be used to track recently stopped containers on node manager. + private final Map recentlyStoppedContainers; + // Duration for which to track recently stopped container. + private long durationToTrackStoppedContainers; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -103,6 +111,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; + this.recentlyStoppedContainers = + new LinkedHashMap(); } @Override @@ -129,11 +139,27 @@ public class NodeStatusUpdaterImpl extends AbstractService implements conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + // Default duration to track stopped containers on nodemanager is 10Min. + // This should not be assigned very large value as it will remember all the + // containers stopped during that time. + durationToTrackStoppedContainers = + conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + 600000); + if (durationToTrackStoppedContainers < 0) { + String message = "Invalid configuration for " + + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default " + + "value is 10Min(600000)."; + LOG.error(message); + throw new YarnException(message); + } + if (LOG.isDebugEnabled()) { + LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :" + + durationToTrackStoppedContainers); + } + super.serviceInit(conf); LOG.info("Initialized nodemanager for " + nodeId + ":" + " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb + " virtual-cores=" + virtualCores); - - super.serviceInit(conf); } @Override @@ -290,7 +316,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (containerStatus.getState() == ContainerState.COMPLETE) { // Remove i.remove(); - + // Adding to finished containers cache. Cache will keep it around at + // least for #durationToTrackStoppedContainers duration. In the + // subsequent call to stop container it will get removed from cache. + addStoppedContainersToCache(containerId); + LOG.info("Removed completed container " + containerId); } } @@ -340,6 +370,46 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + public boolean isContainerRecentlyStopped(ContainerId containerId) { + synchronized (recentlyStoppedContainers) { + return recentlyStoppedContainers.containsKey(containerId); + } + } + + @Private + @VisibleForTesting + public void addStoppedContainersToCache(ContainerId containerId) { + synchronized (recentlyStoppedContainers) { + removeVeryOldStoppedContainersFromCache(); + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); + } + } + + @Override + public void clearFinishedContainersFromCache() { + synchronized (recentlyStoppedContainers) { + recentlyStoppedContainers.clear(); + } + } + + @Private + @VisibleForTesting + public void removeVeryOldStoppedContainersFromCache() { + synchronized (recentlyStoppedContainers) { + long currentTime = System.currentTimeMillis(); + Iterator i = + recentlyStoppedContainers.keySet().iterator(); + while (i.hasNext()) { + if (recentlyStoppedContainers.get(i.next()) < currentTime) { + i.remove(); + } else { + break; + } + } + } + } + @Override public long getRMIdentifier() { return this.rmIdentifier; @@ -455,4 +525,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new Thread(statusUpdaterRunnable, "Node Status Updater"); statusUpdater.start(); } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 26998c938ca..f8a5ea28f24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -581,17 +582,24 @@ public class ContainerManagerImpl extends CompositeService implements authorizeGetAndStopContainerRequest(containerID, container, true, nmTokenIdentifier); - dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - "Container killed by the ApplicationMaster.")); + if (container == null) { + if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } + } else { + dispatcher.getEventHandler().handle( + new ContainerKillEvent(containerID, + "Container killed by the ApplicationMaster.")); - NMAuditLogger.logSuccess(container.getUser(), - AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID - .getApplicationAttemptId().getApplicationId(), containerID); + NMAuditLogger.logSuccess(container.getUser(), + AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID + .getApplicationAttemptId().getApplicationId(), containerID); - // TODO: Move this code to appropriate place once kill_container is - // implemented. - nodeStatusUpdater.sendOutofBandHeartBeat(); + // TODO: Move this code to appropriate place once kill_container is + // implemented. + nodeStatusUpdater.sendOutofBandHeartBeat(); + } } /** @@ -627,6 +635,15 @@ public class ContainerManagerImpl extends CompositeService implements authorizeGetAndStopContainerRequest(containerID, container, false, nmTokenIdentifier); + if (container == null) { + if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " was recently stopped on node manager."); + } else { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } + } ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); LOG.info("Returning " + containerStatus); return containerStatus; @@ -658,17 +675,11 @@ public class ContainerManagerImpl extends CompositeService implements container.getContainerId()); } else { LOG.warn(identifier.getApplicationAttemptId() - + " attempted to get get status for non-application container : " + + " attempted to get status for non-application container : " + container.getContainerId().toString()); } - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " is not started by this application attempt."); } - if (container == null) { - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " is not handled by this NodeManager"); - } } class ContainerEventDispatcher implements EventHandler { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 1e7386a24eb..78ab13ea835 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -426,7 +426,7 @@ public class TestNodeStatusUpdater { return this.nodeStatusUpdater; } - protected MyNodeStatusUpdater3 getNodeStatusUpdater() { + public MyNodeStatusUpdater3 getNodeStatusUpdater() { return this.nodeStatusUpdater; } } @@ -745,6 +745,40 @@ public class TestNodeStatusUpdater { lfs.delete(new Path(basedir.getPath()), true); } + @Test(timeout = 90000) + public void testRecentlyFinishedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( + NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "10000"); + nm.init(conf); + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + + + nodeStatusUpdater.addStoppedContainersToCache(cId); + Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); + + long time1 = System.currentTimeMillis(); + int waitInterval = 15; + while (waitInterval-- > 0 + && nodeStatusUpdater.isContainerRecentlyStopped(cId)) { + nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); + Thread.sleep(1000); + } + long time2 = System.currentTimeMillis(); + // By this time the container will be removed from cache. need to verify. + Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); + Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000); + } + + + @Test public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 1657f1655d5..0a62f36ef7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -260,6 +262,11 @@ public class TestContainerManagerSecurity { Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken, validNMToken, true).contains(sb.toString())); + // Container is removed from node manager's memory by this time. + // trying to stop the container. It should not throw any exception. + testStopContainer(rpc, validAppAttemptId, validNode, validContainerId, + validNMToken, false); + // Rolling over master key twice so that we can check whether older keys // are used for authentication. rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); @@ -267,13 +274,25 @@ public class TestContainerManagerSecurity { rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); // trying get container status. Now saved nmToken should be used for - // authentication. + // authentication... It should complain saying container was recently + // stopped. + sb = new StringBuilder("Container "); + sb.append(validContainerId); + sb.append(" was recently stopped on node manager"); + Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode, + validContainerId, validNMToken, true).contains(sb.toString())); + + // Now lets remove the container from nm-memory + nm.getNodeStatusUpdater().clearFinishedContainersFromCache(); + + // This should fail as container is removed from recently tracked finished + // containers. sb = new StringBuilder("Container "); sb.append(validContainerId.toString()); sb.append(" is not handled by this NodeManager"); Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode, validContainerId, validNMToken, false).contains(sb.toString())); - + } private void waitForContainerToFinishOnNM(ContainerId containerId) { @@ -315,6 +334,23 @@ public class TestContainerManagerSecurity { Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId() == nmTokenSecretManagerRM.getCurrentKey().getKeyId())); } + + private String testStopContainer(YarnRPC rpc, + ApplicationAttemptId appAttemptId, NodeId nodeId, + ContainerId containerId, Token nmToken, boolean isExceptionExpected) { + try { + stopContainer(rpc, nmToken, + Arrays.asList(new ContainerId[] { containerId }), appAttemptId, + nodeId); + if (isExceptionExpected) { + fail("Exception was expected!!"); + } + return ""; + } catch (Exception e) { + e.printStackTrace(); + return e.getMessage(); + } + } private String testGetContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, @@ -334,7 +370,7 @@ public class TestContainerManagerSecurity { } } - protected String testStartContainer(YarnRPC rpc, + private String testStartContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, org.apache.hadoop.yarn.api.records.Token containerToken, org.apache.hadoop.yarn.api.records.Token nmToken, @@ -352,6 +388,29 @@ public class TestContainerManagerSecurity { } } + private void stopContainer(YarnRPC rpc, Token nmToken, + List containerId, ApplicationAttemptId appAttemptId, + NodeId nodeId) throws Exception { + StopContainersRequest request = + StopContainersRequest.newInstance(containerId); + ContainerManagementProtocol proxy = null; + try { + proxy = + getContainerManagementProtocolProxy(rpc, nmToken, nodeId, + appAttemptId.toString()); + StopContainersResponse response = proxy.stopContainers(request); + if (response.getFailedRequests() != null && + response.getFailedRequests().containsKey(containerId)) { + parseAndThrowException(response.getFailedRequests().get(containerId) + .deSerialize()); + } + } catch (Exception e) { + if (proxy != null) { + rpc.stopProxy(proxy, conf); + } + } + } + private void getContainerStatus(YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken,