MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen)
(cherry picked from commit cce71dceef
)
This commit is contained in:
parent
8793e45f53
commit
fc669778be
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app;
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.TaskLog;
|
||||||
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
||||||
import org.apache.hadoop.mapreduce.CryptoUtils;
|
import org.apache.hadoop.mapreduce.CryptoUtils;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.JobStatus.State;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||||
|
@ -1217,6 +1219,7 @@ public class MRAppMaster extends CompositeService {
|
||||||
amInfos = new LinkedList<AMInfo>();
|
amInfos = new LinkedList<AMInfo>();
|
||||||
completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
|
completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
|
||||||
processRecovery();
|
processRecovery();
|
||||||
|
cleanUpPreviousJobOutput();
|
||||||
|
|
||||||
// Current an AMInfo for the current AM generation.
|
// Current an AMInfo for the current AM generation.
|
||||||
AMInfo amInfo =
|
AMInfo amInfo =
|
||||||
|
@ -1395,6 +1398,26 @@ public class MRAppMaster extends CompositeService {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanUpPreviousJobOutput() {
|
||||||
|
// recovered application masters should not remove data from previous job
|
||||||
|
if (!recovered()) {
|
||||||
|
JobContext jobContext = getJobContextFromConf(getConfig());
|
||||||
|
try {
|
||||||
|
LOG.info("Starting to clean up previous job's temporary files");
|
||||||
|
this.committer.abortJob(jobContext, State.FAILED);
|
||||||
|
LOG.info("Finished cleaning up previous job temporary files");
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
LOG.info("Previous job temporary files do not exist, " +
|
||||||
|
"no clean up was necessary.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
// the clean up of a previous attempt is not critical to the success
|
||||||
|
// of this job - only logging the error
|
||||||
|
LOG.error("Error while trying to clean up previous job's temporary " +
|
||||||
|
"files", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static FSDataInputStream getPreviousJobHistoryStream(
|
private static FSDataInputStream getPreviousJobHistoryStream(
|
||||||
Configuration conf, ApplicationAttemptId appAttemptId)
|
Configuration conf, ApplicationAttemptId appAttemptId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -19,9 +19,11 @@
|
||||||
package org.apache.hadoop.mapreduce.v2.app;
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.atLeast;
|
import static org.mockito.Mockito.atLeast;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -43,6 +45,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.JobCounter;
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
|
@ -452,6 +455,8 @@ public class TestRecovery {
|
||||||
public static class TestFileOutputCommitter extends
|
public static class TestFileOutputCommitter extends
|
||||||
org.apache.hadoop.mapred.FileOutputCommitter {
|
org.apache.hadoop.mapred.FileOutputCommitter {
|
||||||
|
|
||||||
|
private boolean abortJobCalled;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isRecoverySupported(
|
public boolean isRecoverySupported(
|
||||||
org.apache.hadoop.mapred.JobContext jobContext) {
|
org.apache.hadoop.mapred.JobContext jobContext) {
|
||||||
|
@ -462,6 +467,16 @@ public class TestRecovery {
|
||||||
}
|
}
|
||||||
return isRecoverySupported;
|
return isRecoverySupported;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abortJob(JobContext context, int runState) throws IOException {
|
||||||
|
super.abortJob(context, runState);
|
||||||
|
this.abortJobCalled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isAbortJobCalled() {
|
||||||
|
return this.abortJobCalled;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1009,6 +1024,73 @@ public class TestRecovery {
|
||||||
validateOutput();
|
validateOutput();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception {
|
||||||
|
int runCount = 0;
|
||||||
|
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
|
||||||
|
true, ++runCount);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
|
||||||
|
conf.setClass("mapred.output.committer.class",
|
||||||
|
TestFileOutputCommitter.class,
|
||||||
|
org.apache.hadoop.mapred.OutputCommitter.class);
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
|
Job job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
||||||
|
//stop the app before the job completes.
|
||||||
|
app.stop();
|
||||||
|
app.close();
|
||||||
|
|
||||||
|
//rerun
|
||||||
|
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
|
||||||
|
++runCount);
|
||||||
|
job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
||||||
|
TestFileOutputCommitter committer = (
|
||||||
|
TestFileOutputCommitter) app.getCommitter();
|
||||||
|
assertTrue("commiter.abortJob() has not been called",
|
||||||
|
committer.isAbortJobCalled());
|
||||||
|
app.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreviousJobIsNotCleanedWhenRecovery()
|
||||||
|
throws Exception {
|
||||||
|
int runCount = 0;
|
||||||
|
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
|
||||||
|
true, ++runCount);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
|
conf.setClass("mapred.output.committer.class",
|
||||||
|
TestFileOutputCommitter.class,
|
||||||
|
org.apache.hadoop.mapred.OutputCommitter.class);
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
// TestFileOutputCommitter supports recovery if want.am.recovery=true
|
||||||
|
conf.setBoolean("want.am.recovery", true);
|
||||||
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
|
Job job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
||||||
|
//stop the app before the job completes.
|
||||||
|
app.stop();
|
||||||
|
app.close();
|
||||||
|
|
||||||
|
//rerun
|
||||||
|
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
|
||||||
|
++runCount);
|
||||||
|
job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
||||||
|
TestFileOutputCommitter committer = (
|
||||||
|
TestFileOutputCommitter) app.getCommitter();
|
||||||
|
assertFalse("commiter.abortJob() has been called",
|
||||||
|
committer.isAbortJobCalled());
|
||||||
|
app.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOutputRecoveryMapsOnly() throws Exception {
|
public void testOutputRecoveryMapsOnly() throws Exception {
|
||||||
int runCount = 0;
|
int runCount = 0;
|
||||||
|
|
Loading…
Reference in New Issue