MAPREDUCE-3355. Fixed MR AM's ContainerLauncher to handle node-command timeouts correctly. (vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1202744 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-11-16 15:37:27 +00:00
parent 649080131c
commit 00b50a5c94
3 changed files with 86 additions and 30 deletions

View File

@ -94,6 +94,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3102. Changed NodeManager to fail fast when LinuxContainerExecutor MAPREDUCE-3102. Changed NodeManager to fail fast when LinuxContainerExecutor
has wrong configuration or permissions. (Hitesh Shah via vinodkv) has wrong configuration or permissions. (Hitesh Shah via vinodkv)
MAPREDUCE-3355. Fixed MR AM's ContainerLauncher to handle node-command
timeouts correctly. (vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -82,6 +82,7 @@ public class ContainerLauncherImpl extends AbstractService implements
private Thread eventHandlingThread; private Thread eventHandlingThread;
private BlockingQueue<ContainerLauncherEvent> eventQueue = private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>(); new LinkedBlockingQueue<ContainerLauncherEvent>();
final Timer commandTimer = new Timer(true);
YarnRPC rpc; YarnRPC rpc;
// To track numNodes. // To track numNodes.
@ -201,14 +202,14 @@ public class ContainerLauncherImpl extends AbstractService implements
return proxy; return proxy;
} }
private static class CommandTimer extends TimerTask { private static class CommandTimerTask extends TimerTask {
private final Thread commandThread; private final Thread commandThread;
protected final ContainerLauncherEvent event;
protected final String message; protected final String message;
private boolean cancelled = false;
public CommandTimer(Thread thread, ContainerLauncherEvent event) { public CommandTimerTask(Thread thread, ContainerLauncherEvent event) {
super();
this.commandThread = thread; this.commandThread = thread;
this.event = event;
this.message = "Couldn't complete " + event.getType() + " on " this.message = "Couldn't complete " + event.getType() + " on "
+ event.getContainerID() + "/" + event.getTaskAttemptID() + event.getContainerID() + "/" + event.getTaskAttemptID()
+ ". Interrupting and returning"; + ". Interrupting and returning";
@ -216,8 +217,27 @@ public class ContainerLauncherImpl extends AbstractService implements
@Override @Override
public void run() { public void run() {
LOG.warn(this.message); synchronized (this) {
this.commandThread.interrupt(); if (this.cancelled) {
return;
}
LOG.warn(this.message);
StackTraceElement[] trace = this.commandThread.getStackTrace();
StringBuilder logMsg = new StringBuilder();
for (int i = 0; i < trace.length; i++) {
logMsg.append("\n\tat " + trace[i]);
}
LOG.info("Stack trace of the command-thread: \n" + logMsg.toString());
this.commandThread.interrupt();
}
}
@Override
public boolean cancel() {
synchronized (this) {
this.cancelled = true;
return super.cancel();
}
} }
} }
@ -243,10 +263,11 @@ public class ContainerLauncherImpl extends AbstractService implements
ContainerToken containerToken = event.getContainerToken(); ContainerToken containerToken = event.getContainerToken();
TaskAttemptId taskAttemptID = event.getTaskAttemptID(); TaskAttemptId taskAttemptID = event.getTaskAttemptID();
Timer timer = new Timer(true);
ContainerManager proxy = null; ContainerManager proxy = null;
CommandTimerTask timerTask = new CommandTimerTask(Thread
.currentThread(), event);
switch(event.getType()) { switch(event.getType()) {
case CONTAINER_REMOTE_LAUNCH: case CONTAINER_REMOTE_LAUNCH:
@ -254,16 +275,16 @@ public class ContainerLauncherImpl extends AbstractService implements
= (ContainerRemoteLaunchEvent) event; = (ContainerRemoteLaunchEvent) event;
try { try {
timer.schedule(new CommandTimer(Thread.currentThread(), event), commandTimer.schedule(timerTask, nmTimeOut);
nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr, proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken); containerToken);
// Interruped during getProxy, but that didn't throw exception // Interruped during getProxy, but that didn't throw exception
if (Thread.currentThread().isInterrupted()) { if (Thread.interrupted()) {
// The timer cancelled the command in the mean while. // The timer cancelled the command in the mean while.
String message = "Start-container for " + event.getContainerID() String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning."; + " got interrupted. Returning.";
sendContainerLaunchFailedMsg(taskAttemptID, message); sendContainerLaunchFailedMsg(taskAttemptID, message);
return; return;
@ -280,11 +301,12 @@ public class ContainerLauncherImpl extends AbstractService implements
StartContainerResponse response = proxy.startContainer(startRequest); StartContainerResponse response = proxy.startContainer(startRequest);
// container started properly. Stop the timer // container started properly. Stop the timer
timer.cancel(); timerTask.cancel();
if (Thread.currentThread().isInterrupted()) { if (Thread.interrupted()) {
// The timer cancelled the command in the mean while, but // The timer cancelled the command in the mean while, but
// startContainer didn't throw exception // startContainer didn't throw exception
String message = "Start-container for " + event.getContainerID() String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning."; + " got interrupted. Returning.";
sendContainerLaunchFailedMsg(taskAttemptID, message); sendContainerLaunchFailedMsg(taskAttemptID, message);
return; return;
@ -309,12 +331,19 @@ public class ContainerLauncherImpl extends AbstractService implements
context.getEventHandler().handle( context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port)); new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
} catch (Throwable t) { } catch (Throwable t) {
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while.
LOG.info("Start-container for " + event.getContainerID()
+ " got interrupted.");
}
String message = "Container launch failed for " + containerID String message = "Container launch failed for " + containerID
+ " : " + StringUtils.stringifyException(t); + " : " + StringUtils.stringifyException(t);
sendContainerLaunchFailedMsg(taskAttemptID, message); sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally { } finally {
timer.cancel(); timerTask.cancel();
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
} }
break; break;
@ -331,13 +360,12 @@ public class ContainerLauncherImpl extends AbstractService implements
} else { } else {
try { try {
timer.schedule(new CommandTimer(Thread.currentThread(), event), commandTimer.schedule(timerTask, nmTimeOut);
nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr, proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken); containerToken);
if (Thread.currentThread().isInterrupted()) { if (Thread.interrupted()) {
// The timer cancelled the command in the mean while. No need to // The timer cancelled the command in the mean while. No need to
// return, send cleanedup event anyways. // return, send cleanedup event anyways.
LOG.info("Stop-container for " + event.getContainerID() LOG.info("Stop-container for " + event.getContainerID()
@ -353,6 +381,14 @@ public class ContainerLauncherImpl extends AbstractService implements
proxy.stopContainer(stopRequest); proxy.stopContainer(stopRequest);
} }
} catch (Throwable t) { } catch (Throwable t) {
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while, clear the
// interrupt flag
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
}
// ignore the cleanup failure // ignore the cleanup failure
String message = "cleanup failed for container " String message = "cleanup failed for container "
+ event.getContainerID() + " : " + event.getContainerID() + " : "
@ -363,8 +399,18 @@ public class ContainerLauncherImpl extends AbstractService implements
message)); message));
LOG.warn(message); LOG.warn(message);
} finally { } finally {
timer.cancel(); timerTask.cancel();
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); if (Thread.interrupted()) {
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
// ignore the cleanup failure
context.getEventHandler()
.handle(new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
"cleanup failed for container " + event.getContainerID()));
}
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
} }
// after killing, send killed event to taskattempt // after killing, send killed event to taskattempt

