MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly so that reducers don't hang in corner cases. (vinodkv)

svn merge --ignore-ancestry -c 1235545 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1235546 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-24 23:18:46 +00:00
parent 0bf9779489
commit c516cc863a
4 changed files with 39 additions and 18 deletions

View File

@ -501,6 +501,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable. MAPREDUCE-3505. yarn APPLICATION_CLASSPATH needs to be overridable.
(ahmed via tucu) (ahmed via tucu)
MAPREDUCE-3714. Fixed EventFetcher and Fetcher threads to shut-down properly
so that reducers don't hang in corner cases. (vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
@SuppressWarnings("deprecation")
class EventFetcher<K,V> extends Thread { class EventFetcher<K,V> extends Thread {
private static final long SLEEP_TIME = 1000; private static final long SLEEP_TIME = 1000;
private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MAX_EVENTS_TO_FETCH = 10000;
@ -42,6 +43,8 @@ class EventFetcher<K,V> extends Thread {
private int maxMapRuntime = 0; private int maxMapRuntime = 0;
private volatile boolean stopped = false;
public EventFetcher(TaskAttemptID reduce, public EventFetcher(TaskAttemptID reduce,
TaskUmbilicalProtocol umbilical, TaskUmbilicalProtocol umbilical,
ShuffleScheduler<K,V> scheduler, ShuffleScheduler<K,V> scheduler,
@ -60,7 +63,7 @@ public void run() {
LOG.info(reduce + " Thread started: " + getName()); LOG.info(reduce + " Thread started: " + getName());
try { try {
while (true && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
try { try {
int numNewMaps = getMapCompletionEvents(); int numNewMaps = getMapCompletionEvents();
failures = 0; failures = 0;
@ -71,6 +74,9 @@ public void run() {
if (!Thread.currentThread().isInterrupted()) { if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
} }
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) { } catch (IOException ie) {
LOG.info("Exception in getting events", ie); LOG.info("Exception in getting events", ie);
// check to see whether to abort // check to see whether to abort
@ -91,6 +97,16 @@ public void run() {
} }
} }
public void shutDown() {
this.stopped = true;
interrupt();
try {
join(5000);
} catch(InterruptedException ie) {
LOG.warn("Got interrupted while joining " + getName(), ie);
}
}
/** /**
* Queries the {@link TaskTracker} for a set of map-completion events * Queries the {@link TaskTracker} for a set of map-completion events
* from a given event ID. * from a given event ID.

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@SuppressWarnings({"deprecation"})
class Fetcher<K,V> extends Thread { class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class); private static final Log LOG = LogFactory.getLog(Fetcher.class);
@ -88,6 +89,8 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
private final Decompressor decompressor; private final Decompressor decompressor;
private final SecretKey jobTokenSecret; private final SecretKey jobTokenSecret;
private volatile boolean stopped = false;
public Fetcher(JobConf job, TaskAttemptID reduceId, public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics, Reporter reporter, ShuffleClientMetrics metrics,
@ -135,7 +138,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,
public void run() { public void run() {
try { try {
while (true && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null; MapHost host = null;
try { try {
// If merge is on, block // If merge is on, block
@ -161,6 +164,16 @@ public void run() {
} }
} }
public void shutDown() throws InterruptedException {
this.stopped = true;
interrupt();
try {
join(5000);
} catch (InterruptedException ie) {
LOG.warn("Got interrupt while joining " + getName(), ie);
}
}
/** /**
* The crux of the matter... * The crux of the matter...
* *

View File

@ -19,8 +19,6 @@
import java.io.IOException; import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -33,17 +31,17 @@
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progress;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
public class Shuffle<K, V> implements ExceptionReporter { public class Shuffle<K, V> implements ExceptionReporter {
private static final Log LOG = LogFactory.getLog(Shuffle.class);
private static final int PROGRESS_FREQUENCY = 2000; private static final int PROGRESS_FREQUENCY = 2000;
private final TaskAttemptID reduceId; private final TaskAttemptID reduceId;
@ -100,7 +98,6 @@ public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS,
this, mergePhase, mapOutputFile); this, mergePhase, mapOutputFile);
} }
@SuppressWarnings("unchecked")
public RawKeyValueIterator run() throws IOException, InterruptedException { public RawKeyValueIterator run() throws IOException, InterruptedException {
// Start the map-completion events fetcher thread // Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher = final EventFetcher<K,V> eventFetcher =
@ -130,19 +127,11 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
} }
// Stop the event-fetcher thread // Stop the event-fetcher thread
eventFetcher.interrupt(); eventFetcher.shutDown();
try {
eventFetcher.join();
} catch(Throwable t) {
LOG.info("Failed to stop " + eventFetcher.getName(), t);
}
// Stop the map-output fetcher threads // Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) { for (Fetcher<K,V> fetcher : fetchers) {
fetcher.interrupt(); fetcher.shutDown();
}
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.join();
} }
fetchers = null; fetchers = null;