diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0b0218c4d6e..836419476e1 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -175,6 +175,9 @@ Release 2.0.5-alpha - UNRELEASED MAPREDUCE-5098. Fix findbugs warnings in gridmix. (kkambatl via tucu) + MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot + command from the RM. (Jian He via jlowe) + Release 2.0.4-beta - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index fe7aa4cedbc..b320e4110da 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -549,8 +549,14 @@ public void shutDownJob() { } try { - //We are finishing cleanly so this is the last retry - isLastAMRetry = true; + //if isLastAMRetry comes as true, should never set it to false + if ( !isLastAMRetry){ + if (((JobImpl)job).getInternalState() != JobStateInternal.REBOOT) { + LOG.info("We are finishing cleanly so this is the last retry"); + isLastAMRetry = true; + } + } + notifyIsLastAMRetry(isLastAMRetry); // Stop all services // This will also send the final report to the ResourceManager LOG.info("Calling stop for all the services"); @@ -1272,19 +1278,25 @@ public void run() { // that they don't take too long in shutting down if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) { ((ContainerAllocatorRouter) appMaster.containerAllocator) - .setSignalled(true); - ((ContainerAllocatorRouter) appMaster.containerAllocator) - .setShouldUnregister(appMaster.isLastAMRetry); - } - - if(appMaster.jobHistoryEventHandler != null) { - appMaster.jobHistoryEventHandler - .setForcejobCompletion(appMaster.isLastAMRetry); + .setSignalled(true); } + appMaster.notifyIsLastAMRetry(appMaster.isLastAMRetry); appMaster.stop(); } } + public void notifyIsLastAMRetry(boolean isLastAMRetry){ + if(containerAllocator instanceof ContainerAllocatorRouter) { + LOG.info("Notify RMCommunicator isAMLastRetry: " + isLastAMRetry); + ((ContainerAllocatorRouter) containerAllocator) + .setShouldUnregister(isLastAMRetry); + } + if(jobHistoryEventHandler != null) { + LOG.info("Notify JHEH isAMLastRetry: " + isLastAMRetry); + jobHistoryEventHandler.setForcejobCompletion(isLastAMRetry); + } + } + protected static void initAndStartAppMaster(final MRAppMaster appMaster, final YarnConfiguration conf, String jobUserName) throws IOException, InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java index 7517bc8c73c..bdb627b2354 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java @@ -30,5 +30,6 @@ public enum JobStateInternal { KILL_WAIT, KILL_ABORT, KILLED, - ERROR + ERROR, + REBOOT } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java index c1608259e48..f6c38d30bc3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java @@ -54,6 +54,6 @@ public enum JobEventType { JOB_TASK_ATTEMPT_FETCH_FAILURE, //Producer:RMContainerAllocator - JOB_UPDATED_NODES - + JOB_UPDATED_NODES, + JOB_AM_REBOOT } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4b080b4ab74..be423a1c4d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -215,6 +215,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); + private static final InternalRebootTransition + INTERNAL_REBOOT_TRANSITION = new InternalRebootTransition(); private static final TaskAttemptCompletedEventTransition TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = new TaskAttemptCompletedEventTransition(); @@ -246,6 +248,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) // Ignore-able events .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_UPDATED_NODES) @@ -265,6 +270,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + .addTransition(JobStateInternal.INITED, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) // Ignore-able events .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, JobEventType.JOB_UPDATED_NODES) @@ -287,6 +295,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + .addTransition(JobStateInternal.SETUP, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) // Ignore-able events .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP, JobEventType.JOB_UPDATED_NODES) @@ -327,6 +338,9 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobStateInternal.RUNNING, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + .addTransition(JobStateInternal.RUNNING, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) // Transitions from KILL_WAIT state. .addTransition @@ -352,7 +366,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_MAP_TASK_RESCHEDULED, - JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_AM_REBOOT)) // Transitions from COMMITTING state .addTransition(JobStateInternal.COMMITTING, @@ -377,7 +392,10 @@ JobEventType.JOB_KILL, new KillTasksTransition()) .addTransition(JobStateInternal.COMMITTING, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Ignore-able events + .addTransition(JobStateInternal.COMMITTING, JobStateInternal.REBOOT, + JobEventType.JOB_AM_REBOOT, + INTERNAL_REBOOT_TRANSITION) + // Ignore-able events .addTransition(JobStateInternal.COMMITTING, JobStateInternal.COMMITTING, EnumSet.of(JobEventType.JOB_UPDATED_NODES, @@ -397,7 +415,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, - JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_AM_REBOOT)) // Transitions from FAIL_ABORT state .addTransition(JobStateInternal.FAIL_ABORT, @@ -425,7 +444,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_MAP_TASK_RESCHEDULED, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_COMMIT_COMPLETED, - JobEventType.JOB_COMMIT_FAILED)) + JobEventType.JOB_COMMIT_FAILED, + JobEventType.JOB_AM_REBOOT)) // Transitions from KILL_ABORT state .addTransition(JobStateInternal.KILL_ABORT, @@ -452,7 +472,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_SETUP_COMPLETED, JobEventType.JOB_SETUP_FAILED, JobEventType.JOB_COMMIT_COMPLETED, - JobEventType.JOB_COMMIT_FAILED)) + JobEventType.JOB_COMMIT_FAILED, + JobEventType.JOB_AM_REBOOT)) // Transitions from FAILED state .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, @@ -476,7 +497,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_SETUP_FAILED, JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, - JobEventType.JOB_ABORT_COMPLETED)) + JobEventType.JOB_ABORT_COMPLETED, + JobEventType.JOB_AM_REBOOT)) // Transitions from KILLED state .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED, @@ -498,7 +520,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_SETUP_FAILED, JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, - JobEventType.JOB_ABORT_COMPLETED)) + JobEventType.JOB_ABORT_COMPLETED, + JobEventType.JOB_AM_REBOOT)) // No transitions from INTERNAL_ERROR state. Ignore all. .addTransition( @@ -517,9 +540,33 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, JobEventType.JOB_ABORT_COMPLETED, - JobEventType.INTERNAL_ERROR)) + JobEventType.INTERNAL_ERROR, + JobEventType.JOB_AM_REBOOT)) .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + + // No transitions from AM_REBOOT state. Ignore all. + .addTransition( + JobStateInternal.REBOOT, + JobStateInternal.REBOOT, + EnumSet.of(JobEventType.JOB_INIT, + JobEventType.JOB_KILL, + JobEventType.JOB_TASK_COMPLETED, + JobEventType.JOB_TASK_ATTEMPT_COMPLETED, + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_DIAGNOSTIC_UPDATE, + JobEventType.JOB_UPDATED_NODES, + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_SETUP_COMPLETED, + JobEventType.JOB_SETUP_FAILED, + JobEventType.JOB_COMMIT_COMPLETED, + JobEventType.JOB_COMMIT_FAILED, + JobEventType.JOB_ABORT_COMPLETED, + JobEventType.INTERNAL_ERROR, + JobEventType.JOB_AM_REBOOT)) + .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT, + JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + // create the topology tables .installTopology(); @@ -904,6 +951,8 @@ private static JobState getExternalState(JobStateInternal smState) { return JobState.RUNNING; case FAIL_ABORT: return JobState.FAILED; + case REBOOT: + return JobState.ERROR; default: return JobState.valueOf(smState.name()); } @@ -972,6 +1021,7 @@ JobStateInternal finished(JobStateInternal finalState) { case KILLED: metrics.killedJob(this); break; + case REBOOT: case ERROR: case FAILED: metrics.failedJob(this); @@ -1898,8 +1948,17 @@ public void transition(JobImpl job, JobEvent event) { } } - private static class InternalErrorTransition implements + private static class InternalTerminationTransition implements SingleArcTransition { + JobStateInternal terminationState = null; + String jobHistoryString = null; + public InternalTerminationTransition(JobStateInternal stateInternal, + String jobHistoryString) { + this.terminationState = stateInternal; + //mostly a hack for jbhistoryserver + this.jobHistoryString = jobHistoryString; + } + @Override public void transition(JobImpl job, JobEvent event) { //TODO Is this JH event required. @@ -1907,9 +1966,21 @@ public void transition(JobImpl job, JobEvent event) { JobUnsuccessfulCompletionEvent failedEvent = new JobUnsuccessfulCompletionEvent(job.oldJobId, job.finishTime, 0, 0, - JobStateInternal.ERROR.toString()); + jobHistoryString); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); - job.finished(JobStateInternal.ERROR); + job.finished(terminationState); + } + } + + private static class InternalErrorTransition extends InternalTerminationTransition { + public InternalErrorTransition(){ + super(JobStateInternal.ERROR, JobStateInternal.ERROR.toString()); + } + } + + private static class InternalRebootTransition extends InternalTerminationTransition { + public InternalRebootTransition(){ + super(JobStateInternal.REBOOT, JobStateInternal.ERROR.toString()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index abb2397e293..74ae16f0ff2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -123,7 +123,7 @@ protected synchronized void heartbeat() throws Exception { // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.INTERNAL_ERROR)); + JobEventType.JOB_AM_REBOOT)); throw new YarnException("Resource Manager doesn't recognize AttemptId: " + this.getContext().getApplicationID()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d29d11890cd..5c453ad83dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -574,7 +574,7 @@ private List getResources() throws Exception { // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.INTERNAL_ERROR)); + JobEventType.JOB_AM_REBOOT)); throw new YarnException("Resource Manager doesn't recognize AttemptId: " + this.getContext().getApplicationID()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index b278186766e..3636273a9c4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -45,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -86,9 +89,68 @@ public void testDeletionofStaging() throws IOException { attemptId.setApplicationId(appId); JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); - MRAppMaster appMaster = new TestMRApp(attemptId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); appMaster.init(conf); + appMaster.start(); appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry()); + verify(fs).delete(stagingJobPath, true); + } + + @Test (timeout = 30000) + public void testNoDeletionofStagingOnReboot() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationAttemptId attemptId = recordFactory.newRecordInstance( + ApplicationAttemptId.class); + attemptId.setAttemptId(0); + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(System.currentTimeMillis()); + appId.setId(0); + attemptId.setApplicationId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.REBOOT, 4); + appMaster.init(conf); + appMaster.start(); + //shutdown the job, not the lastRetry + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry()); + verify(fs, times(0)).delete(stagingJobPath, true); + } + + @Test (timeout = 30000) + public void testDeletionofStagingOnReboot() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationAttemptId attemptId = recordFactory.newRecordInstance( + ApplicationAttemptId.class); + attemptId.setAttemptId(1); + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(System.currentTimeMillis()); + appId.setId(0); + attemptId.setApplicationId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + //shutdown the job, is lastRetry + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry()); verify(fs).delete(stagingJobPath, true); } @@ -151,6 +213,8 @@ public void testDeletionofStagingOnKillLastTry() throws IOException { private class TestMRApp extends MRAppMaster { ContainerAllocator allocator; + boolean testIsLastAMRetry = false; + JobStateInternal jobStateInternal; public TestMRApp(ApplicationAttemptId applicationAttemptId, ContainerAllocator allocator, int maxAppAttempts) { @@ -160,9 +224,11 @@ public TestMRApp(ApplicationAttemptId applicationAttemptId, this.allocator = allocator; } - public TestMRApp(ApplicationAttemptId applicationAttemptId) { - this(applicationAttemptId, null, - MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + public TestMRApp(ApplicationAttemptId applicationAttemptId, + ContainerAllocator allocator, JobStateInternal jobStateInternal, + int maxAppAttempts) { + this(applicationAttemptId, allocator, maxAppAttempts); + this.jobStateInternal = jobStateInternal; } @Override @@ -179,6 +245,31 @@ protected ContainerAllocator createContainerAllocator( return allocator; } + @Override + protected Job createJob(Configuration conf, JobStateInternal forcedState, + String diagnostic) { + JobImpl jobImpl = mock(JobImpl.class); + when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal); + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + when(jobImpl.getID()).thenReturn(jobId); + ((AppContext) getContext()) + .getAllJobs().put(jobImpl.getID(), jobImpl); + return jobImpl; + } + + @Override + public void start() { + super.start(); + DefaultMetricsSystem.shutdown(); + } + + @Override + public void notifyIsLastAMRetry(boolean isLastAMRetry){ + testIsLastAMRetry = isLastAMRetry; + super.notifyIsLastAMRetry(isLastAMRetry); + } + @Override public RMHeartbeatHandler getRMHeartbeatHandler() { return getStubbedHeartbeatHandler(getContext()); @@ -197,6 +288,9 @@ public Configuration getConfig() { protected void downloadTokensAndSetupUGI(Configuration conf) { } + public boolean getTestIsLastAMRetry(){ + return testIsLastAMRetry; + } } private final class MRAppTestCleanup extends MRApp { @@ -288,7 +382,7 @@ public void runOnNextHeartbeat(Runnable callback) { }; } - @Test + @Test(timeout=20000) public void testStagingCleanupOrder() throws Exception { MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, this.getClass().getName(), true); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 8cfbe03c09a..7aa759217b8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -192,6 +192,68 @@ public void testCheckJobCompleteSuccess() throws Exception { commitHandler.stop(); } + @Test(timeout=20000) + public void testRebootedDuringSetup() throws Exception{ + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = new StubbedOutputCommitter() { + @Override + public synchronized void setupJob(JobContext jobContext) + throws IOException { + while(!Thread.interrupted()){ + try{ + wait(); + }catch (InterruptedException e) { + } + } + } + }; + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobId jobId = job.getID(); + job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); + assertJobState(job, JobStateInternal.INITED); + job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + assertJobState(job, JobStateInternal.SETUP); + + job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); + assertJobState(job, JobStateInternal.REBOOT); + dispatcher.stop(); + commitHandler.stop(); + } + + @Test(timeout=20000) + public void testRebootedDuringCommit() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + completeJobTasks(job); + assertJobState(job, JobStateInternal.COMMITTING); + + syncBarrier.await(); + job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); + assertJobState(job, JobStateInternal.REBOOT); + dispatcher.stop(); + commitHandler.stop(); + } + @Test(timeout=20000) public void testKilledDuringSetup() throws Exception { Configuration conf = new Configuration();