From 44998783922b1d7253193298aac57c181aa92d3d Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Thu, 27 Oct 2011 17:32:39 +0000 Subject: [PATCH] Merge -c 1189879 from trunk to branch-0.23 to fix MAPREDUCE-3228. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189880 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/launcher/ContainerLauncher.java | 9 ++ .../app/launcher/ContainerLauncherImpl.java | 147 ++++++++++++++---- .../v2/app/TestContainerLauncher.java | 132 ++++++++++++++++ .../hadoop/mapreduce/v2/app/TestFail.java | 3 +- 5 files changed, 259 insertions(+), 35 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ad74306415c..bbe33e9250f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1752,6 +1752,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3281. Fixed a bug in TestLinuxContainerExecutorWithMocks. (vinodkv) + MAPREDUCE-3228. Fixed MR AM to timeout RPCs to bad NodeManagers. (vinodkv + via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java index cc41db1bc41..12ac363875b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.launcher; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.yarn.event.EventHandler; public interface ContainerLauncher @@ -28,4 +29,12 @@ public interface ContainerLauncher CONTAINER_REMOTE_LAUNCH, CONTAINER_REMOTE_CLEANUP } + + // Not a documented config. Only used for tests + static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX + + "nm-command-timeout"; + /** + * Maximum of 1 minute timeout for a Node to react to the command + */ + static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 20f2a2b7349..bd2ce7357e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.launcher; import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -69,7 +71,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ContainerLauncherImpl extends AbstractService implements ContainerLauncher { - private static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class); + static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class); + + int nmTimeOut; private AppContext context; private ThreadPoolExecutor launcherPool; @@ -95,14 +99,17 @@ public class ContainerLauncherImpl extends AbstractService implements this.limitOnPoolSize = conf.getInt( MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); + this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, + ContainerLauncher.DEFAULT_NM__COMMAND_TIMEOUT); super.init(conf); } public void start() { + + ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( + "ContainerLauncher #%d").setDaemon(true).build(); + // Start with a default core-pool size of 10 and change it dynamically. - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("ContainerLauncher #%d") - .build(); launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE, Integer.MAX_VALUE, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), @@ -156,11 +163,11 @@ public class ContainerLauncherImpl extends AbstractService implements public void stop() { eventHandlingThread.interrupt(); - launcherPool.shutdown(); + launcherPool.shutdownNow(); super.stop(); } - protected ContainerManager getCMProxy(ContainerId containerID, + protected ContainerManager getCMProxy( final String containerManagerBindAddr, ContainerToken containerToken) throws IOException { @@ -193,6 +200,27 @@ public class ContainerLauncherImpl extends AbstractService implements return proxy; } + private static class CommandTimer extends TimerTask { + private final Thread commandThread; + protected final ContainerLauncherEvent event; + protected final String message; + + public CommandTimer(Thread thread, ContainerLauncherEvent event) { + this.commandThread = thread; + this.event = event; + this.message = "Couldn't complete " + event.getType() + " on " + + event.getContainerID() + "/" + event.getTaskAttemptID() + + ". Interrupting and returning"; + } + + + @Override + public void run() { + LOG.warn(this.message); + this.commandThread.interrupt(); + } + } + /** * Setup and start the container on remote nodemanager. */ @@ -213,27 +241,53 @@ public class ContainerLauncherImpl extends AbstractService implements final String containerManagerBindAddr = event.getContainerMgrAddress(); ContainerId containerID = event.getContainerID(); ContainerToken containerToken = event.getContainerToken(); + TaskAttemptId taskAttemptID = event.getTaskAttemptID(); + + Timer timer = new Timer(true); switch(event.getType()) { case CONTAINER_REMOTE_LAUNCH: - ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent) event; + ContainerRemoteLaunchEvent launchEvent + = (ContainerRemoteLaunchEvent) event; - TaskAttemptId taskAttemptID = launchEv.getTaskAttemptID(); try { - - ContainerManager proxy = - getCMProxy(containerID, containerManagerBindAddr, containerToken); - + timer.schedule(new CommandTimer(Thread.currentThread(), event), + nmTimeOut); + + ContainerManager proxy = getCMProxy(containerManagerBindAddr, + containerToken); + + // Interruped during getProxy, but that didn't throw exception + if (Thread.currentThread().isInterrupted()) { + // The timer cancelled the command in the mean while. + String message = "Start-container for " + event.getContainerID() + + " got interrupted. Returning."; + sendContainerLaunchFailedMsg(taskAttemptID, message); + return; + } + // Construct the actual Container ContainerLaunchContext containerLaunchContext = - launchEv.getContainer(); + launchEvent.getContainer(); // Now launch the actual container StartContainerRequest startRequest = recordFactory .newRecordInstance(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); StartContainerResponse response = proxy.startContainer(startRequest); + + // container started properly. Stop the timer + timer.cancel(); + if (Thread.currentThread().isInterrupted()) { + // The timer cancelled the command in the mean while, but + // startContainer didn't throw exception + String message = "Start-container for " + event.getContainerID() + + " got interrupted. Returning."; + sendContainerLaunchFailedMsg(taskAttemptID, message); + return; + } + ByteBuffer portInfo = response .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); int port = -1; @@ -255,12 +309,9 @@ public class ContainerLauncherImpl extends AbstractService implements } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " + StringUtils.stringifyException(t); - LOG.error(message); - context.getEventHandler().handle( - new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message)); - context.getEventHandler().handle( - new TaskAttemptEvent(taskAttemptID, - TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); + sendContainerLaunchFailedMsg(taskAttemptID, message); + } finally { + timer.cancel(); } break; @@ -272,24 +323,44 @@ public class ContainerLauncherImpl extends AbstractService implements eventQueue.remove(event); // TODO: Any synchro needed? //deallocate the container context.getEventHandler().handle( - new ContainerAllocatorEvent(event.getTaskAttemptID(), - ContainerAllocator.EventType.CONTAINER_DEALLOCATE)); + new ContainerAllocatorEvent(taskAttemptID, + ContainerAllocator.EventType.CONTAINER_DEALLOCATE)); } else { + try { - ContainerManager proxy = - getCMProxy(containerID, containerManagerBindAddr, containerToken); - // TODO:check whether container is launched + timer.schedule(new CommandTimer(Thread.currentThread(), event), + nmTimeOut); - // kill the remote container if already launched - StopContainerRequest stopRequest = recordFactory - .newRecordInstance(StopContainerRequest.class); - stopRequest.setContainerId(event.getContainerID()); - proxy.stopContainer(stopRequest); + ContainerManager proxy = getCMProxy(containerManagerBindAddr, + containerToken); + if (Thread.currentThread().isInterrupted()) { + // The timer cancelled the command in the mean while. No need to + // return, send cleanedup event anyways. + LOG.info("Stop-container for " + event.getContainerID() + + " got interrupted."); + } else { + + // TODO:check whether container is launched + + // kill the remote container if already launched + StopContainerRequest stopRequest = recordFactory + .newRecordInstance(StopContainerRequest.class); + stopRequest.setContainerId(event.getContainerID()); + proxy.stopContainer(stopRequest); + } } catch (Throwable t) { - //ignore the cleanup failure - LOG.warn("cleanup failed for container " + event.getContainerID() , - t); + // ignore the cleanup failure + String message = "cleanup failed for container " + + event.getContainerID() + " : " + + StringUtils.stringifyException(t); + context.getEventHandler() + .handle( + new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, + message)); + LOG.warn(message); + } finally { + timer.cancel(); } // after killing, send killed event to taskattempt @@ -300,7 +371,17 @@ public class ContainerLauncherImpl extends AbstractService implements break; } } - + } + + @SuppressWarnings("unchecked") + void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID, + String message) { + LOG.error(message); + context.getEventHandler().handle( + new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message)); + context.getEventHandler().handle( + new TaskAttemptEvent(taskAttemptID, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java new file mode 100644 index 00000000000..2b552497d78 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java @@ -0,0 +1,132 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import java.io.IOException; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; +import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.records.ContainerToken; +import org.junit.Test; + +public class TestContainerLauncher { + + static final Log LOG = LogFactory + .getLog(TestContainerLauncher.class); + + @Test + public void testSlowNM() throws Exception { + test(false); + } + + @Test + public void testSlowNMWithInterruptsSwallowed() throws Exception { + test(true); + } + + private void test(boolean swallowInterrupts) throws Exception { + + MRApp app = new MRAppWithSlowNM(swallowInterrupts); + + Configuration conf = new Configuration(); + int maxAttempts = 1; + conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + + // Set low timeout for NM commands + conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000); + + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + Map tasks = job.getTasks(); + Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); + + Task task = tasks.values().iterator().next(); + app.waitForState(task, TaskState.SCHEDULED); + + Map attempts = tasks.values().iterator() + .next().getAttempts(); + Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts + .size()); + + TaskAttempt attempt = attempts.values().iterator().next(); + app.waitForState(attempt, TaskAttemptState.ASSIGNED); + + app.waitForState(job, JobState.FAILED); + + LOG.info("attempt.getDiagnostics: " + attempt.getDiagnostics()); + Assert.assertTrue(attempt.getDiagnostics().toString().contains( + "Container launch failed for container_0_0000_01_000000 : ")); + Assert.assertTrue(attempt.getDiagnostics().toString().contains( + ": java.lang.InterruptedException")); + + app.stop(); + } + + private static class MRAppWithSlowNM extends MRApp { + + final boolean swallowInterrupts; + + public MRAppWithSlowNM(boolean swallowInterrupts) { + super(1, 0, false, "TestContainerLauncher", true); + this.swallowInterrupts = swallowInterrupts; + } + + @Override + protected ContainerLauncher createContainerLauncher(AppContext context) { + return new ContainerLauncherImpl(context) { + @Override + protected ContainerManager getCMProxy( + String containerManagerBindAddr, ContainerToken containerToken) + throws IOException { + try { + synchronized (this) { + wait(); // Just hang the thread simulating a very slow NM. + } + } catch (InterruptedException e) { + LOG.info(e); + if (!swallowInterrupts) { + throw new IOException(e); + } else { + Thread.currentThread().interrupt(); + } + } + return null; + } + }; + }; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 5598cecb5ab..00304e57c9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.yarn.api.ContainerManager; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerToken; import org.junit.Test; @@ -219,7 +218,7 @@ public class TestFail { } @Override - protected ContainerManager getCMProxy(ContainerId containerID, + protected ContainerManager getCMProxy( String containerManagerBindAddr, ContainerToken containerToken) throws IOException { try {