From 7b82c4ab4e84256bcdee6256564f394dcc4e81ab Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Mon, 26 Jan 2015 12:58:38 +0900 Subject: [PATCH] HADOOP-11450. Cleanup DistCpV1 not to use deprecated methods and fix javadocs. Contributed by Varun Saxena. --- .../hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/tools/DistCpV1.java | 152 ++++++++---------- 2 files changed, 68 insertions(+), 87 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 8618e38b3d3..662f580f229 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -499,6 +499,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11419 Improve hadoop-maven-plugins. (Herve Boutemy via stevel) + HADOOP-11450. Cleanup DistCpV1 not to use deprecated methods and fix + javadocs. (Varun Saxena via ozawa) + OPTIMIZATIONS HADOOP-11323. WritableComparator#compare keeps reference to byte array. diff --git a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java index c44b67b7328..f46c421607a 100644 --- a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java +++ b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java @@ -51,9 +51,11 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -73,6 +75,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; /** * A Map-reduce program to recursively copy directories between @@ -283,9 +286,8 @@ public InputSplit[] getSplits(JobConf job, int numSplits) long last = 0L; long acc = 0L; long cbrem = srcst.getLen(); - SequenceFile.Reader sl = null; - try { - sl = new SequenceFile.Reader(fs, src, job); + try (SequenceFile.Reader sl = + new SequenceFile.Reader(job, Reader.file(src))) { for (; sl.next(key, value); last = sl.getPosition()) { // if adding this split would put this split past the target size, // cut the last split and put this next file in the next split. @@ -299,9 +301,6 @@ public InputSplit[] getSplits(JobConf job, int numSplits) acc += key.get(); } } - finally { - checkAndClose(sl); - } if (cbrem != 0) { splits.add(new FileSplit(src, pos, cbrem, (String[])null)); } @@ -438,32 +437,28 @@ private boolean skipCopyFile(FileStatus srcstat, Path absdst, */ private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst, Reporter reporter) throws IOException { - FSDataInputStream in = null; - FSDataOutputStream out = null; long bytesCopied = 0L; - try { - Path srcPath = srcstat.getPath(); - // open src file - in = srcPath.getFileSystem(job).open(srcPath); + Path srcPath = srcstat.getPath(); + // open src file + try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) { reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); // open tmp file - out = create(tmpfile, reporter, srcstat); - LOG.info("Copying file " + srcPath + " of size " + - srcstat.getLen() + " bytes..."); + try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) { + LOG.info("Copying file " + srcPath + " of size " + + srcstat.getLen() + " bytes..."); - // copy file - for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) { - out.write(buffer, 0, bytesRead); - bytesCopied += bytesRead; - reporter.setStatus( - String.format("%.2f ", bytesCopied*100.0/srcstat.getLen()) - + absdst + " [ " + - StringUtils.humanReadableInt(bytesCopied) + " / " + - StringUtils.humanReadableInt(srcstat.getLen()) + " ]"); + // copy file + for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) { + out.write(buffer, 0, bytesRead); + bytesCopied += bytesRead; + reporter.setStatus( + String.format("%.2f ", bytesCopied*100.0/srcstat.getLen()) + + absdst + " [ " + + TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / " + + TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1) + + " ]"); + } } - } finally { - checkAndClose(in); - checkAndClose(out); } return bytesCopied; } @@ -471,7 +466,8 @@ private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst, /** * Copy a file to a destination. * @param srcstat src path and metadata - * @param dstpath dst path + * @param relativedst relative dst path + * @param outc Log of skipped files * @param reporter * @throws IOException if copy fails(even if the validation of copy fails) */ @@ -570,7 +566,8 @@ private void updateDestStatus(FileStatus src, FileStatus dst } static String bytesString(long b) { - return b + " bytes (" + StringUtils.humanReadableInt(b) + ")"; + return b + " bytes (" + + TraditionalBinaryPrefix.long2String(b, "", 1) + ")"; } /** @@ -762,6 +759,7 @@ private static void checkSrcPath(JobConf jobConf, List srcPaths) /** * Driver to copy srcPath to destPath depending on required protocol. + * @param conf configuration * @param args arguments */ static void copy(final Configuration conf, final Arguments args @@ -838,10 +836,8 @@ static private void finalize(Configuration conf, JobConf jobconf, FileSystem dstfs = destPath.getFileSystem(conf); Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL)); - SequenceFile.Reader in = null; - try { - in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf), - dstdirlist, jobconf); + try (SequenceFile.Reader in = + new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) { Text dsttext = new Text(); FilePair pair = new FilePair(); for(; in.next(dsttext, pair); ) { @@ -849,8 +845,6 @@ static private void finalize(Configuration conf, JobConf jobconf, updateDestStatus(pair.input, dstfs.getFileStatus(absdst), preseved, dstfs); } - } finally { - checkAndClose(in); } } @@ -876,6 +870,8 @@ static class Arguments { * @param preservedAttributes Preserved attributes * @param filelimit File limit * @param sizelimit Size limit + * @param mapredSslConf ssl configuration + * @param dryrun */ Arguments(List srcs, Path basedir, Path dst, Path log, EnumSet flags, String preservedAttributes, @@ -1266,15 +1262,18 @@ static boolean setup(Configuration conf, JobConf jobConf, long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L, skipFileCount = 0L, skipByteCount = 0L; try ( - SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, - jobConf, srcfilelist, LongWritable.class, FilePair.class, - SequenceFile.CompressionType.NONE); - SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, - jobConf, dstfilelist, Text.class, Text.class, - SequenceFile.CompressionType.NONE); - SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, - jobConf, dstdirlist, Text.class, FilePair.class, - SequenceFile.CompressionType.NONE) + SequenceFile.Writer src_writer = SequenceFile.createWriter(jobConf, + Writer.file(srcfilelist), Writer.keyClass(LongWritable.class), + Writer.valueClass(FilePair.class), Writer.compression( + SequenceFile.CompressionType.NONE)); + SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobConf, + Writer.file(dstfilelist), Writer.keyClass(Text.class), + Writer.valueClass(Text.class), Writer.compression( + SequenceFile.CompressionType.NONE)); + SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobConf, + Writer.file(dstdirlist), Writer.keyClass(Text.class), + Writer.valueClass(FilePair.class), Writer.compression( + SequenceFile.CompressionType.NONE)); ) { // handle the case where the destination directory doesn't exist // and we've only a single src directory OR we're updating/overwriting @@ -1286,7 +1285,8 @@ static boolean setup(Configuration conf, JobConf jobConf, HashSet parentDirsToCopy = new HashSet(); if (args.basedir != null) { FileSystem basefs = args.basedir.getFileSystem(conf); - basedir = args.basedir.makeQualified(basefs); + basedir = args.basedir.makeQualified( + basefs.getUri(), basefs.getWorkingDirectory()); if (!basefs.isDirectory(basedir)) { throw new IOException("Basedir " + basedir + " is not a directory."); } @@ -1307,7 +1307,8 @@ static boolean setup(Configuration conf, JobConf jobConf, if (basedir != null) { root = basedir; - Path parent = src.getParent().makeQualified(srcfs); + Path parent = src.getParent().makeQualified( + srcfs.getUri(), srcfs.getWorkingDirectory()); while (parent != null && !parent.equals(basedir)) { if (!parentDirsToCopy.contains(parent)){ parentDirsToCopy.add(parent); @@ -1427,11 +1428,12 @@ static boolean setup(Configuration conf, JobConf jobConf, } LOG.info("sourcePathsCount(files+directories)=" + srcCount); LOG.info("filesToCopyCount=" + fileCount); - LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount)); + LOG.info("bytesToCopyCount=" + + TraditionalBinaryPrefix.long2String(byteCount, "", 1)); if (update) { LOG.info("filesToSkipCopyCount=" + skipFileCount); LOG.info("bytesToSkipCopyCount=" + - StringUtils.humanReadableInt(skipByteCount)); + TraditionalBinaryPrefix.long2String(skipByteCount, "", 1)); } if (args.dryrun) { return false; @@ -1475,7 +1477,8 @@ static boolean setup(Configuration conf, JobConf jobConf, LOG.info("sourcePathsCount=" + srcCount); LOG.info("filesToCopyCount=" + fileCount); - LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount)); + LOG.info("bytesToCopyCount=" + + TraditionalBinaryPrefix.long2String(byteCount, "", 1)); jobConf.setInt(SRC_COUNT_LABEL, srcCount); jobConf.setLong(TOTAL_SIZE_LABEL, byteCount); @@ -1559,10 +1562,10 @@ static private long deleteNonexisting( //write dst lsr results final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr"); - final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf, - dstlsr, Text.class, NullWritable.class, - SequenceFile.CompressionType.NONE); - try { + try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf, + Writer.file(dstlsr), Writer.keyClass(Text.class), + Writer.valueClass(NullWritable.class), Writer.compression( + SequenceFile.CompressionType.NONE))) { //do lsr to get all file statuses in dstroot final Stack lsrstack = new Stack(); for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) { @@ -1575,8 +1578,6 @@ static private long deleteNonexisting( } } } - } finally { - checkAndClose(writer); } //sort lsr results @@ -1586,13 +1587,11 @@ static private long deleteNonexisting( sorter.sort(dstlsr, sortedlsr); //compare lsr list and dst list - SequenceFile.Reader lsrin = null; - SequenceFile.Reader dstin = null; long deletedPathsCount = 0; - try { - lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf); - dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf); - + try (SequenceFile.Reader lsrin = + new SequenceFile.Reader(jobconf, Reader.file(sortedlsr)); + SequenceFile.Reader dstin = + new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) { //compare sorted lsr list and sorted dst list final Text lsrpath = new Text(); final Text dstpath = new Text(); @@ -1623,9 +1622,6 @@ static private long deleteNonexisting( } } } - } finally { - checkAndClose(lsrin); - checkAndClose(dstin); } return deletedPathsCount; } @@ -1644,13 +1640,11 @@ static private boolean isAncestorPath(Path xp, Path yp) { /** Check whether the file list have duplication. */ static private void checkDuplication(FileSystem fs, Path file, Path sorted, Configuration conf) throws IOException { - SequenceFile.Reader in = null; - try { - SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, - new Text.Comparator(), Text.class, Text.class, conf); - sorter.sort(file, sorted); - in = new SequenceFile.Reader(fs, sorted, conf); - + SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, + new Text.Comparator(), Text.class, Text.class, conf); + sorter.sort(file, sorted); + try (SequenceFile.Reader in = + new SequenceFile.Reader(conf, Reader.file(sorted))) { Text prevdst = null, curdst = new Text(); Text prevsrc = null, cursrc = new Text(); for(; in.next(curdst, cursrc); ) { @@ -1665,24 +1659,8 @@ static private void checkDuplication(FileSystem fs, Path file, Path sorted, cursrc = new Text(); } } - finally { - checkAndClose(in); - } } - static boolean checkAndClose(java.io.Closeable io) { - if (io != null) { - try { - io.close(); - } - catch(IOException ioe) { - LOG.warn(StringUtils.stringifyException(ioe)); - return false; - } - } - return true; - } - /** An exception class for duplicated source files. */ public static class DuplicationException extends IOException { private static final long serialVersionUID = 1L;