Merge -c 1187116 from trunk to branch-0.23 to complete fix for MAPREDUCE-3226.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1187119 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-20 22:45:08 +00:00
parent ac21f20ac6
commit fe370841e6
3 changed files with 10 additions and 4 deletions

View File

@ -1656,6 +1656,8 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3188. Ensure correct shutdown in services. (todd via acmurthy) MAPREDUCE-3188. Ensure correct shutdown in services. (todd via acmurthy)
MAPREDUCE-3226. Fix shutdown of fetcher threads. (vinodkv via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -60,7 +60,7 @@ class EventFetcher<K,V> extends Thread {
LOG.info(reduce + " Thread started: " + getName()); LOG.info(reduce + " Thread started: " + getName());
try { try {
while (true) { while (true && !Thread.currentThread().isInterrupted()) {
try { try {
int numNewMaps = getMapCompletionEvents(); int numNewMaps = getMapCompletionEvents();
failures = 0; failures = 0;
@ -68,7 +68,9 @@ class EventFetcher<K,V> extends Thread {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs"); LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
} }
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME); LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
}
} catch (IOException ie) { } catch (IOException ie) {
LOG.info("Exception in getting events", ie); LOG.info("Exception in getting events", ie);
// check to see whether to abort // check to see whether to abort
@ -76,9 +78,11 @@ class EventFetcher<K,V> extends Thread {
throw new IOException("too many failures downloading events", ie); throw new IOException("too many failures downloading events", ie);
} }
// sleep for a bit // sleep for a bit
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(RETRY_PERIOD); Thread.sleep(RETRY_PERIOD);
} }
} }
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -135,7 +135,7 @@ class Fetcher<K,V> extends Thread {
public void run() { public void run() {
try { try {
while (true) { while (true && !Thread.currentThread().isInterrupted()) {
MapHost host = null; MapHost host = null;
try { try {
// If merge is on, block // If merge is on, block