HADOOP-11450. Cleanup DistCpV1 not to use deprecated methods and fix javadocs. Contributed by Varun Saxena.

This commit is contained in:
Tsuyoshi Ozawa 2015-01-26 12:58:38 +09:00
parent 0d6bd62102
commit 7b82c4ab4e
2 changed files with 68 additions and 87 deletions

View File

@ -499,6 +499,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11419 Improve hadoop-maven-plugins. (Herve Boutemy via stevel) HADOOP-11419 Improve hadoop-maven-plugins. (Herve Boutemy via stevel)
HADOOP-11450. Cleanup DistCpV1 not to use deprecated methods and fix
javadocs. (Varun Saxena via ozawa)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11323. WritableComparator#compare keeps reference to byte array. HADOOP-11323. WritableComparator#compare keeps reference to byte array.

View File

@ -51,9 +51,11 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.FileSplit;
@ -73,6 +75,7 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
/** /**
* A Map-reduce program to recursively copy directories between * A Map-reduce program to recursively copy directories between
@ -283,9 +286,8 @@ public class DistCpV1 implements Tool {
long last = 0L; long last = 0L;
long acc = 0L; long acc = 0L;
long cbrem = srcst.getLen(); long cbrem = srcst.getLen();
SequenceFile.Reader sl = null; try (SequenceFile.Reader sl =
try { new SequenceFile.Reader(job, Reader.file(src))) {
sl = new SequenceFile.Reader(fs, src, job);
for (; sl.next(key, value); last = sl.getPosition()) { for (; sl.next(key, value); last = sl.getPosition()) {
// if adding this split would put this split past the target size, // if adding this split would put this split past the target size,
// cut the last split and put this next file in the next split. // cut the last split and put this next file in the next split.
@ -299,9 +301,6 @@ public class DistCpV1 implements Tool {
acc += key.get(); acc += key.get();
} }
} }
finally {
checkAndClose(sl);
}
if (cbrem != 0) { if (cbrem != 0) {
splits.add(new FileSplit(src, pos, cbrem, (String[])null)); splits.add(new FileSplit(src, pos, cbrem, (String[])null));
} }
@ -438,32 +437,28 @@ public class DistCpV1 implements Tool {
*/ */
private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst, private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
Reporter reporter) throws IOException { Reporter reporter) throws IOException {
FSDataInputStream in = null;
FSDataOutputStream out = null;
long bytesCopied = 0L; long bytesCopied = 0L;
try { Path srcPath = srcstat.getPath();
Path srcPath = srcstat.getPath(); // open src file
// open src file try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) {
in = srcPath.getFileSystem(job).open(srcPath);
reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
// open tmp file // open tmp file
out = create(tmpfile, reporter, srcstat); try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) {
LOG.info("Copying file " + srcPath + " of size " + LOG.info("Copying file " + srcPath + " of size " +
srcstat.getLen() + " bytes..."); srcstat.getLen() + " bytes...");
// copy file // copy file
for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) { for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
out.write(buffer, 0, bytesRead); out.write(buffer, 0, bytesRead);
bytesCopied += bytesRead; bytesCopied += bytesRead;
reporter.setStatus( reporter.setStatus(
String.format("%.2f ", bytesCopied*100.0/srcstat.getLen()) String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
+ absdst + " [ " + + absdst + " [ " +
StringUtils.humanReadableInt(bytesCopied) + " / " + TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / "
StringUtils.humanReadableInt(srcstat.getLen()) + " ]"); + TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1)
+ " ]");
}
} }
} finally {
checkAndClose(in);
checkAndClose(out);
} }
return bytesCopied; return bytesCopied;
} }
@ -471,7 +466,8 @@ public class DistCpV1 implements Tool {
/** /**
* Copy a file to a destination. * Copy a file to a destination.
* @param srcstat src path and metadata * @param srcstat src path and metadata
* @param dstpath dst path * @param relativedst relative dst path
* @param outc Log of skipped files
* @param reporter * @param reporter
* @throws IOException if copy fails(even if the validation of copy fails) * @throws IOException if copy fails(even if the validation of copy fails)
*/ */
@ -570,7 +566,8 @@ public class DistCpV1 implements Tool {
} }
static String bytesString(long b) { static String bytesString(long b) {
return b + " bytes (" + StringUtils.humanReadableInt(b) + ")"; return b + " bytes (" +
TraditionalBinaryPrefix.long2String(b, "", 1) + ")";
} }
/** /**
@ -762,6 +759,7 @@ public class DistCpV1 implements Tool {
/** /**
* Driver to copy srcPath to destPath depending on required protocol. * Driver to copy srcPath to destPath depending on required protocol.
* @param conf configuration
* @param args arguments * @param args arguments
*/ */
static void copy(final Configuration conf, final Arguments args static void copy(final Configuration conf, final Arguments args
@ -838,10 +836,8 @@ public class DistCpV1 implements Tool {
FileSystem dstfs = destPath.getFileSystem(conf); FileSystem dstfs = destPath.getFileSystem(conf);
Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL)); Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
SequenceFile.Reader in = null; try (SequenceFile.Reader in =
try { new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf),
dstdirlist, jobconf);
Text dsttext = new Text(); Text dsttext = new Text();
FilePair pair = new FilePair(); FilePair pair = new FilePair();
for(; in.next(dsttext, pair); ) { for(; in.next(dsttext, pair); ) {
@ -849,8 +845,6 @@ public class DistCpV1 implements Tool {
updateDestStatus(pair.input, dstfs.getFileStatus(absdst), updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
preseved, dstfs); preseved, dstfs);
} }
} finally {
checkAndClose(in);
} }
} }
@ -876,6 +870,8 @@ public class DistCpV1 implements Tool {
* @param preservedAttributes Preserved attributes * @param preservedAttributes Preserved attributes
* @param filelimit File limit * @param filelimit File limit
* @param sizelimit Size limit * @param sizelimit Size limit
* @param mapredSslConf ssl configuration
* @param dryrun
*/ */
Arguments(List<Path> srcs, Path basedir, Path dst, Path log, Arguments(List<Path> srcs, Path basedir, Path dst, Path log,
EnumSet<Options> flags, String preservedAttributes, EnumSet<Options> flags, String preservedAttributes,
@ -1266,15 +1262,18 @@ public class DistCpV1 implements Tool {
long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L, long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L,
skipFileCount = 0L, skipByteCount = 0L; skipFileCount = 0L, skipByteCount = 0L;
try ( try (
SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, SequenceFile.Writer src_writer = SequenceFile.createWriter(jobConf,
jobConf, srcfilelist, LongWritable.class, FilePair.class, Writer.file(srcfilelist), Writer.keyClass(LongWritable.class),
SequenceFile.CompressionType.NONE); Writer.valueClass(FilePair.class), Writer.compression(
SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, SequenceFile.CompressionType.NONE));
jobConf, dstfilelist, Text.class, Text.class, SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobConf,
SequenceFile.CompressionType.NONE); Writer.file(dstfilelist), Writer.keyClass(Text.class),
SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, Writer.valueClass(Text.class), Writer.compression(
jobConf, dstdirlist, Text.class, FilePair.class, SequenceFile.CompressionType.NONE));
SequenceFile.CompressionType.NONE) SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobConf,
Writer.file(dstdirlist), Writer.keyClass(Text.class),
Writer.valueClass(FilePair.class), Writer.compression(
SequenceFile.CompressionType.NONE));
) { ) {
// handle the case where the destination directory doesn't exist // handle the case where the destination directory doesn't exist
// and we've only a single src directory OR we're updating/overwriting // and we've only a single src directory OR we're updating/overwriting
@ -1286,7 +1285,8 @@ public class DistCpV1 implements Tool {
HashSet<Path> parentDirsToCopy = new HashSet<Path>(); HashSet<Path> parentDirsToCopy = new HashSet<Path>();
if (args.basedir != null) { if (args.basedir != null) {
FileSystem basefs = args.basedir.getFileSystem(conf); FileSystem basefs = args.basedir.getFileSystem(conf);
basedir = args.basedir.makeQualified(basefs); basedir = args.basedir.makeQualified(
basefs.getUri(), basefs.getWorkingDirectory());
if (!basefs.isDirectory(basedir)) { if (!basefs.isDirectory(basedir)) {
throw new IOException("Basedir " + basedir + " is not a directory."); throw new IOException("Basedir " + basedir + " is not a directory.");
} }
@ -1307,7 +1307,8 @@ public class DistCpV1 implements Tool {
if (basedir != null) { if (basedir != null) {
root = basedir; root = basedir;
Path parent = src.getParent().makeQualified(srcfs); Path parent = src.getParent().makeQualified(
srcfs.getUri(), srcfs.getWorkingDirectory());
while (parent != null && !parent.equals(basedir)) { while (parent != null && !parent.equals(basedir)) {
if (!parentDirsToCopy.contains(parent)){ if (!parentDirsToCopy.contains(parent)){
parentDirsToCopy.add(parent); parentDirsToCopy.add(parent);
@ -1427,11 +1428,12 @@ public class DistCpV1 implements Tool {
} }
LOG.info("sourcePathsCount(files+directories)=" + srcCount); LOG.info("sourcePathsCount(files+directories)=" + srcCount);
LOG.info("filesToCopyCount=" + fileCount); LOG.info("filesToCopyCount=" + fileCount);
LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount)); LOG.info("bytesToCopyCount=" +
TraditionalBinaryPrefix.long2String(byteCount, "", 1));
if (update) { if (update) {
LOG.info("filesToSkipCopyCount=" + skipFileCount); LOG.info("filesToSkipCopyCount=" + skipFileCount);
LOG.info("bytesToSkipCopyCount=" + LOG.info("bytesToSkipCopyCount=" +
StringUtils.humanReadableInt(skipByteCount)); TraditionalBinaryPrefix.long2String(skipByteCount, "", 1));
} }
if (args.dryrun) { if (args.dryrun) {
return false; return false;
@ -1475,7 +1477,8 @@ public class DistCpV1 implements Tool {
LOG.info("sourcePathsCount=" + srcCount); LOG.info("sourcePathsCount=" + srcCount);
LOG.info("filesToCopyCount=" + fileCount); LOG.info("filesToCopyCount=" + fileCount);
LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount)); LOG.info("bytesToCopyCount=" +
TraditionalBinaryPrefix.long2String(byteCount, "", 1));
jobConf.setInt(SRC_COUNT_LABEL, srcCount); jobConf.setInt(SRC_COUNT_LABEL, srcCount);
jobConf.setLong(TOTAL_SIZE_LABEL, byteCount); jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
@ -1559,10 +1562,10 @@ public class DistCpV1 implements Tool {
//write dst lsr results //write dst lsr results
final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr"); final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf, try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf,
dstlsr, Text.class, NullWritable.class, Writer.file(dstlsr), Writer.keyClass(Text.class),
SequenceFile.CompressionType.NONE); Writer.valueClass(NullWritable.class), Writer.compression(
try { SequenceFile.CompressionType.NONE))) {
//do lsr to get all file statuses in dstroot //do lsr to get all file statuses in dstroot
final Stack<FileStatus> lsrstack = new Stack<FileStatus>(); final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) { for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
@ -1575,8 +1578,6 @@ public class DistCpV1 implements Tool {
} }
} }
} }
} finally {
checkAndClose(writer);
} }
//sort lsr results //sort lsr results
@ -1586,13 +1587,11 @@ public class DistCpV1 implements Tool {
sorter.sort(dstlsr, sortedlsr); sorter.sort(dstlsr, sortedlsr);
//compare lsr list and dst list //compare lsr list and dst list
SequenceFile.Reader lsrin = null;
SequenceFile.Reader dstin = null;
long deletedPathsCount = 0; long deletedPathsCount = 0;
try { try (SequenceFile.Reader lsrin =
lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf); new SequenceFile.Reader(jobconf, Reader.file(sortedlsr));
dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf); SequenceFile.Reader dstin =
new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) {
//compare sorted lsr list and sorted dst list //compare sorted lsr list and sorted dst list
final Text lsrpath = new Text(); final Text lsrpath = new Text();
final Text dstpath = new Text(); final Text dstpath = new Text();
@ -1623,9 +1622,6 @@ public class DistCpV1 implements Tool {
} }
} }
} }
} finally {
checkAndClose(lsrin);
checkAndClose(dstin);
} }
return deletedPathsCount; return deletedPathsCount;
} }
@ -1644,13 +1640,11 @@ public class DistCpV1 implements Tool {
/** Check whether the file list have duplication. */ /** Check whether the file list have duplication. */
static private void checkDuplication(FileSystem fs, Path file, Path sorted, static private void checkDuplication(FileSystem fs, Path file, Path sorted,
Configuration conf) throws IOException { Configuration conf) throws IOException {
SequenceFile.Reader in = null; SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
try { new Text.Comparator(), Text.class, Text.class, conf);
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, sorter.sort(file, sorted);
new Text.Comparator(), Text.class, Text.class, conf); try (SequenceFile.Reader in =
sorter.sort(file, sorted); new SequenceFile.Reader(conf, Reader.file(sorted))) {
in = new SequenceFile.Reader(fs, sorted, conf);
Text prevdst = null, curdst = new Text(); Text prevdst = null, curdst = new Text();
Text prevsrc = null, cursrc = new Text(); Text prevsrc = null, cursrc = new Text();
for(; in.next(curdst, cursrc); ) { for(; in.next(curdst, cursrc); ) {
@ -1665,24 +1659,8 @@ public class DistCpV1 implements Tool {
cursrc = new Text(); cursrc = new Text();
} }
} }
finally {
checkAndClose(in);
}
} }
static boolean checkAndClose(java.io.Closeable io) {
if (io != null) {
try {
io.close();
}
catch(IOException ioe) {
LOG.warn(StringUtils.stringifyException(ioe));
return false;
}
}
return true;
}
/** An exception class for duplicated source files. */ /** An exception class for duplicated source files. */
public static class DuplicationException extends IOException { public static class DuplicationException extends IOException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;