MAPREDUCE-3355. Fixed MR AM's ContainerLauncher to handle node-command timeouts correctly. (vinodkv)

svn merge -c r1202744 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1202745 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-11-16 15:40:05 +00:00
parent b57eef4714
commit 6a1621e32c
3 changed files with 86 additions and 30 deletions

View File

@ -41,6 +41,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3102. Changed NodeManager to fail fast when LinuxContainerExecutor
has wrong configuration or permissions. (Hitesh Shah via vinodkv)
MAPREDUCE-3355. Fixed MR AM's ContainerLauncher to handle node-command
timeouts correctly. (vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -82,6 +82,7 @@ public class ContainerLauncherImpl extends AbstractService implements
private Thread eventHandlingThread;
private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
final Timer commandTimer = new Timer(true);
YarnRPC rpc;
// To track numNodes.
@ -201,14 +202,14 @@ public class ContainerLauncherImpl extends AbstractService implements
return proxy;
}
private static class CommandTimer extends TimerTask {
private static class CommandTimerTask extends TimerTask {
private final Thread commandThread;
protected final ContainerLauncherEvent event;
protected final String message;
private boolean cancelled = false;
public CommandTimer(Thread thread, ContainerLauncherEvent event) {
public CommandTimerTask(Thread thread, ContainerLauncherEvent event) {
super();
this.commandThread = thread;
this.event = event;
this.message = "Couldn't complete " + event.getType() + " on "
+ event.getContainerID() + "/" + event.getTaskAttemptID()
+ ". Interrupting and returning";
@ -216,8 +217,27 @@ public class ContainerLauncherImpl extends AbstractService implements
@Override
public void run() {
LOG.warn(this.message);
this.commandThread.interrupt();
synchronized (this) {
if (this.cancelled) {
return;
}
LOG.warn(this.message);
StackTraceElement[] trace = this.commandThread.getStackTrace();
StringBuilder logMsg = new StringBuilder();
for (int i = 0; i < trace.length; i++) {
logMsg.append("\n\tat " + trace[i]);
}
LOG.info("Stack trace of the command-thread: \n" + logMsg.toString());
this.commandThread.interrupt();
}
}
@Override
public boolean cancel() {
synchronized (this) {
this.cancelled = true;
return super.cancel();
}
}
}
@ -243,10 +263,11 @@ public class ContainerLauncherImpl extends AbstractService implements
ContainerToken containerToken = event.getContainerToken();
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
Timer timer = new Timer(true);
ContainerManager proxy = null;
CommandTimerTask timerTask = new CommandTimerTask(Thread
.currentThread(), event);
switch(event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
@ -254,16 +275,16 @@ public class ContainerLauncherImpl extends AbstractService implements
= (ContainerRemoteLaunchEvent) event;
try {
timer.schedule(new CommandTimer(Thread.currentThread(), event),
nmTimeOut);
commandTimer.schedule(timerTask, nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
// Interruped during getProxy, but that didn't throw exception
if (Thread.currentThread().isInterrupted()) {
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while.
String message = "Start-container for " + event.getContainerID()
String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning.";
sendContainerLaunchFailedMsg(taskAttemptID, message);
return;
@ -280,11 +301,12 @@ public class ContainerLauncherImpl extends AbstractService implements
StartContainerResponse response = proxy.startContainer(startRequest);
// container started properly. Stop the timer
timer.cancel();
if (Thread.currentThread().isInterrupted()) {
timerTask.cancel();
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while, but
// startContainer didn't throw exception
String message = "Start-container for " + event.getContainerID()
String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning.";
sendContainerLaunchFailedMsg(taskAttemptID, message);
return;
@ -309,12 +331,19 @@ public class ContainerLauncherImpl extends AbstractService implements
context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
} catch (Throwable t) {
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while.
LOG.info("Start-container for " + event.getContainerID()
+ " got interrupted.");
}
String message = "Container launch failed for " + containerID
+ " : " + StringUtils.stringifyException(t);
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
timer.cancel();
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
timerTask.cancel();
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
}
break;
@ -331,13 +360,12 @@ public class ContainerLauncherImpl extends AbstractService implements
} else {
try {
timer.schedule(new CommandTimer(Thread.currentThread(), event),
nmTimeOut);
commandTimer.schedule(timerTask, nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
if (Thread.currentThread().isInterrupted()) {
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while. No need to
// return, send cleanedup event anyways.
LOG.info("Stop-container for " + event.getContainerID()
@ -353,6 +381,14 @@ public class ContainerLauncherImpl extends AbstractService implements
proxy.stopContainer(stopRequest);
}
} catch (Throwable t) {
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while, clear the
// interrupt flag
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
}
// ignore the cleanup failure
String message = "cleanup failed for container "
+ event.getContainerID() + " : "
@ -363,8 +399,18 @@ public class ContainerLauncherImpl extends AbstractService implements
message));
LOG.warn(message);
} finally {
timer.cancel();
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
timerTask.cancel();
if (Thread.interrupted()) {
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
// ignore the cleanup failure
context.getEventHandler()
.handle(new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
"cleanup failed for container " + event.getContainerID()));
}
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
}
// after killing, send killed event to taskattempt

View File

@ -88,11 +88,19 @@ public class TestContainerLauncher {
app.waitForState(job, JobState.FAILED);
LOG.info("attempt.getDiagnostics: " + attempt.getDiagnostics());
Assert.assertTrue(attempt.getDiagnostics().toString().contains(
"Container launch failed for container_0_0000_01_000000 : "));
Assert.assertTrue(attempt.getDiagnostics().toString().contains(
": java.lang.InterruptedException"));
String diagnostics = attempt.getDiagnostics().toString();
LOG.info("attempt.getDiagnostics: " + diagnostics);
if (swallowInterrupts) {
Assert.assertEquals("[Container launch failed for "
+ "container_0_0000_01_000000 : Start-container for "
+ "container_0_0000_01_000000 got interrupted. Returning.]",
diagnostics);
} else {
Assert.assertTrue(diagnostics.contains("Container launch failed for "
+ "container_0_0000_01_000000 : "));
Assert.assertTrue(diagnostics
.contains(": java.lang.InterruptedException"));
}
app.stop();
}
@ -119,11 +127,10 @@ public class TestContainerLauncher {
}
} catch (InterruptedException e) {
LOG.info(e);
if (!swallowInterrupts) {
if (!MRAppWithSlowNM.this.swallowInterrupts) {
throw new IOException(e);
} else {
Thread.currentThread().interrupt();
}
Thread.currentThread().interrupt();
}
return null;
}