svn merge -c 1407679 FIXES: MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED state (jlowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1407682 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-11-09 23:00:57 +00:00
parent 1d58a7b5b3
commit b8f37b3b36
3 changed files with 72 additions and 1 deletions

View File

@ -500,6 +500,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit
(Mark Fuhs via bobby)
MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
state (jlowe via bobby)
Release 0.23.4 - UNRELEASED

View File

@ -348,6 +348,9 @@ JobEventType.JOB_KILL, new KillTasksTransition())
.addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_COMPLETED,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from KILLED state

View File

@ -27,6 +27,7 @@
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@ -42,6 +43,7 @@
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@ -51,10 +53,14 @@
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -340,7 +346,7 @@ null, mock(JobTokenSecretManager.class), null, null, null,
return isUber;
}
private InitTransition getInitTransition() {
private static InitTransition getInitTransition() {
InitTransition initTransition = new InitTransition() {
@Override
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
@ -350,4 +356,63 @@ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
};
return initTransition;
}
@Test
public void testTransitionsAtFailed() throws IOException {
Configuration conf = new Configuration();
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
OutputCommitter committer = mock(OutputCommitter.class);
doThrow(new IOException("forcefail"))
.when(committer).setupJob(any(JobContext.class));
InlineDispatcher dispatcher = new InlineDispatcher();
JobImpl job = new StubbedJob(jobId, Records
.newRecord(ApplicationAttemptId.class), conf,
dispatcher.getEventHandler(), committer, true, null);
dispatcher.register(JobEventType.class, job);
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
Assert.assertEquals(JobState.FAILED, job.getState());
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
Assert.assertEquals(JobState.FAILED, job.getState());
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
Assert.assertEquals(JobState.FAILED, job.getState());
job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
Assert.assertEquals(JobState.FAILED, job.getState());
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
Assert.assertEquals(JobState.FAILED, job.getState());
}
private static class StubbedJob extends JobImpl {
//override the init transition
private final InitTransition initTransition = getInitTransition();
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
= stateMachineFactory.addTransition(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
JobEventType.JOB_INIT,
// This is abusive.
initTransition);
private final StateMachine<JobStateInternal, JobEventType, JobEvent>
localStateMachine;
@Override
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
return localStateMachine;
}
public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
OutputCommitter committer, boolean newApiCommitter, String user) {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), null, MRAppMetrics.create(), committer,
newApiCommitter, user, System.currentTimeMillis(), null, null);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
localStateMachine = localFactory.make(this);
}
}
}