MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla

(cherry picked from commit 4d8de7ab69)
This commit is contained in:
Jason Lowe 2018-11-28 14:54:59 -06:00
parent d9457df989
commit e7fa638fe8
2 changed files with 51 additions and 10 deletions

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -400,7 +401,7 @@ public class FileOutputCommitter extends PathOutputCommitter {
if (algorithmVersion == 1) { if (algorithmVersion == 1) {
for (FileStatus stat: getAllCommittedTaskPaths(context)) { for (FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput); mergePaths(fs, stat, finalOutput, context);
} }
} }
@ -451,10 +452,11 @@ public class FileOutputCommitter extends PathOutputCommitter {
* @throws IOException on any error * @throws IOException on any error
*/ */
private void mergePaths(FileSystem fs, final FileStatus from, private void mergePaths(FileSystem fs, final FileStatus from,
final Path to) throws IOException { final Path to, JobContext context) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Merging data from " + from + " to " + to); LOG.debug("Merging data from " + from + " to " + to);
} }
reportProgress(context);
FileStatus toStat; FileStatus toStat;
try { try {
toStat = fs.getFileStatus(to); toStat = fs.getFileStatus(to);
@ -478,22 +480,28 @@ public class FileOutputCommitter extends PathOutputCommitter {
if (!fs.delete(to, true)) { if (!fs.delete(to, true)) {
throw new IOException("Failed to delete " + to); throw new IOException("Failed to delete " + to);
} }
renameOrMerge(fs, from, to); renameOrMerge(fs, from, to, context);
} else { } else {
//It is a directory so merge everything in the directories //It is a directory so merge everything in the directories
for (FileStatus subFrom : fs.listStatus(from.getPath())) { for (FileStatus subFrom : fs.listStatus(from.getPath())) {
Path subTo = new Path(to, subFrom.getPath().getName()); Path subTo = new Path(to, subFrom.getPath().getName());
mergePaths(fs, subFrom, subTo); mergePaths(fs, subFrom, subTo, context);
} }
} }
} else { } else {
renameOrMerge(fs, from, to); renameOrMerge(fs, from, to, context);
} }
} }
} }
private void renameOrMerge(FileSystem fs, FileStatus from, Path to) private void reportProgress(JobContext context) {
throws IOException { if (context instanceof Progressable) {
((Progressable) context).progress();
}
}
private void renameOrMerge(FileSystem fs, FileStatus from, Path to,
JobContext context) throws IOException {
if (algorithmVersion == 1) { if (algorithmVersion == 1) {
if (!fs.rename(from.getPath(), to)) { if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to rename " + from + " to " + to); throw new IOException("Failed to rename " + from + " to " + to);
@ -502,7 +510,7 @@ public class FileOutputCommitter extends PathOutputCommitter {
fs.mkdirs(to); fs.mkdirs(to);
for (FileStatus subFrom : fs.listStatus(from.getPath())) { for (FileStatus subFrom : fs.listStatus(from.getPath())) {
Path subTo = new Path(to, subFrom.getPath().getName()); Path subTo = new Path(to, subFrom.getPath().getName());
mergePaths(fs, subFrom, subTo); mergePaths(fs, subFrom, subTo, context);
} }
} }
} }
@ -594,7 +602,7 @@ public class FileOutputCommitter extends PathOutputCommitter {
committedTaskPath); committedTaskPath);
} else { } else {
// directly merge everything from taskAttemptPath to output directory // directly merge everything from taskAttemptPath to output directory
mergePaths(fs, taskAttemptDirStatus, outputPath); mergePaths(fs, taskAttemptDirStatus, outputPath, context);
LOG.info("Saved output of task '" + attemptId + "' to " + LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath); outputPath);
@ -718,7 +726,7 @@ public class FileOutputCommitter extends PathOutputCommitter {
FileStatus from = fs.getFileStatus(previousCommittedTaskPath); FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
LOG.info("Recovering task for upgrading scenario, moving files from " LOG.info("Recovering task for upgrading scenario, moving files from "
+ previousCommittedTaskPath + " to " + outputPath); + previousCommittedTaskPath + " to " + outputPath);
mergePaths(fs, from, outputPath); mergePaths(fs, from, outputPath, context);
} catch (FileNotFoundException ignored) { } catch (FileNotFoundException ignored) {
} }
LOG.info("Done recovering task " + attemptId); LOG.info("Done recovering task " + attemptId);

View File

@ -58,6 +58,10 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestFileOutputCommitter { public class TestFileOutputCommitter {
private static final Path outDir = new Path( private static final Path outDir = new Path(
@ -434,6 +438,35 @@ public class TestFileOutputCommitter {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
@Test
public void testProgressDuringMerge() throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
2);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = spy(new TaskAttemptContextImpl(conf, taskID));
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeMapFileOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
//make sure progress flag was set.
// The first time it is set is during commit but ensure that
// mergePaths call makes it go again.
verify(tContext, atLeast(2)).progress();
}
@Test @Test
public void testCommitterRepeatableV1() throws Exception { public void testCommitterRepeatableV1() throws Exception {
testCommitterRetryInternal(1); testCommitterRetryInternal(1);