HADOOP-10295. Allow distcp to automatically identify the checksum type of source files and use it for the target. Contributed by Jing Zhao and Laurent Goujon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1563019 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-01-30 23:53:23 +00:00
parent 1cac66ce68
commit 067d52b98c
11 changed files with 257 additions and 111 deletions

View File

@ -304,6 +304,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10139. Update and improve the Single Cluster Setup document. HADOOP-10139. Update and improve the Single Cluster Setup document.
(Akira Ajisaka via Arpit Agarwal) (Akira Ajisaka via Arpit Agarwal)
HADOOP-10295. Allow distcp to automatically identify the checksum type of
source files and use it for the target. (jing9 and Laurent Goujon)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -21,21 +21,26 @@ import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
/** An abstract class representing file checksums for files. */ /** An abstract class representing file checksums for files. */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public abstract class FileChecksum implements Writable { public abstract class FileChecksum implements Writable {
/** The checksum algorithm name */ /** The checksum algorithm name */
public abstract String getAlgorithmName(); public abstract String getAlgorithmName();
/** The length of the checksum in bytes */ /** The length of the checksum in bytes */
public abstract int getLength(); public abstract int getLength();
/** The value of the checksum in bytes */ /** The value of the checksum in bytes */
public abstract byte[] getBytes(); public abstract byte[] getBytes();
public ChecksumOpt getChecksumOpt() {
return null;
}
/** Return true if both the algorithms and the values are the same. */ /** Return true if both the algorithms and the values are the same. */
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
@ -50,7 +55,7 @@ public abstract class FileChecksum implements Writable {
return this.getAlgorithmName().equals(that.getAlgorithmName()) return this.getAlgorithmName().equals(that.getAlgorithmName())
&& Arrays.equals(this.getBytes(), that.getBytes()); && Arrays.equals(this.getBytes(), that.getBytes());
} }
@Override @Override
public int hashCode() { public int hashCode() {
return getAlgorithmName().hashCode() ^ Arrays.hashCode(getBytes()); return getAlgorithmName().hashCode() ^ Arrays.hashCode(getBytes());

View File

@ -56,7 +56,7 @@ public class MD5MD5CRC32FileChecksum extends FileChecksum {
this.crcPerBlock = crcPerBlock; this.crcPerBlock = crcPerBlock;
this.md5 = md5; this.md5 = md5;
} }
@Override @Override
public String getAlgorithmName() { public String getAlgorithmName() {
return "MD5-of-" + crcPerBlock + "MD5-of-" + bytesPerCRC + return "MD5-of-" + crcPerBlock + "MD5-of-" + bytesPerCRC +
@ -73,10 +73,10 @@ public class MD5MD5CRC32FileChecksum extends FileChecksum {
throw new IOException("Unknown checksum type in " + algorithm); throw new IOException("Unknown checksum type in " + algorithm);
} }
@Override @Override
public int getLength() {return LENGTH;} public int getLength() {return LENGTH;}
@Override @Override
public byte[] getBytes() { public byte[] getBytes() {
return WritableUtils.toByteArray(this); return WritableUtils.toByteArray(this);
@ -88,6 +88,7 @@ public class MD5MD5CRC32FileChecksum extends FileChecksum {
return DataChecksum.Type.CRC32; return DataChecksum.Type.CRC32;
} }
@Override
public ChecksumOpt getChecksumOpt() { public ChecksumOpt getChecksumOpt() {
return new ChecksumOpt(getCrcType(), bytesPerCRC); return new ChecksumOpt(getCrcType(), bytesPerCRC);
} }
@ -98,12 +99,12 @@ public class MD5MD5CRC32FileChecksum extends FileChecksum {
crcPerBlock = in.readLong(); crcPerBlock = in.readLong();
md5 = MD5Hash.read(in); md5 = MD5Hash.read(in);
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
out.writeInt(bytesPerCRC); out.writeInt(bytesPerCRC);
out.writeLong(crcPerBlock); out.writeLong(crcPerBlock);
md5.write(out); md5.write(out);
} }
/** Write that object to xml output. */ /** Write that object to xml output. */
@ -157,11 +158,11 @@ public class MD5MD5CRC32FileChecksum extends FileChecksum {
} }
} catch (Exception e) { } catch (Exception e) {
throw new SAXException("Invalid attributes: bytesPerCRC=" + bytesPerCRC throw new SAXException("Invalid attributes: bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock + ", crcType=" + crcType + ", crcPerBlock=" + crcPerBlock + ", crcType=" + crcType
+ ", md5=" + md5, e); + ", md5=" + md5, e);
} }
} }
@Override @Override
public String toString() { public String toString() {
return getAlgorithmName() + ":" + md5; return getAlgorithmName() + ":" + md5;

View File

@ -37,15 +37,16 @@ public enum DistCpOptionSwitch {
/** /**
* Preserves status of file/path in the target. * Preserves status of file/path in the target.
* Default behavior with -p, is to preserve replication, * Default behavior with -p, is to preserve replication,
* block size, user, group and permission on the target file * block size, user, group, permission and checksum type on the target file.
* Note that when preserving checksum type, block size is also preserved.
* *
* If any of the optional switches are present among rbugp, then * If any of the optional switches are present among rbugpc, then
* only the corresponding file attribute is preserved * only the corresponding file attribute is preserved.
* *
*/ */
PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS, PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
new Option("p", true, "preserve status (rbugp)" + new Option("p", true, "preserve status (rbugpc)" +
"(replication, block-size, user, group, permission)")), "(replication, block-size, user, group, permission, checksum-type)")),
/** /**
* Update target location by copying only files that are missing * Update target location by copying only files that are missing
@ -53,7 +54,7 @@ public enum DistCpOptionSwitch {
* across source and target. Typically used with DELETE_MISSING * across source and target. Typically used with DELETE_MISSING
* Incompatible with ATOMIC_COMMIT * Incompatible with ATOMIC_COMMIT
*/ */
SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
new Option("update", false, "Update target, copying only missing" + new Option("update", false, "Update target, copying only missing" +
"files or directories")), "files or directories")),
@ -80,7 +81,7 @@ public enum DistCpOptionSwitch {
* Max number of maps to use during copy. DistCp will split work * Max number of maps to use during copy. DistCp will split work
* as equally as possible among these maps * as equally as possible among these maps
*/ */
MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS,
new Option("m", true, "Max number of concurrent maps to use for copy")), new Option("m", true, "Max number of concurrent maps to use for copy")),
/** /**

View File

@ -61,7 +61,7 @@ public class DistCpOptions {
private Path targetPath; private Path targetPath;
public static enum FileAttribute{ public static enum FileAttribute{
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION; REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE;
public static FileAttribute getAttribute(char symbol) { public static FileAttribute getAttribute(char symbol) {
for (FileAttribute attribute : values()) { for (FileAttribute attribute : values()) {

View File

@ -34,7 +34,7 @@ public class OptionsParser {
private static final Log LOG = LogFactory.getLog(OptionsParser.class); private static final Log LOG = LogFactory.getLog(OptionsParser.class);
private static final Options cliOptions = new Options(); private static final Options cliOptions = new Options();
static { static {
for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) { for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) {
@ -50,7 +50,7 @@ public class OptionsParser {
protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) { protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) {
for (int index = 0; index < arguments.length; index++) { for (int index = 0; index < arguments.length; index++) {
if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) { if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
arguments[index] = "-prbugp"; arguments[index] = "-prbugpc";
} }
} }
return super.flatten(options, arguments, stopAtNonOption); return super.flatten(options, arguments, stopAtNonOption);
@ -125,7 +125,7 @@ public class OptionsParser {
option.setAtomicWorkPath(new Path(workPath)); option.setAtomicWorkPath(new Path(workPath));
} }
} else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) { } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic"); throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic");
} }
if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {

View File

@ -111,7 +111,7 @@ public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
*/ */
private void initializeSSLConf(Context context) throws IOException { private void initializeSSLConf(Context context) throws IOException {
LOG.info("Initializing SSL configuration"); LOG.info("Initializing SSL configuration");
String workDir = conf.get(JobContext.JOB_LOCAL_DIR) + "/work"; String workDir = conf.get(JobContext.JOB_LOCAL_DIR) + "/work";
Path[] cacheFiles = context.getLocalCacheFiles(); Path[] cacheFiles = context.getLocalCacheFiles();
@ -294,7 +294,7 @@ public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
RetriableFileCopyCommand.CopyReadException) { RetriableFileCopyCommand.CopyReadException) {
incrementCounter(context, Counter.FAIL, 1); incrementCounter(context, Counter.FAIL, 1);
incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen()); incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen());
context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " + context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " +
StringUtils.stringifyException(exception))); StringUtils.stringifyException(exception)));
} }
else else
@ -322,7 +322,7 @@ public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
targetFileStatus.getLen() != source.getLen() targetFileStatus.getLen() != source.getLen()
|| (!skipCrc && || (!skipCrc &&
!DistCpUtils.checksumsAreEqual(sourceFS, !DistCpUtils.checksumsAreEqual(sourceFS,
source.getPath(), targetFS, target)) source.getPath(), null, targetFS, target))
|| (source.getBlockSize() != targetFileStatus.getBlockSize() && || (source.getBlockSize() != targetFileStatus.getBlockSize() &&
preserve.contains(FileAttribute.BLOCKSIZE)) preserve.contains(FileAttribute.BLOCKSIZE))
); );

