diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7b52256871c..f0317eda28b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -54,6 +54,9 @@ Release 2.5.0 - UNRELEASED YARN-1713. Added get-new-app and submit-app functionality to RM web services. (Varun Vasudev via vinodkv) + YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving + restart is enabled. (Anubhav Dhoot via jianhe) + IMPROVEMENTS YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 2292a0dc9de..1109b087c17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -84,7 +84,8 @@ public class NodeManager extends CompositeService private NMStateStoreService nmStore = null; private AtomicBoolean isStopping = new AtomicBoolean(false); - + private boolean rmWorkPreservingRestartEnabled; + public NodeManager() { super(NodeManager.class.getName()); } @@ -173,6 +174,10 @@ public class NodeManager extends CompositeService conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration + .RM_WORK_PRESERVING_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); + initAndStartRecoveryStore(conf); NMContainerTokenSecretManager containerTokenSecretManager = @@ -276,8 +281,12 @@ public class NodeManager extends CompositeService try { LOG.info("Notifying ContainerManager to block new container-requests"); containerManager.setBlockNewContainerRequests(true); - LOG.info("Cleaning up running containers on resync"); - containerManager.cleanupContainersOnNMResync(); + if (!rmWorkPreservingRestartEnabled) { + LOG.info("Cleaning up running containers on resync"); + containerManager.cleanupContainersOnNMResync(); + } else { + LOG.info("Preserving containers on resync"); + } ((NodeStatusUpdaterImpl) nodeStatusUpdater) .rebootNodeStatusUpdaterAndRegisterWithRM(); } catch (YarnRuntimeException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index ea8f7343ec0..bd531865815 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; @@ -85,6 +84,9 @@ public class TestNodeManagerResync { private CyclicBarrier syncBarrier; private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false); + private final NodeManagerEvent resyncEvent = + new NodeManagerEvent(NodeManagerEventType.RESYNC); + @Before public void setup() throws UnsupportedFileSystemException { @@ -102,34 +104,56 @@ public class TestNodeManagerResync { assertionFailedInThread.set(false); } - @SuppressWarnings("unchecked") @Test public void testKillContainersOnResync() throws IOException, InterruptedException, YarnException { - NodeManager nm = new TestNodeManager1(); + TestNodeManager1 nm = new TestNodeManager1(false); + + testContainerPreservationOnResyncImpl(nm, false); + } + + @Test + public void testPreserveContainersOnResyncKeepingContainers() throws + IOException, + InterruptedException, YarnException { + TestNodeManager1 nm = new TestNodeManager1(true); + + testContainerPreservationOnResyncImpl(nm, true); + } + + @SuppressWarnings("unchecked") + protected void testContainerPreservationOnResyncImpl(TestNodeManager1 nm, + boolean isWorkPreservingRestartEnabled) + throws IOException, YarnException, InterruptedException { YarnConfiguration conf = createNMConfig(); - nm.init(conf); - nm.start(); - ContainerId cId = TestNodeManagerShutdown.createContainerId(); - TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, - processStartFile); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + isWorkPreservingRestartEnabled); - Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount()); - nm.getNMDispatcher().getEventHandler(). - handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); try { - syncBarrier.await(); - } catch (BrokenBarrierException e) { - } - Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount()); - // Only containers should be killed on resync, apps should lie around. That - // way local resources for apps can be used beyond resync without - // relocalization - Assert.assertTrue(nm.getNMContext().getApplications() - .containsKey(cId.getApplicationAttemptId().getApplicationId())); - Assert.assertFalse(assertionFailedInThread.get()); + nm.init(conf); + nm.start(); + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); - nm.stop(); + nm.setExistingContainerId(cId); + Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount()); + nm.getNMDispatcher().getEventHandler().handle(resyncEvent); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount()); + // Only containers should be killed on resync, apps should lie around. + // That way local resources for apps can be used beyond resync without + // relocalization + Assert.assertTrue(nm.getNMContext().getApplications() + .containsKey(cId.getApplicationAttemptId().getApplicationId())); + Assert.assertFalse(assertionFailedInThread.get()); + } + finally { + nm.stop(); + } } // This test tests new container requests are blocked when NM starts from @@ -157,7 +181,7 @@ public class TestNodeManagerResync { Assert.assertFalse(assertionFailedInThread.get()); nm.stop(); } - + @SuppressWarnings("unchecked") @Test(timeout=10000) public void testNMshutdownWhenResyncThrowException() throws IOException, @@ -169,7 +193,7 @@ public class TestNodeManagerResync { Assert.assertEquals(1, ((TestNodeManager3) nm).getNMRegistrationCount()); nm.getNMDispatcher().getEventHandler() .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC)); - + synchronized (isNMShutdownCalled) { while (isNMShutdownCalled.get() == false) { try { @@ -178,7 +202,7 @@ public class TestNodeManagerResync { } } } - + Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get()); nm.stop(); } @@ -313,6 +337,16 @@ public class TestNodeManagerResync { class TestNodeManager1 extends NodeManager { private int registrationCount = 0; + private boolean containersShouldBePreserved; + private ContainerId existingCid; + + public TestNodeManager1(boolean containersShouldBePreserved) { + this.containersShouldBePreserved = containersShouldBePreserved; + } + + public void setExistingContainerId(ContainerId cId) { + existingCid = cId; + } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -344,10 +378,23 @@ public class TestNodeManagerResync { .containermanager.container.Container> containers = getNMContext().getContainers(); try { - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdaterAndRegisterWithRM(); - syncBarrier.await(); + try { + if (containersShouldBePreserved) { + Assert.assertFalse(containers.isEmpty()); + Assert.assertTrue(containers.containsKey(existingCid)); + } else { + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + } + super.rebootNodeStatusUpdaterAndRegisterWithRM(); + } + catch (AssertionError ae) { + ae.printStackTrace(); + assertionFailedInThread.set(true); + } + finally { + syncBarrier.await(); + } } catch (InterruptedException e) { } catch (BrokenBarrierException e) { } catch (AssertionError ae) {