YARN-688. Fixed NodeManager to properly cleanup containers when it is shut down. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1506814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-07-25 04:14:25 +00:00
parent f138ae68f9
commit db1e0c1eef
5 changed files with 60 additions and 20 deletions

View File

@ -746,6 +746,14 @@ Release 2.1.0-beta - 2013-07-02
YARN-875. Application can hang if AMRMClientAsync callback thread has
exception (Xuan Gong via bikas)
YARN-461. Fair scheduler should not accept apps with empty string queue name.
(ywskycn via tucu)
YARN-968. RM admin commands don't work. (vinodkv via kihwal)
YARN-688. Fixed NodeManager to properly cleanup containers when it is shut
down. (Jian He via vinodkv)
BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.
@ -811,11 +819,6 @@ Release 2.1.0-beta - 2013-07-02
YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan Liu
via cnauroth)
YARN-461. Fair scheduler should not accept apps with empty string queue name.
(ywskycn via tucu)
YARN-968. RM admin commands don't work. (vinodkv via kihwal)
Release 2.0.5-alpha - 06/06/2013
INCOMPATIBLE CHANGES

View File

@ -229,6 +229,15 @@ public class NodeManager extends CompositeService
return "NodeManager";
}
protected void shutDown() {
new Thread() {
@Override
public void run() {
NodeManager.this.stop();
}
}.start();
}
protected void resyncWithRM() {
//we do not want to block dispatcher thread here
new Thread() {
@ -265,6 +274,8 @@ public class NodeManager extends CompositeService
while (!containers.isEmpty()
&& System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
try {
//To remove done containers in NM context
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on shutdown",
@ -276,7 +287,6 @@ public class NodeManager extends CompositeService
while (!containers.isEmpty()) {
try {
Thread.sleep(1000);
//to remove done containers from the map
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on resync",
@ -409,7 +419,7 @@ public class NodeManager extends CompositeService
public void handle(NodeManagerEvent event) {
switch (event.getType()) {
case SHUTDOWN:
stop();
shutDown();
break;
case RESYNC:
resyncWithRM();

View File

@ -385,7 +385,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
if (response.getNodeAction() == NodeAction.RESYNC) {
LOG.warn("Node is out of sync with ResourceManager,"
+ " hence rebooting.");
+ " hence resyncing.");
LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage());
// Invalidate the RMIdentifier while resync
@ -418,6 +418,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
throw new YarnRuntimeException(e);
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -97,8 +99,12 @@ public class TestNodeStatusUpdater {
}
static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
static final Path basedir =
new Path("target", TestNodeStatusUpdater.class.getName());
static final File basedir =
new File("target", TestNodeStatusUpdater.class.getName());
static final File nmLocalDir = new File(basedir, "nm0");
static final File tmpDir = new File(basedir, "tmpDir");
static final File remoteLogsDir = new File(basedir, "remotelogs");
static final File logsDir = new File(basedir, "logs");
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
@ -110,9 +116,14 @@ public class TestNodeStatusUpdater {
private NodeManager nm;
private boolean containerStatusBackupSuccessfully = true;
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
@Before
public void setUp() {
nmLocalDir.mkdirs();
tmpDir.mkdirs();
logsDir.mkdirs();
remoteLogsDir.mkdirs();
conf = createNMConfig();
}
@ -121,6 +132,7 @@ public class TestNodeStatusUpdater {
this.registeredNodes.clear();
heartBeatID = 0;
ServiceOperations.stop(nm);
assertionFailedInThread.set(false);
DefaultMetricsSystem.shutdown();
}
@ -442,6 +454,13 @@ public class TestNodeStatusUpdater {
protected void serviceStop() throws Exception {
super.serviceStop();
isStopped = true;
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers =
getNMContext().getContainers();
// ensure that containers are empty
if(!containers.isEmpty()) {
assertionFailedInThread.set(true);
}
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
}
}
@ -723,7 +742,7 @@ public class TestNodeStatusUpdater {
@After
public void deleteBaseDir() throws IOException {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(basedir, true);
lfs.delete(new Path(basedir.getPath()), true);
}
@Test
@ -1095,7 +1114,7 @@ public class TestNodeStatusUpdater {
@Test(timeout = 200000)
public void testNodeStatusUpdaterRetryAndNMShutdown()
throws InterruptedException {
throws Exception {
final long connectionWaitSecs = 1;
final long connectionRetryIntervalSecs = 1;
YarnConfiguration conf = createNMConfig();
@ -1104,14 +1123,23 @@ public class TestNodeStatusUpdater {
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
connectionRetryIntervalSecs);
conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
CyclicBarrier syncBarrier = new CyclicBarrier(2);
nm = new MyNodeManager2(syncBarrier, conf);
nm.init(conf);
nm.start();
// start a container
ContainerId cId = TestNodeManagerShutdown.createContainerId();
FileContext localFS = FileContext.getLocalFSFileContext();
TestNodeManagerShutdown.startContainer(nm, cId, localFS, nmLocalDir,
new File("start_file.txt"));
try {
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
}
Assert.assertFalse("Containers not cleaned up when NM stopped",
assertionFailedInThread.get());
Assert.assertTrue(((MyNodeManager2) nm).isStopped);
Assert.assertTrue("calculate heartBeatCount based on" +
" connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
@ -1229,15 +1257,13 @@ public class TestNodeStatusUpdater {
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
.getPath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
"remotelogs").toUri().getPath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0")
.toUri().getPath());
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
return conf;
}

View File

@ -253,7 +253,7 @@ public class ResourceTrackerService extends AbstractService implements
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
/* node does not exist */
String message = "Node not found rebooting " + remoteNodeStatus.getNodeId();
String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
LOG.info(message);
resync.setDiagnosticsMessage(message);
return resync;