Merge r1608334 from trunk. YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving restart is enabled. Contributed by Anubhav Dhoot

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1608336 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-07-07 04:40:36 +00:00
parent 92de5cae9b
commit 77a94b73b2
3 changed files with 91 additions and 32 deletions

View File

@ -54,6 +54,9 @@ Release 2.5.0 - UNRELEASED
YARN-1713. Added get-new-app and submit-app functionality to RM web services. YARN-1713. Added get-new-app and submit-app functionality to RM web services.
(Varun Vasudev via vinodkv) (Varun Vasudev via vinodkv)
YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving
restart is enabled. (Anubhav Dhoot via jianhe)
IMPROVEMENTS IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

View File

@ -84,6 +84,7 @@ public class NodeManager extends CompositeService
private NMStateStoreService nmStore = null; private NMStateStoreService nmStore = null;
private AtomicBoolean isStopping = new AtomicBoolean(false); private AtomicBoolean isStopping = new AtomicBoolean(false);
private boolean rmWorkPreservingRestartEnabled;
public NodeManager() { public NodeManager() {
super(NodeManager.class.getName()); super(NodeManager.class.getName());
@ -173,6 +174,10 @@ public class NodeManager extends CompositeService
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration
.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
initAndStartRecoveryStore(conf); initAndStartRecoveryStore(conf);
NMContainerTokenSecretManager containerTokenSecretManager = NMContainerTokenSecretManager containerTokenSecretManager =
@ -276,8 +281,12 @@ public class NodeManager extends CompositeService
try { try {
LOG.info("Notifying ContainerManager to block new container-requests"); LOG.info("Notifying ContainerManager to block new container-requests");
containerManager.setBlockNewContainerRequests(true); containerManager.setBlockNewContainerRequests(true);
LOG.info("Cleaning up running containers on resync"); if (!rmWorkPreservingRestartEnabled) {
containerManager.cleanupContainersOnNMResync(); LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();
} else {
LOG.info("Preserving containers on resync");
}
((NodeStatusUpdaterImpl) nodeStatusUpdater) ((NodeStatusUpdaterImpl) nodeStatusUpdater)
.rebootNodeStatusUpdaterAndRegisterWithRM(); .rebootNodeStatusUpdaterAndRegisterWithRM();
} catch (YarnRuntimeException e) { } catch (YarnRuntimeException e) {

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
@ -85,6 +84,9 @@ public class TestNodeManagerResync {
private CyclicBarrier syncBarrier; private CyclicBarrier syncBarrier;
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false); private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
private final NodeManagerEvent resyncEvent =
new NodeManagerEvent(NodeManagerEventType.RESYNC);
@Before @Before
public void setup() throws UnsupportedFileSystemException { public void setup() throws UnsupportedFileSystemException {
@ -102,34 +104,56 @@ public class TestNodeManagerResync {
assertionFailedInThread.set(false); assertionFailedInThread.set(false);
} }
@SuppressWarnings("unchecked")
@Test @Test
public void testKillContainersOnResync() throws IOException, public void testKillContainersOnResync() throws IOException,
InterruptedException, YarnException { InterruptedException, YarnException {
NodeManager nm = new TestNodeManager1(); TestNodeManager1 nm = new TestNodeManager1(false);
testContainerPreservationOnResyncImpl(nm, false);
}
@Test
public void testPreserveContainersOnResyncKeepingContainers() throws
IOException,
InterruptedException, YarnException {
TestNodeManager1 nm = new TestNodeManager1(true);
testContainerPreservationOnResyncImpl(nm, true);
}
@SuppressWarnings("unchecked")
protected void testContainerPreservationOnResyncImpl(TestNodeManager1 nm,
boolean isWorkPreservingRestartEnabled)
throws IOException, YarnException, InterruptedException {
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
nm.init(conf); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
nm.start(); isWorkPreservingRestartEnabled);
ContainerId cId = TestNodeManagerShutdown.createContainerId();
TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
processStartFile);
Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount());
nm.getNMDispatcher().getEventHandler().
handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
try { try {
syncBarrier.await(); nm.init(conf);
} catch (BrokenBarrierException e) { nm.start();
} ContainerId cId = TestNodeManagerShutdown.createContainerId();
Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount()); TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
// Only containers should be killed on resync, apps should lie around. That processStartFile);
// way local resources for apps can be used beyond resync without
// relocalization
Assert.assertTrue(nm.getNMContext().getApplications()
.containsKey(cId.getApplicationAttemptId().getApplicationId()));
Assert.assertFalse(assertionFailedInThread.get());
nm.stop(); nm.setExistingContainerId(cId);
Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount());
nm.getNMDispatcher().getEventHandler().handle(resyncEvent);
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount());
// Only containers should be killed on resync, apps should lie around.
// That way local resources for apps can be used beyond resync without
// relocalization
Assert.assertTrue(nm.getNMContext().getApplications()
.containsKey(cId.getApplicationAttemptId().getApplicationId()));
Assert.assertFalse(assertionFailedInThread.get());
}
finally {
nm.stop();
}
} }
// This test tests new container requests are blocked when NM starts from // This test tests new container requests are blocked when NM starts from
@ -313,6 +337,16 @@ public class TestNodeManagerResync {
class TestNodeManager1 extends NodeManager { class TestNodeManager1 extends NodeManager {
private int registrationCount = 0; private int registrationCount = 0;
private boolean containersShouldBePreserved;
private ContainerId existingCid;
public TestNodeManager1(boolean containersShouldBePreserved) {
this.containersShouldBePreserved = containersShouldBePreserved;
}
public void setExistingContainerId(ContainerId cId) {
existingCid = cId;
}
@Override @Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@ -344,10 +378,23 @@ public class TestNodeManagerResync {
.containermanager.container.Container> containers = .containermanager.container.Container> containers =
getNMContext().getContainers(); getNMContext().getContainers();
try { try {
// ensure that containers are empty before restart nodeStatusUpdater try {
Assert.assertTrue(containers.isEmpty()); if (containersShouldBePreserved) {
super.rebootNodeStatusUpdaterAndRegisterWithRM(); Assert.assertFalse(containers.isEmpty());
syncBarrier.await(); Assert.assertTrue(containers.containsKey(existingCid));
} else {
// ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty());
}
super.rebootNodeStatusUpdaterAndRegisterWithRM();
}
catch (AssertionError ae) {
ae.printStackTrace();
assertionFailedInThread.set(true);
}
finally {
syncBarrier.await();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
} catch (BrokenBarrierException e) { } catch (BrokenBarrierException e) {
} catch (AssertionError ae) { } catch (AssertionError ae) {