MAPREDUCE-5888. Failed job leaves hung AM after it unregisters (Jason Lowe via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594317 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Turner Eagles 2014-05-13 18:27:26 +00:00
parent 8e5b5165c1
commit 4bc3371824
2 changed files with 15 additions and 2 deletions

View File

@ -219,6 +219,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5884. History server uses short user name when canceling tokens
(Mohammad Kamrul Islam via jlowe)
MAPREDUCE-5888. Failed job leaves hung AM after it unregisters (Jason Lowe
via jeagles)
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -129,6 +130,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
@ -644,8 +647,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private JobStateInternal forcedState = null;
//Executor used for running future tasks. Setting thread pool size to 1
private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
//Executor used for running future tasks.
private ScheduledThreadPoolExecutor executor;
private ScheduledFuture failWaitTriggerScheduledFuture;
private JobState lastNonFinalState = JobState.NEW;
@ -687,6 +690,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.aclsManager = new JobACLsManager(conf);
this.username = System.getProperty("user.name");
this.jobACLs = aclsManager.constructJobACLs(conf);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Job Fail Wait Timeout Monitor #%d")
.setDaemon(true)
.build();
this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);