MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task output is recovered and thus reduce the unnecessarily bloated recovery time. Contributed by Robert Joseph Evans.
svn merge --ignore-ancestry -c 1240413 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1240414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2c20d0a547
commit
4affe6d979
|
@ -628,6 +628,10 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3727. jobtoken location property in jobconf refers to wrong
|
MAPREDUCE-3727. jobtoken location property in jobconf refers to wrong
|
||||||
jobtoken file (tucu)
|
jobtoken file (tucu)
|
||||||
|
|
||||||
|
MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task
|
||||||
|
output is recovered and thus reduce the unnecessarily bloated recovery
|
||||||
|
time. (Robert Joseph Evans via vinodkv)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -559,6 +559,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void internalError(TaskEventType type) {
|
private void internalError(TaskEventType type) {
|
||||||
|
LOG.error("Invalid event " + type + " on Task " + this.taskId);
|
||||||
eventHandler.handle(new JobDiagnosticsUpdateEvent(
|
eventHandler.handle(new JobDiagnosticsUpdateEvent(
|
||||||
this.taskId.getJobId(), "Invalid event " + type +
|
this.taskId.getJobId(), "Invalid event " + type +
|
||||||
" on Task " + this.taskId));
|
" on Task " + this.taskId));
|
||||||
|
|
|
@ -103,6 +103,7 @@ public class LocalContainerAllocator extends RMCommunicator
|
||||||
// This can happen when the connection to the RM has gone down. Keep
|
// This can happen when the connection to the RM has gone down. Keep
|
||||||
// re-trying until the retryInterval has expired.
|
// re-trying until the retryInterval has expired.
|
||||||
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
|
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
|
||||||
|
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
|
||||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||||
JobEventType.INTERNAL_ERROR));
|
JobEventType.INTERNAL_ERROR));
|
||||||
throw new YarnException("Could not contact RM after " +
|
throw new YarnException("Could not contact RM after " +
|
||||||
|
|
|
@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||||
|
@ -358,16 +360,24 @@ public class RecoveryService extends CompositeService implements Recovery {
|
||||||
//recover the task output
|
//recover the task output
|
||||||
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
|
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
|
||||||
attInfo.getAttemptId());
|
attInfo.getAttemptId());
|
||||||
try {
|
try {
|
||||||
committer.recoverTask(taskContext);
|
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
|
||||||
|
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
|
||||||
|
if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
|
||||||
|
committer.recoverTask(taskContext);
|
||||||
|
LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
|
||||||
|
} else {
|
||||||
|
LOG.info("Will not try to recover output for "
|
||||||
|
+ taskContext.getTaskAttemptID());
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
LOG.error("Caught an exception while trying to recover task "+aId, e);
|
||||||
actualHandler.handle(new JobDiagnosticsUpdateEvent(
|
actualHandler.handle(new JobDiagnosticsUpdateEvent(
|
||||||
aId.getTaskId().getJobId(), "Error in recovering task output " +
|
aId.getTaskId().getJobId(), "Error in recovering task output " +
|
||||||
e.getMessage()));
|
e.getMessage()));
|
||||||
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
|
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
|
||||||
JobEventType.INTERNAL_ERROR));
|
JobEventType.INTERNAL_ERROR));
|
||||||
}
|
}
|
||||||
LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
|
|
||||||
|
|
||||||
// send the done event
|
// send the done event
|
||||||
LOG.info("Sending done event to " + aId);
|
LOG.info("Sending done event to " + aId);
|
||||||
|
|
|
@ -543,6 +543,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
// This can happen when the connection to the RM has gone down. Keep
|
// This can happen when the connection to the RM has gone down. Keep
|
||||||
// re-trying until the retryInterval has expired.
|
// re-trying until the retryInterval has expired.
|
||||||
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
|
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
|
||||||
|
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
|
||||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||||
JobEventType.INTERNAL_ERROR));
|
JobEventType.INTERNAL_ERROR));
|
||||||
throw new YarnException("Could not contact RM after " +
|
throw new YarnException("Could not contact RM after " +
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public class TestRecovery {
|
public class TestRecovery {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
|
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
|
||||||
|
@ -112,7 +113,7 @@ public class TestRecovery {
|
||||||
Assert.assertEquals("Reduce Task state not correct",
|
Assert.assertEquals("Reduce Task state not correct",
|
||||||
TaskState.RUNNING, reduceTask.getReport().getTaskState());
|
TaskState.RUNNING, reduceTask.getReport().getTaskState());
|
||||||
|
|
||||||
//send the fail signal to the 1st map task attempt
|
//send the fail signal to the 1st map task attempt
|
||||||
app.getContext().getEventHandler().handle(
|
app.getContext().getEventHandler().handle(
|
||||||
new TaskAttemptEvent(
|
new TaskAttemptEvent(
|
||||||
task1Attempt1.getID(),
|
task1Attempt1.getID(),
|
||||||
|
@ -193,7 +194,7 @@ public class TestRecovery {
|
||||||
//RUNNING state
|
//RUNNING state
|
||||||
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
//send the done signal to the 2nd map task
|
//send the done signal to the 2nd map task
|
||||||
app.getContext().getEventHandler().handle(
|
app.getContext().getEventHandler().handle(
|
||||||
new TaskAttemptEvent(
|
new TaskAttemptEvent(
|
||||||
mapTask2.getAttempts().values().iterator().next().getID(),
|
mapTask2.getAttempts().values().iterator().next().getID(),
|
||||||
|
@ -349,6 +350,151 @@ public class TestRecovery {
|
||||||
validateOutput();
|
validateOutput();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOutputRecoveryMapsOnly() throws Exception {
|
||||||
|
int runCount = 0;
|
||||||
|
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
|
||||||
|
true, ++runCount);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean("mapred.mapper.new-api", true);
|
||||||
|
conf.setBoolean("mapred.reducer.new-api", true);
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
|
Job job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
Assert.assertEquals("No of tasks not correct",
|
||||||
|
3, job.getTasks().size());
|
||||||
|
Iterator<Task> it = job.getTasks().values().iterator();
|
||||||
|
Task mapTask1 = it.next();
|
||||||
|
Task mapTask2 = it.next();
|
||||||
|
Task reduceTask1 = it.next();
|
||||||
|
|
||||||
|
// all maps must be running
|
||||||
|
app.waitForState(mapTask1, TaskState.RUNNING);
|
||||||
|
|
||||||
|
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
|
||||||
|
.next();
|
||||||
|
|
||||||
|
//before sending the TA_DONE, event make sure attempt has come to
|
||||||
|
//RUNNING state
|
||||||
|
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
// write output corresponding to map1 (This is just to validate that it is
|
||||||
|
//no included in the output)
|
||||||
|
writeBadOutput(task1Attempt1, conf);
|
||||||
|
|
||||||
|
//send the done signal to the map
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(
|
||||||
|
task1Attempt1.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
//wait for map task to complete
|
||||||
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
// Verify the shuffle-port
|
||||||
|
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
|
||||||
|
|
||||||
|
//stop the app before the job completes.
|
||||||
|
app.stop();
|
||||||
|
|
||||||
|
//rerun
|
||||||
|
//in rerun the map will be recovered from previous run
|
||||||
|
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
||||||
|
++runCount);
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
|
conf.setBoolean("mapred.mapper.new-api", true);
|
||||||
|
conf.setBoolean("mapred.reducer.new-api", true);
|
||||||
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
Assert.assertEquals("No of tasks not correct",
|
||||||
|
3, job.getTasks().size());
|
||||||
|
it = job.getTasks().values().iterator();
|
||||||
|
mapTask1 = it.next();
|
||||||
|
mapTask2 = it.next();
|
||||||
|
reduceTask1 = it.next();
|
||||||
|
|
||||||
|
// map will be recovered, no need to send done
|
||||||
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
// Verify the shuffle-port after recovery
|
||||||
|
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
|
||||||
|
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
|
||||||
|
|
||||||
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
|
TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
|
||||||
|
.next();
|
||||||
|
|
||||||
|
//before sending the TA_DONE, event make sure attempt has come to
|
||||||
|
//RUNNING state
|
||||||
|
app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
//send the done signal to the map
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(
|
||||||
|
task2Attempt1.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
//wait for map task to complete
|
||||||
|
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
// Verify the shuffle-port
|
||||||
|
Assert.assertEquals(5467, task2Attempt1.getShufflePort());
|
||||||
|
|
||||||
|
app.waitForState(reduceTask1, TaskState.RUNNING);
|
||||||
|
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
|
||||||
|
|
||||||
|
// write output corresponding to reduce1
|
||||||
|
writeOutput(reduce1Attempt1, conf);
|
||||||
|
|
||||||
|
//send the done signal to the 1st reduce
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(
|
||||||
|
reduce1Attempt1.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
//wait for first reduce task to complete
|
||||||
|
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
app.verifyCompleted();
|
||||||
|
validateOutput();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
|
||||||
|
throws Exception {
|
||||||
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
|
||||||
|
TypeConverter.fromYarn(attempt.getID()));
|
||||||
|
|
||||||
|
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
|
||||||
|
RecordWriter theRecordWriter = theOutputFormat
|
||||||
|
.getRecordWriter(tContext);
|
||||||
|
|
||||||
|
NullWritable nullWritable = NullWritable.get();
|
||||||
|
try {
|
||||||
|
theRecordWriter.write(key2, val2);
|
||||||
|
theRecordWriter.write(null, nullWritable);
|
||||||
|
theRecordWriter.write(null, val2);
|
||||||
|
theRecordWriter.write(nullWritable, val1);
|
||||||
|
theRecordWriter.write(key1, nullWritable);
|
||||||
|
theRecordWriter.write(key2, null);
|
||||||
|
theRecordWriter.write(null, null);
|
||||||
|
theRecordWriter.write(key1, val1);
|
||||||
|
} finally {
|
||||||
|
theRecordWriter.close(tContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
OutputFormat outputFormat = ReflectionUtils.newInstance(
|
||||||
|
tContext.getOutputFormatClass(), conf);
|
||||||
|
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
|
||||||
|
committer.commitTask(tContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void writeOutput(TaskAttempt attempt, Configuration conf)
|
private void writeOutput(TaskAttempt attempt, Configuration conf)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
|
||||||
|
|
|
@ -19,14 +19,12 @@
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
/** An {@link OutputCommitter} that commits files specified
|
/** An {@link OutputCommitter} that commits files specified
|
||||||
|
@ -42,280 +40,140 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
/**
|
/**
|
||||||
* Temporary directory name
|
* Temporary directory name
|
||||||
*/
|
*/
|
||||||
public static final String TEMP_DIR_NAME = "_temporary";
|
public static final String TEMP_DIR_NAME =
|
||||||
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
|
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME;
|
||||||
static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
|
public static final String SUCCEEDED_FILE_NAME =
|
||||||
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
|
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCEEDED_FILE_NAME;
|
||||||
|
static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
|
||||||
|
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
|
||||||
|
|
||||||
|
private static Path getOutputPath(JobContext context) {
|
||||||
|
JobConf conf = context.getJobConf();
|
||||||
|
return FileOutputFormat.getOutputPath(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Path getOutputPath(TaskAttemptContext context) {
|
||||||
|
JobConf conf = context.getJobConf();
|
||||||
|
return FileOutputFormat.getOutputPath(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped = null;
|
||||||
|
|
||||||
|
private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||||
|
getWrapped(JobContext context) throws IOException {
|
||||||
|
if(wrapped == null) {
|
||||||
|
wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
|
||||||
|
getOutputPath(context), context);
|
||||||
|
}
|
||||||
|
return wrapped;
|
||||||
|
}
|
||||||
|
|
||||||
|
private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||||
|
getWrapped(TaskAttemptContext context) throws IOException {
|
||||||
|
if(wrapped == null) {
|
||||||
|
wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
|
||||||
|
getOutputPath(context), context);
|
||||||
|
}
|
||||||
|
return wrapped;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a given job attempt will be placed.
|
||||||
|
* @param context the context of the job. This is used to get the
|
||||||
|
* application attempt id.
|
||||||
|
* @return the path to store job attempt data.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
Path getJobAttemptPath(JobContext context) {
|
||||||
|
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||||
|
.getJobAttemptPath(context, getOutputPath(context));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
|
||||||
|
return getTaskAttemptPath(context, getOutputPath(context));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
|
||||||
|
Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
|
||||||
|
if(workPath == null) {
|
||||||
|
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||||
|
.getTaskAttemptPath(context, out);
|
||||||
|
}
|
||||||
|
return workPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a committed task is stored until
|
||||||
|
* the entire job is committed.
|
||||||
|
* @param context the context of the task attempt
|
||||||
|
* @return the path where the output of a committed task is stored until
|
||||||
|
* the entire job is committed.
|
||||||
|
*/
|
||||||
|
Path getCommittedTaskPath(TaskAttemptContext context) {
|
||||||
|
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||||
|
.getCommittedTaskPath(context, getOutputPath(context));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getWorkPath(TaskAttemptContext context, Path outputPath)
|
||||||
|
throws IOException {
|
||||||
|
return getTaskAttemptPath(context, outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setupJob(JobContext context) throws IOException {
|
public void setupJob(JobContext context) throws IOException {
|
||||||
JobConf conf = context.getJobConf();
|
getWrapped(context).setupJob(context);
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
|
||||||
if (outputPath != null) {
|
|
||||||
Path tmpDir =
|
|
||||||
new Path(outputPath, getJobAttemptBaseDirName(context) +
|
|
||||||
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
|
|
||||||
FileSystem fileSys = tmpDir.getFileSystem(conf);
|
|
||||||
if (!fileSys.mkdirs(tmpDir)) {
|
|
||||||
LOG.error("Mkdirs failed to create " + tmpDir.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// True if the job requires output.dir marked on successful job.
|
|
||||||
// Note that by default it is set to true.
|
|
||||||
private boolean shouldMarkOutputDir(JobConf conf) {
|
|
||||||
return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void commitJob(JobContext context) throws IOException {
|
public void commitJob(JobContext context) throws IOException {
|
||||||
//delete the task temp directory from the current jobtempdir
|
getWrapped(context).commitJob(context);
|
||||||
JobConf conf = context.getJobConf();
|
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
|
||||||
if (outputPath != null) {
|
|
||||||
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
|
|
||||||
Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
|
|
||||||
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
|
|
||||||
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
|
|
||||||
if (fileSys.exists(tmpDir)) {
|
|
||||||
fileSys.delete(tmpDir, true);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Task temp dir could not be deleted " + tmpDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
//move the job output to final place
|
|
||||||
Path jobOutputPath =
|
|
||||||
new Path(outputPath, getJobAttemptBaseDirName(context));
|
|
||||||
moveJobOutputs(outputFileSystem,
|
|
||||||
jobOutputPath, outputPath, jobOutputPath);
|
|
||||||
|
|
||||||
// delete the _temporary folder in the output folder
|
|
||||||
cleanupJob(context);
|
|
||||||
// check if the output-dir marking is required
|
|
||||||
if (shouldMarkOutputDir(context.getJobConf())) {
|
|
||||||
// create a _success file in the output folder
|
|
||||||
markOutputDirSuccessful(context);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a _success file in the job's output folder
|
|
||||||
private void markOutputDirSuccessful(JobContext context) throws IOException {
|
|
||||||
JobConf conf = context.getJobConf();
|
|
||||||
// get the o/p path
|
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
|
||||||
if (outputPath != null) {
|
|
||||||
// get the filesys
|
|
||||||
FileSystem fileSys = outputPath.getFileSystem(conf);
|
|
||||||
// create a file in the output folder to mark the job completion
|
|
||||||
Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
|
|
||||||
fileSys.create(filePath).close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
|
|
||||||
Path finalOutputDir, Path jobOutput) throws IOException {
|
|
||||||
LOG.debug("Told to move job output from " + jobOutput
|
|
||||||
+ " to " + finalOutputDir +
|
|
||||||
" and orig job output path is " + origJobOutputPath);
|
|
||||||
if (fs.isFile(jobOutput)) {
|
|
||||||
Path finalOutputPath =
|
|
||||||
getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
|
|
||||||
if (!fs.rename(jobOutput, finalOutputPath)) {
|
|
||||||
if (!fs.delete(finalOutputPath, true)) {
|
|
||||||
throw new IOException("Failed to delete earlier output of job");
|
|
||||||
}
|
|
||||||
if (!fs.rename(jobOutput, finalOutputPath)) {
|
|
||||||
throw new IOException("Failed to save output of job");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.debug("Moved job output file from " + jobOutput + " to " +
|
|
||||||
finalOutputPath);
|
|
||||||
} else if (fs.getFileStatus(jobOutput).isDirectory()) {
|
|
||||||
LOG.debug("Job output file " + jobOutput + " is a dir");
|
|
||||||
FileStatus[] paths = fs.listStatus(jobOutput);
|
|
||||||
Path finalOutputPath =
|
|
||||||
getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
|
|
||||||
fs.mkdirs(finalOutputPath);
|
|
||||||
LOG.debug("Creating dirs along job output path " + finalOutputPath);
|
|
||||||
if (paths != null) {
|
|
||||||
for (FileStatus path : paths) {
|
|
||||||
moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void cleanupJob(JobContext context) throws IOException {
|
public void cleanupJob(JobContext context) throws IOException {
|
||||||
JobConf conf = context.getJobConf();
|
getWrapped(context).cleanupJob(context);
|
||||||
// do the clean up of temporary directory
|
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
|
||||||
if (outputPath != null) {
|
|
||||||
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
|
|
||||||
FileSystem fileSys = tmpDir.getFileSystem(conf);
|
|
||||||
context.getProgressible().progress();
|
|
||||||
if (fileSys.exists(tmpDir)) {
|
|
||||||
fileSys.delete(tmpDir, true);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Output Path is Null in cleanup");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void abortJob(JobContext context, int runState)
|
public void abortJob(JobContext context, int runState)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// simply delete the _temporary dir from the o/p folder of the job
|
JobStatus.State state;
|
||||||
cleanupJob(context);
|
if(runState == JobStatus.State.RUNNING.getValue()) {
|
||||||
|
state = JobStatus.State.RUNNING;
|
||||||
|
} else if(runState == JobStatus.State.SUCCEEDED.getValue()) {
|
||||||
|
state = JobStatus.State.SUCCEEDED;
|
||||||
|
} else if(runState == JobStatus.State.FAILED.getValue()) {
|
||||||
|
state = JobStatus.State.FAILED;
|
||||||
|
} else if(runState == JobStatus.State.PREP.getValue()) {
|
||||||
|
state = JobStatus.State.PREP;
|
||||||
|
} else if(runState == JobStatus.State.KILLED.getValue()) {
|
||||||
|
state = JobStatus.State.KILLED;
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException(runState+" is not a valid runState.");
|
||||||
|
}
|
||||||
|
getWrapped(context).abortJob(context, state);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setupTask(TaskAttemptContext context) throws IOException {
|
public void setupTask(TaskAttemptContext context) throws IOException {
|
||||||
// FileOutputCommitter's setupTask doesn't do anything. Because the
|
getWrapped(context).setupTask(context);
|
||||||
// temporary task directory is created on demand when the
|
|
||||||
// task is writing.
|
|
||||||
}
|
|
||||||
|
|
||||||
public void commitTask(TaskAttemptContext context)
|
|
||||||
throws IOException {
|
|
||||||
Path taskOutputPath = getTempTaskOutputPath(context);
|
|
||||||
TaskAttemptID attemptId = context.getTaskAttemptID();
|
|
||||||
JobConf job = context.getJobConf();
|
|
||||||
if (taskOutputPath != null) {
|
|
||||||
FileSystem fs = taskOutputPath.getFileSystem(job);
|
|
||||||
context.getProgressible().progress();
|
|
||||||
if (fs.exists(taskOutputPath)) {
|
|
||||||
// Move the task outputs to the current job attempt output dir
|
|
||||||
JobConf conf = context.getJobConf();
|
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
|
||||||
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
|
|
||||||
Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
|
|
||||||
moveTaskOutputs(context, outputFileSystem, jobOutputPath,
|
|
||||||
taskOutputPath);
|
|
||||||
|
|
||||||
// Delete the temporary task-specific output directory
|
|
||||||
if (!fs.delete(taskOutputPath, true)) {
|
|
||||||
LOG.info("Failed to delete the temporary output" +
|
|
||||||
" directory of task: " + attemptId + " - " + taskOutputPath);
|
|
||||||
}
|
|
||||||
LOG.info("Saved output of task '" + attemptId + "' to " +
|
|
||||||
jobOutputPath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void moveTaskOutputs(TaskAttemptContext context,
|
|
||||||
FileSystem fs,
|
|
||||||
Path jobOutputDir,
|
|
||||||
Path taskOutput)
|
|
||||||
throws IOException {
|
|
||||||
TaskAttemptID attemptId = context.getTaskAttemptID();
|
|
||||||
context.getProgressible().progress();
|
|
||||||
LOG.debug("Told to move taskoutput from " + taskOutput
|
|
||||||
+ " to " + jobOutputDir);
|
|
||||||
if (fs.isFile(taskOutput)) {
|
|
||||||
Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
|
|
||||||
getTempTaskOutputPath(context));
|
|
||||||
if (!fs.rename(taskOutput, finalOutputPath)) {
|
|
||||||
if (!fs.delete(finalOutputPath, true)) {
|
|
||||||
throw new IOException("Failed to delete earlier output of task: " +
|
|
||||||
attemptId);
|
|
||||||
}
|
|
||||||
if (!fs.rename(taskOutput, finalOutputPath)) {
|
|
||||||
throw new IOException("Failed to save output of task: " +
|
|
||||||
attemptId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
|
|
||||||
} else if(fs.getFileStatus(taskOutput).isDirectory()) {
|
|
||||||
LOG.debug("Taskoutput " + taskOutput + " is a dir");
|
|
||||||
FileStatus[] paths = fs.listStatus(taskOutput);
|
|
||||||
Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
|
|
||||||
getTempTaskOutputPath(context));
|
|
||||||
fs.mkdirs(finalOutputPath);
|
|
||||||
LOG.debug("Creating dirs along path " + finalOutputPath);
|
|
||||||
if (paths != null) {
|
|
||||||
for (FileStatus path : paths) {
|
|
||||||
moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void abortTask(TaskAttemptContext context) throws IOException {
|
|
||||||
Path taskOutputPath = getTempTaskOutputPath(context);
|
|
||||||
if (taskOutputPath != null) {
|
|
||||||
FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
|
|
||||||
context.getProgressible().progress();
|
|
||||||
fs.delete(taskOutputPath, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput,
|
|
||||||
Path taskOutputPath) throws IOException {
|
|
||||||
URI taskOutputUri = taskOutput.makeQualified(fs).toUri();
|
|
||||||
URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri();
|
|
||||||
URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
|
|
||||||
if (taskOutputUri == relativePath) {
|
|
||||||
//taskOutputPath is not a parent of taskOutput
|
|
||||||
throw new IOException("Can not get the relative path: base = " +
|
|
||||||
taskOutputPathUri + " child = " + taskOutputUri);
|
|
||||||
}
|
|
||||||
if (relativePath.getPath().length() > 0) {
|
|
||||||
return new Path(jobOutputDir, relativePath.getPath());
|
|
||||||
} else {
|
|
||||||
return jobOutputDir;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean needsTaskCommit(TaskAttemptContext context)
|
|
||||||
throws IOException {
|
|
||||||
Path taskOutputPath = getTempTaskOutputPath(context);
|
|
||||||
if (taskOutputPath != null) {
|
|
||||||
context.getProgressible().progress();
|
|
||||||
// Get the file-system for the task output directory
|
|
||||||
FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
|
|
||||||
// since task output path is created on demand,
|
|
||||||
// if it exists, task needs a commit
|
|
||||||
if (fs.exists(taskOutputPath)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Path getTempTaskOutputPath(TaskAttemptContext taskContext)
|
|
||||||
throws IOException {
|
|
||||||
JobConf conf = taskContext.getJobConf();
|
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
|
||||||
if (outputPath != null) {
|
|
||||||
Path p = new Path(outputPath,
|
|
||||||
(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
|
||||||
"_" + taskContext.getTaskAttemptID().toString()));
|
|
||||||
FileSystem fs = p.getFileSystem(conf);
|
|
||||||
return p.makeQualified(fs);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Path getWorkPath(TaskAttemptContext taskContext, Path basePath)
|
@Override
|
||||||
|
public void commitTask(TaskAttemptContext context) throws IOException {
|
||||||
|
getWrapped(context).commitTask(context, getTaskAttemptPath(context));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abortTask(TaskAttemptContext context) throws IOException {
|
||||||
|
getWrapped(context).abortTask(context, getTaskAttemptPath(context));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean needsTaskCommit(TaskAttemptContext context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// ${mapred.out.dir}/_temporary
|
return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context));
|
||||||
Path jobTmpDir = new Path(basePath, FileOutputCommitter.TEMP_DIR_NAME);
|
|
||||||
FileSystem fs = jobTmpDir.getFileSystem(taskContext.getJobConf());
|
|
||||||
if (!fs.exists(jobTmpDir)) {
|
|
||||||
throw new IOException("The temporary job-output directory " +
|
|
||||||
jobTmpDir.toString() + " doesn't exist!");
|
|
||||||
}
|
|
||||||
// ${mapred.out.dir}/_temporary/_${taskid}
|
|
||||||
String taskid = taskContext.getTaskAttemptID().toString();
|
|
||||||
Path taskTmpDir = new Path(jobTmpDir, "_" + taskid);
|
|
||||||
if (!fs.mkdirs(taskTmpDir)) {
|
|
||||||
throw new IOException("Mkdirs failed to create "
|
|
||||||
+ taskTmpDir.toString());
|
|
||||||
}
|
|
||||||
return taskTmpDir;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -326,54 +184,6 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
@Override
|
@Override
|
||||||
public void recoverTask(TaskAttemptContext context)
|
public void recoverTask(TaskAttemptContext context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf());
|
getWrapped(context).recoverTask(context);
|
||||||
context.progress();
|
|
||||||
Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
|
|
||||||
int previousAttempt =
|
|
||||||
context.getConfiguration().getInt(
|
|
||||||
MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1;
|
|
||||||
if (previousAttempt < 0) {
|
|
||||||
LOG.warn("Cannot recover task output for first attempt...");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
FileSystem outputFileSystem =
|
|
||||||
outputPath.getFileSystem(context.getJobConf());
|
|
||||||
Path pathToRecover =
|
|
||||||
new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
|
|
||||||
if (outputFileSystem.exists(pathToRecover)) {
|
|
||||||
// Move the task outputs to their final place
|
|
||||||
LOG.debug("Trying to recover task from " + pathToRecover
|
|
||||||
+ " into " + jobOutputPath);
|
|
||||||
moveJobOutputs(outputFileSystem,
|
|
||||||
pathToRecover, jobOutputPath, pathToRecover);
|
|
||||||
LOG.info("Saved output of job to " + jobOutputPath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static String getJobAttemptBaseDirName(JobContext context) {
|
|
||||||
int appAttemptId =
|
|
||||||
context.getJobConf().getInt(
|
|
||||||
MRConstants.APPLICATION_ATTEMPT_ID, 0);
|
|
||||||
return getJobAttemptBaseDirName(appAttemptId);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static String getJobTempDirName(TaskAttemptContext context) {
|
|
||||||
int appAttemptId =
|
|
||||||
context.getJobConf().getInt(
|
|
||||||
MRConstants.APPLICATION_ATTEMPT_ID, 0);
|
|
||||||
return getJobAttemptBaseDirName(appAttemptId);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static String getJobAttemptBaseDirName(int appAttemptId) {
|
|
||||||
return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
|
||||||
+ appAttemptId;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static String getTaskAttemptBaseDirName(
|
|
||||||
TaskAttemptContext context) {
|
|
||||||
return getJobTempDirName(context) + Path.SEPARATOR +
|
|
||||||
FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
|
||||||
"_" + context.getTaskAttemptID().toString();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -525,7 +525,7 @@ abstract public class Task implements Writable, Configurable {
|
||||||
if (outputPath != null) {
|
if (outputPath != null) {
|
||||||
if ((committer instanceof FileOutputCommitter)) {
|
if ((committer instanceof FileOutputCommitter)) {
|
||||||
FileOutputFormat.setWorkOutputPath(conf,
|
FileOutputFormat.setWorkOutputPath(conf,
|
||||||
((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext));
|
((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
|
||||||
} else {
|
} else {
|
||||||
FileOutputFormat.setWorkOutputPath(conf, outputPath);
|
FileOutputFormat.setWorkOutputPath(conf, outputPath);
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,17 +51,21 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
* Discard the task commit.
|
* Discard the task commit.
|
||||||
* </li>
|
* </li>
|
||||||
* </ol>
|
* </ol>
|
||||||
|
* The methods in this class can be called from several different processes and
|
||||||
|
* from several different contexts. It is important to know which process and
|
||||||
|
* which context each is called from. Each method should be marked accordingly
|
||||||
|
* in its documentation.
|
||||||
*
|
*
|
||||||
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||||
* @see JobContext
|
* @see JobContext
|
||||||
* @see TaskAttemptContext
|
* @see TaskAttemptContext
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public abstract class OutputCommitter {
|
public abstract class OutputCommitter {
|
||||||
/**
|
/**
|
||||||
* For the framework to setup the job output during initialization
|
* For the framework to setup the job output during initialization. This is
|
||||||
|
* called from the application master process for the entire job.
|
||||||
*
|
*
|
||||||
* @param jobContext Context of the job whose output is being written.
|
* @param jobContext Context of the job whose output is being written.
|
||||||
* @throws IOException if temporary output could not be created
|
* @throws IOException if temporary output could not be created
|
||||||
|
@ -69,11 +73,12 @@ public abstract class OutputCommitter {
|
||||||
public abstract void setupJob(JobContext jobContext) throws IOException;
|
public abstract void setupJob(JobContext jobContext) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For cleaning up the job's output after job completion
|
* For cleaning up the job's output after job completion. This is called
|
||||||
|
* from the application master process for the entire job.
|
||||||
*
|
*
|
||||||
* @param jobContext Context of the job whose output is being written.
|
* @param jobContext Context of the job whose output is being written.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @deprecated Use {@link #commitJob(JobContext)} or
|
* @deprecated Use {@link #commitJob(JobContext)} and
|
||||||
* {@link #abortJob(JobContext, JobStatus.State)} instead.
|
* {@link #abortJob(JobContext, JobStatus.State)} instead.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
@ -81,7 +86,8 @@ public abstract class OutputCommitter {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For committing job's output after successful job completion. Note that this
|
* For committing job's output after successful job completion. Note that this
|
||||||
* is invoked for jobs with final runstate as SUCCESSFUL.
|
* is invoked for jobs with final runstate as SUCCESSFUL. This is called
|
||||||
|
* from the application master process for the entire job.
|
||||||
*
|
*
|
||||||
* @param jobContext Context of the job whose output is being written.
|
* @param jobContext Context of the job whose output is being written.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -94,7 +100,8 @@ public abstract class OutputCommitter {
|
||||||
/**
|
/**
|
||||||
* For aborting an unsuccessful job's output. Note that this is invoked for
|
* For aborting an unsuccessful job's output. Note that this is invoked for
|
||||||
* jobs with final runstate as {@link JobStatus.State#FAILED} or
|
* jobs with final runstate as {@link JobStatus.State#FAILED} or
|
||||||
* {@link JobStatus.State#KILLED}.
|
* {@link JobStatus.State#KILLED}. This is called from the application
|
||||||
|
* master process for the entire job.
|
||||||
*
|
*
|
||||||
* @param jobContext Context of the job whose output is being written.
|
* @param jobContext Context of the job whose output is being written.
|
||||||
* @param state final runstate of the job
|
* @param state final runstate of the job
|
||||||
|
@ -106,7 +113,8 @@ public abstract class OutputCommitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up output for the task.
|
* Sets up output for the task. This is called from each individual task's
|
||||||
|
* process that will output to HDFS, and it is called just for that task.
|
||||||
*
|
*
|
||||||
* @param taskContext Context of the task whose output is being written.
|
* @param taskContext Context of the task whose output is being written.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -115,7 +123,9 @@ public abstract class OutputCommitter {
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether task needs a commit
|
* Check whether task needs a commit. This is called from each individual
|
||||||
|
* task's process that will output to HDFS, and it is called just for that
|
||||||
|
* task.
|
||||||
*
|
*
|
||||||
* @param taskContext
|
* @param taskContext
|
||||||
* @return true/false
|
* @return true/false
|
||||||
|
@ -125,18 +135,23 @@ public abstract class OutputCommitter {
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* To promote the task's temporary output to final output location
|
* To promote the task's temporary output to final output location.
|
||||||
*
|
* If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
|
||||||
* The task's output is moved to the job's output directory.
|
* task is the task that the AM determines finished first, this method
|
||||||
|
* is called to commit an individual task's output. This is to mark
|
||||||
|
* that tasks output as complete, as {@link #commitJob(JobContext)} will
|
||||||
|
* also be called later on if the entire job finished successfully. This
|
||||||
|
* is called from a task's process.
|
||||||
*
|
*
|
||||||
* @param taskContext Context of the task whose output is being written.
|
* @param taskContext Context of the task whose output is being written.
|
||||||
* @throws IOException if commit is not
|
* @throws IOException if commit is not successful.
|
||||||
*/
|
*/
|
||||||
public abstract void commitTask(TaskAttemptContext taskContext)
|
public abstract void commitTask(TaskAttemptContext taskContext)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Discard the task output
|
* Discard the task output. This is called from a task's process to clean
|
||||||
|
* up a single task's output that can not yet been committed.
|
||||||
*
|
*
|
||||||
* @param taskContext
|
* @param taskContext
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -164,7 +179,8 @@ public abstract class OutputCommitter {
|
||||||
* The retry-count for the job will be passed via the
|
* The retry-count for the job will be passed via the
|
||||||
* {@link MRJobConfig#APPLICATION_ATTEMPT_ID} key in
|
* {@link MRJobConfig#APPLICATION_ATTEMPT_ID} key in
|
||||||
* {@link TaskAttemptContext#getConfiguration()} for the
|
* {@link TaskAttemptContext#getConfiguration()} for the
|
||||||
* <code>OutputCommitter</code>.
|
* <code>OutputCommitter</code>. This is called from the application master
|
||||||
|
* process, but it is called individually for each task.
|
||||||
*
|
*
|
||||||
* If an exception is thrown the task will be attempted again.
|
* If an exception is thrown the task will be attempted again.
|
||||||
*
|
*
|
||||||
|
|
|
@ -19,16 +19,16 @@
|
||||||
package org.apache.hadoop.mapreduce.lib.output;
|
package org.apache.hadoop.mapreduce.lib.output;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
@ -37,41 +37,239 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
|
||||||
/** An {@link OutputCommitter} that commits files specified
|
/** An {@link OutputCommitter} that commits files specified
|
||||||
* in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
|
* in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
|
||||||
**/
|
**/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class FileOutputCommitter extends OutputCommitter {
|
public class FileOutputCommitter extends OutputCommitter {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
|
private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Temporary directory name
|
* Name of directory where pending data is placed. Data that has not been
|
||||||
|
* committed yet.
|
||||||
*/
|
*/
|
||||||
protected static final String TEMP_DIR_NAME = "_temporary";
|
public static final String PENDING_DIR_NAME = "_temporary";
|
||||||
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
|
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
|
||||||
static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
|
public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
|
||||||
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
|
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
|
||||||
private FileSystem outputFileSystem = null;
|
|
||||||
private Path outputPath = null;
|
private Path outputPath = null;
|
||||||
private Path workPath = null;
|
private Path workPath = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a file output committer
|
* Create a file output committer
|
||||||
* @param outputPath the job's output path
|
* @param outputPath the job's output path, or null if you want the output
|
||||||
|
* committer to act as a noop.
|
||||||
* @param context the task's context
|
* @param context the task's context
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public FileOutputCommitter(Path outputPath,
|
public FileOutputCommitter(Path outputPath,
|
||||||
TaskAttemptContext context) throws IOException {
|
TaskAttemptContext context) throws IOException {
|
||||||
|
this(outputPath, (JobContext)context);
|
||||||
if (outputPath != null) {
|
if (outputPath != null) {
|
||||||
this.outputPath = outputPath;
|
workPath = getTaskAttemptPath(context, outputPath);
|
||||||
outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
|
|
||||||
workPath = new Path(outputPath,
|
|
||||||
getTaskAttemptBaseDirName(context))
|
|
||||||
.makeQualified(outputFileSystem);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a file output committer
|
||||||
|
* @param outputPath the job's output path, or null if you want the output
|
||||||
|
* committer to act as a noop.
|
||||||
|
* @param context the task's context
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public FileOutputCommitter(Path outputPath,
|
||||||
|
JobContext context) throws IOException {
|
||||||
|
if (outputPath != null) {
|
||||||
|
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
|
||||||
|
this.outputPath = fs.makeQualified(outputPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the path where final output of the job should be placed. This
|
||||||
|
* could also be considered the committed application attempt path.
|
||||||
|
*/
|
||||||
|
private Path getOutputPath() {
|
||||||
|
return this.outputPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if we have an output path set, else false.
|
||||||
|
*/
|
||||||
|
private boolean hasOutputPath() {
|
||||||
|
return this.outputPath != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the path where the output of pending job attempts are
|
||||||
|
* stored.
|
||||||
|
*/
|
||||||
|
private Path getPendingJobAttemptsPath() {
|
||||||
|
return getPendingJobAttemptsPath(getOutputPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the location of pending job attempts.
|
||||||
|
* @param out the base output directory.
|
||||||
|
* @return the location of pending job attempts.
|
||||||
|
*/
|
||||||
|
private static Path getPendingJobAttemptsPath(Path out) {
|
||||||
|
return new Path(out, PENDING_DIR_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the Application Attempt Id for this job
|
||||||
|
* @param context the context to look in
|
||||||
|
* @return the Application Attempt Id for a given job.
|
||||||
|
*/
|
||||||
|
private static int getAppAttemptId(JobContext context) {
|
||||||
|
return context.getConfiguration().getInt(
|
||||||
|
MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a given job attempt will be placed.
|
||||||
|
* @param context the context of the job. This is used to get the
|
||||||
|
* application attempt id.
|
||||||
|
* @return the path to store job attempt data.
|
||||||
|
*/
|
||||||
|
public Path getJobAttemptPath(JobContext context) {
|
||||||
|
return getJobAttemptPath(context, getOutputPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a given job attempt will be placed.
|
||||||
|
* @param context the context of the job. This is used to get the
|
||||||
|
* application attempt id.
|
||||||
|
* @param out the output path to place these in.
|
||||||
|
* @return the path to store job attempt data.
|
||||||
|
*/
|
||||||
|
public static Path getJobAttemptPath(JobContext context, Path out) {
|
||||||
|
return getJobAttemptPath(getAppAttemptId(context), out);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a given job attempt will be placed.
|
||||||
|
* @param appAttemptId the ID of the application attempt for this job.
|
||||||
|
* @return the path to store job attempt data.
|
||||||
|
*/
|
||||||
|
private Path getJobAttemptPath(int appAttemptId) {
|
||||||
|
return getJobAttemptPath(appAttemptId, getOutputPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a given job attempt will be placed.
|
||||||
|
* @param appAttemptId the ID of the application attempt for this job.
|
||||||
|
* @return the path to store job attempt data.
|
||||||
|
*/
|
||||||
|
private static Path getJobAttemptPath(int appAttemptId, Path out) {
|
||||||
|
return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of pending task attempts are stored.
|
||||||
|
* @param context the context of the job with pending tasks.
|
||||||
|
* @return the path where the output of pending task attempts are stored.
|
||||||
|
*/
|
||||||
|
private Path getPendingTaskAttemptsPath(JobContext context) {
|
||||||
|
return getPendingTaskAttemptsPath(context, getOutputPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of pending task attempts are stored.
|
||||||
|
* @param context the context of the job with pending tasks.
|
||||||
|
* @return the path where the output of pending task attempts are stored.
|
||||||
|
*/
|
||||||
|
private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
|
||||||
|
return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a task attempt is stored until
|
||||||
|
* that task is committed.
|
||||||
|
*
|
||||||
|
* @param context the context of the task attempt.
|
||||||
|
* @return the path where a task attempt should be stored.
|
||||||
|
*/
|
||||||
|
public Path getTaskAttemptPath(TaskAttemptContext context) {
|
||||||
|
return new Path(getPendingTaskAttemptsPath(context),
|
||||||
|
String.valueOf(context.getTaskAttemptID()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a task attempt is stored until
|
||||||
|
* that task is committed.
|
||||||
|
*
|
||||||
|
* @param context the context of the task attempt.
|
||||||
|
* @param out The output path to put things in.
|
||||||
|
* @return the path where a task attempt should be stored.
|
||||||
|
*/
|
||||||
|
public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
|
||||||
|
return new Path(getPendingTaskAttemptsPath(context, out),
|
||||||
|
String.valueOf(context.getTaskAttemptID()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a committed task is stored until
|
||||||
|
* the entire job is committed.
|
||||||
|
* @param context the context of the task attempt
|
||||||
|
* @return the path where the output of a committed task is stored until
|
||||||
|
* the entire job is committed.
|
||||||
|
*/
|
||||||
|
public Path getCommittedTaskPath(TaskAttemptContext context) {
|
||||||
|
return getCommittedTaskPath(getAppAttemptId(context), context);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
|
||||||
|
return getCommittedTaskPath(getAppAttemptId(context), context, out);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the path where the output of a committed task is stored until the
|
||||||
|
* entire job is committed for a specific application attempt.
|
||||||
|
* @param appAttemptId the id of the application attempt to use
|
||||||
|
* @param context the context of any task.
|
||||||
|
* @return the path where the output of a committed task is stored.
|
||||||
|
*/
|
||||||
|
private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
|
||||||
|
return new Path(getJobAttemptPath(appAttemptId),
|
||||||
|
String.valueOf(context.getTaskAttemptID().getTaskID()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
|
||||||
|
return new Path(getJobAttemptPath(appAttemptId, out),
|
||||||
|
String.valueOf(context.getTaskAttemptID().getTaskID()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class CommittedTaskFilter implements PathFilter {
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path path) {
|
||||||
|
return !PENDING_DIR_NAME.equals(path.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of all paths where output from committed tasks are stored.
|
||||||
|
* @param context the context of the current job
|
||||||
|
* @return the list of these Paths/FileStatuses.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private FileStatus[] getAllCommittedTaskPaths(JobContext context)
|
||||||
|
throws IOException {
|
||||||
|
Path jobAttemptPath = getJobAttemptPath(context);
|
||||||
|
FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
|
||||||
|
return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the directory that the task should write results into.
|
||||||
|
* @return the work directory
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Path getWorkPath() throws IOException {
|
||||||
|
return workPath;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the temporary directory that is the root of all of the task
|
* Create the temporary directory that is the root of all of the task
|
||||||
|
@ -79,116 +277,103 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
* @param context the job's context
|
* @param context the job's context
|
||||||
*/
|
*/
|
||||||
public void setupJob(JobContext context) throws IOException {
|
public void setupJob(JobContext context) throws IOException {
|
||||||
if (outputPath != null) {
|
if (hasOutputPath()) {
|
||||||
Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
|
Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
|
||||||
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
|
FileSystem fs = pendingJobAttemptsPath.getFileSystem(
|
||||||
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
|
context.getConfiguration());
|
||||||
if (!fileSys.mkdirs(tmpDir)) {
|
if (!fs.mkdirs(pendingJobAttemptsPath)) {
|
||||||
LOG.error("Mkdirs failed to create " + tmpDir.toString());
|
LOG.error("Mkdirs failed to create " + pendingJobAttemptsPath);
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
}
|
LOG.warn("Output Path is null in setupJob()");
|
||||||
|
|
||||||
// True if the job requires output.dir marked on successful job.
|
|
||||||
// Note that by default it is set to true.
|
|
||||||
private boolean shouldMarkOutputDir(Configuration conf) {
|
|
||||||
return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a _success file in the job's output dir
|
|
||||||
private void markOutputDirSuccessful(MRJobConfig context) throws IOException {
|
|
||||||
if (outputPath != null) {
|
|
||||||
// create a file in the output folder to mark the job completion
|
|
||||||
Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
|
|
||||||
outputFileSystem.create(filePath).close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move all job output to the final place.
|
* The job has completed so move all committed tasks to the final output dir.
|
||||||
* Delete the temporary directory, including all of the work directories.
|
* Delete the temporary directory, including all of the work directories.
|
||||||
* Create a _SUCCESS file to make it as successful.
|
* Create a _SUCCESS file to make it as successful.
|
||||||
* @param context the job's context
|
* @param context the job's context
|
||||||
*/
|
*/
|
||||||
public void commitJob(JobContext context) throws IOException {
|
public void commitJob(JobContext context) throws IOException {
|
||||||
if (outputPath != null) {
|
if (hasOutputPath()) {
|
||||||
//delete the task temp directory from the current jobtempdir
|
Path finalOutput = getOutputPath();
|
||||||
Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
|
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
|
||||||
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
|
for(FileStatus stat: getAllCommittedTaskPaths(context)) {
|
||||||
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
|
mergePaths(fs, stat, finalOutput);
|
||||||
if (fileSys.exists(tmpDir)) {
|
|
||||||
fileSys.delete(tmpDir, true);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Task temp dir could not be deleted " + tmpDir);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//move the job output to final place
|
|
||||||
Path jobOutputPath =
|
|
||||||
new Path(outputPath, getJobAttemptBaseDirName(context));
|
|
||||||
moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
|
|
||||||
|
|
||||||
// delete the _temporary folder and create a _done file in the o/p folder
|
// delete the _temporary folder and create a _done file in the o/p folder
|
||||||
cleanupJob(context);
|
cleanupJob(context);
|
||||||
if (shouldMarkOutputDir(context.getConfiguration())) {
|
// True if the job requires output.dir marked on successful job.
|
||||||
markOutputDirSuccessful(context);
|
// Note that by default it is set to true.
|
||||||
|
if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
|
||||||
|
Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
|
||||||
|
fs.create(markerPath).close();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("Output Path is null in commitJob()");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move job output to final location
|
* Merge two paths together. Anything in from will be moved into to, if there
|
||||||
* @param fs Filesystem handle
|
* are any name conflicts while merging the files or directories in from win.
|
||||||
* @param origJobOutputPath The original location of the job output
|
* @param fs the File System to use
|
||||||
* Required to generate the relative path for correct moving of data.
|
* @param from the path data is coming from.
|
||||||
* @param finalOutputDir The final output directory to which the job output
|
* @param to the path data is going to.
|
||||||
* needs to be moved
|
* @throws IOException on any error
|
||||||
* @param jobOutput The current job output directory being moved
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
|
private static void mergePaths(FileSystem fs, final FileStatus from,
|
||||||
Path finalOutputDir, Path jobOutput) throws IOException {
|
final Path to)
|
||||||
LOG.debug("Told to move job output from " + jobOutput
|
throws IOException {
|
||||||
+ " to " + finalOutputDir +
|
LOG.debug("Merging data from "+from+" to "+to);
|
||||||
" and orig job output path is " + origJobOutputPath);
|
if(from.isFile()) {
|
||||||
if (fs.isFile(jobOutput)) {
|
if(fs.exists(to)) {
|
||||||
Path finalOutputPath =
|
if(!fs.delete(to, true)) {
|
||||||
getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
|
throw new IOException("Failed to delete "+to);
|
||||||
if (!fs.rename(jobOutput, finalOutputPath)) {
|
}
|
||||||
if (!fs.delete(finalOutputPath, true)) {
|
}
|
||||||
throw new IOException("Failed to delete earlier output of job");
|
|
||||||
}
|
if(!fs.rename(from.getPath(), to)) {
|
||||||
if (!fs.rename(jobOutput, finalOutputPath)) {
|
throw new IOException("Failed to rename "+from+" to "+to);
|
||||||
throw new IOException("Failed to save output of job");
|
}
|
||||||
}
|
} else if(from.isDirectory()) {
|
||||||
}
|
if(fs.exists(to)) {
|
||||||
LOG.debug("Moved job output file from " + jobOutput + " to " +
|
FileStatus toStat = fs.getFileStatus(to);
|
||||||
finalOutputPath);
|
if(!toStat.isDirectory()) {
|
||||||
} else if (fs.getFileStatus(jobOutput).isDirectory()) {
|
if(!fs.delete(to, true)) {
|
||||||
LOG.debug("Job output file " + jobOutput + " is a dir");
|
throw new IOException("Failed to delete "+to);
|
||||||
FileStatus[] paths = fs.listStatus(jobOutput);
|
}
|
||||||
Path finalOutputPath =
|
if(!fs.rename(from.getPath(), to)) {
|
||||||
getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
|
throw new IOException("Failed to rename "+from+" to "+to);
|
||||||
fs.mkdirs(finalOutputPath);
|
}
|
||||||
LOG.debug("Creating dirs along job output path " + finalOutputPath);
|
} else {
|
||||||
if (paths != null) {
|
//It is a directory so merge everything in the directories
|
||||||
for (FileStatus path : paths) {
|
for(FileStatus subFrom: fs.listStatus(from.getPath())) {
|
||||||
moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
|
Path subTo = new Path(to, subFrom.getPath().getName());
|
||||||
}
|
mergePaths(fs, subFrom, subTo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
//it does not exist just rename
|
||||||
|
if(!fs.rename(from.getPath(), to)) {
|
||||||
|
throw new IOException("Failed to rename "+from+" to "+to);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void cleanupJob(JobContext context) throws IOException {
|
public void cleanupJob(JobContext context) throws IOException {
|
||||||
if (outputPath != null) {
|
if (hasOutputPath()) {
|
||||||
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
|
Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
|
||||||
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
|
FileSystem fs = pendingJobAttemptsPath
|
||||||
if (fileSys.exists(tmpDir)) {
|
.getFileSystem(context.getConfiguration());
|
||||||
fileSys.delete(tmpDir, true);
|
fs.delete(pendingJobAttemptsPath, true);
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Output Path is null in cleanup");
|
LOG.warn("Output Path is null in cleanupJob()");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,69 +402,40 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
* Move the files from the work directory to the job output directory
|
* Move the files from the work directory to the job output directory
|
||||||
* @param context the task context
|
* @param context the task context
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void commitTask(TaskAttemptContext context)
|
public void commitTask(TaskAttemptContext context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
TaskAttemptID attemptId = context.getTaskAttemptID();
|
commitTask(context, null);
|
||||||
if (workPath != null) {
|
|
||||||
context.progress();
|
|
||||||
if (outputFileSystem.exists(workPath)) {
|
|
||||||
// Move the task outputs to the current job attempt output dir
|
|
||||||
Path jobOutputPath =
|
|
||||||
new Path(outputPath, getJobAttemptBaseDirName(context));
|
|
||||||
moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath);
|
|
||||||
// Delete the temporary task-specific output directory
|
|
||||||
if (!outputFileSystem.delete(workPath, true)) {
|
|
||||||
LOG.warn("Failed to delete the temporary output" +
|
|
||||||
" directory of task: " + attemptId + " - " + workPath);
|
|
||||||
}
|
|
||||||
LOG.info("Saved output of task '" + attemptId + "' to " +
|
|
||||||
jobOutputPath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Private
|
||||||
* Move all of the files from the work directory to the final output
|
public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
|
||||||
* @param context the task context
|
|
||||||
* @param fs the output file system
|
|
||||||
* @param jobOutputDir the final output direcotry
|
|
||||||
* @param taskOutput the work path
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void moveTaskOutputs(TaskAttemptContext context,
|
|
||||||
FileSystem fs,
|
|
||||||
Path jobOutputDir,
|
|
||||||
Path taskOutput)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
TaskAttemptID attemptId = context.getTaskAttemptID();
|
TaskAttemptID attemptId = context.getTaskAttemptID();
|
||||||
context.progress();
|
if (hasOutputPath()) {
|
||||||
LOG.debug("Told to move taskoutput from " + taskOutput
|
context.progress();
|
||||||
+ " to " + jobOutputDir);
|
if(taskAttemptPath == null) {
|
||||||
if (fs.isFile(taskOutput)) {
|
taskAttemptPath = getTaskAttemptPath(context);
|
||||||
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
|
|
||||||
workPath);
|
|
||||||
if (!fs.rename(taskOutput, finalOutputPath)) {
|
|
||||||
if (!fs.delete(finalOutputPath, true)) {
|
|
||||||
throw new IOException("Failed to delete earlier output of task: " +
|
|
||||||
attemptId);
|
|
||||||
}
|
|
||||||
if (!fs.rename(taskOutput, finalOutputPath)) {
|
|
||||||
throw new IOException("Failed to save output of task: " +
|
|
||||||
attemptId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
|
Path committedTaskPath = getCommittedTaskPath(context);
|
||||||
} else if(fs.getFileStatus(taskOutput).isDirectory()) {
|
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
|
||||||
LOG.debug("Taskoutput " + taskOutput + " is a dir");
|
if (fs.exists(taskAttemptPath)) {
|
||||||
FileStatus[] paths = fs.listStatus(taskOutput);
|
if(fs.exists(committedTaskPath)) {
|
||||||
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
|
if(!fs.delete(committedTaskPath, true)) {
|
||||||
fs.mkdirs(finalOutputPath);
|
throw new IOException("Could not delete " + committedTaskPath);
|
||||||
LOG.debug("Creating dirs along path " + finalOutputPath);
|
}
|
||||||
if (paths != null) {
|
|
||||||
for (FileStatus path : paths) {
|
|
||||||
moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
|
|
||||||
}
|
}
|
||||||
|
if(!fs.rename(taskAttemptPath, committedTaskPath)) {
|
||||||
|
throw new IOException("Could not rename " + taskAttemptPath + " to "
|
||||||
|
+ committedTaskPath);
|
||||||
|
}
|
||||||
|
LOG.info("Saved output of task '" + attemptId + "' to " +
|
||||||
|
committedTaskPath);
|
||||||
|
} else {
|
||||||
|
LOG.warn("No Output found for " + attemptId);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("Output Path is null in commitTask()");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,38 +445,22 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void abortTask(TaskAttemptContext context) throws IOException {
|
public void abortTask(TaskAttemptContext context) throws IOException {
|
||||||
if (workPath != null) {
|
abortTask(context, null);
|
||||||
context.progress();
|
|
||||||
outputFileSystem.delete(workPath, true);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Private
|
||||||
* Find the final name of a given output file, given the job output directory
|
public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
|
||||||
* and the work directory.
|
if (hasOutputPath()) {
|
||||||
* @param jobOutputDir the job's output directory
|
context.progress();
|
||||||
* @param taskOutput the specific task output file
|
if(taskAttemptPath == null) {
|
||||||
* @param taskOutputPath the job's work directory
|
taskAttemptPath = getTaskAttemptPath(context);
|
||||||
* @return the final path for the specific output file
|
}
|
||||||
* @throws IOException
|
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
|
||||||
*/
|
if(!fs.delete(taskAttemptPath, true)) {
|
||||||
private Path getFinalPath(Path jobOutputDir, Path taskOutput,
|
LOG.warn("Could not delete "+taskAttemptPath);
|
||||||
Path taskOutputPath) throws IOException {
|
}
|
||||||
URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(),
|
|
||||||
outputFileSystem.getWorkingDirectory()).toUri();
|
|
||||||
URI taskOutputPathUri =
|
|
||||||
taskOutputPath.makeQualified(
|
|
||||||
outputFileSystem.getUri(),
|
|
||||||
outputFileSystem.getWorkingDirectory()).toUri();
|
|
||||||
URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
|
|
||||||
if (taskOutputUri == relativePath) {
|
|
||||||
throw new IOException("Can not get the relative path: base = " +
|
|
||||||
taskOutputPathUri + " child = " + taskOutputUri);
|
|
||||||
}
|
|
||||||
if (relativePath.getPath().length() > 0) {
|
|
||||||
return new Path(jobOutputDir, relativePath.getPath());
|
|
||||||
} else {
|
} else {
|
||||||
return jobOutputDir;
|
LOG.warn("Output Path is null in abortTask()");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,16 +471,20 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
@Override
|
@Override
|
||||||
public boolean needsTaskCommit(TaskAttemptContext context
|
public boolean needsTaskCommit(TaskAttemptContext context
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
return workPath != null && outputFileSystem.exists(workPath);
|
return needsTaskCommit(context, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Private
|
||||||
* Get the directory that the task should write results into
|
public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
|
||||||
* @return the work directory
|
) throws IOException {
|
||||||
* @throws IOException
|
if(hasOutputPath()) {
|
||||||
*/
|
if(taskAttemptPath == null) {
|
||||||
public Path getWorkPath() throws IOException {
|
taskAttemptPath = getTaskAttemptPath(context);
|
||||||
return workPath;
|
}
|
||||||
|
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
|
||||||
|
return fs.exists(taskAttemptPath);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -352,43 +496,35 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
public void recoverTask(TaskAttemptContext context)
|
public void recoverTask(TaskAttemptContext context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
context.progress();
|
context.progress();
|
||||||
Path jobOutputPath =
|
TaskAttemptID attemptId = context.getTaskAttemptID();
|
||||||
new Path(outputPath, getJobAttemptBaseDirName(context));
|
int previousAttempt = getAppAttemptId(context) - 1;
|
||||||
int previousAttempt =
|
|
||||||
context.getConfiguration().getInt(
|
|
||||||
MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1;
|
|
||||||
if (previousAttempt < 0) {
|
if (previousAttempt < 0) {
|
||||||
throw new IOException ("Cannot recover task output for first attempt...");
|
throw new IOException ("Cannot recover task output for first attempt...");
|
||||||
}
|
}
|
||||||
|
|
||||||
Path pathToRecover =
|
Path committedTaskPath = getCommittedTaskPath(context);
|
||||||
new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
|
Path previousCommittedTaskPath = getCommittedTaskPath(
|
||||||
LOG.debug("Trying to recover task from " + pathToRecover
|
previousAttempt, context);
|
||||||
+ " into " + jobOutputPath);
|
FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
|
||||||
if (outputFileSystem.exists(pathToRecover)) {
|
|
||||||
// Move the task outputs to their final place
|
LOG.debug("Trying to recover task from " + previousCommittedTaskPath
|
||||||
moveJobOutputs(outputFileSystem,
|
+ " into " + committedTaskPath);
|
||||||
pathToRecover, jobOutputPath, pathToRecover);
|
if (fs.exists(previousCommittedTaskPath)) {
|
||||||
LOG.info("Saved output of job to " + jobOutputPath);
|
if(fs.exists(committedTaskPath)) {
|
||||||
|
if(!fs.delete(committedTaskPath, true)) {
|
||||||
|
throw new IOException("Could not delete "+committedTaskPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//Rename can fail if the parent directory does not yet exist.
|
||||||
|
Path committedParent = committedTaskPath.getParent();
|
||||||
|
fs.mkdirs(committedParent);
|
||||||
|
if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
|
||||||
|
throw new IOException("Could not rename " + previousCommittedTaskPath +
|
||||||
|
" to " + committedTaskPath);
|
||||||
|
}
|
||||||
|
LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
|
||||||
|
} else {
|
||||||
|
LOG.warn(attemptId+" had no output to recover.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static String getJobAttemptBaseDirName(JobContext context) {
|
|
||||||
int appAttemptId =
|
|
||||||
context.getConfiguration().getInt(
|
|
||||||
MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
|
|
||||||
return getJobAttemptBaseDirName(appAttemptId);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static String getJobAttemptBaseDirName(int appAttemptId) {
|
|
||||||
return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
|
||||||
+ appAttemptId;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static String getTaskAttemptBaseDirName(
|
|
||||||
TaskAttemptContext context) {
|
|
||||||
return getJobAttemptBaseDirName(context) + Path.SEPARATOR +
|
|
||||||
FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
|
||||||
"_" + context.getTaskAttemptID().toString();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,10 +105,9 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
|
|
||||||
// do commit
|
// do commit
|
||||||
committer.commitTask(tContext);
|
committer.commitTask(tContext);
|
||||||
Path jobTempDir1 = new Path(outDir,
|
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
|
||||||
FileOutputCommitter.getJobAttemptBaseDirName(
|
File jtd1 = new File(jobTempDir1.toUri().getPath());
|
||||||
conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
|
assertTrue(jtd1.exists());
|
||||||
assertTrue((new File(jobTempDir1.toString()).exists()));
|
|
||||||
validateContent(jobTempDir1);
|
validateContent(jobTempDir1);
|
||||||
|
|
||||||
//now while running the second app attempt,
|
//now while running the second app attempt,
|
||||||
|
@ -119,14 +118,12 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
|
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
|
||||||
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
|
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
|
||||||
FileOutputCommitter committer2 = new FileOutputCommitter();
|
FileOutputCommitter committer2 = new FileOutputCommitter();
|
||||||
committer.setupJob(jContext2);
|
committer2.setupJob(jContext2);
|
||||||
Path jobTempDir2 = new Path(outDir,
|
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
|
||||||
FileOutputCommitter.getJobAttemptBaseDirName(
|
|
||||||
conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
|
|
||||||
assertTrue((new File(jobTempDir2.toString()).exists()));
|
|
||||||
|
|
||||||
tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
|
|
||||||
committer2.recoverTask(tContext2);
|
committer2.recoverTask(tContext2);
|
||||||
|
File jtd2 = new File(jobTempDir2.toUri().getPath());
|
||||||
|
assertTrue(jtd2.exists());
|
||||||
validateContent(jobTempDir2);
|
validateContent(jobTempDir2);
|
||||||
|
|
||||||
committer2.commitJob(jContext2);
|
committer2.commitJob(jContext2);
|
||||||
|
@ -135,7 +132,8 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateContent(Path dir) throws IOException {
|
private void validateContent(Path dir) throws IOException {
|
||||||
File expectedFile = new File(new Path(dir, partFile).toString());
|
File fdir = new File(dir.toUri().getPath());
|
||||||
|
File expectedFile = new File(fdir, partFile);
|
||||||
StringBuffer expectedOutput = new StringBuffer();
|
StringBuffer expectedOutput = new StringBuffer();
|
||||||
expectedOutput.append(key1).append('\t').append(val1).append("\n");
|
expectedOutput.append(key1).append('\t').append(val1).append("\n");
|
||||||
expectedOutput.append(val1).append("\n");
|
expectedOutput.append(val1).append("\n");
|
||||||
|
@ -244,21 +242,17 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
|
|
||||||
// do abort
|
// do abort
|
||||||
committer.abortTask(tContext);
|
committer.abortTask(tContext);
|
||||||
FileSystem outputFileSystem = outDir.getFileSystem(conf);
|
File out = new File(outDir.toUri().getPath());
|
||||||
Path workPath = new Path(outDir,
|
Path workPath = committer.getWorkPath(tContext, outDir);
|
||||||
committer.getTaskAttemptBaseDirName(tContext))
|
File wp = new File(workPath.toUri().getPath());
|
||||||
.makeQualified(outputFileSystem);
|
File expectedFile = new File(wp, partFile);
|
||||||
File expectedFile = new File(new Path(workPath, partFile)
|
|
||||||
.toString());
|
|
||||||
assertFalse("task temp dir still exists", expectedFile.exists());
|
assertFalse("task temp dir still exists", expectedFile.exists());
|
||||||
|
|
||||||
committer.abortJob(jContext, JobStatus.State.FAILED);
|
committer.abortJob(jContext, JobStatus.State.FAILED);
|
||||||
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
|
expectedFile = new File(out, FileOutputCommitter.TEMP_DIR_NAME);
|
||||||
.toString());
|
|
||||||
assertFalse("job temp dir still exists", expectedFile.exists());
|
assertFalse("job temp dir still exists", expectedFile.exists());
|
||||||
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
assertEquals("Output directory not empty", 0, out.listFiles().length);
|
||||||
.listFiles().length);
|
FileUtil.fullyDelete(out);
|
||||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FakeFileSystem extends RawLocalFileSystem {
|
public static class FakeFileSystem extends RawLocalFileSystem {
|
||||||
|
|
|
@ -60,6 +60,22 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
private Text val2 = new Text("val2");
|
private Text val2 = new Text("val2");
|
||||||
|
|
||||||
|
|
||||||
|
private static void cleanup() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = outDir.getFileSystem(conf);
|
||||||
|
fs.delete(outDir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
private void writeOutput(RecordWriter theRecordWriter,
|
private void writeOutput(RecordWriter theRecordWriter,
|
||||||
TaskAttemptContext context) throws IOException, InterruptedException {
|
TaskAttemptContext context) throws IOException, InterruptedException {
|
||||||
NullWritable nullWritable = NullWritable.get();
|
NullWritable nullWritable = NullWritable.get();
|
||||||
|
@ -114,11 +130,10 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
|
|
||||||
// do commit
|
// do commit
|
||||||
committer.commitTask(tContext);
|
committer.commitTask(tContext);
|
||||||
Path jobTempDir1 = new Path(outDir,
|
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
|
||||||
FileOutputCommitter.getJobAttemptBaseDirName(
|
File jtd = new File(jobTempDir1.toUri().getPath());
|
||||||
conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
|
assertTrue(jtd.exists());
|
||||||
assertTrue((new File(jobTempDir1.toString()).exists()));
|
validateContent(jtd);
|
||||||
validateContent(jobTempDir1);
|
|
||||||
|
|
||||||
//now while running the second app attempt,
|
//now while running the second app attempt,
|
||||||
//recover the task output from first attempt
|
//recover the task output from first attempt
|
||||||
|
@ -128,15 +143,13 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
|
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
|
||||||
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
|
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
|
||||||
FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
|
FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
|
||||||
committer.setupJob(tContext2);
|
committer2.setupJob(tContext2);
|
||||||
Path jobTempDir2 = new Path(outDir,
|
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
|
||||||
FileOutputCommitter.getJobAttemptBaseDirName(
|
File jtd2 = new File(jobTempDir2.toUri().getPath());
|
||||||
conf2.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
|
|
||||||
assertTrue((new File(jobTempDir2.toString()).exists()));
|
|
||||||
|
|
||||||
tContext2.getConfiguration().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
|
|
||||||
committer2.recoverTask(tContext2);
|
committer2.recoverTask(tContext2);
|
||||||
validateContent(jobTempDir2);
|
assertTrue(jtd2.exists());
|
||||||
|
validateContent(jtd2);
|
||||||
|
|
||||||
committer2.commitJob(jContext2);
|
committer2.commitJob(jContext2);
|
||||||
validateContent(outDir);
|
validateContent(outDir);
|
||||||
|
@ -144,7 +157,12 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateContent(Path dir) throws IOException {
|
private void validateContent(Path dir) throws IOException {
|
||||||
File expectedFile = new File(new Path(dir, partFile).toString());
|
validateContent(new File(dir.toUri().getPath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateContent(File dir) throws IOException {
|
||||||
|
File expectedFile = new File(dir, partFile);
|
||||||
|
assertTrue("Could not find "+expectedFile, expectedFile.exists());
|
||||||
StringBuffer expectedOutput = new StringBuffer();
|
StringBuffer expectedOutput = new StringBuffer();
|
||||||
expectedOutput.append(key1).append('\t').append(val1).append("\n");
|
expectedOutput.append(key1).append('\t').append(val1).append("\n");
|
||||||
expectedOutput.append(val1).append("\n");
|
expectedOutput.append(val1).append("\n");
|
||||||
|
@ -259,7 +277,7 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
assertFalse("task temp dir still exists", expectedFile.exists());
|
assertFalse("task temp dir still exists", expectedFile.exists());
|
||||||
|
|
||||||
committer.abortJob(jContext, JobStatus.State.FAILED);
|
committer.abortJob(jContext, JobStatus.State.FAILED);
|
||||||
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
|
expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME)
|
||||||
.toString());
|
.toString());
|
||||||
assertFalse("job temp dir still exists", expectedFile.exists());
|
assertFalse("job temp dir still exists", expectedFile.exists());
|
||||||
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
||||||
|
@ -315,12 +333,10 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
assertNotNull(th);
|
assertNotNull(th);
|
||||||
assertTrue(th instanceof IOException);
|
assertTrue(th instanceof IOException);
|
||||||
assertTrue(th.getMessage().contains("fake delete failed"));
|
assertTrue(th.getMessage().contains("fake delete failed"));
|
||||||
File jobTmpDir = new File(new Path(outDir,
|
Path jtd = committer.getJobAttemptPath(jContext);
|
||||||
FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
File jobTmpDir = new File(jtd.toUri().getPath());
|
||||||
conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0) +
|
Path ttd = committer.getTaskAttemptPath(tContext);
|
||||||
Path.SEPARATOR +
|
File taskTmpDir = new File(ttd.toUri().getPath());
|
||||||
FileOutputCommitter.TEMP_DIR_NAME).toString());
|
|
||||||
File taskTmpDir = new File(jobTmpDir, "_" + taskID);
|
|
||||||
File expectedFile = new File(taskTmpDir, partFile);
|
File expectedFile = new File(taskTmpDir, partFile);
|
||||||
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
|
||||||
FileOutputCommitter committer = new FileOutputCommitter();
|
FileOutputCommitter committer = new FileOutputCommitter();
|
||||||
FileOutputFormat.setWorkOutputPath(job,
|
FileOutputFormat.setWorkOutputPath(job,
|
||||||
committer.getTempTaskOutputPath(tContext));
|
committer.getTaskAttemptPath(tContext));
|
||||||
|
|
||||||
committer.setupJob(jContext);
|
committer.setupJob(jContext);
|
||||||
committer.setupTask(tContext);
|
committer.setupTask(tContext);
|
||||||
|
@ -115,7 +115,7 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
|
||||||
FileOutputCommitter committer = new FileOutputCommitter();
|
FileOutputCommitter committer = new FileOutputCommitter();
|
||||||
FileOutputFormat.setWorkOutputPath(job, committer
|
FileOutputFormat.setWorkOutputPath(job, committer
|
||||||
.getTempTaskOutputPath(tContext));
|
.getTaskAttemptPath(tContext));
|
||||||
|
|
||||||
// do setup
|
// do setup
|
||||||
committer.setupJob(jContext);
|
committer.setupJob(jContext);
|
||||||
|
@ -134,13 +134,13 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
// do abort
|
// do abort
|
||||||
committer.abortTask(tContext);
|
committer.abortTask(tContext);
|
||||||
File expectedFile = new File(new Path(committer
|
File expectedFile = new File(new Path(committer
|
||||||
.getTempTaskOutputPath(tContext), file).toString());
|
.getTaskAttemptPath(tContext), file).toString());
|
||||||
assertFalse("task temp dir still exists", expectedFile.exists());
|
assertFalse("task temp dir still exists", expectedFile.exists());
|
||||||
|
|
||||||
committer.abortJob(jContext, JobStatus.State.FAILED);
|
committer.abortJob(jContext, JobStatus.State.FAILED);
|
||||||
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
|
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
|
||||||
.toString());
|
.toString());
|
||||||
assertFalse("job temp dir still exists", expectedFile.exists());
|
assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
|
||||||
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
||||||
.listFiles().length);
|
.listFiles().length);
|
||||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||||
|
@ -170,16 +170,15 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
|
||||||
FileOutputCommitter committer = new FileOutputCommitter();
|
FileOutputCommitter committer = new FileOutputCommitter();
|
||||||
FileOutputFormat.setWorkOutputPath(job, committer
|
FileOutputFormat.setWorkOutputPath(job, committer
|
||||||
.getTempTaskOutputPath(tContext));
|
.getTaskAttemptPath(tContext));
|
||||||
|
|
||||||
// do setup
|
// do setup
|
||||||
committer.setupJob(jContext);
|
committer.setupJob(jContext);
|
||||||
committer.setupTask(tContext);
|
committer.setupTask(tContext);
|
||||||
|
|
||||||
String file = "test.txt";
|
String file = "test.txt";
|
||||||
String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
|
File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath());
|
||||||
File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
|
File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath());
|
||||||
File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
|
|
||||||
File expectedFile = new File(taskTmpDir, file);
|
File expectedFile = new File(taskTmpDir, file);
|
||||||
|
|
||||||
// A reporter that does nothing
|
// A reporter that does nothing
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class TestTaskCommit extends HadoopTestCase {
|
||||||
|
|
||||||
static class CommitterWithCommitFail extends FileOutputCommitter {
|
static class CommitterWithCommitFail extends FileOutputCommitter {
|
||||||
public void commitTask(TaskAttemptContext context) throws IOException {
|
public void commitTask(TaskAttemptContext context) throws IOException {
|
||||||
Path taskOutputPath = getTempTaskOutputPath(context);
|
Path taskOutputPath = getTaskAttemptPath(context);
|
||||||
TaskAttemptID attemptId = context.getTaskAttemptID();
|
TaskAttemptID attemptId = context.getTaskAttemptID();
|
||||||
JobConf job = context.getJobConf();
|
JobConf job = context.getJobConf();
|
||||||
if (taskOutputPath != null) {
|
if (taskOutputPath != null) {
|
||||||
|
|
|
@ -70,6 +70,22 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void cleanup() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = outDir.getFileSystem(conf);
|
||||||
|
fs.delete(outDir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testCommitter() throws Exception {
|
public void testCommitter() throws Exception {
|
||||||
Job job = Job.getInstance();
|
Job job = Job.getInstance();
|
||||||
|
@ -133,7 +149,7 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
assertFalse("task temp dir still exists", expectedFile.exists());
|
assertFalse("task temp dir still exists", expectedFile.exists());
|
||||||
|
|
||||||
committer.abortJob(jContext, JobStatus.State.FAILED);
|
committer.abortJob(jContext, JobStatus.State.FAILED);
|
||||||
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
|
expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME)
|
||||||
.toString());
|
.toString());
|
||||||
assertFalse("job temp dir still exists", expectedFile.exists());
|
assertFalse("job temp dir still exists", expectedFile.exists());
|
||||||
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
||||||
|
@ -188,9 +204,9 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
assertNotNull(th);
|
assertNotNull(th);
|
||||||
assertTrue(th instanceof IOException);
|
assertTrue(th instanceof IOException);
|
||||||
assertTrue(th.getMessage().contains("fake delete failed"));
|
assertTrue(th.getMessage().contains("fake delete failed"));
|
||||||
String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
|
//Path taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
|
||||||
File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
|
File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath());
|
||||||
File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
|
File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath());
|
||||||
File expectedFile = new File(taskTmpDir, partFile);
|
File expectedFile = new File(taskTmpDir, partFile);
|
||||||
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue