From a286acd7be5b715f233958fefea8c1fc4d8d94fc Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Fri, 19 Jan 2018 12:56:17 -0800 Subject: [PATCH] MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen) (cherry picked from commit cce71dceef9e82d31fe8ec59648b2a4a50c8869a) (cherry picked from commit fc669778bec543427ea94abe12aff34ee1533b5c) --- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 23 ++++++ .../hadoop/mapreduce/v2/app/TestRecovery.java | 82 +++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 1f8be12afdb..16e6bc51869 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -56,6 +57,7 @@ import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -1199,6 +1201,7 @@ protected void serviceStart() throws Exception { amInfos = new LinkedList(); completedTasksFromPreviousRun = new HashMap(); processRecovery(); + cleanUpPreviousJobOutput(); // Current an AMInfo for the current AM generation. AMInfo amInfo = @@ -1377,6 +1380,26 @@ private boolean shouldAttemptRecovery() throws IOException { return true; } + private void cleanUpPreviousJobOutput() { + // recovered application masters should not remove data from previous job + if (!recovered()) { + JobContext jobContext = getJobContextFromConf(getConfig()); + try { + LOG.info("Starting to clean up previous job's temporary files"); + this.committer.abortJob(jobContext, State.FAILED); + LOG.info("Finished cleaning up previous job temporary files"); + } catch (FileNotFoundException e) { + LOG.info("Previous job temporary files do not exist, " + + "no clean up was necessary."); + } catch (Exception e) { + // the clean up of a previous attempt is not critical to the success + // of this job - only logging the error + LOG.error("Error while trying to clean up previous job's temporary " + + "files", e); + } + } + } + private static FSDataInputStream getPreviousJobHistoryStream( Configuration conf, ApplicationAttemptId appAttemptId) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 6332c5d825e..51959b0fbef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -19,9 +19,11 @@ package org.apache.hadoop.mapreduce.v2.app; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -45,6 +47,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobID; @@ -452,6 +455,8 @@ public void testCrashOfMapsOnlyJob() throws Exception { public static class TestFileOutputCommitter extends org.apache.hadoop.mapred.FileOutputCommitter { + private boolean abortJobCalled; + @Override public boolean isRecoverySupported( org.apache.hadoop.mapred.JobContext jobContext) { @@ -462,6 +467,16 @@ public boolean isRecoverySupported( } return isRecoverySupported; } + + @Override + public void abortJob(JobContext context, int runState) throws IOException { + super.abortJob(context, runState); + this.abortJobCalled = true; + } + + private boolean isAbortJobCalled() { + return this.abortJobCalled; + } } /** @@ -1009,6 +1024,73 @@ public void testOutputRecovery() throws Exception { validateOutput(); } + @Test + public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + //stop the app before the job completes. + app.stop(); + app.close(); + + //rerun + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + TestFileOutputCommitter committer = ( + TestFileOutputCommitter) app.getCommitter(); + assertTrue("commiter.abortJob() has not been called", + committer.isAbortJobCalled()); + app.close(); + } + + @Test + public void testPreviousJobIsNotCleanedWhenRecovery() + throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + // TestFileOutputCommitter supports recovery if want.am.recovery=true + conf.setBoolean("want.am.recovery", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + //stop the app before the job completes. + app.stop(); + app.close(); + + //rerun + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + TestFileOutputCommitter committer = ( + TestFileOutputCommitter) app.getCommitter(); + assertFalse("commiter.abortJob() has been called", + committer.isAbortJobCalled()); + app.close(); + } + @Test public void testOutputRecoveryMapsOnly() throws Exception { int runCount = 0;