Merge -c 1182008 from trunk to branch-0.23 to fix MAPREDUCE-3126.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1182009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
591b080e98
commit
76fb42f94d
|
@ -328,6 +328,9 @@ Release 0.23.0 - Unreleased
|
||||||
MAPREDUCE-3161. Improved some javadocs and fixed some typos in
|
MAPREDUCE-3161. Improved some javadocs and fixed some typos in
|
||||||
YARN. (Todd Lipcon via vinodkv)
|
YARN. (Todd Lipcon via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3148. Ported MAPREDUCE-2702 to old mapred api for aiding task
|
||||||
|
recovery. (acmurthy)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
||||||
|
|
|
@ -160,7 +160,10 @@
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
|
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
|
||||||
|
<Or>
|
||||||
<Method name="commitJob" />
|
<Method name="commitJob" />
|
||||||
|
<Method name="recoverTask" />
|
||||||
|
</Or>
|
||||||
<Bug pattern="NM_WRONG_PACKAGE" />
|
<Bug pattern="NM_WRONG_PACKAGE" />
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
|
@ -169,6 +172,7 @@
|
||||||
<Method name="abortJob" />
|
<Method name="abortJob" />
|
||||||
<Method name="commitJob" />
|
<Method name="commitJob" />
|
||||||
<Method name="cleanupJob" />
|
<Method name="cleanupJob" />
|
||||||
|
<Method name="recoverTask" />
|
||||||
</Or>
|
</Or>
|
||||||
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
|
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
|
@ -38,6 +38,7 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(
|
public static final Log LOG = LogFactory.getLog(
|
||||||
"org.apache.hadoop.mapred.FileOutputCommitter");
|
"org.apache.hadoop.mapred.FileOutputCommitter");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Temporary directory name
|
* Temporary directory name
|
||||||
*/
|
*/
|
||||||
|
@ -50,7 +51,9 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
JobConf conf = context.getJobConf();
|
JobConf conf = context.getJobConf();
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
||||||
if (outputPath != null) {
|
if (outputPath != null) {
|
||||||
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
|
Path tmpDir =
|
||||||
|
new Path(outputPath, getJobAttemptBaseDirName(context) +
|
||||||
|
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
|
||||||
FileSystem fileSys = tmpDir.getFileSystem(conf);
|
FileSystem fileSys = tmpDir.getFileSystem(conf);
|
||||||
if (!fileSys.mkdirs(tmpDir)) {
|
if (!fileSys.mkdirs(tmpDir)) {
|
||||||
LOG.error("Mkdirs failed to create " + tmpDir.toString());
|
LOG.error("Mkdirs failed to create " + tmpDir.toString());
|
||||||
|
@ -65,6 +68,24 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void commitJob(JobContext context) throws IOException {
|
public void commitJob(JobContext context) throws IOException {
|
||||||
|
//delete the task temp directory from the current jobtempdir
|
||||||
|
JobConf conf = context.getJobConf();
|
||||||
|
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
||||||
|
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
|
||||||
|
Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
|
||||||
|
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
|
||||||
|
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
|
||||||
|
if (fileSys.exists(tmpDir)) {
|
||||||
|
fileSys.delete(tmpDir, true);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Task temp dir could not be deleted " + tmpDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
//move the job output to final place
|
||||||
|
Path jobOutputPath =
|
||||||
|
new Path(outputPath, getJobAttemptBaseDirName(context));
|
||||||
|
moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
|
||||||
|
|
||||||
// delete the _temporary folder in the output folder
|
// delete the _temporary folder in the output folder
|
||||||
cleanupJob(context);
|
cleanupJob(context);
|
||||||
// check if the output-dir marking is required
|
// check if the output-dir marking is required
|
||||||
|
@ -88,6 +109,30 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void moveJobOutputs(FileSystem fs,
|
||||||
|
Path finalOutputDir, Path jobOutput) throws IOException {
|
||||||
|
if (fs.isFile(jobOutput)) {
|
||||||
|
Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
|
||||||
|
if (!fs.rename(jobOutput, finalOutputPath)) {
|
||||||
|
if (!fs.delete(finalOutputPath, true)) {
|
||||||
|
throw new IOException("Failed to delete earlier output of job");
|
||||||
|
}
|
||||||
|
if (!fs.rename(jobOutput, finalOutputPath)) {
|
||||||
|
throw new IOException("Failed to save output of job");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
|
||||||
|
} else if (fs.getFileStatus(jobOutput).isDirectory()) {
|
||||||
|
FileStatus[] paths = fs.listStatus(jobOutput);
|
||||||
|
Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
|
||||||
|
fs.mkdirs(finalOutputPath);
|
||||||
|
if (paths != null) {
|
||||||
|
for (FileStatus path : paths) {
|
||||||
|
moveJobOutputs(fs, finalOutputDir, path.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void cleanupJob(JobContext context) throws IOException {
|
public void cleanupJob(JobContext context) throws IOException {
|
||||||
|
@ -128,9 +173,14 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
FileSystem fs = taskOutputPath.getFileSystem(job);
|
FileSystem fs = taskOutputPath.getFileSystem(job);
|
||||||
context.getProgressible().progress();
|
context.getProgressible().progress();
|
||||||
if (fs.exists(taskOutputPath)) {
|
if (fs.exists(taskOutputPath)) {
|
||||||
Path jobOutputPath = taskOutputPath.getParent().getParent();
|
// Move the task outputs to the current job attempt output dir
|
||||||
// Move the task outputs to their final place
|
JobConf conf = context.getJobConf();
|
||||||
moveTaskOutputs(context, fs, jobOutputPath, taskOutputPath);
|
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
||||||
|
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
|
||||||
|
Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
|
||||||
|
moveTaskOutputs(context, outputFileSystem, jobOutputPath,
|
||||||
|
taskOutputPath);
|
||||||
|
|
||||||
// Delete the temporary task-specific output directory
|
// Delete the temporary task-specific output directory
|
||||||
if (!fs.delete(taskOutputPath, true)) {
|
if (!fs.delete(taskOutputPath, true)) {
|
||||||
LOG.info("Failed to delete the temporary output" +
|
LOG.info("Failed to delete the temporary output" +
|
||||||
|
@ -189,7 +239,8 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
Path taskOutputPath) throws IOException {
|
Path taskOutputPath) throws IOException {
|
||||||
URI taskOutputUri = taskOutput.toUri();
|
URI taskOutputUri = taskOutput.toUri();
|
||||||
URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
|
URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
|
||||||
if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
|
if (taskOutputUri == relativePath) {
|
||||||
|
//taskOutputPath is not a parent of taskOutput
|
||||||
throw new IOException("Can not get the relative path: base = " +
|
throw new IOException("Can not get the relative path: base = " +
|
||||||
taskOutputPath + " child = " + taskOutput);
|
taskOutputPath + " child = " + taskOutput);
|
||||||
}
|
}
|
||||||
|
@ -216,7 +267,8 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
Path getTempTaskOutputPath(TaskAttemptContext taskContext) throws IOException {
|
Path getTempTaskOutputPath(TaskAttemptContext taskContext)
|
||||||
|
throws IOException {
|
||||||
JobConf conf = taskContext.getJobConf();
|
JobConf conf = taskContext.getJobConf();
|
||||||
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
Path outputPath = FileOutputFormat.getOutputPath(conf);
|
||||||
if (outputPath != null) {
|
if (outputPath != null) {
|
||||||
|
@ -247,4 +299,60 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
}
|
}
|
||||||
return taskTmpDir;
|
return taskTmpDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRecoverySupported() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverTask(TaskAttemptContext context)
|
||||||
|
throws IOException {
|
||||||
|
Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf());
|
||||||
|
context.progress();
|
||||||
|
Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
|
||||||
|
int previousAttempt =
|
||||||
|
context.getConfiguration().getInt(
|
||||||
|
MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1;
|
||||||
|
if (previousAttempt < 0) {
|
||||||
|
LOG.warn("Cannot recover task output for first attempt...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
FileSystem outputFileSystem =
|
||||||
|
outputPath.getFileSystem(context.getJobConf());
|
||||||
|
Path pathToRecover =
|
||||||
|
new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
|
||||||
|
if (outputFileSystem.exists(pathToRecover)) {
|
||||||
|
// Move the task outputs to their final place
|
||||||
|
moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
|
||||||
|
LOG.info("Saved output of job to " + jobOutputPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String getJobAttemptBaseDirName(JobContext context) {
|
||||||
|
int appAttemptId =
|
||||||
|
context.getJobConf().getInt(
|
||||||
|
MRConstants.APPLICATION_ATTEMPT_ID, 0);
|
||||||
|
return getJobAttemptBaseDirName(appAttemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String getJobTempDirName(TaskAttemptContext context) {
|
||||||
|
int appAttemptId =
|
||||||
|
context.getJobConf().getInt(
|
||||||
|
MRConstants.APPLICATION_ATTEMPT_ID, 0);
|
||||||
|
return getJobAttemptBaseDirName(appAttemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String getJobAttemptBaseDirName(int appAttemptId) {
|
||||||
|
return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
||||||
|
+ appAttemptId;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String getTaskAttemptBaseDirName(
|
||||||
|
TaskAttemptContext context) {
|
||||||
|
return getJobTempDirName(context) + Path.SEPARATOR +
|
||||||
|
FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
||||||
|
"_" + context.getTaskAttemptID().toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,4 +60,9 @@ public interface MRConstants {
|
||||||
|
|
||||||
/** Used in MRv1, mostly in TaskTracker code **/
|
/** Used in MRv1, mostly in TaskTracker code **/
|
||||||
public static final String WORKDIR = "work";
|
public static final String WORKDIR = "work";
|
||||||
|
|
||||||
|
/** Used on by MRv2 */
|
||||||
|
public static final String APPLICATION_ATTEMPT_ID =
|
||||||
|
"mapreduce.job.application.attempt.id";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,6 +146,33 @@ public abstract class OutputCommitter
|
||||||
public abstract void abortTask(TaskAttemptContext taskContext)
|
public abstract void abortTask(TaskAttemptContext taskContext)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method implements the new interface by calling the old method. Note
|
||||||
|
* that the input types are different between the new and old apis and this
|
||||||
|
* is a bridge between the two.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isRecoverySupported() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recover the task output.
|
||||||
|
*
|
||||||
|
* The retry-count for the job will be passed via the
|
||||||
|
* {@link MRConstants#APPLICATION_ATTEMPT_ID} key in
|
||||||
|
* {@link TaskAttemptContext#getConfiguration()} for the
|
||||||
|
* <code>OutputCommitter</code>.
|
||||||
|
*
|
||||||
|
* If an exception is thrown the task will be attempted again.
|
||||||
|
*
|
||||||
|
* @param taskContext Context of the task whose output is being recovered
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void recoverTask(TaskAttemptContext taskContext)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method implements the new interface by calling the old method. Note
|
* This method implements the new interface by calling the old method. Note
|
||||||
* that the input types are different between the new and old apis and this
|
* that the input types are different between the new and old apis and this
|
||||||
|
@ -246,4 +273,17 @@ public abstract class OutputCommitter
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
abortTask((TaskAttemptContext) taskContext);
|
abortTask((TaskAttemptContext) taskContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method implements the new interface by calling the old method. Note
|
||||||
|
* that the input types are different between the new and old apis and this
|
||||||
|
* is a bridge between the two.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public final
|
||||||
|
void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
|
||||||
|
) throws IOException {
|
||||||
|
recoverTask((TaskAttemptContext) taskContext);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,284 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public class TestFileOutputCommitter extends TestCase {
|
||||||
|
private static Path outDir = new Path(System.getProperty("test.build.data",
|
||||||
|
"/tmp"), "output");
|
||||||
|
|
||||||
|
// A random task attempt id for testing.
|
||||||
|
private static String attempt = "attempt_200707121733_0001_m_000000_0";
|
||||||
|
private static String partFile = "part-00000";
|
||||||
|
private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
|
||||||
|
private Text key1 = new Text("key1");
|
||||||
|
private Text key2 = new Text("key2");
|
||||||
|
private Text val1 = new Text("val1");
|
||||||
|
private Text val2 = new Text("val2");
|
||||||
|
|
||||||
|
|
||||||
|
private void writeOutput(RecordWriter theRecordWriter,
|
||||||
|
TaskAttemptContext context) throws IOException, InterruptedException {
|
||||||
|
NullWritable nullWritable = NullWritable.get();
|
||||||
|
|
||||||
|
try {
|
||||||
|
theRecordWriter.write(key1, val1);
|
||||||
|
theRecordWriter.write(null, nullWritable);
|
||||||
|
theRecordWriter.write(null, val1);
|
||||||
|
theRecordWriter.write(nullWritable, val2);
|
||||||
|
theRecordWriter.write(key2, nullWritable);
|
||||||
|
theRecordWriter.write(key1, null);
|
||||||
|
theRecordWriter.write(null, null);
|
||||||
|
theRecordWriter.write(key2, val2);
|
||||||
|
} finally {
|
||||||
|
theRecordWriter.close(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testRecovery() throws Exception {
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
FileOutputFormat.setOutputPath(conf, outDir);
|
||||||
|
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||||
|
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
|
||||||
|
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||||
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||||
|
FileOutputCommitter committer = new FileOutputCommitter();
|
||||||
|
|
||||||
|
// setup
|
||||||
|
committer.setupJob(jContext);
|
||||||
|
committer.setupTask(tContext);
|
||||||
|
|
||||||
|
// write output
|
||||||
|
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||||
|
RecordWriter theRecordWriter =
|
||||||
|
theOutputFormat.getRecordWriter(null, conf, partFile, null);
|
||||||
|
writeOutput(theRecordWriter, tContext);
|
||||||
|
|
||||||
|
// do commit
|
||||||
|
committer.commitTask(tContext);
|
||||||
|
Path jobTempDir1 = new Path(outDir,
|
||||||
|
FileOutputCommitter.getJobAttemptBaseDirName(
|
||||||
|
conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
|
||||||
|
assertTrue((new File(jobTempDir1.toString()).exists()));
|
||||||
|
validateContent(jobTempDir1);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
//now while running the second app attempt,
|
||||||
|
//recover the task output from first attempt
|
||||||
|
JobConf conf2 = new JobConf(conf);
|
||||||
|
conf2.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||||
|
conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
|
||||||
|
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
|
||||||
|
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
|
||||||
|
FileOutputCommitter committer2 = new FileOutputCommitter();
|
||||||
|
committer.setupJob(jContext2);
|
||||||
|
Path jobTempDir2 = new Path(outDir,
|
||||||
|
FileOutputCommitter.getJobAttemptBaseDirName(
|
||||||
|
conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
|
||||||
|
assertTrue((new File(jobTempDir2.toString()).exists()));
|
||||||
|
|
||||||
|
tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
|
||||||
|
committer2.recoverTask(tContext2);
|
||||||
|
validateContent(jobTempDir2);
|
||||||
|
|
||||||
|
committer2.commitJob(jContext2);
|
||||||
|
validateContent(outDir);
|
||||||
|
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateContent(Path dir) throws IOException {
|
||||||
|
File expectedFile = new File(new Path(dir, partFile).toString());
|
||||||
|
StringBuffer expectedOutput = new StringBuffer();
|
||||||
|
expectedOutput.append(key1).append('\t').append(val1).append("\n");
|
||||||
|
expectedOutput.append(val1).append("\n");
|
||||||
|
expectedOutput.append(val2).append("\n");
|
||||||
|
expectedOutput.append(key2).append("\n");
|
||||||
|
expectedOutput.append(key1).append("\n");
|
||||||
|
expectedOutput.append(key2).append('\t').append(val2).append("\n");
|
||||||
|
String output = slurp(expectedFile);
|
||||||
|
assertEquals(output, expectedOutput.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testCommitter() throws Exception {
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
FileOutputFormat.setOutputPath(conf, outDir);
|
||||||
|
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||||
|
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||||
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||||
|
FileOutputCommitter committer = new FileOutputCommitter();
|
||||||
|
|
||||||
|
// setup
|
||||||
|
committer.setupJob(jContext);
|
||||||
|
committer.setupTask(tContext);
|
||||||
|
|
||||||
|
// write output
|
||||||
|
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||||
|
RecordWriter theRecordWriter =
|
||||||
|
theOutputFormat.getRecordWriter(null, conf, partFile, null);
|
||||||
|
writeOutput(theRecordWriter, tContext);
|
||||||
|
|
||||||
|
// do commit
|
||||||
|
committer.commitTask(tContext);
|
||||||
|
committer.commitJob(jContext);
|
||||||
|
|
||||||
|
// validate output
|
||||||
|
validateContent(outDir);
|
||||||
|
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testAbort() throws IOException, InterruptedException {
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
FileOutputFormat.setOutputPath(conf, outDir);
|
||||||
|
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||||
|
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||||
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||||
|
FileOutputCommitter committer = new FileOutputCommitter();
|
||||||
|
|
||||||
|
// do setup
|
||||||
|
committer.setupJob(jContext);
|
||||||
|
committer.setupTask(tContext);
|
||||||
|
|
||||||
|
// write output
|
||||||
|
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||||
|
RecordWriter theRecordWriter =
|
||||||
|
theOutputFormat.getRecordWriter(null, conf, partFile, null);
|
||||||
|
writeOutput(theRecordWriter, tContext);
|
||||||
|
|
||||||
|
// do abort
|
||||||
|
committer.abortTask(tContext);
|
||||||
|
FileSystem outputFileSystem = outDir.getFileSystem(conf);
|
||||||
|
Path workPath = new Path(outDir,
|
||||||
|
committer.getTaskAttemptBaseDirName(tContext))
|
||||||
|
.makeQualified(outputFileSystem);
|
||||||
|
File expectedFile = new File(new Path(workPath, partFile)
|
||||||
|
.toString());
|
||||||
|
assertFalse("task temp dir still exists", expectedFile.exists());
|
||||||
|
|
||||||
|
committer.abortJob(jContext, JobStatus.State.FAILED);
|
||||||
|
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
|
||||||
|
.toString());
|
||||||
|
assertFalse("job temp dir still exists", expectedFile.exists());
|
||||||
|
assertEquals("Output directory not empty", 0, new File(outDir.toString())
|
||||||
|
.listFiles().length);
|
||||||
|
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FakeFileSystem extends RawLocalFileSystem {
|
||||||
|
public FakeFileSystem() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public URI getUri() {
|
||||||
|
return URI.create("faildel:///");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean delete(Path p, boolean recursive) throws IOException {
|
||||||
|
throw new IOException("fake delete failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testFailAbort() throws IOException, InterruptedException {
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
|
||||||
|
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
|
||||||
|
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||||
|
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
|
||||||
|
FileOutputFormat.setOutputPath(conf, outDir);
|
||||||
|
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||||
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||||
|
FileOutputCommitter committer = new FileOutputCommitter();
|
||||||
|
|
||||||
|
// do setup
|
||||||
|
committer.setupJob(jContext);
|
||||||
|
committer.setupTask(tContext);
|
||||||
|
|
||||||
|
// write output
|
||||||
|
File jobTmpDir = new File(new Path(outDir,
|
||||||
|
FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
||||||
|
conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0) +
|
||||||
|
Path.SEPARATOR +
|
||||||
|
FileOutputCommitter.TEMP_DIR_NAME).toString());
|
||||||
|
File taskTmpDir = new File(jobTmpDir, "_" + taskID);
|
||||||
|
File expectedFile = new File(taskTmpDir, partFile);
|
||||||
|
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
|
||||||
|
RecordWriter<?, ?> theRecordWriter =
|
||||||
|
theOutputFormat.getRecordWriter(null, conf,
|
||||||
|
expectedFile.getAbsolutePath(), null);
|
||||||
|
writeOutput(theRecordWriter, tContext);
|
||||||
|
|
||||||
|
// do abort
|
||||||
|
Throwable th = null;
|
||||||
|
try {
|
||||||
|
committer.abortTask(tContext);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
th = ie;
|
||||||
|
}
|
||||||
|
assertNotNull(th);
|
||||||
|
assertTrue(th instanceof IOException);
|
||||||
|
assertTrue(th.getMessage().contains("fake delete failed"));
|
||||||
|
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
||||||
|
|
||||||
|
th = null;
|
||||||
|
try {
|
||||||
|
committer.abortJob(jContext, JobStatus.State.FAILED);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
th = ie;
|
||||||
|
}
|
||||||
|
assertNotNull(th);
|
||||||
|
assertTrue(th instanceof IOException);
|
||||||
|
assertTrue(th.getMessage().contains("fake delete failed"));
|
||||||
|
assertTrue("job temp dir does not exists", jobTmpDir.exists());
|
||||||
|
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String slurp(File f) throws IOException {
|
||||||
|
int len = (int) f.length();
|
||||||
|
byte[] buf = new byte[len];
|
||||||
|
FileInputStream in = new FileInputStream(f);
|
||||||
|
String contents = null;
|
||||||
|
try {
|
||||||
|
in.read(buf, 0, len);
|
||||||
|
contents = new String(buf, "UTF-8");
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
return contents;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -175,7 +175,12 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
// do setup
|
// do setup
|
||||||
committer.setupJob(jContext);
|
committer.setupJob(jContext);
|
||||||
committer.setupTask(tContext);
|
committer.setupTask(tContext);
|
||||||
|
|
||||||
String file = "test.txt";
|
String file = "test.txt";
|
||||||
|
String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
|
||||||
|
File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
|
||||||
|
File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
|
||||||
|
File expectedFile = new File(taskTmpDir, file);
|
||||||
|
|
||||||
// A reporter that does nothing
|
// A reporter that does nothing
|
||||||
Reporter reporter = Reporter.NULL;
|
Reporter reporter = Reporter.NULL;
|
||||||
|
@ -183,7 +188,7 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
FileSystem localFs = new FakeFileSystem();
|
FileSystem localFs = new FakeFileSystem();
|
||||||
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||||
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
|
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
|
||||||
job, file, reporter);
|
job, expectedFile.getAbsolutePath(), reporter);
|
||||||
writeOutput(theRecordWriter, reporter);
|
writeOutput(theRecordWriter, reporter);
|
||||||
|
|
||||||
// do abort
|
// do abort
|
||||||
|
@ -196,10 +201,6 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
assertNotNull(th);
|
assertNotNull(th);
|
||||||
assertTrue(th instanceof IOException);
|
assertTrue(th instanceof IOException);
|
||||||
assertTrue(th.getMessage().contains("fake delete failed"));
|
assertTrue(th.getMessage().contains("fake delete failed"));
|
||||||
File jobTmpDir = new File(new Path(outDir,
|
|
||||||
FileOutputCommitter.TEMP_DIR_NAME).toString());
|
|
||||||
File taskTmpDir = new File(jobTmpDir, "_" + taskID);
|
|
||||||
File expectedFile = new File(taskTmpDir, file);
|
|
||||||
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
||||||
|
|
||||||
th = null;
|
th = null;
|
||||||
|
|
|
@ -188,9 +188,9 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
assertNotNull(th);
|
assertNotNull(th);
|
||||||
assertTrue(th instanceof IOException);
|
assertTrue(th instanceof IOException);
|
||||||
assertTrue(th.getMessage().contains("fake delete failed"));
|
assertTrue(th.getMessage().contains("fake delete failed"));
|
||||||
String filename = committer.getTaskAttemptBaseDirName(tContext);
|
String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
|
||||||
File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
|
File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
|
||||||
File taskTmpDir = new File(outDir.toString(), filename);
|
File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
|
||||||
File expectedFile = new File(taskTmpDir, partFile);
|
File expectedFile = new File(taskTmpDir, partFile);
|
||||||
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
assertTrue(expectedFile + " does not exists", expectedFile.exists());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue