MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen)

(cherry picked from commit cce71dceef9e82d31fe8ec59648b2a4a50c8869a)
(cherry picked from commit fc669778bec543427ea94abe12aff34ee1533b5c)
This commit is contained in:
Haibo Chen 2018-01-19 12:56:17 -08:00
parent 895a0391fe
commit a286acd7be
2 changed files with 105 additions and 0 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@ -56,6 +57,7 @@
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@ -1199,6 +1201,7 @@ protected void serviceStart() throws Exception {
amInfos = new LinkedList<AMInfo>();
completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
processRecovery();
cleanUpPreviousJobOutput();
// Current an AMInfo for the current AM generation.
AMInfo amInfo =
@ -1377,6 +1380,26 @@ private boolean shouldAttemptRecovery() throws IOException {
return true;
}
private void cleanUpPreviousJobOutput() {
// recovered application masters should not remove data from previous job
if (!recovered()) {
JobContext jobContext = getJobContextFromConf(getConfig());
try {
LOG.info("Starting to clean up previous job's temporary files");
this.committer.abortJob(jobContext, State.FAILED);
LOG.info("Finished cleaning up previous job temporary files");
} catch (FileNotFoundException e) {
LOG.info("Previous job temporary files do not exist, " +
"no clean up was necessary.");
} catch (Exception e) {
// the clean up of a previous attempt is not critical to the success
// of this job - only logging the error
LOG.error("Error while trying to clean up previous job's temporary " +
"files", e);
}
}
}
private static FSDataInputStream getPreviousJobHistoryStream(
Configuration conf, ApplicationAttemptId appAttemptId)
throws IOException {

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -45,6 +47,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
@ -452,6 +455,8 @@ public void testCrashOfMapsOnlyJob() throws Exception {
public static class TestFileOutputCommitter extends
org.apache.hadoop.mapred.FileOutputCommitter {
private boolean abortJobCalled;
@Override
public boolean isRecoverySupported(
org.apache.hadoop.mapred.JobContext jobContext) {
@ -462,6 +467,16 @@ public boolean isRecoverySupported(
}
return isRecoverySupported;
}
@Override
public void abortJob(JobContext context, int runState) throws IOException {
super.abortJob(context, runState);
this.abortJobCalled = true;
}
private boolean isAbortJobCalled() {
return this.abortJobCalled;
}
}
/**
@ -1009,6 +1024,73 @@ public void testOutputRecovery() throws Exception {
validateOutput();
}
@Test
public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
//stop the app before the job completes.
app.stop();
app.close();
//rerun
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
TestFileOutputCommitter committer = (
TestFileOutputCommitter) app.getCommitter();
assertTrue("commiter.abortJob() has not been called",
committer.isAbortJobCalled());
app.close();
}
@Test
public void testPreviousJobIsNotCleanedWhenRecovery()
throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
// TestFileOutputCommitter supports recovery if want.am.recovery=true
conf.setBoolean("want.am.recovery", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
//stop the app before the job completes.
app.stop();
app.close();
//rerun
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
TestFileOutputCommitter committer = (
TestFileOutputCommitter) app.getCommitter();
assertFalse("commiter.abortJob() has been called",
committer.isAbortJobCalled());
app.close();
}
@Test
public void testOutputRecoveryMapsOnly() throws Exception {
int runCount = 0;