MAPREDUCE-3226. Fix shutdown of fetcher threads. Contributed by Vinod K V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1187116 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6cd5a1b0f7
commit
02d5fa3e62
|
@ -1712,6 +1712,8 @@ Release 0.23.0 - Unreleased
|
||||||
|
|
||||||
MAPREDUCE-3188. Ensure correct shutdown in services. (todd via acmurthy)
|
MAPREDUCE-3188. Ensure correct shutdown in services. (todd via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-3226. Fix shutdown of fetcher threads. (vinodkv via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -60,7 +60,7 @@ class EventFetcher<K,V> extends Thread {
|
||||||
LOG.info(reduce + " Thread started: " + getName());
|
LOG.info(reduce + " Thread started: " + getName());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true && !Thread.currentThread().isInterrupted()) {
|
||||||
try {
|
try {
|
||||||
int numNewMaps = getMapCompletionEvents();
|
int numNewMaps = getMapCompletionEvents();
|
||||||
failures = 0;
|
failures = 0;
|
||||||
|
@ -68,7 +68,9 @@ class EventFetcher<K,V> extends Thread {
|
||||||
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
|
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
|
||||||
}
|
}
|
||||||
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
|
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
|
||||||
|
if (!Thread.currentThread().isInterrupted()) {
|
||||||
Thread.sleep(SLEEP_TIME);
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
}
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.info("Exception in getting events", ie);
|
LOG.info("Exception in getting events", ie);
|
||||||
// check to see whether to abort
|
// check to see whether to abort
|
||||||
|
@ -76,9 +78,11 @@ class EventFetcher<K,V> extends Thread {
|
||||||
throw new IOException("too many failures downloading events", ie);
|
throw new IOException("too many failures downloading events", ie);
|
||||||
}
|
}
|
||||||
// sleep for a bit
|
// sleep for a bit
|
||||||
|
if (!Thread.currentThread().isInterrupted()) {
|
||||||
Thread.sleep(RETRY_PERIOD);
|
Thread.sleep(RETRY_PERIOD);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
return;
|
return;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
|
@ -135,7 +135,7 @@ class Fetcher<K,V> extends Thread {
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true && !Thread.currentThread().isInterrupted()) {
|
||||||
MapHost host = null;
|
MapHost host = null;
|
||||||
try {
|
try {
|
||||||
// If merge is on, block
|
// If merge is on, block
|
||||||
|
|
Loading…
Reference in New Issue