MAPREDUCE-6275. Race condition in FileOutputCommitter v2 for user-specified task output subdirs. Contributed by Gera Shegalov and Siqi Li
(cherry picked from commit 91baca145a
)
This commit is contained in:
parent
2b9173059d
commit
cabe676d67
|
@ -215,6 +215,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
MAPREDUCE-6277. Job can post multiple history files if attempt loses
|
MAPREDUCE-6277. Job can post multiple history files if attempt loses
|
||||||
connection to the RM (Chang Li via jlowe)
|
connection to the RM (Chang Li via jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-6275. Race condition in FileOutputCommitter v2 for
|
||||||
|
user-specified task output subdirs (Gera Shegalov and Siqi Li via jlowe)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -348,44 +348,61 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
* @param to the path data is going to.
|
* @param to the path data is going to.
|
||||||
* @throws IOException on any error
|
* @throws IOException on any error
|
||||||
*/
|
*/
|
||||||
private static void mergePaths(FileSystem fs, final FileStatus from,
|
private void mergePaths(FileSystem fs, final FileStatus from,
|
||||||
final Path to)
|
final Path to) throws IOException {
|
||||||
throws IOException {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Merging data from "+from+" to "+to);
|
LOG.debug("Merging data from " + from + " to " + to);
|
||||||
if(from.isFile()) {
|
}
|
||||||
if(fs.exists(to)) {
|
FileStatus toStat;
|
||||||
if(!fs.delete(to, true)) {
|
try {
|
||||||
throw new IOException("Failed to delete "+to);
|
toStat = fs.getFileStatus(to);
|
||||||
|
} catch (FileNotFoundException fnfe) {
|
||||||
|
toStat = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (from.isFile()) {
|
||||||
|
if (toStat != null) {
|
||||||
|
if (!fs.delete(to, true)) {
|
||||||
|
throw new IOException("Failed to delete " + to);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!fs.rename(from.getPath(), to)) {
|
if (!fs.rename(from.getPath(), to)) {
|
||||||
throw new IOException("Failed to rename "+from+" to "+to);
|
throw new IOException("Failed to rename " + from + " to " + to);
|
||||||
}
|
}
|
||||||
} else if(from.isDirectory()) {
|
} else if (from.isDirectory()) {
|
||||||
if(fs.exists(to)) {
|
if (toStat != null) {
|
||||||
FileStatus toStat = fs.getFileStatus(to);
|
if (!toStat.isDirectory()) {
|
||||||
if(!toStat.isDirectory()) {
|
if (!fs.delete(to, true)) {
|
||||||
if(!fs.delete(to, true)) {
|
throw new IOException("Failed to delete " + to);
|
||||||
throw new IOException("Failed to delete "+to);
|
|
||||||
}
|
|
||||||
if(!fs.rename(from.getPath(), to)) {
|
|
||||||
throw new IOException("Failed to rename "+from+" to "+to);
|
|
||||||
}
|
}
|
||||||
|
renameOrMerge(fs, from, to);
|
||||||
} else {
|
} else {
|
||||||
//It is a directory so merge everything in the directories
|
//It is a directory so merge everything in the directories
|
||||||
for(FileStatus subFrom: fs.listStatus(from.getPath())) {
|
for (FileStatus subFrom : fs.listStatus(from.getPath())) {
|
||||||
Path subTo = new Path(to, subFrom.getPath().getName());
|
Path subTo = new Path(to, subFrom.getPath().getName());
|
||||||
mergePaths(fs, subFrom, subTo);
|
mergePaths(fs, subFrom, subTo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
//it does not exist just rename
|
renameOrMerge(fs, from, to);
|
||||||
if(!fs.rename(from.getPath(), to)) {
|
|
||||||
throw new IOException("Failed to rename "+from+" to "+to);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
|
||||||
|
throws IOException {
|
||||||
|
if (algorithmVersion == 1) {
|
||||||
|
if (!fs.rename(from.getPath(), to)) {
|
||||||
|
throw new IOException("Failed to rename " + from + " to " + to);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fs.mkdirs(to);
|
||||||
|
for (FileStatus subFrom : fs.listStatus(from.getPath())) {
|
||||||
|
Path subTo = new Path(to, subFrom.getPath().getName());
|
||||||
|
mergePaths(fs, subFrom, subTo);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -546,8 +563,9 @@ public class FileOutputCommitter extends OutputCommitter {
|
||||||
Path previousCommittedTaskPath = getCommittedTaskPath(
|
Path previousCommittedTaskPath = getCommittedTaskPath(
|
||||||
previousAttempt, context);
|
previousAttempt, context);
|
||||||
FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
|
FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
|
LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
|
||||||
|
}
|
||||||
if (algorithmVersion == 1) {
|
if (algorithmVersion == 1) {
|
||||||
if (fs.exists(previousCommittedTaskPath)) {
|
if (fs.exists(previousCommittedTaskPath)) {
|
||||||
Path committedTaskPath = getCommittedTaskPath(context);
|
Path committedTaskPath = getCommittedTaskPath(context);
|
||||||
|
|
|
@ -22,9 +22,15 @@ import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -39,6 +45,7 @@ import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
@ -47,13 +54,25 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class TestFileOutputCommitter extends TestCase {
|
public class TestFileOutputCommitter extends TestCase {
|
||||||
private static Path outDir = new Path(System.getProperty("test.build.data",
|
private static final Path outDir = new Path(
|
||||||
"/tmp"), "output");
|
System.getProperty("test.build.data",
|
||||||
|
System.getProperty("java.io.tmpdir")),
|
||||||
|
TestFileOutputCommitter.class.getName());
|
||||||
|
|
||||||
|
private final static String SUB_DIR = "SUB_DIR";
|
||||||
|
private final static Path OUT_SUB_DIR = new Path(outDir, SUB_DIR);
|
||||||
|
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TestFileOutputCommitter.class);
|
||||||
|
|
||||||
// A random task attempt id for testing.
|
// A random task attempt id for testing.
|
||||||
private static String attempt = "attempt_200707121733_0001_m_000000_0";
|
private static final String attempt = "attempt_200707121733_0001_m_000000_0";
|
||||||
private static String partFile = "part-m-00000";
|
private static final String partFile = "part-m-00000";
|
||||||
private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
|
private static final TaskAttemptID taskID = TaskAttemptID.forName(attempt);
|
||||||
|
|
||||||
|
private static final String attempt1 = "attempt_200707121733_0001_m_000001_0";
|
||||||
|
private static final TaskAttemptID taskID1 = TaskAttemptID.forName(attempt1);
|
||||||
|
|
||||||
private Text key1 = new Text("key1");
|
private Text key1 = new Text("key1");
|
||||||
private Text key2 = new Text("key2");
|
private Text key2 = new Text("key2");
|
||||||
private Text val1 = new Text("val1");
|
private Text val1 = new Text("val1");
|
||||||
|
@ -441,6 +460,107 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
testFailAbortInternal(2);
|
testFailAbortInternal(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class RLFS extends RawLocalFileSystem {
|
||||||
|
private final ThreadLocal<Boolean> needNull = new ThreadLocal<Boolean>() {
|
||||||
|
@Override
|
||||||
|
protected Boolean initialValue() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public RLFS() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus getFileStatus(Path f) throws IOException {
|
||||||
|
if (needNull.get() &&
|
||||||
|
OUT_SUB_DIR.toUri().getPath().equals(f.toUri().getPath())) {
|
||||||
|
needNull.set(false); // lie once per thread
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return super.getFileStatus(f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testConcurrentCommitTaskWithSubDir(int version)
|
||||||
|
throws Exception {
|
||||||
|
final Job job = Job.getInstance();
|
||||||
|
FileOutputFormat.setOutputPath(job, outDir);
|
||||||
|
final Configuration conf = job.getConfiguration();
|
||||||
|
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||||
|
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||||
|
version);
|
||||||
|
|
||||||
|
conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
|
||||||
|
FileSystem.closeAll();
|
||||||
|
|
||||||
|
final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||||
|
final FileOutputCommitter amCommitter =
|
||||||
|
new FileOutputCommitter(outDir, jContext);
|
||||||
|
amCommitter.setupJob(jContext);
|
||||||
|
|
||||||
|
final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
|
||||||
|
taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
|
||||||
|
taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
|
||||||
|
|
||||||
|
final TextOutputFormat[] tof = new TextOutputFormat[2];
|
||||||
|
for (int i = 0; i < tof.length; i++) {
|
||||||
|
tof[i] = new TextOutputFormat() {
|
||||||
|
@Override
|
||||||
|
public Path getDefaultWorkFile(TaskAttemptContext context,
|
||||||
|
String extension) throws IOException {
|
||||||
|
final FileOutputCommitter foc = (FileOutputCommitter)
|
||||||
|
getOutputCommitter(context);
|
||||||
|
return new Path(new Path(foc.getWorkPath(), SUB_DIR),
|
||||||
|
getUniqueFile(context, getOutputName(context), extension));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
final ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < taCtx.length; i++) {
|
||||||
|
final int taskIdx = i;
|
||||||
|
executor.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException, InterruptedException {
|
||||||
|
final OutputCommitter outputCommitter =
|
||||||
|
tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
|
||||||
|
outputCommitter.setupTask(taCtx[taskIdx]);
|
||||||
|
final RecordWriter rw =
|
||||||
|
tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
|
||||||
|
writeOutput(rw, taCtx[taskIdx]);
|
||||||
|
outputCommitter.commitTask(taCtx[taskIdx]);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
executor.shutdown();
|
||||||
|
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
|
||||||
|
LOG.info("Awaiting thread termination!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
amCommitter.commitJob(jContext);
|
||||||
|
final RawLocalFileSystem lfs = new RawLocalFileSystem();
|
||||||
|
lfs.setConf(conf);
|
||||||
|
assertFalse("Must not end up with sub_dir/sub_dir",
|
||||||
|
lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
|
||||||
|
|
||||||
|
// validate output
|
||||||
|
validateContent(OUT_SUB_DIR);
|
||||||
|
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testConcurrentCommitTaskWithSubDirV1() throws Exception {
|
||||||
|
testConcurrentCommitTaskWithSubDir(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testConcurrentCommitTaskWithSubDirV2() throws Exception {
|
||||||
|
testConcurrentCommitTaskWithSubDir(2);
|
||||||
|
}
|
||||||
|
|
||||||
public static String slurp(File f) throws IOException {
|
public static String slurp(File f) throws IOException {
|
||||||
int len = (int) f.length();
|
int len = (int) f.length();
|
||||||
byte[] buf = new byte[len];
|
byte[] buf = new byte[len];
|
||||||
|
|
Loading…
Reference in New Issue