From d05e6d2671db3876756e5a55c369c189a8fcbdb7 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 17 Jan 2012 06:10:50 +0000 Subject: [PATCH] MAPREDUCE-2450. Fixed a corner case with interrupted communication threads leading to a long timeout in Task. Contributed by Rajesh Balamohan. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1232314 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../java/org/apache/hadoop/mapred/Task.java | 20 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8a1a5dc7959..925081a2127 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -503,6 +503,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe via mahadev) + MAPREDUCE-2450. Fixed a corner case with interrupted communication threads + leading to a long timeout in Task. (Rajesh Balamohan via acmurthy) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 17bf8cb4aa3..7e339c868c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -552,6 +552,8 @@ protected class TaskReporter private InputSplit split = null; private Progress taskProgress; private Thread pingThread = null; + private boolean done = true; + private Object lock = new Object(); /** * flag that indicates whether progress update needs to be sent to parent. @@ -648,6 +650,9 @@ public void run() { // get current flag value and reset it as well boolean sendProgress = resetProgressFlag(); while (!taskDone.get()) { + synchronized (lock) { + done = false; + } try { boolean taskFound = true; // whether TT knows about this task // sleep for a bit @@ -680,6 +685,7 @@ public void run() { // came back up), kill ourselves if (!taskFound) { LOG.warn("Parent died. Exiting "+taskId); + resetDoneFlag(); System.exit(66); } @@ -692,10 +698,19 @@ public void run() { if (remainingRetries == 0) { ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); LOG.warn("Last retry, killing "+taskId); + resetDoneFlag(); System.exit(65); } } } + //Notify that we are done with the work + resetDoneFlag(); + } + void resetDoneFlag() { + synchronized (lock) { + done = true; + lock.notify(); + } } public void startCommunicationThread() { if (pingThread == null) { @@ -706,6 +721,11 @@ public void startCommunicationThread() { } public void stopCommunicationThread() throws InterruptedException { if (pingThread != null) { + synchronized (lock) { + while (!done) { + lock.wait(); + } + } pingThread.interrupt(); pingThread.join(); }