From b16dfc125dfd172900e34de1b46d3a06fe9aceb6 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Wed, 9 Jan 2013 22:56:09 +0000 Subject: [PATCH] MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery. Contributed by Jerry Chen git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1431131 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 2 +- .../v2/app/recover/RecoveryService.java | 18 ++- .../hadoop/mapreduce/v2/app/TestRecovery.java | 109 ++++++++++++++++++ 4 files changed, 128 insertions(+), 4 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index eb7fbb04c72..65fbf1d6d63 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -685,6 +685,9 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4913. TestMRAppMaster#testMRAppMasterMissingStaging occasionally exits (Jason Lowe via tgraves) + MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry + Chen via jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 405c4748032..b3b307a56fa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -579,7 +579,7 @@ public class MRAppMaster extends CompositeService { */ protected Recovery createRecoveryService(AppContext appContext) { return new RecoveryService(appContext.getApplicationAttemptId(), - appContext.getClock(), getCommitter()); + appContext.getClock(), getCommitter(), isNewApiCommitter()); } /** Create and initialize (but don't start) a single job. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 2760b58f3f3..4ab61c52351 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -100,6 +101,7 @@ public class RecoveryService extends CompositeService implements Recovery { private final ApplicationAttemptId applicationAttemptId; private final OutputCommitter committer; + private final boolean newApiCommitter; private final Dispatcher dispatcher; private final ControlledClock clock; @@ -113,10 +115,11 @@ public class RecoveryService extends CompositeService implements Recovery { private volatile boolean recoveryMode = false; public RecoveryService(ApplicationAttemptId applicationAttemptId, - Clock clock, OutputCommitter committer) { + Clock clock, OutputCommitter committer, boolean newApiCommitter) { super("RecoveringDispatcher"); this.applicationAttemptId = applicationAttemptId; this.committer = committer; + this.newApiCommitter = newApiCommitter; this.dispatcher = createRecoveryDispatcher(); this.clock = new ControlledClock(clock); addService((Service) dispatcher); @@ -360,8 +363,17 @@ public class RecoveryService extends CompositeService implements Recovery { switch (state) { case SUCCEEDED: //recover the task output - TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(), - attInfo.getAttemptId()); + + // check the committer type and construct corresponding context + TaskAttemptContext taskContext = null; + if(newApiCommitter) { + taskContext = new TaskAttemptContextImpl(getConfig(), + attInfo.getAttemptId()); + } else { + taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()), + TypeConverter.fromYarn(aId)); + } + try { TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType(); int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 87fce7ece6b..8d6ca2b9b24 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -626,6 +626,115 @@ public class TestRecovery { validateOutput(); } + @Test + public void testRecoveryWithOldCommiter() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", false); + conf.setBoolean("mapred.reducer.new-api", false); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task reduceTask1 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + + TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator() + .next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + + //send the done signal to the map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); + + app.waitForState(reduceTask1, TaskState.RUNNING); + TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next(); + + // write output corresponding to reduce1 + writeOutput(reduce1Attempt1, conf); + + //send the done signal to the 1st reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduce1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for first reduce task to complete + app.waitForState(reduceTask1, TaskState.SUCCEEDED); + + //stop the app before the job completes. + app.stop(); + + //rerun + //in rerun the map will be recovered from previous run + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", false); + conf.setBoolean("mapred.reducer.new-api", false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + reduceTask1 = it.next(); + Task reduceTask2 = it.next(); + + // map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port after recovery + task1Attempt1 = mapTask1.getAttempts().values().iterator().next(); + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); + + // first reduce will be recovered, no need to send done + app.waitForState(reduceTask1, TaskState.SUCCEEDED); + + app.waitForState(reduceTask2, TaskState.RUNNING); + + TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values() + .iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd reduce task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduce2Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(reduceTask2, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + validateOutput(); + } + private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,