From f6663a11980fe6d55af9965b61860efad50cd6a7 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 25 Jul 2013 04:15:16 +0000 Subject: [PATCH] YARN-688. Fixed NodeManager to properly cleanup containers when it is shut down. Contributed by Jian He. svn merge --ignore-ancestry -c 1506814 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1506815 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 13 +++-- .../yarn/server/nodemanager/NodeManager.java | 14 +++++- .../nodemanager/NodeStatusUpdaterImpl.java | 3 +- .../nodemanager/TestNodeStatusUpdater.java | 48 ++++++++++++++----- .../ResourceTrackerService.java | 2 +- 5 files changed, 60 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index dc24c643235..29f83678064 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -729,6 +729,14 @@ Release 2.1.0-beta - 2013-07-02 YARN-875. Application can hang if AMRMClientAsync callback thread has exception (Xuan Gong via bikas) + YARN-461. Fair scheduler should not accept apps with empty string queue name. + (ywskycn via tucu) + + YARN-968. RM admin commands don't work. (vinodkv via kihwal) + + YARN-688. Fixed NodeManager to properly cleanup containers when it is shut + down. (Jian He via vinodkv) + BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS YARN-158. Yarn creating package-info.java must not depend on sh. @@ -794,11 +802,6 @@ Release 2.1.0-beta - 2013-07-02 YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan Liu via cnauroth) - YARN-461. Fair scheduler should not accept apps with empty string queue name. - (ywskycn via tucu) - - YARN-968. RM admin commands don't work. (vinodkv via kihwal) - Release 2.0.5-alpha - 06/06/2013 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 8ec3a5a70cf..d4776bce758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -229,6 +229,15 @@ public class NodeManager extends CompositeService return "NodeManager"; } + protected void shutDown() { + new Thread() { + @Override + public void run() { + NodeManager.this.stop(); + } + }.start(); + } + protected void resyncWithRM() { //we do not want to block dispatcher thread here new Thread() { @@ -265,6 +274,8 @@ public class NodeManager extends CompositeService while (!containers.isEmpty() && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { try { + //To remove done containers in NM context + nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); Thread.sleep(1000); } catch (InterruptedException ex) { LOG.warn("Interrupted while sleeping on container kill on shutdown", @@ -276,7 +287,6 @@ public class NodeManager extends CompositeService while (!containers.isEmpty()) { try { Thread.sleep(1000); - //to remove done containers from the map nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); } catch (InterruptedException ex) { LOG.warn("Interrupted while sleeping on container kill on resync", @@ -409,7 +419,7 @@ public class NodeManager extends CompositeService public void handle(NodeManagerEvent event) { switch (event.getType()) { case SHUTDOWN: - stop(); + shutDown(); break; case RESYNC: resyncWithRM(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b0e71e91563..8169677bd42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -385,7 +385,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } if (response.getNodeAction() == NodeAction.RESYNC) { LOG.warn("Node is out of sync with ResourceManager," - + " hence rebooting."); + + " hence resyncing."); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); // Invalidate the RMIdentifier while resync @@ -418,6 +418,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); throw new YarnRuntimeException(e); } catch (Throwable e) { + // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 73bea039a2f..1e7386a24eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -97,8 +99,12 @@ public class TestNodeStatusUpdater { } static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class); - static final Path basedir = - new Path("target", TestNodeStatusUpdater.class.getName()); + static final File basedir = + new File("target", TestNodeStatusUpdater.class.getName()); + static final File nmLocalDir = new File(basedir, "nm0"); + static final File tmpDir = new File(basedir, "tmpDir"); + static final File remoteLogsDir = new File(basedir, "remotelogs"); + static final File logsDir = new File(basedir, "logs"); private static final RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); @@ -110,9 +116,14 @@ public class TestNodeStatusUpdater { private NodeManager nm; private boolean containerStatusBackupSuccessfully = true; private List completedContainerStatusList = new ArrayList(); + private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); @Before public void setUp() { + nmLocalDir.mkdirs(); + tmpDir.mkdirs(); + logsDir.mkdirs(); + remoteLogsDir.mkdirs(); conf = createNMConfig(); } @@ -121,6 +132,7 @@ public class TestNodeStatusUpdater { this.registeredNodes.clear(); heartBeatID = 0; ServiceOperations.stop(nm); + assertionFailedInThread.set(false); DefaultMetricsSystem.shutdown(); } @@ -442,6 +454,13 @@ public class TestNodeStatusUpdater { protected void serviceStop() throws Exception { super.serviceStop(); isStopped = true; + ConcurrentMap containers = + getNMContext().getContainers(); + // ensure that containers are empty + if(!containers.isEmpty()) { + assertionFailedInThread.set(true); + } syncBarrier.await(10000, TimeUnit.MILLISECONDS); } } @@ -723,7 +742,7 @@ public class TestNodeStatusUpdater { @After public void deleteBaseDir() throws IOException { FileContext lfs = FileContext.getLocalFSFileContext(); - lfs.delete(basedir, true); + lfs.delete(new Path(basedir.getPath()), true); } @Test @@ -1095,7 +1114,7 @@ public class TestNodeStatusUpdater { @Test(timeout = 200000) public void testNodeStatusUpdaterRetryAndNMShutdown() - throws InterruptedException { + throws Exception { final long connectionWaitSecs = 1; final long connectionRetryIntervalSecs = 1; YarnConfiguration conf = createNMConfig(); @@ -1104,14 +1123,23 @@ public class TestNodeStatusUpdater { conf.setLong(YarnConfiguration .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, connectionRetryIntervalSecs); + conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); CyclicBarrier syncBarrier = new CyclicBarrier(2); nm = new MyNodeManager2(syncBarrier, conf); nm.init(conf); nm.start(); + // start a container + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + FileContext localFS = FileContext.getLocalFSFileContext(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, nmLocalDir, + new File("start_file.txt")); + try { syncBarrier.await(10000, TimeUnit.MILLISECONDS); } catch (Exception e) { } + Assert.assertFalse("Containers not cleaned up when NM stopped", + assertionFailedInThread.get()); Assert.assertTrue(((MyNodeManager2) nm).isStopped); Assert.assertTrue("calculate heartBeatCount based on" + " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2); @@ -1229,15 +1257,13 @@ public class TestNodeStatusUpdater { private YarnConfiguration createNMConfig() { YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB + conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345"); conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346"); - conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri() - .getPath()); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir, - "remotelogs").toUri().getPath()); - conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0") - .toUri().getPath()); + conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + remoteLogsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); return conf; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index adc5984efa4..aa8f120a9cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -253,7 +253,7 @@ public class ResourceTrackerService extends AbstractService implements RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); if (rmNode == null) { /* node does not exist */ - String message = "Node not found rebooting " + remoteNodeStatus.getNodeId(); + String message = "Node not found resyncing " + remoteNodeStatus.getNodeId(); LOG.info(message); resync.setDiagnosticsMessage(message); return resync;