diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 15e9499e56f..afd3dd01d7d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1712,6 +1712,8 @@ Release 0.23.0 - Unreleased MAPREDUCE-3188. Ensure correct shutdown in services. (todd via acmurthy) + MAPREDUCE-3226. Fix shutdown of fetcher threads. (vinodkv via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java index 8f0dad75d55..6facb47aa21 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java @@ -60,7 +60,7 @@ class EventFetcher extends Thread { LOG.info(reduce + " Thread started: " + getName()); try { - while (true) { + while (true && !Thread.currentThread().isInterrupted()) { try { int numNewMaps = getMapCompletionEvents(); failures = 0; @@ -68,7 +68,9 @@ class EventFetcher extends Thread { LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs"); } LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME); - Thread.sleep(SLEEP_TIME); + if (!Thread.currentThread().isInterrupted()) { + Thread.sleep(SLEEP_TIME); + } } catch (IOException ie) { LOG.info("Exception in getting events", ie); // check to see whether to abort @@ -76,7 +78,9 @@ class EventFetcher extends Thread { throw new IOException("too many failures downloading events", ie); } // sleep for a bit - Thread.sleep(RETRY_PERIOD); + if (!Thread.currentThread().isInterrupted()) { + Thread.sleep(RETRY_PERIOD); + } } } } catch (InterruptedException e) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 994251addb4..5a213f05c1a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -135,7 +135,7 @@ class Fetcher extends Thread { public void run() { try { - while (true) { + while (true && !Thread.currentThread().isInterrupted()) { MapHost host = null; try { // If merge is on, block