View File

@ -18,23 +18,33 @@
package org.apache.hadoop.tools.mapred; package org.apache.hadoop.tools.mapred;
import org.apache.hadoop.tools.util.RetriableCommand; import java.io.BufferedInputStream;
import org.apache.hadoop.tools.util.ThrottledInputStream; import java.io.BufferedOutputStream;
import org.apache.hadoop.tools.util.DistCpUtils; import java.io.IOException;
import org.apache.hadoop.tools.DistCpOptions.*; import java.io.InputStream;
import org.apache.hadoop.tools.DistCpConstants; import java.io.OutputStream;
import org.apache.hadoop.fs.*; import java.util.EnumSet;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.tools.util.ThrottledInputStream;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.*;
import java.util.EnumSet;
/** /**
* This class extends RetriableCommand to implement the copy of files, * This class extends RetriableCommand to implement the copy of files,
* with retries on failure. * with retries on failure.
@ -44,7 +54,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class); private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
private static int BUFFER_SIZE = 8 * 1024; private static int BUFFER_SIZE = 8 * 1024;
private boolean skipCrc = false; private boolean skipCrc = false;
/** /**
* Constructor, taking a description of the action. * Constructor, taking a description of the action.
* @param description Verbose description of the copy operation. * @param description Verbose description of the copy operation.
@ -52,7 +62,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
public RetriableFileCopyCommand(String description) { public RetriableFileCopyCommand(String description) {
super(description); super(description);
} }
/** /**
* Create a RetriableFileCopyCommand. * Create a RetriableFileCopyCommand.
* *
@ -99,15 +109,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target); LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
LOG.debug("Tmp-file path: " + tmpTargetPath); LOG.debug("Tmp-file path: " + tmpTargetPath);
} }
FileSystem sourceFS = sourceFileStatus.getPath().getFileSystem( final Path sourcePath = sourceFileStatus.getPath();
configuration); final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus, final FileChecksum sourceChecksum = fileAttributes
context, fileAttributes); .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
.getFileChecksum(sourcePath) : null;
compareFileLengths(sourceFileStatus, tmpTargetPath, configuration, bytesRead); long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
context, fileAttributes, sourceChecksum);
compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
bytesRead);
//At this point, src&dest lengths are same. if length==0, we skip checksum //At this point, src&dest lengths are same. if length==0, we skip checksum
if ((bytesRead != 0) && (!skipCrc)) { if ((bytesRead != 0) && (!skipCrc)) {
compareCheckSums(sourceFS, sourceFileStatus.getPath(), targetFS, tmpTargetPath); compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
targetFS, tmpTargetPath);
} }
promoteTmpToTarget(tmpTargetPath, target, targetFS); promoteTmpToTarget(tmpTargetPath, target, targetFS);
return bytesRead; return bytesRead;
@ -118,14 +134,33 @@ public class RetriableFileCopyCommand extends RetriableCommand {
} }
} }
/**
* @return the checksum spec of the source checksum if checksum type should be
* preserved
*/
private ChecksumOpt getChecksumOpt(EnumSet<FileAttribute> fileAttributes,
FileChecksum sourceChecksum) {
if (fileAttributes.contains(FileAttribute.CHECKSUMTYPE)
&& sourceChecksum != null) {
return sourceChecksum.getChecksumOpt();
}
return null;
}
private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS, private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
FileStatus sourceFileStatus, Mapper.Context context, FileStatus sourceFileStatus, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes) EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
throws IOException { throws IOException {
OutputStream outStream = new BufferedOutputStream(targetFS.create( FsPermission permission = FsPermission.getFileDefault().applyUMask(
tmpTargetPath, true, BUFFER_SIZE, FsPermission.getUMask(targetFS.getConf()));
getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), OutputStream outStream = new BufferedOutputStream(
getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context)); targetFS.create(tmpTargetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE,
getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
tmpTargetPath),
getBlockSize(fileAttributes, sourceFileStatus, targetFS,
tmpTargetPath),
context, getChecksumOpt(fileAttributes, sourceChecksum)));
return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context); return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
} }
@ -140,9 +175,10 @@ public class RetriableFileCopyCommand extends RetriableCommand {
} }
private void compareCheckSums(FileSystem sourceFS, Path source, private void compareCheckSums(FileSystem sourceFS, Path source,
FileSystem targetFS, Path target) FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException { throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) { if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ") StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
.append(source).append(" and ").append(target).append("."); .append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) { if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
@ -249,11 +285,18 @@ public class RetriableFileCopyCommand extends RetriableCommand {
sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath); sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath);
} }
/**
* @return the block size of the source file if we need to preserve either
* the block size or the checksum type. Otherwise the default block
* size of the target FS.
*/
private static long getBlockSize( private static long getBlockSize(
EnumSet<FileAttribute> fileAttributes, EnumSet<FileAttribute> fileAttributes,
FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) { FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
return fileAttributes.contains(FileAttribute.BLOCKSIZE)? boolean preserve = fileAttributes.contains(FileAttribute.BLOCKSIZE)
sourceFile.getBlockSize() : targetFS.getDefaultBlockSize(tmpTargetPath); || fileAttributes.contains(FileAttribute.CHECKSUMTYPE);
return preserve ? sourceFile.getBlockSize() : targetFS
.getDefaultBlockSize(tmpTargetPath);
} }
/** /**
@ -261,7 +304,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
* failures from other kinds of IOExceptions. * failures from other kinds of IOExceptions.
* The failure to read from source is dealt with specially, in the CopyMapper. * The failure to read from source is dealt with specially, in the CopyMapper.
* Such failures may be skipped if the DistCpOptions indicate so. * Such failures may be skipped if the DistCpOptions indicate so.
* Write failures are intolerable, and amount to CopyMapper failure. * Write failures are intolerable, and amount to CopyMapper failure.
*/ */
public static class CopyReadException extends IOException { public static class CopyReadException extends IOException {
public CopyReadException(Throwable rootCause) { public CopyReadException(Throwable rootCause) {

View File

@ -125,7 +125,7 @@ public class DistCpUtils {
* @param sourceRootPath - Source root path * @param sourceRootPath - Source root path
* @param childPath - Path for which relative path is required * @param childPath - Path for which relative path is required
* @return - Relative portion of the child path (always prefixed with / * @return - Relative portion of the child path (always prefixed with /
* unless it is empty * unless it is empty
*/ */
public static String getRelativePath(Path sourceRootPath, Path childPath) { public static String getRelativePath(Path sourceRootPath, Path childPath) {
String childPathString = childPath.toUri().getPath(); String childPathString = childPath.toUri().getPath();
@ -277,9 +277,11 @@ public class DistCpUtils {
* If checksums's can't be retrieved, it doesn't fail the test * If checksums's can't be retrieved, it doesn't fail the test
* Only time the comparison would fail is when checksums are * Only time the comparison would fail is when checksums are
* available and they don't match * available and they don't match
* *
* @param sourceFS FileSystem for the source path. * @param sourceFS FileSystem for the source path.
* @param source The source path. * @param source The source path.
* @param sourceChecksum The checksum of the source file. If it is null we
* still need to retrieve it through sourceFS.
* @param targetFS FileSystem for the target path. * @param targetFS FileSystem for the target path.
* @param target The target path. * @param target The target path.
* @return If either checksum couldn't be retrieved, the function returns * @return If either checksum couldn't be retrieved, the function returns
@ -288,12 +290,12 @@ public class DistCpUtils {
* @throws IOException if there's an exception while retrieving checksums. * @throws IOException if there's an exception while retrieving checksums.
*/ */
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source, public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
FileSystem targetFS, Path target) FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException { throws IOException {
FileChecksum sourceChecksum = null;
FileChecksum targetChecksum = null; FileChecksum targetChecksum = null;
try { try {
sourceChecksum = sourceFS.getFileChecksum(source); sourceChecksum = sourceChecksum != null ? sourceChecksum : sourceFS
.getFileChecksum(source);
targetChecksum = targetFS.getFileChecksum(target); targetChecksum = targetFS.getFileChecksum(target);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Unable to retrieve checksum for " + source + " or " + target, e); LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);

View File

@ -110,7 +110,7 @@ public class TestOptionsParser {
"hdfs://localhost:8020/target/"}); "hdfs://localhost:8020/target/"});
Assert.assertEquals(options.getMapBandwidth(), 11); Assert.assertEquals(options.getMapBandwidth(), 11);
} }
@Test(expected=IllegalArgumentException.class) @Test(expected=IllegalArgumentException.class)
public void testParseNonPositiveBandwidth() { public void testParseNonPositiveBandwidth() {
OptionsParser.parse(new String[] { OptionsParser.parse(new String[] {
@ -119,7 +119,7 @@ public class TestOptionsParser {
"hdfs://localhost:8020/source/first", "hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"}); "hdfs://localhost:8020/target/"});
} }
@Test(expected=IllegalArgumentException.class) @Test(expected=IllegalArgumentException.class)
public void testParseZeroBandwidth() { public void testParseZeroBandwidth() {
OptionsParser.parse(new String[] { OptionsParser.parse(new String[] {
@ -397,6 +397,7 @@ public class TestOptionsParser {
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER)); Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] { options = OptionsParser.parse(new String[] {
"-p", "-p",
@ -408,6 +409,7 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] { options = OptionsParser.parse(new String[] {
"-p", "-p",
@ -418,6 +420,7 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] { options = OptionsParser.parse(new String[] {
"-pbr", "-pbr",
@ -429,6 +432,7 @@ public class TestOptionsParser {
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER)); Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] { options = OptionsParser.parse(new String[] {
"-pbrgup", "-pbrgup",
@ -440,6 +444,31 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-pbrgupc",
"-f",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-pc",
"-f",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] { options = OptionsParser.parse(new String[] {
"-p", "-p",
@ -452,7 +481,7 @@ public class TestOptionsParser {
attribIterator.next(); attribIterator.next();
i++; i++;
} }
Assert.assertEquals(i, 5); Assert.assertEquals(i, 6);
try { try {
OptionsParser.parse(new String[] { OptionsParser.parse(new String[] {

View File

@ -18,29 +18,6 @@
package org.apache.hadoop.tools.mapred; package org.apache.hadoop.tools.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -49,11 +26,38 @@ import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.DataChecksum;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestCopyMapper { public class TestCopyMapper {
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
private static List<Path> pathList = new ArrayList<Path>(); private static List<Path> pathList = new ArrayList<Path>();
private static int nFiles = 0; private static int nFiles = 0;
private static final int DEFAULT_FILE_SIZE = 1024; private static final int DEFAULT_FILE_SIZE = 1024;
private static final long NON_DEFAULT_BLOCK_SIZE = 4096;
private static MiniDFSCluster cluster; private static MiniDFSCluster cluster;
@ -119,12 +123,27 @@ public class TestCopyMapper {
mkdirs(SOURCE_PATH + "/2/3/4"); mkdirs(SOURCE_PATH + "/2/3/4");
mkdirs(SOURCE_PATH + "/2/3"); mkdirs(SOURCE_PATH + "/2/3");
mkdirs(SOURCE_PATH + "/5"); mkdirs(SOURCE_PATH + "/5");
touchFile(SOURCE_PATH + "/5/6", true); touchFile(SOURCE_PATH + "/5/6", true, null);
mkdirs(SOURCE_PATH + "/7"); mkdirs(SOURCE_PATH + "/7");
mkdirs(SOURCE_PATH + "/7/8"); mkdirs(SOURCE_PATH + "/7/8");
touchFile(SOURCE_PATH + "/7/8/9"); touchFile(SOURCE_PATH + "/7/8/9");
} }
private static void createSourceDataWithDifferentChecksumType()
throws Exception {
mkdirs(SOURCE_PATH + "/1");
mkdirs(SOURCE_PATH + "/2");
mkdirs(SOURCE_PATH + "/2/3/4");
mkdirs(SOURCE_PATH + "/2/3");
mkdirs(SOURCE_PATH + "/5");
touchFile(SOURCE_PATH + "/5/6", new ChecksumOpt(DataChecksum.Type.CRC32,
512));
mkdirs(SOURCE_PATH + "/7");
mkdirs(SOURCE_PATH + "/7/8");
touchFile(SOURCE_PATH + "/7/8/9", new ChecksumOpt(DataChecksum.Type.CRC32C,
512));
}
private static void mkdirs(String path) throws Exception { private static void mkdirs(String path) throws Exception {
FileSystem fileSystem = cluster.getFileSystem(); FileSystem fileSystem = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(), final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
@ -134,21 +153,31 @@ public class TestCopyMapper {
} }
private static void touchFile(String path) throws Exception { private static void touchFile(String path) throws Exception {
touchFile(path, false); touchFile(path, false, null);
} }
private static void touchFile(String path, boolean createMultipleBlocks) throws Exception { private static void touchFile(String path, ChecksumOpt checksumOpt)
final long NON_DEFAULT_BLOCK_SIZE = 4096; throws Exception {
// create files with specific checksum opt and non-default block size
touchFile(path, true, checksumOpt);
}
private static void touchFile(String path, boolean createMultipleBlocks,
ChecksumOpt checksumOpt) throws Exception {
FileSystem fs; FileSystem fs;
DataOutputStream outputStream = null; DataOutputStream outputStream = null;
try { try {
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
fs.getWorkingDirectory()); fs.getWorkingDirectory());
final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : fs.getDefaultBlockSize(qualifiedPath) * 2; final long blockSize = createMultipleBlocks ? NON_DEFAULT_BLOCK_SIZE : fs
outputStream = fs.create(qualifiedPath, true, 0, .getDefaultBlockSize(qualifiedPath) * 2;
(short)(fs.getDefaultReplication(qualifiedPath)*2), FsPermission permission = FsPermission.getFileDefault().applyUMask(
blockSize); FsPermission.getUMask(fs.getConf()));
outputStream = fs.create(qualifiedPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 0,
(short) (fs.getDefaultReplication(qualifiedPath) * 2), blockSize,
null, checksumOpt);
byte[] bytes = new byte[DEFAULT_FILE_SIZE]; byte[] bytes = new byte[DEFAULT_FILE_SIZE];
outputStream.write(bytes); outputStream.write(bytes);
long fileSize = DEFAULT_FILE_SIZE; long fileSize = DEFAULT_FILE_SIZE;
@ -171,17 +200,40 @@ public class TestCopyMapper {
} }
} }
@Test
public void testCopyWithDifferentChecksumType() throws Exception {
testCopy(true);
}
@Test(timeout=40000) @Test(timeout=40000)
public void testRun() { public void testRun() {
testCopy(false);
}
private void testCopy(boolean preserveChecksum) {
try { try {
deleteState(); deleteState();
createSourceData(); if (preserveChecksum) {
createSourceDataWithDifferentChecksumType();
} else {
createSourceData();
}
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper(); CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0); StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext(); = stubContext.getContext();
Configuration configuration = context.getConfiguration();
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
if (preserveChecksum) {
fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
}
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
DistCpUtils.packAttributes(fileAttributes));
copyMapper.setup(context); copyMapper.setup(context);
for (Path path: pathList) { for (Path path: pathList) {
@ -195,19 +247,29 @@ public class TestCopyMapper {
.replaceAll(SOURCE_PATH, TARGET_PATH)); .replaceAll(SOURCE_PATH, TARGET_PATH));
Assert.assertTrue(fs.exists(targetPath)); Assert.assertTrue(fs.exists(targetPath));
Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path)); Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
Assert.assertEquals(fs.getFileStatus(path).getReplication(), FileStatus sourceStatus = fs.getFileStatus(path);
fs.getFileStatus(targetPath).getReplication()); FileStatus targetStatus = fs.getFileStatus(targetPath);
Assert.assertEquals(fs.getFileStatus(path).getBlockSize(), Assert.assertEquals(sourceStatus.getReplication(),
fs.getFileStatus(targetPath).getBlockSize()); targetStatus.getReplication());
Assert.assertTrue(!fs.isFile(targetPath) || if (preserveChecksum) {
fs.getFileChecksum(targetPath).equals( Assert.assertEquals(sourceStatus.getBlockSize(),
fs.getFileChecksum(path))); targetStatus.getBlockSize());
}
Assert.assertTrue(!fs.isFile(targetPath)
|| fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
} }
Assert.assertEquals(pathList.size(), Assert.assertEquals(pathList.size(),
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue()); stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, if (!preserveChecksum) {
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
.getValue());
} else {
Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
.getValue());
}
testCopyingExistingFiles(fs, copyMapper, context); testCopyingExistingFiles(fs, copyMapper, context);
for (Text value : stubContext.getWriter().values()) { for (Text value : stubContext.getWriter().values()) {
@ -309,7 +371,7 @@ public class TestCopyMapper {
UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest"); UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
final CopyMapper copyMapper = new CopyMapper(); final CopyMapper copyMapper = new CopyMapper();
final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser. final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() { doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
@Override @Override
@ -535,7 +597,7 @@ public class TestCopyMapper {
final Mapper<Text, FileStatus, Text, Text>.Context context final Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext(); = stubContext.getContext();
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS, context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus)); DistCpUtils.packAttributes(preserveStatus));