MAPREDUCE-5441. Changed MR AM to return RUNNING state if exiting when RM commands to reboot, so that client can continue to track the overall job. Contributed by xJian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1518821 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a29714f077
commit
5d4b684c02
|
@ -245,6 +245,10 @@ Release 2.1.1-beta - UNRELEASED
|
|||
|
||||
MAPREDUCE-5483. revert MAPREDUCE-5357. (rkanter via tucu)
|
||||
|
||||
MAPREDUCE-5441. Changed MR AM to return RUNNING state if exiting when RM
|
||||
commands to reboot, so that client can continue to track the overall job.
|
||||
(Jian He via vinodkv)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-22
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -993,7 +993,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
}
|
||||
}
|
||||
|
||||
private static JobState getExternalState(JobStateInternal smState) {
|
||||
private JobState getExternalState(JobStateInternal smState) {
|
||||
switch (smState) {
|
||||
case KILL_WAIT:
|
||||
case KILL_ABORT:
|
||||
|
@ -1005,7 +1005,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
case FAIL_ABORT:
|
||||
return JobState.FAILED;
|
||||
case REBOOT:
|
||||
return JobState.ERROR;
|
||||
if (appContext.isLastAMRetry()) {
|
||||
return JobState.ERROR;
|
||||
} else {
|
||||
// In case of not last retry, return the external state as RUNNING since
|
||||
// otherwise JobClient will exit when it polls the AM for job state
|
||||
return JobState.RUNNING;
|
||||
}
|
||||
default:
|
||||
return JobState.valueOf(smState.name());
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.Iterator;
|
|||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
|
@ -41,6 +42,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
|
@ -51,12 +54,15 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
|||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -368,6 +374,47 @@ public class TestMRApp {
|
|||
app.waitForState(job, JobState.ERROR);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobRebootNotLastRetry() throws Exception {
|
||||
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
|
||||
Job job = app.submit(new Configuration());
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
Task task = it.next();
|
||||
app.waitForState(task, TaskState.RUNNING);
|
||||
|
||||
//send an reboot event
|
||||
app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
|
||||
// return exteranl state as RUNNING since otherwise the JobClient will
|
||||
// prematurely exit.
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobRebootOnLastRetry() throws Exception {
|
||||
// make startCount as 2 since this is last retry which equals to
|
||||
// DEFAULT_MAX_AM_RETRY
|
||||
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
Job job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
Task task = it.next();
|
||||
app.waitForState(task, TaskState.RUNNING);
|
||||
|
||||
//send an reboot event
|
||||
app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
|
||||
// return exteranl state as ERROR if this is the last retry
|
||||
app.waitForState(job, JobState.ERROR);
|
||||
}
|
||||
|
||||
private final class MRAppWithSpiedJob extends MRApp {
|
||||
private JobImpl spiedJob;
|
||||
|
||||
|
|
|
@ -142,7 +142,7 @@ public class TestJobImpl {
|
|||
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
|
||||
"tag1,tag2");
|
||||
dispatcher.register(EventType.class, jseHandler);
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 0);
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 0, null);
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobStartEvent(job.getID()));
|
||||
|
@ -170,7 +170,7 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
||||
completeJobTasks(job);
|
||||
assertJobState(job, JobStateInternal.COMMITTING);
|
||||
|
||||
|
@ -195,7 +195,7 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
||||
completeJobTasks(job);
|
||||
assertJobState(job, JobStateInternal.COMMITTING);
|
||||
|
||||
|
@ -239,7 +239,9 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
when(mockContext.isLastAMRetry()).thenReturn(false);
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
|
||||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
|
@ -248,6 +250,10 @@ public class TestJobImpl {
|
|||
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
|
||||
assertJobState(job, JobStateInternal.REBOOT);
|
||||
// return the external state as RUNNING since otherwise JobClient will
|
||||
// exit when it polls the AM for job state
|
||||
Assert.assertEquals(JobState.RUNNING, job.getState());
|
||||
|
||||
dispatcher.stop();
|
||||
commitHandler.stop();
|
||||
}
|
||||
|
@ -256,6 +262,7 @@ public class TestJobImpl {
|
|||
public void testRebootedDuringCommit() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
||||
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2);
|
||||
AsyncDispatcher dispatcher = new AsyncDispatcher();
|
||||
dispatcher.init(conf);
|
||||
dispatcher.start();
|
||||
|
@ -266,13 +273,18 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
when(mockContext.isLastAMRetry()).thenReturn(true);
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
|
||||
completeJobTasks(job);
|
||||
assertJobState(job, JobStateInternal.COMMITTING);
|
||||
|
||||
syncBarrier.await();
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
|
||||
assertJobState(job, JobStateInternal.REBOOT);
|
||||
// return the external state as FAILED since this is last retry.
|
||||
Assert.assertEquals(JobState.ERROR, job.getState());
|
||||
|
||||
dispatcher.stop();
|
||||
commitHandler.stop();
|
||||
}
|
||||
|
@ -301,7 +313,7 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
||||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
|
@ -328,7 +340,7 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
||||
completeJobTasks(job);
|
||||
assertJobState(job, JobStateInternal.COMMITTING);
|
||||
|
||||
|
@ -352,7 +364,7 @@ public class TestJobImpl {
|
|||
createCommitterEventHandler(dispatcher, committer);
|
||||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
||||
|
||||
//Fail one task. This should land the JobImpl in the FAIL_WAIT state
|
||||
job.handle(new JobTaskEvent(
|
||||
|
@ -388,7 +400,7 @@ public class TestJobImpl {
|
|||
//Job has only 1 mapper task. No reducers
|
||||
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
|
||||
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 1);
|
||||
JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null);
|
||||
|
||||
//Fail / finish all the tasks. This should land the JobImpl directly in the
|
||||
//FAIL_ABORT state
|
||||
|
@ -440,7 +452,7 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
||||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
|
@ -477,7 +489,7 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
||||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
|
@ -687,7 +699,7 @@ public class TestJobImpl {
|
|||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2);
|
||||
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
|
||||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
|
@ -735,12 +747,12 @@ public class TestJobImpl {
|
|||
}
|
||||
|
||||
private static StubbedJob createStubbedJob(Configuration conf,
|
||||
Dispatcher dispatcher, int numSplits) {
|
||||
Dispatcher dispatcher, int numSplits, AppContext appContext) {
|
||||
JobID jobID = JobID.forName("job_1234567890000_0001");
|
||||
JobId jobId = TypeConverter.toYarn(jobID);
|
||||
StubbedJob job = new StubbedJob(jobId,
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
|
||||
conf,dispatcher.getEventHandler(), true, "somebody", numSplits);
|
||||
conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);
|
||||
dispatcher.register(JobEventType.class, job);
|
||||
EventHandler mockHandler = mock(EventHandler.class);
|
||||
dispatcher.register(TaskEventType.class, mockHandler);
|
||||
|
@ -751,8 +763,8 @@ public class TestJobImpl {
|
|||
}
|
||||
|
||||
private static StubbedJob createRunningStubbedJob(Configuration conf,
|
||||
Dispatcher dispatcher, int numSplits) {
|
||||
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
|
||||
Dispatcher dispatcher, int numSplits, AppContext appContext) {
|
||||
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits, appContext);
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobStartEvent(job.getID()));
|
||||
|
@ -880,13 +892,13 @@ public class TestJobImpl {
|
|||
}
|
||||
|
||||
public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
||||
Configuration conf, EventHandler eventHandler,
|
||||
boolean newApiCommitter, String user, int numSplits) {
|
||||
Configuration conf, EventHandler eventHandler, boolean newApiCommitter,
|
||||
String user, int numSplits, AppContext appContext) {
|
||||
super(jobId, applicationAttemptId, conf, eventHandler,
|
||||
null, new JobTokenSecretManager(), new Credentials(),
|
||||
new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
|
||||
MRAppMetrics.create(), null, newApiCommitter, user,
|
||||
System.currentTimeMillis(), null, null, null, null);
|
||||
System.currentTimeMillis(), null, appContext, null, null);
|
||||
|
||||
initTransition = getInitTransition(numSplits);
|
||||
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
|
||||
|
|
Loading…
Reference in New Issue