YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving restart is enabled. Contributed by Anubhav Dhoot

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1608334 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-07-07 04:37:59 +00:00
parent ed642b5d8d
commit 6d7dbd4fed
3 changed files with 91 additions and 32 deletions

View File

@ -72,6 +72,9 @@ Release 2.5.0 - UNRELEASED
YARN-1713. Added get-new-app and submit-app functionality to RM web services.
(Varun Vasudev via vinodkv)
YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving
restart is enabled. (Anubhav Dhoot via jianhe)
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

View File

@ -84,7 +84,8 @@ public class NodeManager extends CompositeService
private NMStateStoreService nmStore = null;
private AtomicBoolean isStopping = new AtomicBoolean(false);
private boolean rmWorkPreservingRestartEnabled;
public NodeManager() {
super(NodeManager.class.getName());
}
@ -173,6 +174,10 @@ public class NodeManager extends CompositeService
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration
.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
initAndStartRecoveryStore(conf);
NMContainerTokenSecretManager containerTokenSecretManager =
@ -276,8 +281,12 @@ public class NodeManager extends CompositeService
try {
LOG.info("Notifying ContainerManager to block new container-requests");
containerManager.setBlockNewContainerRequests(true);
LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();
if (!rmWorkPreservingRestartEnabled) {
LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();
} else {
LOG.info("Preserving containers on resync");
}
((NodeStatusUpdaterImpl) nodeStatusUpdater)
.rebootNodeStatusUpdaterAndRegisterWithRM();
} catch (YarnRuntimeException e) {

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
@ -85,6 +84,9 @@ public class TestNodeManagerResync {
private CyclicBarrier syncBarrier;
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
private final NodeManagerEvent resyncEvent =
new NodeManagerEvent(NodeManagerEventType.RESYNC);
@Before
public void setup() throws UnsupportedFileSystemException {
@ -102,34 +104,56 @@ public class TestNodeManagerResync {
assertionFailedInThread.set(false);
}
@SuppressWarnings("unchecked")
@Test
public void testKillContainersOnResync() throws IOException,
InterruptedException, YarnException {
NodeManager nm = new TestNodeManager1();
TestNodeManager1 nm = new TestNodeManager1(false);
testContainerPreservationOnResyncImpl(nm, false);
}
@Test
public void testPreserveContainersOnResyncKeepingContainers() throws
IOException,
InterruptedException, YarnException {
TestNodeManager1 nm = new TestNodeManager1(true);
testContainerPreservationOnResyncImpl(nm, true);
}
@SuppressWarnings("unchecked")
protected void testContainerPreservationOnResyncImpl(TestNodeManager1 nm,
boolean isWorkPreservingRestartEnabled)
throws IOException, YarnException, InterruptedException {
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
ContainerId cId = TestNodeManagerShutdown.createContainerId();
TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
processStartFile);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
isWorkPreservingRestartEnabled);
Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount());
nm.getNMDispatcher().getEventHandler().
handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount());
// Only containers should be killed on resync, apps should lie around. That
// way local resources for apps can be used beyond resync without
// relocalization
Assert.assertTrue(nm.getNMContext().getApplications()
.containsKey(cId.getApplicationAttemptId().getApplicationId()));
Assert.assertFalse(assertionFailedInThread.get());
nm.init(conf);
nm.start();
ContainerId cId = TestNodeManagerShutdown.createContainerId();
TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
processStartFile);
nm.stop();
nm.setExistingContainerId(cId);
Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount());
nm.getNMDispatcher().getEventHandler().handle(resyncEvent);
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount());
// Only containers should be killed on resync, apps should lie around.
// That way local resources for apps can be used beyond resync without
// relocalization
Assert.assertTrue(nm.getNMContext().getApplications()
.containsKey(cId.getApplicationAttemptId().getApplicationId()));
Assert.assertFalse(assertionFailedInThread.get());
}
finally {
nm.stop();
}
}
// This test tests new container requests are blocked when NM starts from
@ -157,7 +181,7 @@ public class TestNodeManagerResync {
Assert.assertFalse(assertionFailedInThread.get());
nm.stop();
}
@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testNMshutdownWhenResyncThrowException() throws IOException,
@ -169,7 +193,7 @@ public class TestNodeManagerResync {
Assert.assertEquals(1, ((TestNodeManager3) nm).getNMRegistrationCount());
nm.getNMDispatcher().getEventHandler()
.handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
synchronized (isNMShutdownCalled) {
while (isNMShutdownCalled.get() == false) {
try {
@ -178,7 +202,7 @@ public class TestNodeManagerResync {
}
}
}
Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get());
nm.stop();
}
@ -313,6 +337,16 @@ public class TestNodeManagerResync {
class TestNodeManager1 extends NodeManager {
private int registrationCount = 0;
private boolean containersShouldBePreserved;
private ContainerId existingCid;
public TestNodeManager1(boolean containersShouldBePreserved) {
this.containersShouldBePreserved = containersShouldBePreserved;
}
public void setExistingContainerId(ContainerId cId) {
existingCid = cId;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@ -344,10 +378,23 @@ public class TestNodeManagerResync {
.containermanager.container.Container> containers =
getNMContext().getContainers();
try {
// ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty());
super.rebootNodeStatusUpdaterAndRegisterWithRM();
syncBarrier.await();
try {
if (containersShouldBePreserved) {
Assert.assertFalse(containers.isEmpty());
Assert.assertTrue(containers.containsKey(existingCid));
} else {
// ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty());
}
super.rebootNodeStatusUpdaterAndRegisterWithRM();
}
catch (AssertionError ae) {
ae.printStackTrace();
assertionFailedInThread.set(true);
}
finally {
syncBarrier.await();
}
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
} catch (AssertionError ae) {