From ebdacedc83580f6ec92a03129328e13718c12cad Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 28 Jan 2021 04:08:08 +0900 Subject: [PATCH] MAPREDUCE-7317. Add latency information in FileOutputCommitter.mergePaths. (#2624) Contributed by Jungtaek Lim. Change-Id: Iaff2f55e5378c22ce8a92ae776f5aba3f0fc304e --- .../lib/output/FileOutputCommitter.java | 64 ++++++++++--------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index e8f9ec7e8ec..2973fb05f50 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -454,43 +455,44 @@ public class FileOutputCommitter extends PathOutputCommitter { */ private void mergePaths(FileSystem fs, final FileStatus from, final Path to, JobContext context) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Merging data from " + from + " to " + to); - } - reportProgress(context); - FileStatus toStat; - try { - toStat = fs.getFileStatus(to); - } catch (FileNotFoundException fnfe) { - toStat = null; - } - - if (from.isFile()) { - if (toStat != null) { - if (!fs.delete(to, true)) { - throw new IOException("Failed to delete " + to); - } + try (DurationInfo d = new DurationInfo(LOG, + false, + "Merging data from %s to %s", from, to)) { + reportProgress(context); + FileStatus toStat; + try { + toStat = fs.getFileStatus(to); + } catch (FileNotFoundException fnfe) { + toStat = null; } - if (!fs.rename(from.getPath(), to)) { - throw new IOException("Failed to rename " + from + " to " + to); - } - } else if (from.isDirectory()) { - if (toStat != null) { - if (!toStat.isDirectory()) { + if (from.isFile()) { + if (toStat != null) { if (!fs.delete(to, true)) { throw new IOException("Failed to delete " + to); } - renameOrMerge(fs, from, to, context); - } else { - //It is a directory so merge everything in the directories - for (FileStatus subFrom : fs.listStatus(from.getPath())) { - Path subTo = new Path(to, subFrom.getPath().getName()); - mergePaths(fs, subFrom, subTo, context); - } } - } else { - renameOrMerge(fs, from, to, context); + + if (!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename " + from + " to " + to); + } + } else if (from.isDirectory()) { + if (toStat != null) { + if (!toStat.isDirectory()) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); + } + renameOrMerge(fs, from, to, context); + } else { + //It is a directory so merge everything in the directories + for (FileStatus subFrom : fs.listStatus(from.getPath())) { + Path subTo = new Path(to, subFrom.getPath().getName()); + mergePaths(fs, subFrom, subTo, context); + } + } + } else { + renameOrMerge(fs, from, to, context); + } } } }