MAPREDUCE-7317. Add latency information in FileOutputCommitter.mergePaths. (#2624)

Contributed by Jungtaek Lim.

Change-Id: Iaff2f55e5378c22ce8a92ae776f5aba3f0fc304e
This commit is contained in:
Jungtaek Lim 2021-01-28 04:08:08 +09:00 committed by Steve Loughran
parent 2d124f2f5e
commit ebdacedc83
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
1 changed files with 33 additions and 31 deletions

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -454,43 +455,44 @@ public class FileOutputCommitter extends PathOutputCommitter {
*/ */
private void mergePaths(FileSystem fs, final FileStatus from, private void mergePaths(FileSystem fs, final FileStatus from,
final Path to, JobContext context) throws IOException { final Path to, JobContext context) throws IOException {
if (LOG.isDebugEnabled()) { try (DurationInfo d = new DurationInfo(LOG,
LOG.debug("Merging data from " + from + " to " + to); false,
} "Merging data from %s to %s", from, to)) {
reportProgress(context); reportProgress(context);
FileStatus toStat; FileStatus toStat;
try { try {
toStat = fs.getFileStatus(to); toStat = fs.getFileStatus(to);
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
toStat = null; toStat = null;
}
if (from.isFile()) {
if (toStat != null) {
if (!fs.delete(to, true)) {
throw new IOException("Failed to delete " + to);
}
} }
if (!fs.rename(from.getPath(), to)) { if (from.isFile()) {
throw new IOException("Failed to rename " + from + " to " + to); if (toStat != null) {
}
} else if (from.isDirectory()) {
if (toStat != null) {
if (!toStat.isDirectory()) {
if (!fs.delete(to, true)) { if (!fs.delete(to, true)) {
throw new IOException("Failed to delete " + to); throw new IOException("Failed to delete " + to);
} }
renameOrMerge(fs, from, to, context);
} else {
//It is a directory so merge everything in the directories
for (FileStatus subFrom : fs.listStatus(from.getPath())) {
Path subTo = new Path(to, subFrom.getPath().getName());
mergePaths(fs, subFrom, subTo, context);
}
} }
} else {
renameOrMerge(fs, from, to, context); if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to rename " + from + " to " + to);
}
} else if (from.isDirectory()) {
if (toStat != null) {
if (!toStat.isDirectory()) {
if (!fs.delete(to, true)) {
throw new IOException("Failed to delete " + to);
}
renameOrMerge(fs, from, to, context);
} else {
//It is a directory so merge everything in the directories
for (FileStatus subFrom : fs.listStatus(from.getPath())) {
Path subTo = new Path(to, subFrom.getPath().getName());
mergePaths(fs, subFrom, subTo, context);
}
}
} else {
renameOrMerge(fs, from, to, context);
}
} }
} }
} }