View File

@ -88,11 +88,19 @@ public class TestContainerLauncher {
app.waitForState(job, JobState.FAILED); app.waitForState(job, JobState.FAILED);
LOG.info("attempt.getDiagnostics: " + attempt.getDiagnostics()); String diagnostics = attempt.getDiagnostics().toString();
Assert.assertTrue(attempt.getDiagnostics().toString().contains( LOG.info("attempt.getDiagnostics: " + diagnostics);
"Container launch failed for container_0_0000_01_000000 : ")); if (swallowInterrupts) {
Assert.assertTrue(attempt.getDiagnostics().toString().contains( Assert.assertEquals("[Container launch failed for "
": java.lang.InterruptedException")); + "container_0_0000_01_000000 : Start-container for "
+ "container_0_0000_01_000000 got interrupted. Returning.]",
diagnostics);
} else {
Assert.assertTrue(diagnostics.contains("Container launch failed for "
+ "container_0_0000_01_000000 : "));
Assert.assertTrue(diagnostics
.contains(": java.lang.InterruptedException"));
}
app.stop(); app.stop();
} }
@ -119,11 +127,10 @@ public class TestContainerLauncher {
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info(e); LOG.info(e);
if (!swallowInterrupts) { if (!MRAppWithSlowNM.this.swallowInterrupts) {
throw new IOException(e); throw new IOException(e);
} else {
Thread.currentThread().interrupt();
} }
Thread.currentThread().interrupt();
} }
return null; return null;
} }