svn merge -c 1503744 to revert MAPREDUCE-5317. Stale files left behind for failed jobs
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503747 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6f1452044a
commit
2386e127b8
|
@ -21,9 +21,6 @@ Release 2.3.0 - UNRELEASED
|
||||||
MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl
|
MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl
|
||||||
(Devaraj K via jlowe)
|
(Devaraj K via jlowe)
|
||||||
|
|
||||||
MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
|
|
||||||
jlowe)
|
|
||||||
|
|
||||||
Release 2.2.0 - UNRELEASED
|
Release 2.2.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1098,9 +1095,6 @@ Release 0.23.10 - UNRELEASED
|
||||||
MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
|
MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
|
||||||
input path dir (Devaraj K via jlowe)
|
input path dir (Devaraj K via jlowe)
|
||||||
|
|
||||||
MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
|
|
||||||
jlowe)
|
|
||||||
|
|
||||||
Release 0.23.9 - 2013-07-08
|
Release 0.23.9 - 2013-07-08
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -25,7 +25,6 @@ public enum JobStateInternal {
|
||||||
RUNNING,
|
RUNNING,
|
||||||
COMMITTING,
|
COMMITTING,
|
||||||
SUCCEEDED,
|
SUCCEEDED,
|
||||||
FAIL_WAIT,
|
|
||||||
FAIL_ABORT,
|
FAIL_ABORT,
|
||||||
FAILED,
|
FAILED,
|
||||||
KILL_WAIT,
|
KILL_WAIT,
|
||||||
|
|
|
@ -44,7 +44,6 @@ public enum JobEventType {
|
||||||
|
|
||||||
//Producer:Job
|
//Producer:Job
|
||||||
JOB_COMPLETED,
|
JOB_COMPLETED,
|
||||||
JOB_FAIL_WAIT_TIMEDOUT,
|
|
||||||
|
|
||||||
//Producer:Any component
|
//Producer:Any component
|
||||||
JOB_DIAGNOSTIC_UPDATE,
|
JOB_DIAGNOSTIC_UPDATE,
|
||||||
|
|
|
@ -30,9 +30,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -316,8 +313,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
.addTransition
|
.addTransition
|
||||||
(JobStateInternal.RUNNING,
|
(JobStateInternal.RUNNING,
|
||||||
EnumSet.of(JobStateInternal.RUNNING,
|
EnumSet.of(JobStateInternal.RUNNING,
|
||||||
JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT,
|
JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT),
|
||||||
JobStateInternal.FAIL_ABORT),
|
|
||||||
JobEventType.JOB_TASK_COMPLETED,
|
JobEventType.JOB_TASK_COMPLETED,
|
||||||
new TaskCompletedTransition())
|
new TaskCompletedTransition())
|
||||||
.addTransition
|
.addTransition
|
||||||
|
@ -428,37 +424,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
||||||
JobEventType.JOB_MAP_TASK_RESCHEDULED))
|
JobEventType.JOB_MAP_TASK_RESCHEDULED))
|
||||||
|
|
||||||
// Transitions from FAIL_WAIT state
|
// Transitions from FAIL_ABORT state
|
||||||
.addTransition(JobStateInternal.FAIL_WAIT,
|
|
||||||
JobStateInternal.FAIL_WAIT,
|
|
||||||
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
|
||||||
DIAGNOSTIC_UPDATE_TRANSITION)
|
|
||||||
.addTransition(JobStateInternal.FAIL_WAIT,
|
|
||||||
JobStateInternal.FAIL_WAIT,
|
|
||||||
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
|
|
||||||
.addTransition(JobStateInternal.FAIL_WAIT,
|
|
||||||
EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT),
|
|
||||||
JobEventType.JOB_TASK_COMPLETED,
|
|
||||||
new JobFailWaitTransition())
|
|
||||||
.addTransition(JobStateInternal.FAIL_WAIT,
|
|
||||||
JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT,
|
|
||||||
new JobFailWaitTimedOutTransition())
|
|
||||||
.addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED,
|
|
||||||
JobEventType.JOB_KILL,
|
|
||||||
new KilledDuringAbortTransition())
|
|
||||||
.addTransition(JobStateInternal.FAIL_WAIT,
|
|
||||||
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
|
|
||||||
INTERNAL_ERROR_TRANSITION)
|
|
||||||
// Ignore-able events
|
|
||||||
.addTransition(JobStateInternal.FAIL_WAIT,
|
|
||||||
JobStateInternal.FAIL_WAIT,
|
|
||||||
EnumSet.of(JobEventType.JOB_UPDATED_NODES,
|
|
||||||
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
|
||||||
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
|
||||||
JobEventType.JOB_AM_REBOOT))
|
|
||||||
|
|
||||||
//Transitions from FAIL_ABORT state
|
|
||||||
.addTransition(JobStateInternal.FAIL_ABORT,
|
.addTransition(JobStateInternal.FAIL_ABORT,
|
||||||
JobStateInternal.FAIL_ABORT,
|
JobStateInternal.FAIL_ABORT,
|
||||||
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
||||||
|
@ -485,8 +451,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
||||||
JobEventType.JOB_COMMIT_COMPLETED,
|
JobEventType.JOB_COMMIT_COMPLETED,
|
||||||
JobEventType.JOB_COMMIT_FAILED,
|
JobEventType.JOB_COMMIT_FAILED,
|
||||||
JobEventType.JOB_AM_REBOOT,
|
JobEventType.JOB_AM_REBOOT))
|
||||||
JobEventType.JOB_FAIL_WAIT_TIMEDOUT))
|
|
||||||
|
|
||||||
// Transitions from KILL_ABORT state
|
// Transitions from KILL_ABORT state
|
||||||
.addTransition(JobStateInternal.KILL_ABORT,
|
.addTransition(JobStateInternal.KILL_ABORT,
|
||||||
|
@ -637,10 +602,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
|
|
||||||
private JobStateInternal forcedState = null;
|
private JobStateInternal forcedState = null;
|
||||||
|
|
||||||
//Executor used for running future tasks. Setting thread pool size to 1
|
|
||||||
private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
|
|
||||||
private ScheduledFuture failWaitTriggerScheduledFuture;
|
|
||||||
|
|
||||||
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
||||||
Configuration conf, EventHandler eventHandler,
|
Configuration conf, EventHandler eventHandler,
|
||||||
TaskAttemptListener taskAttemptListener,
|
TaskAttemptListener taskAttemptListener,
|
||||||
|
@ -1001,7 +962,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
case SETUP:
|
case SETUP:
|
||||||
case COMMITTING:
|
case COMMITTING:
|
||||||
return JobState.RUNNING;
|
return JobState.RUNNING;
|
||||||
case FAIL_WAIT:
|
|
||||||
case FAIL_ABORT:
|
case FAIL_ABORT:
|
||||||
return JobState.FAILED;
|
return JobState.FAILED;
|
||||||
case REBOOT:
|
case REBOOT:
|
||||||
|
@ -1605,43 +1565,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
job.unsuccessfulFinish(finalState);
|
job.unsuccessfulFinish(finalState);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//This transition happens when a job is to be failed. It waits for all the
|
|
||||||
//tasks to finish / be killed.
|
|
||||||
private static class JobFailWaitTransition
|
|
||||||
implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
|
|
||||||
@Override
|
|
||||||
public JobStateInternal transition(JobImpl job, JobEvent event) {
|
|
||||||
if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
|
|
||||||
for(Task task: job.tasks.values()) {
|
|
||||||
if(!task.isFinished()) {
|
|
||||||
return JobStateInternal.FAIL_WAIT;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//Finished waiting. All tasks finished / were killed
|
|
||||||
job.failWaitTriggerScheduledFuture.cancel(false);
|
|
||||||
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
|
|
||||||
job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
|
|
||||||
return JobStateInternal.FAIL_ABORT;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//This transition happens when a job to be failed times out while waiting on
|
|
||||||
//tasks that had been sent the KILL signal. It is triggered by a
|
|
||||||
//ScheduledFuture task queued in the executor.
|
|
||||||
private static class JobFailWaitTimedOutTransition
|
|
||||||
implements SingleArcTransition<JobImpl, JobEvent> {
|
|
||||||
@Override
|
|
||||||
public void transition(JobImpl job, JobEvent event) {
|
|
||||||
LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
|
|
||||||
+ " Going to fail job anyway");
|
|
||||||
job.failWaitTriggerScheduledFuture.cancel(false);
|
|
||||||
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
|
|
||||||
job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// JobFinishedEvent triggers the move of the history file out of the staging
|
// JobFinishedEvent triggers the move of the history file out of the staging
|
||||||
// area. May need to create a new event type for this if JobFinished should
|
// area. May need to create a new event type for this if JobFinished should
|
||||||
// not be generated for KilledJobs, etc.
|
// not be generated for KilledJobs, etc.
|
||||||
|
@ -1874,23 +1798,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
return checkJobAfterTaskCompletion(job);
|
return checkJobAfterTaskCompletion(job);
|
||||||
}
|
}
|
||||||
|
|
||||||
//This class is used to queue a ScheduledFuture to send an event to a job
|
|
||||||
//after some delay. This can be used to wait for maximum amount of time
|
|
||||||
//before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for
|
|
||||||
//all tasks to be killed.
|
|
||||||
static class TriggerScheduledFuture implements Runnable {
|
|
||||||
JobEvent toSend;
|
|
||||||
JobImpl job;
|
|
||||||
TriggerScheduledFuture(JobImpl job, JobEvent toSend) {
|
|
||||||
this.toSend = toSend;
|
|
||||||
this.job = job;
|
|
||||||
}
|
|
||||||
public void run() {
|
|
||||||
LOG.info("Sending event " + toSend + " to " + job.getID());
|
|
||||||
job.getEventHandler().handle(toSend);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
|
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
|
||||||
//check for Job failure
|
//check for Job failure
|
||||||
if (job.failedMapTaskCount*100 >
|
if (job.failedMapTaskCount*100 >
|
||||||
|
@ -1904,30 +1811,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
" failedReduces:" + job.failedReduceTaskCount;
|
" failedReduces:" + job.failedReduceTaskCount;
|
||||||
LOG.info(diagnosticMsg);
|
LOG.info(diagnosticMsg);
|
||||||
job.addDiagnostic(diagnosticMsg);
|
job.addDiagnostic(diagnosticMsg);
|
||||||
|
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
|
||||||
//Send kill signal to all unfinished tasks here.
|
job.jobContext,
|
||||||
boolean allDone = true;
|
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
|
||||||
for (Task task : job.tasks.values()) {
|
return JobStateInternal.FAIL_ABORT;
|
||||||
if(!task.isFinished()) {
|
|
||||||
allDone = false;
|
|
||||||
job.eventHandler.handle(
|
|
||||||
new TaskEvent(task.getID(), TaskEventType.T_KILL));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//If all tasks are already done, we should go directly to FAIL_ABORT
|
|
||||||
if(allDone) {
|
|
||||||
return JobStateInternal.FAIL_ABORT;
|
|
||||||
}
|
|
||||||
|
|
||||||
//Set max timeout to wait for the tasks to get killed
|
|
||||||
job.failWaitTriggerScheduledFuture = job.executor.schedule(
|
|
||||||
new TriggerScheduledFuture(job, new JobEvent(job.getID(),
|
|
||||||
JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt(
|
|
||||||
MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
|
|
||||||
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS),
|
|
||||||
TimeUnit.MILLISECONDS);
|
|
||||||
return JobStateInternal.FAIL_WAIT;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return job.checkReadyForCommit();
|
return job.checkReadyForCommit();
|
||||||
|
|
|
@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
|
||||||
import org.apache.hadoop.yarn.state.StateMachine;
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
@ -85,7 +84,6 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -334,39 +332,6 @@ public class TestJobImpl {
|
||||||
commitHandler.stop();
|
commitHandler.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAbortJobCalledAfterKillingTasks() throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
||||||
conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
|
|
||||||
InlineDispatcher dispatcher = new InlineDispatcher();
|
|
||||||
dispatcher.init(conf);
|
|
||||||
dispatcher.start();
|
|
||||||
OutputCommitter committer = Mockito.mock(OutputCommitter.class);
|
|
||||||
CommitterEventHandler commitHandler =
|
|
||||||
createCommitterEventHandler(dispatcher, committer);
|
|
||||||
commitHandler.init(conf);
|
|
||||||
commitHandler.start();
|
|
||||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
|
||||||
|
|
||||||
//Fail one task. This should land the JobImpl in the FAIL_WAIT state
|
|
||||||
job.handle(new JobTaskEvent(
|
|
||||||
MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
|
|
||||||
TaskState.FAILED));
|
|
||||||
//Verify abort job hasn't been called
|
|
||||||
Mockito.verify(committer, Mockito.never())
|
|
||||||
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
|
|
||||||
assertJobState(job, JobStateInternal.FAIL_WAIT);
|
|
||||||
|
|
||||||
//Verify abortJob is called once and the job failed
|
|
||||||
Mockito.verify(committer, Mockito.timeout(2000).times(1))
|
|
||||||
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
|
|
||||||
assertJobState(job, JobStateInternal.FAILED);
|
|
||||||
|
|
||||||
dispatcher.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=20000)
|
@Test(timeout=20000)
|
||||||
public void testKilledDuringFailAbort() throws Exception {
|
public void testKilledDuringFailAbort() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
Loading…
Reference in New Issue