diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7972c082a9f..10c938993cf 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -463,6 +463,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6277. Job can post multiple history files if attempt loses connection to the RM (Chang Li via jlowe) + MAPREDUCE-6275. Race condition in FileOutputCommitter v2 for + user-specified task output subdirs (Gera Shegalov and Siqi Li via jlowe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 28a85485c1a..6e5d0a1e627 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -348,44 +348,61 @@ public class FileOutputCommitter extends OutputCommitter { * @param to the path data is going to. * @throws IOException on any error */ - private static void mergePaths(FileSystem fs, final FileStatus from, - final Path to) - throws IOException { - LOG.debug("Merging data from "+from+" to "+to); - if(from.isFile()) { - if(fs.exists(to)) { - if(!fs.delete(to, true)) { - throw new IOException("Failed to delete "+to); - } - } + private void mergePaths(FileSystem fs, final FileStatus from, + final Path to) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Merging data from " + from + " to " + to); + } + FileStatus toStat; + try { + toStat = fs.getFileStatus(to); + } catch (FileNotFoundException fnfe) { + toStat = null; + } - if(!fs.rename(from.getPath(), to)) { - throw new IOException("Failed to rename "+from+" to "+to); - } - } else if(from.isDirectory()) { - if(fs.exists(to)) { - FileStatus toStat = fs.getFileStatus(to); - if(!toStat.isDirectory()) { - if(!fs.delete(to, true)) { - throw new IOException("Failed to delete "+to); - } - if(!fs.rename(from.getPath(), to)) { - throw new IOException("Failed to rename "+from+" to "+to); - } - } else { - //It is a directory so merge everything in the directories - for(FileStatus subFrom: fs.listStatus(from.getPath())) { - Path subTo = new Path(to, subFrom.getPath().getName()); - mergePaths(fs, subFrom, subTo); - } - } - } else { - //it does not exist just rename - if(!fs.rename(from.getPath(), to)) { - throw new IOException("Failed to rename "+from+" to "+to); - } - } - } + if (from.isFile()) { + if (toStat != null) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); + } + } + + if (!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename " + from + " to " + to); + } + } else if (from.isDirectory()) { + if (toStat != null) { + if (!toStat.isDirectory()) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); + } + renameOrMerge(fs, from, to); + } else { + //It is a directory so merge everything in the directories + for (FileStatus subFrom : fs.listStatus(from.getPath())) { + Path subTo = new Path(to, subFrom.getPath().getName()); + mergePaths(fs, subFrom, subTo); + } + } + } else { + renameOrMerge(fs, from, to); + } + } + } + + private void renameOrMerge(FileSystem fs, FileStatus from, Path to) + throws IOException { + if (algorithmVersion == 1) { + if (!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename " + from + " to " + to); + } + } else { + fs.mkdirs(to); + for (FileStatus subFrom : fs.listStatus(from.getPath())) { + Path subTo = new Path(to, subFrom.getPath().getName()); + mergePaths(fs, subFrom, subTo); + } + } } @Override @@ -546,8 +563,9 @@ public class FileOutputCommitter extends OutputCommitter { Path previousCommittedTaskPath = getCommittedTaskPath( previousAttempt, context); FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration()); - - LOG.debug("Trying to recover task from " + previousCommittedTaskPath); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to recover task from " + previousCommittedTaskPath); + } if (algorithmVersion == 1) { if (fs.exists(previousCommittedTaskPath)) { Path committedTaskPath = getCommittedTaskPath(context); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index 8f6030065fd..0d4ab98693d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -22,9 +22,15 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -39,6 +45,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -47,13 +54,25 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; @SuppressWarnings("unchecked") public class TestFileOutputCommitter extends TestCase { - private static Path outDir = new Path(System.getProperty("test.build.data", - "/tmp"), "output"); + private static final Path outDir = new Path( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestFileOutputCommitter.class.getName()); + + private final static String SUB_DIR = "SUB_DIR"; + private final static Path OUT_SUB_DIR = new Path(outDir, SUB_DIR); + + private static final Log LOG = + LogFactory.getLog(TestFileOutputCommitter.class); // A random task attempt id for testing. - private static String attempt = "attempt_200707121733_0001_m_000000_0"; - private static String partFile = "part-m-00000"; - private static TaskAttemptID taskID = TaskAttemptID.forName(attempt); + private static final String attempt = "attempt_200707121733_0001_m_000000_0"; + private static final String partFile = "part-m-00000"; + private static final TaskAttemptID taskID = TaskAttemptID.forName(attempt); + + private static final String attempt1 = "attempt_200707121733_0001_m_000001_0"; + private static final TaskAttemptID taskID1 = TaskAttemptID.forName(attempt1); + private Text key1 = new Text("key1"); private Text key2 = new Text("key2"); private Text val1 = new Text("val1"); @@ -229,7 +248,7 @@ public class TestFileOutputCommitter extends TestCase { } private void testCommitterInternal(int version) throws Exception { - Job job = Job.getInstance(); + Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); @@ -441,6 +460,107 @@ public class TestFileOutputCommitter extends TestCase { testFailAbortInternal(2); } + static class RLFS extends RawLocalFileSystem { + private final ThreadLocal needNull = new ThreadLocal() { + @Override + protected Boolean initialValue() { + return true; + } + }; + + public RLFS() { + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + if (needNull.get() && + OUT_SUB_DIR.toUri().getPath().equals(f.toUri().getPath())) { + needNull.set(false); // lie once per thread + return null; + } + return super.getFileStatus(f); + } + } + + private void testConcurrentCommitTaskWithSubDir(int version) + throws Exception { + final Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + final Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + version); + + conf.setClass("fs.file.impl", RLFS.class, FileSystem.class); + FileSystem.closeAll(); + + final JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + final FileOutputCommitter amCommitter = + new FileOutputCommitter(outDir, jContext); + amCommitter.setupJob(jContext); + + final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2]; + taCtx[0] = new TaskAttemptContextImpl(conf, taskID); + taCtx[1] = new TaskAttemptContextImpl(conf, taskID1); + + final TextOutputFormat[] tof = new TextOutputFormat[2]; + for (int i = 0; i < tof.length; i++) { + tof[i] = new TextOutputFormat() { + @Override + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException { + final FileOutputCommitter foc = (FileOutputCommitter) + getOutputCommitter(context); + return new Path(new Path(foc.getWorkPath(), SUB_DIR), + getUniqueFile(context, getOutputName(context), extension)); + } + }; + } + + final ExecutorService executor = Executors.newFixedThreadPool(2); + try { + for (int i = 0; i < taCtx.length; i++) { + final int taskIdx = i; + executor.submit(new Callable() { + @Override + public Void call() throws IOException, InterruptedException { + final OutputCommitter outputCommitter = + tof[taskIdx].getOutputCommitter(taCtx[taskIdx]); + outputCommitter.setupTask(taCtx[taskIdx]); + final RecordWriter rw = + tof[taskIdx].getRecordWriter(taCtx[taskIdx]); + writeOutput(rw, taCtx[taskIdx]); + outputCommitter.commitTask(taCtx[taskIdx]); + return null; + } + }); + } + } finally { + executor.shutdown(); + while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.info("Awaiting thread termination!"); + } + } + + amCommitter.commitJob(jContext); + final RawLocalFileSystem lfs = new RawLocalFileSystem(); + lfs.setConf(conf); + assertFalse("Must not end up with sub_dir/sub_dir", + lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR))); + + // validate output + validateContent(OUT_SUB_DIR); + FileUtil.fullyDelete(new File(outDir.toString())); + } + + public void testConcurrentCommitTaskWithSubDirV1() throws Exception { + testConcurrentCommitTaskWithSubDir(1); + } + + public void testConcurrentCommitTaskWithSubDirV2() throws Exception { + testConcurrentCommitTaskWithSubDir(2); + } + public static String slurp(File f) throws IOException { int len = (int) f.length(); byte[] buf = new byte[len];