svn merge -c 1506154 FIXES: MAPREDUCE-5317. Stale files left behind for failed jobs. Contributed by Ravi Prakash
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1506157 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f284b538fd
commit
673b762364
|
@ -30,6 +30,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
MAPREDUCE-5404. HSAdminServer does not use ephemeral ports in minicluster
|
MAPREDUCE-5404. HSAdminServer does not use ephemeral ports in minicluster
|
||||||
mode (Ted Yu via jlowe)
|
mode (Ted Yu via jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
|
||||||
|
jlowe)
|
||||||
|
|
||||||
Release 2.1.1-beta - UNRELEASED
|
Release 2.1.1-beta - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1097,6 +1100,9 @@ Release 0.23.10 - UNRELEASED
|
||||||
MAPREDUCE-5380. Invalid mapred command should return non-zero exit code
|
MAPREDUCE-5380. Invalid mapred command should return non-zero exit code
|
||||||
(Stephen Chu via jlowe)
|
(Stephen Chu via jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
|
||||||
|
jlowe)
|
||||||
|
|
||||||
Release 0.23.9 - 2013-07-08
|
Release 0.23.9 - 2013-07-08
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -25,6 +25,7 @@ public enum JobStateInternal {
|
||||||
RUNNING,
|
RUNNING,
|
||||||
COMMITTING,
|
COMMITTING,
|
||||||
SUCCEEDED,
|
SUCCEEDED,
|
||||||
|
FAIL_WAIT,
|
||||||
FAIL_ABORT,
|
FAIL_ABORT,
|
||||||
FAILED,
|
FAILED,
|
||||||
KILL_WAIT,
|
KILL_WAIT,
|
||||||
|
|
|
@ -44,6 +44,7 @@ public enum JobEventType {
|
||||||
|
|
||||||
//Producer:Job
|
//Producer:Job
|
||||||
JOB_COMPLETED,
|
JOB_COMPLETED,
|
||||||
|
JOB_FAIL_WAIT_TIMEDOUT,
|
||||||
|
|
||||||
//Producer:Any component
|
//Producer:Any component
|
||||||
JOB_DIAGNOSTIC_UPDATE,
|
JOB_DIAGNOSTIC_UPDATE,
|
||||||
|
|
|
@ -30,6 +30,9 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -313,7 +316,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
.addTransition
|
.addTransition
|
||||||
(JobStateInternal.RUNNING,
|
(JobStateInternal.RUNNING,
|
||||||
EnumSet.of(JobStateInternal.RUNNING,
|
EnumSet.of(JobStateInternal.RUNNING,
|
||||||
JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT),
|
JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT,
|
||||||
|
JobStateInternal.FAIL_ABORT),
|
||||||
JobEventType.JOB_TASK_COMPLETED,
|
JobEventType.JOB_TASK_COMPLETED,
|
||||||
new TaskCompletedTransition())
|
new TaskCompletedTransition())
|
||||||
.addTransition
|
.addTransition
|
||||||
|
@ -424,7 +428,37 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
||||||
JobEventType.JOB_MAP_TASK_RESCHEDULED))
|
JobEventType.JOB_MAP_TASK_RESCHEDULED))
|
||||||
|
|
||||||
// Transitions from FAIL_ABORT state
|
// Transitions from FAIL_WAIT state
|
||||||
|
.addTransition(JobStateInternal.FAIL_WAIT,
|
||||||
|
JobStateInternal.FAIL_WAIT,
|
||||||
|
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
||||||
|
DIAGNOSTIC_UPDATE_TRANSITION)
|
||||||
|
.addTransition(JobStateInternal.FAIL_WAIT,
|
||||||
|
JobStateInternal.FAIL_WAIT,
|
||||||
|
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
|
||||||
|
.addTransition(JobStateInternal.FAIL_WAIT,
|
||||||
|
EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT),
|
||||||
|
JobEventType.JOB_TASK_COMPLETED,
|
||||||
|
new JobFailWaitTransition())
|
||||||
|
.addTransition(JobStateInternal.FAIL_WAIT,
|
||||||
|
JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT,
|
||||||
|
new JobFailWaitTimedOutTransition())
|
||||||
|
.addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED,
|
||||||
|
JobEventType.JOB_KILL,
|
||||||
|
new KilledDuringAbortTransition())
|
||||||
|
.addTransition(JobStateInternal.FAIL_WAIT,
|
||||||
|
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
|
||||||
|
INTERNAL_ERROR_TRANSITION)
|
||||||
|
// Ignore-able events
|
||||||
|
.addTransition(JobStateInternal.FAIL_WAIT,
|
||||||
|
JobStateInternal.FAIL_WAIT,
|
||||||
|
EnumSet.of(JobEventType.JOB_UPDATED_NODES,
|
||||||
|
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
||||||
|
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
||||||
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
||||||
|
JobEventType.JOB_AM_REBOOT))
|
||||||
|
|
||||||
|
//Transitions from FAIL_ABORT state
|
||||||
.addTransition(JobStateInternal.FAIL_ABORT,
|
.addTransition(JobStateInternal.FAIL_ABORT,
|
||||||
JobStateInternal.FAIL_ABORT,
|
JobStateInternal.FAIL_ABORT,
|
||||||
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
||||||
|
@ -451,7 +485,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
||||||
JobEventType.JOB_COMMIT_COMPLETED,
|
JobEventType.JOB_COMMIT_COMPLETED,
|
||||||
JobEventType.JOB_COMMIT_FAILED,
|
JobEventType.JOB_COMMIT_FAILED,
|
||||||
JobEventType.JOB_AM_REBOOT))
|
JobEventType.JOB_AM_REBOOT,
|
||||||
|
JobEventType.JOB_FAIL_WAIT_TIMEDOUT))
|
||||||
|
|
||||||
// Transitions from KILL_ABORT state
|
// Transitions from KILL_ABORT state
|
||||||
.addTransition(JobStateInternal.KILL_ABORT,
|
.addTransition(JobStateInternal.KILL_ABORT,
|
||||||
|
@ -602,6 +637,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
|
|
||||||
private JobStateInternal forcedState = null;
|
private JobStateInternal forcedState = null;
|
||||||
|
|
||||||
|
//Executor used for running future tasks. Setting thread pool size to 1
|
||||||
|
private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
|
||||||
|
private ScheduledFuture failWaitTriggerScheduledFuture;
|
||||||
|
|
||||||
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
||||||
Configuration conf, EventHandler eventHandler,
|
Configuration conf, EventHandler eventHandler,
|
||||||
TaskAttemptListener taskAttemptListener,
|
TaskAttemptListener taskAttemptListener,
|
||||||
|
@ -962,6 +1001,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
case SETUP:
|
case SETUP:
|
||||||
case COMMITTING:
|
case COMMITTING:
|
||||||
return JobState.RUNNING;
|
return JobState.RUNNING;
|
||||||
|
case FAIL_WAIT:
|
||||||
case FAIL_ABORT:
|
case FAIL_ABORT:
|
||||||
return JobState.FAILED;
|
return JobState.FAILED;
|
||||||
case REBOOT:
|
case REBOOT:
|
||||||
|
@ -1566,6 +1606,42 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//This transition happens when a job is to be failed. It waits for all the
|
||||||
|
//tasks to finish / be killed.
|
||||||
|
private static class JobFailWaitTransition
|
||||||
|
implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
|
||||||
|
@Override
|
||||||
|
public JobStateInternal transition(JobImpl job, JobEvent event) {
|
||||||
|
if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
|
||||||
|
for(Task task: job.tasks.values()) {
|
||||||
|
if(!task.isFinished()) {
|
||||||
|
return JobStateInternal.FAIL_WAIT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//Finished waiting. All tasks finished / were killed
|
||||||
|
job.failWaitTriggerScheduledFuture.cancel(false);
|
||||||
|
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
|
||||||
|
job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
|
||||||
|
return JobStateInternal.FAIL_ABORT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//This transition happens when a job to be failed times out while waiting on
|
||||||
|
//tasks that had been sent the KILL signal. It is triggered by a
|
||||||
|
//ScheduledFuture task queued in the executor.
|
||||||
|
private static class JobFailWaitTimedOutTransition
|
||||||
|
implements SingleArcTransition<JobImpl, JobEvent> {
|
||||||
|
@Override
|
||||||
|
public void transition(JobImpl job, JobEvent event) {
|
||||||
|
LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
|
||||||
|
+ " Going to fail job anyway");
|
||||||
|
job.failWaitTriggerScheduledFuture.cancel(false);
|
||||||
|
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
|
||||||
|
job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// JobFinishedEvent triggers the move of the history file out of the staging
|
// JobFinishedEvent triggers the move of the history file out of the staging
|
||||||
// area. May need to create a new event type for this if JobFinished should
|
// area. May need to create a new event type for this if JobFinished should
|
||||||
// not be generated for KilledJobs, etc.
|
// not be generated for KilledJobs, etc.
|
||||||
|
@ -1798,6 +1874,23 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
return checkJobAfterTaskCompletion(job);
|
return checkJobAfterTaskCompletion(job);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//This class is used to queue a ScheduledFuture to send an event to a job
|
||||||
|
//after some delay. This can be used to wait for maximum amount of time
|
||||||
|
//before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for
|
||||||
|
//all tasks to be killed.
|
||||||
|
static class TriggerScheduledFuture implements Runnable {
|
||||||
|
JobEvent toSend;
|
||||||
|
JobImpl job;
|
||||||
|
TriggerScheduledFuture(JobImpl job, JobEvent toSend) {
|
||||||
|
this.toSend = toSend;
|
||||||
|
this.job = job;
|
||||||
|
}
|
||||||
|
public void run() {
|
||||||
|
LOG.info("Sending event " + toSend + " to " + job.getID());
|
||||||
|
job.getEventHandler().handle(toSend);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
|
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
|
||||||
//check for Job failure
|
//check for Job failure
|
||||||
if (job.failedMapTaskCount*100 >
|
if (job.failedMapTaskCount*100 >
|
||||||
|
@ -1811,10 +1904,33 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
" failedReduces:" + job.failedReduceTaskCount;
|
" failedReduces:" + job.failedReduceTaskCount;
|
||||||
LOG.info(diagnosticMsg);
|
LOG.info(diagnosticMsg);
|
||||||
job.addDiagnostic(diagnosticMsg);
|
job.addDiagnostic(diagnosticMsg);
|
||||||
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
|
|
||||||
job.jobContext,
|
//Send kill signal to all unfinished tasks here.
|
||||||
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
|
boolean allDone = true;
|
||||||
return JobStateInternal.FAIL_ABORT;
|
for (Task task : job.tasks.values()) {
|
||||||
|
if(!task.isFinished()) {
|
||||||
|
allDone = false;
|
||||||
|
job.eventHandler.handle(
|
||||||
|
new TaskEvent(task.getID(), TaskEventType.T_KILL));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//If all tasks are already done, we should go directly to FAIL_ABORT
|
||||||
|
if(allDone) {
|
||||||
|
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
|
||||||
|
job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)
|
||||||
|
);
|
||||||
|
return JobStateInternal.FAIL_ABORT;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Set max timeout to wait for the tasks to get killed
|
||||||
|
job.failWaitTriggerScheduledFuture = job.executor.schedule(
|
||||||
|
new TriggerScheduledFuture(job, new JobEvent(job.getID(),
|
||||||
|
JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt(
|
||||||
|
MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS),
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
|
return JobStateInternal.FAIL_WAIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
return job.checkReadyForCommit();
|
return job.checkReadyForCommit();
|
||||||
|
|
|
@ -57,13 +57,17 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
|
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
|
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
||||||
|
@ -74,7 +78,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.state.StateMachine;
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
@ -84,6 +90,7 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -332,6 +339,78 @@ public class TestJobImpl {
|
||||||
commitHandler.stop();
|
commitHandler.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAbortJobCalledAfterKillingTasks() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
||||||
|
conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
|
||||||
|
InlineDispatcher dispatcher = new InlineDispatcher();
|
||||||
|
dispatcher.init(conf);
|
||||||
|
dispatcher.start();
|
||||||
|
OutputCommitter committer = Mockito.mock(OutputCommitter.class);
|
||||||
|
CommitterEventHandler commitHandler =
|
||||||
|
createCommitterEventHandler(dispatcher, committer);
|
||||||
|
commitHandler.init(conf);
|
||||||
|
commitHandler.start();
|
||||||
|
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
||||||
|
|
||||||
|
//Fail one task. This should land the JobImpl in the FAIL_WAIT state
|
||||||
|
job.handle(new JobTaskEvent(
|
||||||
|
MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
|
||||||
|
TaskState.FAILED));
|
||||||
|
//Verify abort job hasn't been called
|
||||||
|
Mockito.verify(committer, Mockito.never())
|
||||||
|
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
|
||||||
|
assertJobState(job, JobStateInternal.FAIL_WAIT);
|
||||||
|
|
||||||
|
//Verify abortJob is called once and the job failed
|
||||||
|
Mockito.verify(committer, Mockito.timeout(2000).times(1))
|
||||||
|
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
|
||||||
|
assertJobState(job, JobStateInternal.FAILED);
|
||||||
|
|
||||||
|
dispatcher.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=10000)
|
||||||
|
public void testFailAbortDoesntHang() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
||||||
|
conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
|
||||||
|
|
||||||
|
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
dispatcher.init(conf);
|
||||||
|
dispatcher.start();
|
||||||
|
OutputCommitter committer = Mockito.mock(OutputCommitter.class);
|
||||||
|
CommitterEventHandler commitHandler =
|
||||||
|
createCommitterEventHandler(dispatcher, committer);
|
||||||
|
commitHandler.init(conf);
|
||||||
|
commitHandler.start();
|
||||||
|
//Job has only 1 mapper task. No reducers
|
||||||
|
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
|
||||||
|
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
|
||||||
|
JobImpl job = createRunningStubbedJob(conf, dispatcher, 1);
|
||||||
|
|
||||||
|
//Fail / finish all the tasks. This should land the JobImpl directly in the
|
||||||
|
//FAIL_ABORT state
|
||||||
|
for(Task t: job.tasks.values()) {
|
||||||
|
TaskImpl task = (TaskImpl) t;
|
||||||
|
task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE));
|
||||||
|
for(TaskAttempt ta: task.getAttempts().values()) {
|
||||||
|
task.handle(new TaskTAttemptEvent(ta.getID(),
|
||||||
|
TaskEventType.T_ATTEMPT_FAILED));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertJobState(job, JobStateInternal.FAIL_ABORT);
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
|
//Verify abortJob is called once and the job failed
|
||||||
|
Mockito.verify(committer, Mockito.timeout(2000).times(1))
|
||||||
|
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
|
||||||
|
assertJobState(job, JobStateInternal.FAILED);
|
||||||
|
|
||||||
|
dispatcher.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=20000)
|
@Test(timeout=20000)
|
||||||
public void testKilledDuringFailAbort() throws Exception {
|
public void testKilledDuringFailAbort() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
Loading…
Reference in New Issue