diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9e1b92f03bc..6f06303e791 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -762,6 +762,9 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then the recovery. (vinodkv) + MAPREDUCE-3802. Added test to validate that AM can crash multiple times and + still can recover successfully after MAPREDUCE-3846. (vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 3f4fa4070cd..509c49e9cc0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -277,6 +277,136 @@ public class TestRecovery { // available in the failed attempt should be available here } + @Test + public void testMultipleCrashes() throws Exception { + + int runCount = 0; + MRApp app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, + ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task reduceTask = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + // reduces must be in NEW state + Assert.assertEquals("Reduce Task state not correct", + TaskState.RUNNING, reduceTask.getReport().getTaskState()); + + //send the done signal to the 1st map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for first map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Crash the app + app.stop(); + + //rerun + //in rerun the 1st map will be recovered from previous run + app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask = it.next(); + + // first map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + app.waitForState(mapTask2, TaskState.RUNNING); + + task2Attempt = mapTask2.getAttempts().values().iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd map task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapTask2.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + // Crash the app again. + app.stop(); + + //rerun + //in rerun the 1st and 2nd map will be recovered from previous run + app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask = it.next(); + + // The maps will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + //wait for reduce to be running before sending done + app.waitForState(reduceTask, TaskState.RUNNING); + //send the done signal to the reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduceTask.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + } + @Test public void testOutputRecovery() throws Exception { int runCount = 0;