MAPREDUCE-4794. DefaultSpeculator generates error messages on normal shutdown (Jason Lowe via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1451826 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Turner Eagles 2013-03-02 03:30:58 +00:00
parent 6680ee5f5d
commit 38c4e17ec1
2 changed files with 9 additions and 3 deletions

View File

@ -754,6 +754,9 @@ Release 0.23.7 - UNRELEASED
mapred-default has mapreduce.job.split.metainfo.maxsize (Jason Lowe via mapred-default has mapreduce.job.split.metainfo.maxsize (Jason Lowe via
jeagles) jeagles)
MAPREDUCE-4794. DefaultSpeculator generates error messages on normal
shutdown (Jason Lowe via jeagles)
Release 0.23.6 - UNRELEASED Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -91,6 +91,7 @@ public class DefaultSpeculator extends AbstractService implements
private final Configuration conf; private final Configuration conf;
private AppContext context; private AppContext context;
private Thread speculationBackgroundThread = null; private Thread speculationBackgroundThread = null;
private volatile boolean stopped = false;
private BlockingQueue<SpeculatorEvent> eventQueue private BlockingQueue<SpeculatorEvent> eventQueue
= new LinkedBlockingQueue<SpeculatorEvent>(); = new LinkedBlockingQueue<SpeculatorEvent>();
private TaskRuntimeEstimator estimator; private TaskRuntimeEstimator estimator;
@ -170,7 +171,7 @@ public class DefaultSpeculator extends AbstractService implements
= new Runnable() { = new Runnable() {
@Override @Override
public void run() { public void run() {
while (!Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
long backgroundRunStartTime = clock.getTime(); long backgroundRunStartTime = clock.getTime();
try { try {
int speculations = computeSpeculations(); int speculations = computeSpeculations();
@ -189,8 +190,9 @@ public class DefaultSpeculator extends AbstractService implements
Object pollResult Object pollResult
= scanControl.poll(wait, TimeUnit.MILLISECONDS); = scanControl.poll(wait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Background thread returning, interrupted : " + e); if (!stopped) {
e.printStackTrace(System.out); LOG.error("Background thread returning, interrupted", e);
}
return; return;
} }
} }
@ -205,6 +207,7 @@ public class DefaultSpeculator extends AbstractService implements
@Override @Override
public void stop() { public void stop() {
stopped = true;
// this could be called before background thread is established // this could be called before background thread is established
if (speculationBackgroundThread != null) { if (speculationBackgroundThread != null) {
speculationBackgroundThread.interrupt(); speculationBackgroundThread.interrupt();