diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c73ab8b8db8..d0345594acc 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -231,6 +231,9 @@ Release 2.1.1-beta - UNRELEASED pick up the right history file for the last successful AM. (Jian He via vinodkv) + MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via + acmurthy) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index d8ddb2c29f1..ab1c4feadf0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1042,11 +1042,11 @@ public class MRAppMaster extends CompositeService { // attempt will generate one. However that disables recovery if there // are reducers as the shuffle secret would be app attempt specific. int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0); - boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 && - TokenCache.getShuffleSecretKey(jobCredentials) != null); + boolean shuffleKeyValidForRecovery = + TokenCache.getShuffleSecretKey(jobCredentials) != null; if (recoveryEnabled && recoverySupportedByCommitter - && shuffleKeyValidForRecovery) { + && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) { LOG.info("Recovery is enabled. " + "Will try to recover from previous life on best effort basis."); try { @@ -1059,7 +1059,8 @@ public class MRAppMaster extends CompositeService { } else { LOG.info("Will not try to recover. recoveryEnabled: " + recoveryEnabled + " recoverySupportedByCommitter: " - + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: " + + recoverySupportedByCommitter + " numReduceTasks: " + + numReduceTasks + " shuffleKeyValidForRecovery: " + shuffleKeyValidForRecovery + " ApplicationAttemptID: " + appAttemptID.getAttemptId()); // Get the amInfos anyways whether recovery is enabled or not diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 911fd0d022d..1c17b224da1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -114,7 +114,6 @@ public class TestRecovery { private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); - /** * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt * completely disappears because of failed launch, one attempt gets killed and @@ -316,6 +315,116 @@ public class TestRecovery { // available in the failed attempt should be available here } + /** + * AM with 3 maps and 0 reduce. AM crashes after the first two tasks finishes + * and recovers completely and succeeds in the second generation. + * + * @throws Exception + */ + @Test + public void testCrashOfMapsOnlyJob() throws Exception { + int runCount = 0; + MRApp app = + new MRAppWithHistory(3, 0, false, this.getClass().getName(), true, + ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + // all maps would be running + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task mapTask3 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + app.waitForState(mapTask3, TaskState.RUNNING); + + TaskAttempt task1Attempt = + mapTask1.getAttempts().values().iterator().next(); + TaskAttempt task2Attempt = + mapTask2.getAttempts().values().iterator().next(); + TaskAttempt task3Attempt = + mapTask3.getAttempts().values().iterator().next(); + + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task1Attempt, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to the 1st two maps + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE)); + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE)); + + // wait for first two map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + // stop the app + app.stop(); + + // rerun + // in rerun the 1st two map will be recovered from previous run + app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + // Set num-reduces explicitly in conf as recovery logic depends on it. + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + mapTask3 = it.next(); + + // first two maps will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + app.waitForState(mapTask3, TaskState.RUNNING); + + task3Attempt = mapTask3.getAttempts().values().iterator().next(); + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to the 3rd map task + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next() + .getID(), TaskAttemptEventType.TA_DONE)); + + // wait to get it completed + app.waitForState(mapTask3, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + } + @Test public void testMultipleCrashes() throws Exception {