diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index db7231e280d..3ac103aa855 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -449,6 +449,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe via mahadev) + MAPREDUCE-2450. Fixed a corner case with interrupted communication threads + leading to a long timeout in Task. (Rajesh Balamohan via acmurthy) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 17bf8cb4aa3..7e339c868c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -552,6 +552,8 @@ abstract public class Task implements Writable, Configurable { private InputSplit split = null; private Progress taskProgress; private Thread pingThread = null; + private boolean done = true; + private Object lock = new Object(); /** * flag that indicates whether progress update needs to be sent to parent. @@ -648,6 +650,9 @@ abstract public class Task implements Writable, Configurable { // get current flag value and reset it as well boolean sendProgress = resetProgressFlag(); while (!taskDone.get()) { + synchronized (lock) { + done = false; + } try { boolean taskFound = true; // whether TT knows about this task // sleep for a bit @@ -680,6 +685,7 @@ abstract public class Task implements Writable, Configurable { // came back up), kill ourselves if (!taskFound) { LOG.warn("Parent died. Exiting "+taskId); + resetDoneFlag(); System.exit(66); } @@ -692,10 +698,19 @@ abstract public class Task implements Writable, Configurable { if (remainingRetries == 0) { ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); LOG.warn("Last retry, killing "+taskId); + resetDoneFlag(); System.exit(65); } } } + //Notify that we are done with the work + resetDoneFlag(); + } + void resetDoneFlag() { + synchronized (lock) { + done = true; + lock.notify(); + } } public void startCommunicationThread() { if (pingThread == null) { @@ -706,6 +721,11 @@ abstract public class Task implements Writable, Configurable { } public void stopCommunicationThread() throws InterruptedException { if (pingThread != null) { + synchronized (lock) { + while (!done) { + lock.wait(); + } + } pingThread.interrupt(); pingThread.join(); }