diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1d2b3c42e69..a15db7a16e2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -501,6 +501,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable. (ahmed via tucu) + MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly + so that reducers don't hang in corner cases. (vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java index 6facb47aa21..fd80ec2b1e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.TaskAttemptID; +@SuppressWarnings("deprecation") class EventFetcher extends Thread { private static final long SLEEP_TIME = 1000; private static final int MAX_EVENTS_TO_FETCH = 10000; @@ -41,6 +42,8 @@ class EventFetcher extends Thread { private ExceptionReporter exceptionReporter = null; private int maxMapRuntime = 0; + + private volatile boolean stopped = false; public EventFetcher(TaskAttemptID reduce, TaskUmbilicalProtocol umbilical, @@ -60,7 +63,7 @@ public void run() { LOG.info(reduce + " Thread started: " + getName()); try { - while (true && !Thread.currentThread().isInterrupted()) { + while (!stopped && !Thread.currentThread().isInterrupted()) { try { int numNewMaps = getMapCompletionEvents(); failures = 0; @@ -71,6 +74,9 @@ public void run() { if (!Thread.currentThread().isInterrupted()) { Thread.sleep(SLEEP_TIME); } + } catch (InterruptedException e) { + LOG.info("EventFetcher is interrupted.. Returning"); + return; } catch (IOException ie) { LOG.info("Exception in getting events", ie); // check to see whether to abort @@ -90,6 +96,16 @@ public void run() { return; } } + + public void shutDown() { + this.stopped = true; + interrupt(); + try { + join(5000); + } catch(InterruptedException ie) { + LOG.warn("Got interrupted while joining " + getName(), ie); + } + } /** * Queries the {@link TaskTracker} for a set of map-completion events diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 5a213f05c1a..93200873d6b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -48,6 +48,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; +@SuppressWarnings({"deprecation"}) class Fetcher extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -88,6 +89,8 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, private final Decompressor decompressor; private final SecretKey jobTokenSecret; + private volatile boolean stopped = false; + public Fetcher(JobConf job, TaskAttemptID reduceId, ShuffleScheduler scheduler, MergeManager merger, Reporter reporter, ShuffleClientMetrics metrics, @@ -135,7 +138,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId, public void run() { try { - while (true && !Thread.currentThread().isInterrupted()) { + while (!stopped && !Thread.currentThread().isInterrupted()) { MapHost host = null; try { // If merge is on, block @@ -160,7 +163,17 @@ public void run() { exceptionReporter.reportException(t); } } - + + public void shutDown() throws InterruptedException { + this.stopped = true; + interrupt(); + try { + join(5000); + } catch (InterruptedException ie) { + LOG.warn("Got interrupt while joining " + getName(), ie); + } + } + /** * The crux of the matter... * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index 4b8b854952c..e7d7d71d079 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -19,8 +19,6 @@ import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; @@ -33,17 +31,17 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; -import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progress; @InterfaceAudience.Private @InterfaceStability.Unstable +@SuppressWarnings({"deprecation", "unchecked", "rawtypes"}) public class Shuffle implements ExceptionReporter { - private static final Log LOG = LogFactory.getLog(Shuffle.class); private static final int PROGRESS_FREQUENCY = 2000; private final TaskAttemptID reduceId; @@ -100,7 +98,6 @@ public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, this, mergePhase, mapOutputFile); } - @SuppressWarnings("unchecked") public RawKeyValueIterator run() throws IOException, InterruptedException { // Start the map-completion events fetcher thread final EventFetcher eventFetcher = @@ -130,19 +127,11 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { } // Stop the event-fetcher thread - eventFetcher.interrupt(); - try { - eventFetcher.join(); - } catch(Throwable t) { - LOG.info("Failed to stop " + eventFetcher.getName(), t); - } + eventFetcher.shutDown(); // Stop the map-output fetcher threads for (Fetcher fetcher : fetchers) { - fetcher.interrupt(); - } - for (Fetcher fetcher : fetchers) { - fetcher.join(); + fetcher.shutDown(); } fetchers = null;