From f29bfa0e0e88de49f1558138eeaba1396fa5a1d2 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 5 Oct 2011 12:15:44 +0000 Subject: [PATCH] MAPREDUCE-2702. Added a new API in OutputCommitter for recovering the outputs of tasks from a crashed job so as to support MR Application Master recovery. Contributed by Sharad Agarwal and Arun C Murthy. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1179188 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../apache/hadoop/mapreduce/MRJobConfig.java | 2 + .../hadoop/mapreduce/OutputCommitter.java | 31 ++ .../lib/output/FileOutputCommitter.java | 102 +++++- .../lib/output/TestFileOutputCommitter.java | 290 ++++++++++++++++++ 5 files changed, 422 insertions(+), 7 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 80b8b37d52d..679643b9a80 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -347,6 +347,10 @@ Release 0.23.0 - Unreleased MAPREDUCE-2907. Changed log level for various messages in ResourceManager from INFO to DEBUG. (Ravi Prakash via vinodkv) + MAPREDUCE-2702. Added a new API in OutputCommitter for recovering + the outputs of tasks from a crashed job so as to support MR Application + Master recovery. (Sharad Agarwal and Arun C Murthy via vinodkv) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index accfdddc3db..a3e5a6cf615 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -473,4 +473,6 @@ public interface MRJobConfig { public static final String MAPREDUCE_V2_CHILD_CLASS = "org.apache.hadoop.mapred.YarnChild"; + public static final String APPLICATION_ATTEMPT_ID = + "mapreduce.job.application.attempt.id"; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java index 22ff59aa113..819c32baa9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java @@ -143,4 +143,35 @@ public abstract class OutputCommitter { */ public abstract void abortTask(TaskAttemptContext taskContext) throws IOException; + + /** + * Is task output recovery supported for restarting jobs? + * + * If task output recovery is supported, job restart can be done more + * efficiently. + * + * @return true if task output recovery is supported, + * false otherwise + * @see #recoverTask(TaskAttemptContext) + */ + public boolean isRecoverySupported() { + return false; + } + + /** + * Recover the task output. + * + * The retry-count for the job will be passed via the + * {@link MRJobConfig#APPLICATION_ATTEMPT_ID} key in + * {@link TaskAttemptContext#getConfiguration()} for the + * OutputCommitter. + * + * If an exception is thrown the task will be attempted again. + * + * @param taskContext Context of the task whose output is being recovered + * @throws IOException + */ + public void recoverTask(TaskAttemptContext taskContext) + throws IOException + {} } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 3fe4354b17d..26390c7df2a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.StringUtils; /** An {@link OutputCommitter} that commits files specified * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. @@ -69,9 +68,8 @@ public class FileOutputCommitter extends OutputCommitter { this.outputPath = outputPath; outputFileSystem = outputPath.getFileSystem(context.getConfiguration()); workPath = new Path(outputPath, - (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + - "_" + context.getTaskAttemptID().toString() - )).makeQualified(outputFileSystem); + getTaskAttemptBaseDirName(context)) + .makeQualified(outputFileSystem); } } @@ -82,7 +80,8 @@ public class FileOutputCommitter extends OutputCommitter { */ public void setupJob(JobContext context) throws IOException { if (outputPath != null) { - Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); + Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + + Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); if (!fileSys.mkdirs(tmpDir)) { LOG.error("Mkdirs failed to create " + tmpDir.toString()); @@ -106,11 +105,27 @@ public class FileOutputCommitter extends OutputCommitter { } /** + * Move all job output to the final place. * Delete the temporary directory, including all of the work directories. * Create a _SUCCESS file to make it as successful. * @param context the job's context */ public void commitJob(JobContext context) throws IOException { + //delete the task temp directory from the current jobtempdir + Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + + Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); + FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); + if (fileSys.exists(tmpDir)) { + fileSys.delete(tmpDir, true); + } else { + LOG.warn("Task temp dir could not be deleted " + tmpDir); + } + + //move the job output to final place + Path jobOutputPath = + new Path(outputPath, getJobAttemptBaseDirName(context)); + moveJobOutputs(outputFileSystem, outputPath, jobOutputPath); + // delete the _temporary folder and create a _done file in the o/p folder cleanupJob(context); if (shouldMarkOutputDir(context.getConfiguration())) { @@ -118,6 +133,31 @@ public class FileOutputCommitter extends OutputCommitter { } } + private void moveJobOutputs(FileSystem fs, + Path finalOutputDir, Path jobOutput) throws IOException { + if (fs.isFile(jobOutput)) { + Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); + if (!fs.rename(jobOutput, finalOutputPath)) { + if (!fs.delete(finalOutputPath, true)) { + throw new IOException("Failed to delete earlier output of job"); + } + if (!fs.rename(jobOutput, finalOutputPath)) { + throw new IOException("Failed to save output of job"); + } + } + LOG.debug("Moved " + jobOutput + " to " + finalOutputPath); + } else if (fs.getFileStatus(jobOutput).isDirectory()) { + FileStatus[] paths = fs.listStatus(jobOutput); + Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); + fs.mkdirs(finalOutputPath); + if (paths != null) { + for (FileStatus path : paths) { + moveJobOutputs(fs, finalOutputDir, path.getPath()); + } + } + } + } + @Override @Deprecated public void cleanupJob(JobContext context) throws IOException { @@ -163,8 +203,10 @@ public class FileOutputCommitter extends OutputCommitter { if (workPath != null) { context.progress(); if (outputFileSystem.exists(workPath)) { - // Move the task outputs to their final place - moveTaskOutputs(context, outputFileSystem, outputPath, workPath); + // Move the task outputs to the current job attempt output dir + Path jobOutputPath = + new Path(outputPath, getJobAttemptBaseDirName(context)); + moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath); // Delete the temporary task-specific output directory if (!outputFileSystem.delete(workPath, true)) { LOG.warn("Failed to delete the temporary output" + @@ -271,4 +313,50 @@ public class FileOutputCommitter extends OutputCommitter { public Path getWorkPath() throws IOException { return workPath; } + + @Override + public boolean isRecoverySupported() { + return true; + } + + @Override + public void recoverTask(TaskAttemptContext context) + throws IOException { + context.progress(); + Path jobOutputPath = + new Path(outputPath, getJobAttemptBaseDirName(context)); + int previousAttempt = + context.getConfiguration().getInt( + MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1; + if (previousAttempt < 0) { + throw new IOException ("Cannot recover task output for first attempt..."); + } + + Path pathToRecover = + new Path(outputPath, getJobAttemptBaseDirName(previousAttempt)); + if (outputFileSystem.exists(pathToRecover)) { + // Move the task outputs to their final place + moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover); + LOG.info("Saved output of job to " + jobOutputPath); + } + } + + protected static String getJobAttemptBaseDirName(JobContext context) { + int appAttemptId = + context.getConfiguration().getInt( + MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + return getJobAttemptBaseDirName(appAttemptId); + } + + protected static String getJobAttemptBaseDirName(int appAttemptId) { + return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + + + appAttemptId; + } + + protected static String getTaskAttemptBaseDirName( + TaskAttemptContext context) { + return getJobAttemptBaseDirName(context) + Path.SEPARATOR + + FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + + "_" + context.getTaskAttemptID().toString(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java new file mode 100644 index 00000000000..6e8941bd7ec --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URI; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +@SuppressWarnings("unchecked") +public class TestFileOutputCommitter extends TestCase { + private static Path outDir = new Path(System.getProperty("test.build.data", + "/tmp"), "output"); + + // A random task attempt id for testing. + private static String attempt = "attempt_200707121733_0001_m_000000_0"; + private static String partFile = "part-m-00000"; + private static TaskAttemptID taskID = TaskAttemptID.forName(attempt); + private Text key1 = new Text("key1"); + private Text key2 = new Text("key2"); + private Text val1 = new Text("val1"); + private Text val2 = new Text("val2"); + + + private void writeOutput(RecordWriter theRecordWriter, + TaskAttemptContext context) throws IOException, InterruptedException { + NullWritable nullWritable = NullWritable.get(); + + try { + theRecordWriter.write(key1, val1); + theRecordWriter.write(null, nullWritable); + theRecordWriter.write(null, val1); + theRecordWriter.write(nullWritable, val2); + theRecordWriter.write(key2, nullWritable); + theRecordWriter.write(key1, null); + theRecordWriter.write(null, null); + theRecordWriter.write(key2, val2); + } finally { + theRecordWriter.close(context); + } + } + + + public void testRecovery() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + Path jobTempDir1 = new Path(outDir, + FileOutputCommitter.getJobAttemptBaseDirName( + conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0))); + assertTrue((new File(jobTempDir1.toString()).exists())); + validateContent(jobTempDir1); + + + + //now while running the second app attempt, + //recover the task output from first attempt + Configuration conf2 = job.getConfiguration(); + conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2); + JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID()); + TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID); + FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2); + committer.setupJob(tContext2); + Path jobTempDir2 = new Path(outDir, + FileOutputCommitter.getJobAttemptBaseDirName( + conf2.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0))); + assertTrue((new File(jobTempDir2.toString()).exists())); + + tContext2.getConfiguration().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2); + committer2.recoverTask(tContext2); + validateContent(jobTempDir2); + + committer2.commitJob(jContext2); + validateContent(outDir); + FileUtil.fullyDelete(new File(outDir.toString())); + } + + private void validateContent(Path dir) throws IOException { + File expectedFile = new File(new Path(dir, partFile).toString()); + StringBuffer expectedOutput = new StringBuffer(); + expectedOutput.append(key1).append('\t').append(val1).append("\n"); + expectedOutput.append(val1).append("\n"); + expectedOutput.append(val2).append("\n"); + expectedOutput.append(key2).append("\n"); + expectedOutput.append(key1).append("\n"); + expectedOutput.append(key2).append('\t').append(val2).append("\n"); + String output = slurp(expectedFile); + assertEquals(output, expectedOutput.toString()); + } + + + public void testCommitter() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + committer.commitJob(jContext); + + // validate output + validateContent(outDir); + FileUtil.fullyDelete(new File(outDir.toString())); + } + + + public void testAbort() throws IOException, InterruptedException { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // do setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // do abort + committer.abortTask(tContext); + File expectedFile = new File(new Path(committer.getWorkPath(), partFile) + .toString()); + assertFalse("task temp dir still exists", expectedFile.exists()); + + committer.abortJob(jContext, JobStatus.State.FAILED); + expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME) + .toString()); + assertFalse("job temp dir still exists", expectedFile.exists()); + assertEquals("Output directory not empty", 0, new File(outDir.toString()) + .listFiles().length); + FileUtil.fullyDelete(new File(outDir.toString())); + } + + public static class FakeFileSystem extends RawLocalFileSystem { + public FakeFileSystem() { + super(); + } + + public URI getUri() { + return URI.create("faildel:///"); + } + + @Override + public boolean delete(Path p, boolean recursive) throws IOException { + throw new IOException("fake delete failed"); + } + } + + + public void testFailAbort() throws IOException, InterruptedException { + Job job = Job.getInstance(); + Configuration conf = job.getConfiguration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///"); + conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + FileOutputFormat.setOutputPath(job, outDir); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // do setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat + .getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // do abort + Throwable th = null; + try { + committer.abortTask(tContext); + } catch (IOException ie) { + th = ie; + } + assertNotNull(th); + assertTrue(th instanceof IOException); + assertTrue(th.getMessage().contains("fake delete failed")); + File jobTmpDir = new File(new Path(outDir, + FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + + conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0) + + Path.SEPARATOR + + FileOutputCommitter.TEMP_DIR_NAME).toString()); + File taskTmpDir = new File(jobTmpDir, "_" + taskID); + File expectedFile = new File(taskTmpDir, partFile); + assertTrue(expectedFile + " does not exists", expectedFile.exists()); + + th = null; + try { + committer.abortJob(jContext, JobStatus.State.FAILED); + } catch (IOException ie) { + th = ie; + } + assertNotNull(th); + assertTrue(th instanceof IOException); + assertTrue(th.getMessage().contains("fake delete failed")); + assertTrue("job temp dir does not exists", jobTmpDir.exists()); + FileUtil.fullyDelete(new File(outDir.toString())); + } + + public static String slurp(File f) throws IOException { + int len = (int) f.length(); + byte[] buf = new byte[len]; + FileInputStream in = new FileInputStream(f); + String contents = null; + try { + in.read(buf, 0, len); + contents = new String(buf, "UTF-8"); + } finally { + in.close(); + } + return contents; + } + +}