YARN-688. Fixed NodeManager to properly cleanup containers when it is shut down. Contributed by Jian He.
svn merge --ignore-ancestry -c 1506814 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1506815 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b3c61cb9ee
commit
f6663a1198
|
@ -729,6 +729,14 @@ Release 2.1.0-beta - 2013-07-02
|
|||
YARN-875. Application can hang if AMRMClientAsync callback thread has
|
||||
exception (Xuan Gong via bikas)
|
||||
|
||||
YARN-461. Fair scheduler should not accept apps with empty string queue name.
|
||||
(ywskycn via tucu)
|
||||
|
||||
YARN-968. RM admin commands don't work. (vinodkv via kihwal)
|
||||
|
||||
YARN-688. Fixed NodeManager to properly cleanup containers when it is shut
|
||||
down. (Jian He via vinodkv)
|
||||
|
||||
BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
YARN-158. Yarn creating package-info.java must not depend on sh.
|
||||
|
@ -794,11 +802,6 @@ Release 2.1.0-beta - 2013-07-02
|
|||
YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan Liu
|
||||
via cnauroth)
|
||||
|
||||
YARN-461. Fair scheduler should not accept apps with empty string queue name.
|
||||
(ywskycn via tucu)
|
||||
|
||||
YARN-968. RM admin commands don't work. (vinodkv via kihwal)
|
||||
|
||||
Release 2.0.5-alpha - 06/06/2013
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -229,6 +229,15 @@ public class NodeManager extends CompositeService
|
|||
return "NodeManager";
|
||||
}
|
||||
|
||||
protected void shutDown() {
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
NodeManager.this.stop();
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
|
||||
protected void resyncWithRM() {
|
||||
//we do not want to block dispatcher thread here
|
||||
new Thread() {
|
||||
|
@ -265,6 +274,8 @@ public class NodeManager extends CompositeService
|
|||
while (!containers.isEmpty()
|
||||
&& System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
|
||||
try {
|
||||
//To remove done containers in NM context
|
||||
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.warn("Interrupted while sleeping on container kill on shutdown",
|
||||
|
@ -276,7 +287,6 @@ public class NodeManager extends CompositeService
|
|||
while (!containers.isEmpty()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
//to remove done containers from the map
|
||||
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.warn("Interrupted while sleeping on container kill on resync",
|
||||
|
@ -409,7 +419,7 @@ public class NodeManager extends CompositeService
|
|||
public void handle(NodeManagerEvent event) {
|
||||
switch (event.getType()) {
|
||||
case SHUTDOWN:
|
||||
stop();
|
||||
shutDown();
|
||||
break;
|
||||
case RESYNC:
|
||||
resyncWithRM();
|
||||
|
|
|
@ -385,7 +385,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
}
|
||||
if (response.getNodeAction() == NodeAction.RESYNC) {
|
||||
LOG.warn("Node is out of sync with ResourceManager,"
|
||||
+ " hence rebooting.");
|
||||
+ " hence resyncing.");
|
||||
LOG.warn("Message from ResourceManager: "
|
||||
+ response.getDiagnosticsMessage());
|
||||
// Invalidate the RMIdentifier while resync
|
||||
|
@ -418,6 +418,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
||||
throw new YarnRuntimeException(e);
|
||||
} catch (Throwable e) {
|
||||
|
||||
// TODO Better error handling. Thread can die with the rest of the
|
||||
// NM still running.
|
||||
LOG.error("Caught exception in status-updater", e);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -97,8 +99,12 @@ public class TestNodeStatusUpdater {
|
|||
}
|
||||
|
||||
static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
|
||||
static final Path basedir =
|
||||
new Path("target", TestNodeStatusUpdater.class.getName());
|
||||
static final File basedir =
|
||||
new File("target", TestNodeStatusUpdater.class.getName());
|
||||
static final File nmLocalDir = new File(basedir, "nm0");
|
||||
static final File tmpDir = new File(basedir, "tmpDir");
|
||||
static final File remoteLogsDir = new File(basedir, "remotelogs");
|
||||
static final File logsDir = new File(basedir, "logs");
|
||||
private static final RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
|
||||
|
@ -110,9 +116,14 @@ public class TestNodeStatusUpdater {
|
|||
private NodeManager nm;
|
||||
private boolean containerStatusBackupSuccessfully = true;
|
||||
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
|
||||
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
nmLocalDir.mkdirs();
|
||||
tmpDir.mkdirs();
|
||||
logsDir.mkdirs();
|
||||
remoteLogsDir.mkdirs();
|
||||
conf = createNMConfig();
|
||||
}
|
||||
|
||||
|
@ -121,6 +132,7 @@ public class TestNodeStatusUpdater {
|
|||
this.registeredNodes.clear();
|
||||
heartBeatID = 0;
|
||||
ServiceOperations.stop(nm);
|
||||
assertionFailedInThread.set(false);
|
||||
DefaultMetricsSystem.shutdown();
|
||||
}
|
||||
|
||||
|
@ -442,6 +454,13 @@ public class TestNodeStatusUpdater {
|
|||
protected void serviceStop() throws Exception {
|
||||
super.serviceStop();
|
||||
isStopped = true;
|
||||
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
|
||||
.containermanager.container.Container> containers =
|
||||
getNMContext().getContainers();
|
||||
// ensure that containers are empty
|
||||
if(!containers.isEmpty()) {
|
||||
assertionFailedInThread.set(true);
|
||||
}
|
||||
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
@ -723,7 +742,7 @@ public class TestNodeStatusUpdater {
|
|||
@After
|
||||
public void deleteBaseDir() throws IOException {
|
||||
FileContext lfs = FileContext.getLocalFSFileContext();
|
||||
lfs.delete(basedir, true);
|
||||
lfs.delete(new Path(basedir.getPath()), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1095,7 +1114,7 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
@Test(timeout = 200000)
|
||||
public void testNodeStatusUpdaterRetryAndNMShutdown()
|
||||
throws InterruptedException {
|
||||
throws Exception {
|
||||
final long connectionWaitSecs = 1;
|
||||
final long connectionRetryIntervalSecs = 1;
|
||||
YarnConfiguration conf = createNMConfig();
|
||||
|
@ -1104,14 +1123,23 @@ public class TestNodeStatusUpdater {
|
|||
conf.setLong(YarnConfiguration
|
||||
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
|
||||
connectionRetryIntervalSecs);
|
||||
conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
|
||||
CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
||||
nm = new MyNodeManager2(syncBarrier, conf);
|
||||
nm.init(conf);
|
||||
nm.start();
|
||||
// start a container
|
||||
ContainerId cId = TestNodeManagerShutdown.createContainerId();
|
||||
FileContext localFS = FileContext.getLocalFSFileContext();
|
||||
TestNodeManagerShutdown.startContainer(nm, cId, localFS, nmLocalDir,
|
||||
new File("start_file.txt"));
|
||||
|
||||
try {
|
||||
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
Assert.assertFalse("Containers not cleaned up when NM stopped",
|
||||
assertionFailedInThread.get());
|
||||
Assert.assertTrue(((MyNodeManager2) nm).isStopped);
|
||||
Assert.assertTrue("calculate heartBeatCount based on" +
|
||||
" connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
|
||||
|
@ -1229,15 +1257,13 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
private YarnConfiguration createNMConfig() {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
|
||||
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
|
||||
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345");
|
||||
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346");
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
|
||||
.getPath());
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
|
||||
"remotelogs").toUri().getPath());
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0")
|
||||
.toUri().getPath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
remoteLogsDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
|
|
@ -253,7 +253,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
|
||||
if (rmNode == null) {
|
||||
/* node does not exist */
|
||||
String message = "Node not found rebooting " + remoteNodeStatus.getNodeId();
|
||||
String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
|
||||
LOG.info(message);
|
||||
resync.setDiagnosticsMessage(message);
|
||||
return resync;
|
||||
|
|
Loading…
Reference in New Issue