MAPREDUCE-5317. Stale files left behind for failed jobs. Contributed by Ravi Prakash

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1503499 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-07-15 22:24:37 +00:00
parent 37b2a96055
commit 8a037edbfb
5 changed files with 164 additions and 8 deletions

View File

@ -158,6 +158,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl
(Devaraj K via jlowe) (Devaraj K via jlowe)
MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
jlowe)
Release 2.2.0 - UNRELEASED Release 2.2.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -1220,6 +1223,9 @@ Release 0.23.10 - UNRELEASED
MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
input path dir (Devaraj K via jlowe) input path dir (Devaraj K via jlowe)
MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
jlowe)
Release 0.23.9 - 2013-07-08 Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -25,6 +25,7 @@ public enum JobStateInternal {
RUNNING, RUNNING,
COMMITTING, COMMITTING,
SUCCEEDED, SUCCEEDED,
FAIL_WAIT,
FAIL_ABORT, FAIL_ABORT,
FAILED, FAILED,
KILL_WAIT, KILL_WAIT,

View File

@ -44,6 +44,7 @@ public enum JobEventType {
//Producer:Job //Producer:Job
JOB_COMPLETED, JOB_COMPLETED,
JOB_FAIL_WAIT_TIMEDOUT,
//Producer:Any component //Producer:Any component
JOB_DIAGNOSTIC_UPDATE, JOB_DIAGNOSTIC_UPDATE,

View File

@ -30,6 +30,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -313,7 +316,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
.addTransition .addTransition
(JobStateInternal.RUNNING, (JobStateInternal.RUNNING,
EnumSet.of(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.RUNNING,
JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT), JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT,
JobStateInternal.FAIL_ABORT),
JobEventType.JOB_TASK_COMPLETED, JobEventType.JOB_TASK_COMPLETED,
new TaskCompletedTransition()) new TaskCompletedTransition())
.addTransition .addTransition
@ -424,6 +428,36 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED, JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
JobEventType.JOB_MAP_TASK_RESCHEDULED)) JobEventType.JOB_MAP_TASK_RESCHEDULED))
// Transitions from FAIL_WAIT state
.addTransition(JobStateInternal.FAIL_WAIT,
JobStateInternal.FAIL_WAIT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.FAIL_WAIT,
JobStateInternal.FAIL_WAIT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobStateInternal.FAIL_WAIT,
EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT),
JobEventType.JOB_TASK_COMPLETED,
new JobFailWaitTransition())
.addTransition(JobStateInternal.FAIL_WAIT,
JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT,
new JobFailWaitTimedOutTransition())
.addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED,
JobEventType.JOB_KILL,
new KilledDuringAbortTransition())
.addTransition(JobStateInternal.FAIL_WAIT,
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.FAIL_WAIT,
JobStateInternal.FAIL_WAIT,
EnumSet.of(JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_AM_REBOOT))
//Transitions from FAIL_ABORT state //Transitions from FAIL_ABORT state
.addTransition(JobStateInternal.FAIL_ABORT, .addTransition(JobStateInternal.FAIL_ABORT,
JobStateInternal.FAIL_ABORT, JobStateInternal.FAIL_ABORT,
@ -451,7 +485,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED, JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_AM_REBOOT)) JobEventType.JOB_AM_REBOOT,
JobEventType.JOB_FAIL_WAIT_TIMEDOUT))
// Transitions from KILL_ABORT state // Transitions from KILL_ABORT state
.addTransition(JobStateInternal.KILL_ABORT, .addTransition(JobStateInternal.KILL_ABORT,
@ -602,6 +637,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private JobStateInternal forcedState = null; private JobStateInternal forcedState = null;
//Executor used for running future tasks. Setting thread pool size to 1
private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
private ScheduledFuture failWaitTriggerScheduledFuture;
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler, Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, TaskAttemptListener taskAttemptListener,
@ -962,6 +1001,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
case SETUP: case SETUP:
case COMMITTING: case COMMITTING:
return JobState.RUNNING; return JobState.RUNNING;
case FAIL_WAIT:
case FAIL_ABORT: case FAIL_ABORT:
return JobState.FAILED; return JobState.FAILED;
case REBOOT: case REBOOT:
@ -1566,6 +1606,42 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
} }
} }
//This transition happens when a job is to be failed. It waits for all the
//tasks to finish / be killed.
private static class JobFailWaitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@Override
public JobStateInternal transition(JobImpl job, JobEvent event) {
if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
for(Task task: job.tasks.values()) {
if(!task.isFinished()) {
return JobStateInternal.FAIL_WAIT;
}
}
}
//Finished waiting. All tasks finished / were killed
job.failWaitTriggerScheduledFuture.cancel(false);
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
return JobStateInternal.FAIL_ABORT;
}
}
//This transition happens when a job to be failed times out while waiting on
//tasks that had been sent the KILL signal. It is triggered by a
//ScheduledFuture task queued in the executor.
private static class JobFailWaitTimedOutTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
+ " Going to fail job anyway");
job.failWaitTriggerScheduledFuture.cancel(false);
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
}
}
// JobFinishedEvent triggers the move of the history file out of the staging // JobFinishedEvent triggers the move of the history file out of the staging
// area. May need to create a new event type for this if JobFinished should // area. May need to create a new event type for this if JobFinished should
// not be generated for KilledJobs, etc. // not be generated for KilledJobs, etc.
@ -1798,6 +1874,23 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return checkJobAfterTaskCompletion(job); return checkJobAfterTaskCompletion(job);
} }
//This class is used to queue a ScheduledFuture to send an event to a job
//after some delay. This can be used to wait for maximum amount of time
//before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for
//all tasks to be killed.
static class TriggerScheduledFuture implements Runnable {
JobEvent toSend;
JobImpl job;
TriggerScheduledFuture(JobImpl job, JobEvent toSend) {
this.toSend = toSend;
this.job = job;
}
public void run() {
LOG.info("Sending event " + toSend + " to " + job.getID());
job.getEventHandler().handle(toSend);
}
}
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
//check for Job failure //check for Job failure
if (job.failedMapTaskCount*100 > if (job.failedMapTaskCount*100 >
@ -1811,12 +1904,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
" failedReduces:" + job.failedReduceTaskCount; " failedReduces:" + job.failedReduceTaskCount;
LOG.info(diagnosticMsg); LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg); job.addDiagnostic(diagnosticMsg);
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext, //Send kill signal to all unfinished tasks here.
org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); boolean allDone = true;
for (Task task : job.tasks.values()) {
if(!task.isFinished()) {
allDone = false;
job.eventHandler.handle(
new TaskEvent(task.getID(), TaskEventType.T_KILL));
}
}
//If all tasks are already done, we should go directly to FAIL_ABORT
if(allDone) {
return JobStateInternal.FAIL_ABORT; return JobStateInternal.FAIL_ABORT;
} }
//Set max timeout to wait for the tasks to get killed
job.failWaitTriggerScheduledFuture = job.executor.schedule(
new TriggerScheduledFuture(job, new JobEvent(job.getID(),
JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt(
MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS),
TimeUnit.MILLISECONDS);
return JobStateInternal.FAIL_WAIT;
}
return job.checkReadyForCommit(); return job.checkReadyForCommit();
} }

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -84,6 +85,7 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
@ -332,6 +334,39 @@ public class TestJobImpl {
commitHandler.stop(); commitHandler.stop();
} }
@Test
public void testAbortJobCalledAfterKillingTasks() throws IOException,
InterruptedException {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
InlineDispatcher dispatcher = new InlineDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = Mockito.mock(OutputCommitter.class);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
//Fail one task. This should land the JobImpl in the FAIL_WAIT state
job.handle(new JobTaskEvent(
MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
TaskState.FAILED));
//Verify abort job hasn't been called
Mockito.verify(committer, Mockito.never())
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
assertJobState(job, JobStateInternal.FAIL_WAIT);
//Verify abortJob is called once and the job failed
Mockito.verify(committer, Mockito.timeout(2000).times(1))
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
assertJobState(job, JobStateInternal.FAILED);
dispatcher.stop();
}
@Test(timeout=20000) @Test(timeout=20000)
public void testKilledDuringFailAbort() throws Exception { public void testKilledDuringFailAbort() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();