From fc669778bec543427ea94abe12aff34ee1533b5c Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Fri, 19 Jan 2018 12:56:17 -0800 Subject: [PATCH] MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen) (cherry picked from commit cce71dceef9e82d31fe8ec59648b2a4a50c8869a) --- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 23 ++++++ .../hadoop/mapreduce/v2/app/TestRecovery.java | 82 +++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index df61928684a..e6a45cf32d6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -1217,6 +1219,7 @@ public class MRAppMaster extends CompositeService { amInfos = new LinkedList(); completedTasksFromPreviousRun = new HashMap(); processRecovery(); + cleanUpPreviousJobOutput(); // Current an AMInfo for the current AM generation. AMInfo amInfo = @@ -1395,6 +1398,26 @@ public class MRAppMaster extends CompositeService { return true; } + private void cleanUpPreviousJobOutput() { + // recovered application masters should not remove data from previous job + if (!recovered()) { + JobContext jobContext = getJobContextFromConf(getConfig()); + try { + LOG.info("Starting to clean up previous job's temporary files"); + this.committer.abortJob(jobContext, State.FAILED); + LOG.info("Finished cleaning up previous job temporary files"); + } catch (FileNotFoundException e) { + LOG.info("Previous job temporary files do not exist, " + + "no clean up was necessary."); + } catch (Exception e) { + // the clean up of a previous attempt is not critical to the success + // of this job - only logging the error + LOG.error("Error while trying to clean up previous job's temporary " + + "files", e); + } + } + } + private static FSDataInputStream getPreviousJobHistoryStream( Configuration conf, ApplicationAttemptId appAttemptId) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 17e07b11050..893c4a07334 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -19,9 +19,11 @@ package org.apache.hadoop.mapreduce.v2.app; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,6 +45,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobID; @@ -452,6 +455,8 @@ public class TestRecovery { public static class TestFileOutputCommitter extends org.apache.hadoop.mapred.FileOutputCommitter { + private boolean abortJobCalled; + @Override public boolean isRecoverySupported( org.apache.hadoop.mapred.JobContext jobContext) { @@ -462,6 +467,16 @@ public class TestRecovery { } return isRecoverySupported; } + + @Override + public void abortJob(JobContext context, int runState) throws IOException { + super.abortJob(context, runState); + this.abortJobCalled = true; + } + + private boolean isAbortJobCalled() { + return this.abortJobCalled; + } } /** @@ -1009,6 +1024,73 @@ public class TestRecovery { validateOutput(); } + @Test + public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + //stop the app before the job completes. + app.stop(); + app.close(); + + //rerun + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + TestFileOutputCommitter committer = ( + TestFileOutputCommitter) app.getCommitter(); + assertTrue("commiter.abortJob() has not been called", + committer.isAbortJobCalled()); + app.close(); + } + + @Test + public void testPreviousJobIsNotCleanedWhenRecovery() + throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + // TestFileOutputCommitter supports recovery if want.am.recovery=true + conf.setBoolean("want.am.recovery", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + //stop the app before the job completes. + app.stop(); + app.close(); + + //rerun + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + TestFileOutputCommitter committer = ( + TestFileOutputCommitter) app.getCommitter(); + assertFalse("commiter.abortJob() has been called", + committer.isAbortJobCalled()); + app.close(); + } + @Test public void testOutputRecoveryMapsOnly() throws Exception { int runCount = 0;