diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 74f25008f67..53e2efaba8a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -94,6 +94,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3102. Changed NodeManager to fail fast when LinuxContainerExecutor has wrong configuration or permissions. (Hitesh Shah via vinodkv) + MAPREDUCE-3355. Fixed MR AM's ContainerLauncher to handle node-command + timeouts correctly. (vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 62ceae99f97..f1670039e67 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -82,6 +82,7 @@ public class ContainerLauncherImpl extends AbstractService implements private Thread eventHandlingThread; private BlockingQueue eventQueue = new LinkedBlockingQueue(); + final Timer commandTimer = new Timer(true); YarnRPC rpc; // To track numNodes. @@ -201,14 +202,14 @@ public class ContainerLauncherImpl extends AbstractService implements return proxy; } - private static class CommandTimer extends TimerTask { + private static class CommandTimerTask extends TimerTask { private final Thread commandThread; - protected final ContainerLauncherEvent event; protected final String message; + private boolean cancelled = false; - public CommandTimer(Thread thread, ContainerLauncherEvent event) { + public CommandTimerTask(Thread thread, ContainerLauncherEvent event) { + super(); this.commandThread = thread; - this.event = event; this.message = "Couldn't complete " + event.getType() + " on " + event.getContainerID() + "/" + event.getTaskAttemptID() + ". Interrupting and returning"; @@ -216,8 +217,27 @@ public class ContainerLauncherImpl extends AbstractService implements @Override public void run() { - LOG.warn(this.message); - this.commandThread.interrupt(); + synchronized (this) { + if (this.cancelled) { + return; + } + LOG.warn(this.message); + StackTraceElement[] trace = this.commandThread.getStackTrace(); + StringBuilder logMsg = new StringBuilder(); + for (int i = 0; i < trace.length; i++) { + logMsg.append("\n\tat " + trace[i]); + } + LOG.info("Stack trace of the command-thread: \n" + logMsg.toString()); + this.commandThread.interrupt(); + } + } + + @Override + public boolean cancel() { + synchronized (this) { + this.cancelled = true; + return super.cancel(); + } } } @@ -243,10 +263,11 @@ public class ContainerLauncherImpl extends AbstractService implements ContainerToken containerToken = event.getContainerToken(); TaskAttemptId taskAttemptID = event.getTaskAttemptID(); - Timer timer = new Timer(true); - ContainerManager proxy = null; + CommandTimerTask timerTask = new CommandTimerTask(Thread + .currentThread(), event); + switch(event.getType()) { case CONTAINER_REMOTE_LAUNCH: @@ -254,16 +275,16 @@ public class ContainerLauncherImpl extends AbstractService implements = (ContainerRemoteLaunchEvent) event; try { - timer.schedule(new CommandTimer(Thread.currentThread(), event), - nmTimeOut); + commandTimer.schedule(timerTask, nmTimeOut); proxy = getCMProxy(containerID, containerManagerBindAddr, containerToken); // Interruped during getProxy, but that didn't throw exception - if (Thread.currentThread().isInterrupted()) { + if (Thread.interrupted()) { // The timer cancelled the command in the mean while. - String message = "Start-container for " + event.getContainerID() + String message = "Container launch failed for " + containerID + + " : Start-container for " + event.getContainerID() + " got interrupted. Returning."; sendContainerLaunchFailedMsg(taskAttemptID, message); return; @@ -280,11 +301,12 @@ public class ContainerLauncherImpl extends AbstractService implements StartContainerResponse response = proxy.startContainer(startRequest); // container started properly. Stop the timer - timer.cancel(); - if (Thread.currentThread().isInterrupted()) { + timerTask.cancel(); + if (Thread.interrupted()) { // The timer cancelled the command in the mean while, but // startContainer didn't throw exception - String message = "Start-container for " + event.getContainerID() + String message = "Container launch failed for " + containerID + + " : Start-container for " + event.getContainerID() + " got interrupted. Returning."; sendContainerLaunchFailedMsg(taskAttemptID, message); return; @@ -309,12 +331,19 @@ public class ContainerLauncherImpl extends AbstractService implements context.getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(taskAttemptID, port)); } catch (Throwable t) { + if (Thread.interrupted()) { + // The timer cancelled the command in the mean while. + LOG.info("Start-container for " + event.getContainerID() + + " got interrupted."); + } String message = "Container launch failed for " + containerID + " : " + StringUtils.stringifyException(t); sendContainerLaunchFailedMsg(taskAttemptID, message); } finally { - timer.cancel(); - ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); + timerTask.cancel(); + if (proxy != null) { + ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); + } } break; @@ -331,13 +360,12 @@ public class ContainerLauncherImpl extends AbstractService implements } else { try { - timer.schedule(new CommandTimer(Thread.currentThread(), event), - nmTimeOut); + commandTimer.schedule(timerTask, nmTimeOut); proxy = getCMProxy(containerID, containerManagerBindAddr, containerToken); - if (Thread.currentThread().isInterrupted()) { + if (Thread.interrupted()) { // The timer cancelled the command in the mean while. No need to // return, send cleanedup event anyways. LOG.info("Stop-container for " + event.getContainerID() @@ -353,6 +381,14 @@ public class ContainerLauncherImpl extends AbstractService implements proxy.stopContainer(stopRequest); } } catch (Throwable t) { + + if (Thread.interrupted()) { + // The timer cancelled the command in the mean while, clear the + // interrupt flag + LOG.info("Stop-container for " + event.getContainerID() + + " got interrupted."); + } + // ignore the cleanup failure String message = "cleanup failed for container " + event.getContainerID() + " : " @@ -363,8 +399,18 @@ public class ContainerLauncherImpl extends AbstractService implements message)); LOG.warn(message); } finally { - timer.cancel(); - ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); + timerTask.cancel(); + if (Thread.interrupted()) { + LOG.info("Stop-container for " + event.getContainerID() + + " got interrupted."); + // ignore the cleanup failure + context.getEventHandler() + .handle(new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, + "cleanup failed for container " + event.getContainerID())); + } + if (proxy != null) { + ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig()); + } } // after killing, send killed event to taskattempt diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java index b2686e2314b..860133fada0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java @@ -88,11 +88,19 @@ public class TestContainerLauncher { app.waitForState(job, JobState.FAILED); - LOG.info("attempt.getDiagnostics: " + attempt.getDiagnostics()); - Assert.assertTrue(attempt.getDiagnostics().toString().contains( - "Container launch failed for container_0_0000_01_000000 : ")); - Assert.assertTrue(attempt.getDiagnostics().toString().contains( - ": java.lang.InterruptedException")); + String diagnostics = attempt.getDiagnostics().toString(); + LOG.info("attempt.getDiagnostics: " + diagnostics); + if (swallowInterrupts) { + Assert.assertEquals("[Container launch failed for " + + "container_0_0000_01_000000 : Start-container for " + + "container_0_0000_01_000000 got interrupted. Returning.]", + diagnostics); + } else { + Assert.assertTrue(diagnostics.contains("Container launch failed for " + + "container_0_0000_01_000000 : ")); + Assert.assertTrue(diagnostics + .contains(": java.lang.InterruptedException")); + } app.stop(); } @@ -119,11 +127,10 @@ public class TestContainerLauncher { } } catch (InterruptedException e) { LOG.info(e); - if (!swallowInterrupts) { + if (!MRAppWithSlowNM.this.swallowInterrupts) { throw new IOException(e); - } else { - Thread.currentThread().interrupt(); } + Thread.currentThread().interrupt(); } return null; }