diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 2c21fd3a2c5..b5163cc0c0c 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -222,6 +222,9 @@ Branch-2 ( Unreleased changes ) HADOOP-7754. Expose file descriptors from Hadoop-wrapped local FileSystems (todd and ahmed via tucu) + HADOOP-8240. Add a new API to allow users to specify a checksum type + on FileSystem.create(..). (Kihwal Lee via szetszwo) + IMPROVEMENTS HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java index cbcce217b61..d9eda445800 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java @@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.FsPermission; @@ -46,6 +47,7 @@ import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; /** @@ -467,6 +469,7 @@ public abstract class AbstractFileSystem { short replication = -1; long blockSize = -1; int bytesPerChecksum = -1; + ChecksumOpt checksumOpt = null; FsPermission permission = null; Progressable progress = null; Boolean createParent = null; @@ -496,6 +499,12 @@ public abstract class AbstractFileSystem { "BytesPerChecksum option is set multiple times"); } bytesPerChecksum = ((CreateOpts.BytesPerChecksum) iOpt).getValue(); + } else if (CreateOpts.ChecksumParam.class.isInstance(iOpt)) { + if (checksumOpt != null) { + throw new HadoopIllegalArgumentException( + "CreateChecksumType option is set multiple times"); + } + checksumOpt = ((CreateOpts.ChecksumParam) iOpt).getValue(); } else if (CreateOpts.Perms.class.isInstance(iOpt)) { if (permission != null) { throw new HadoopIllegalArgumentException( @@ -533,9 +542,16 @@ public abstract class AbstractFileSystem { if (blockSize == -1) { blockSize = ssDef.getBlockSize(); } - if (bytesPerChecksum == -1) { - bytesPerChecksum = ssDef.getBytesPerChecksum(); - } + + // Create a checksum option honoring user input as much as possible. + // If bytesPerChecksum is specified, it will override the one set in + // checksumOpt. Any missing value will be filled in using the default. + ChecksumOpt defaultOpt = new ChecksumOpt( + ssDef.getChecksumType(), + ssDef.getBytesPerChecksum()); + checksumOpt = ChecksumOpt.processChecksumOpt(defaultOpt, + checksumOpt, bytesPerChecksum); + if (bufferSize == -1) { bufferSize = ssDef.getFileBufferSize(); } @@ -552,7 +568,7 @@ public abstract class AbstractFileSystem { } return this.createInternal(f, createFlag, permission, bufferSize, - replication, blockSize, progress, bytesPerChecksum, createParent); + replication, blockSize, progress, checksumOpt, createParent); } /** @@ -563,7 +579,7 @@ public abstract class AbstractFileSystem { public abstract FSDataOutputStream createInternal(Path f, EnumSet flag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, - int bytesPerChecksum, boolean createParent) + ChecksumOpt checksumOpt, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, UnresolvedLinkException, IOException; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java index e1d4ea6d635..47849919827 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.PureJavaCrc32; @@ -324,13 +325,17 @@ public abstract class ChecksumFs extends FilterFs { final EnumSet createFlag, final FsPermission absolutePermission, final int bufferSize, final short replication, final long blockSize, - final Progressable progress, final int bytesPerChecksum, + final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws IOException { super(new PureJavaCrc32(), fs.getBytesPerSum(), 4); + // checksumOpt is passed down to the raw fs. Unless it implements + // checksum impelemts internally, checksumOpt will be ignored. + // If the raw fs does checksum internally, we will end up with + // two layers of checksumming. i.e. checksumming checksum file. this.datas = fs.getRawFs().createInternal(file, createFlag, absolutePermission, bufferSize, replication, blockSize, progress, - bytesPerChecksum, createParent); + checksumOpt, createParent); // Now create the chekcsumfile; adjust the buffsize int bytesPerSum = fs.getBytesPerSum(); @@ -338,7 +343,7 @@ public abstract class ChecksumFs extends FilterFs { this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), absolutePermission, sumBufferSize, replication, blockSize, progress, - bytesPerChecksum, createParent); + checksumOpt, createParent); sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); sums.writeInt(bytesPerSum); } @@ -361,12 +366,11 @@ public abstract class ChecksumFs extends FilterFs { public FSDataOutputStream createInternal(Path f, EnumSet createFlag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, - int bytesPerChecksum, boolean createParent) throws IOException { - + ChecksumOpt checksumOpt, boolean createParent) throws IOException { final FSDataOutputStream out = new FSDataOutputStream( new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission, bufferSize, replication, blockSize, progress, - bytesPerChecksum, createParent), null); + checksumOpt, createParent), null); return out; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java index 1619c02fa25..962847154ac 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.token.Token; @@ -62,7 +63,7 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem { public FSDataOutputStream createInternal (Path f, EnumSet flag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, - int bytesPerChecksum, boolean createParent) throws IOException { + ChecksumOpt checksumOpt, boolean createParent) throws IOException { checkPath(f); // Default impl assumes that permissions do not matter @@ -81,8 +82,8 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem { } // parent does exist - go ahead with create of file. } - return fsImpl.primitiveCreate(f, absolutePermission, flag, - bufferSize, replication, blockSize, progress, bytesPerChecksum); + return fsImpl.primitiveCreate(f, absolutePermission, flag, + bufferSize, replication, blockSize, progress, checksumOpt); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index 409eabd56f9..e7591c710de 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -127,7 +127,8 @@ import org.apache.hadoop.util.ShutdownHookManager; *
  • replication factor *
  • block size *
  • buffer size - *
  • bytesPerChecksum (if used). + *
  • encryptDataTransfer + *
  • checksum option. (checksumType and bytesPerChecksum) * * *

    @@ -613,7 +614,8 @@ public final class FileContext { *

  • BufferSize - buffersize used in FSDataOutputStream *
  • Blocksize - block size for file blocks *
  • ReplicationFactor - replication for blocks - *
  • BytesPerChecksum - bytes per checksum + *
  • ChecksumParam - Checksum parameters. server default is used + * if not specified. * * * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 13881c776f0..dd848ada5ff 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -45,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.MultipleIOException; @@ -54,6 +55,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; @@ -656,14 +658,17 @@ public abstract class FileSystem extends Configured implements Closeable { @Deprecated public FsServerDefaults getServerDefaults() throws IOException { Configuration conf = getConf(); + // CRC32 is chosen as default as it is available in all + // releases that support checksum. + // The client trash configuration is ignored. return new FsServerDefaults(getDefaultBlockSize(), conf.getInt("io.bytes.per.checksum", 512), 64 * 1024, getDefaultReplication(), conf.getInt("io.file.buffer.size", 4096), false, - // NB: ignoring the client trash configuration - CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT); + CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT, + DataChecksum.Type.CRC32); } /** @@ -889,11 +894,40 @@ public abstract class FileSystem extends Configured implements Closeable { short replication, long blockSize, Progressable progress) throws IOException { - // only DFS support this - return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress); + return create(f, permission, flags, bufferSize, replication, + blockSize, progress, null); } - + /** + * Create an FSDataOutputStream at the indicated Path with a custom + * checksum option + * @param f the file name to open + * @param permission + * @param flags {@link CreateFlag}s to use for this stream. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @param checksumOpt checksum parameter. If null, the values + * found in conf will be used. + * @throws IOException + * @see #setPermission(Path, FsPermission) + */ + public FSDataOutputStream create(Path f, + FsPermission permission, + EnumSet flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress, + ChecksumOpt checksumOpt) throws IOException { + // Checksum options are ignored by default. The file systems that + // implement checksum need to override this method. The full + // support is currently only available in DFS. + return create(f, permission, flags.contains(CreateFlag.OVERWRITE), + bufferSize, replication, blockSize, progress); + } + /*. * This create has been added to support the FileContext that processes * the permission @@ -905,7 +939,7 @@ public abstract class FileSystem extends Configured implements Closeable { protected FSDataOutputStream primitiveCreate(Path f, FsPermission absolutePermission, EnumSet flag, int bufferSize, short replication, long blockSize, Progressable progress, - int bytesPerChecksum) throws IOException { + ChecksumOpt checksumOpt) throws IOException { boolean pathExists = exists(f); CreateFlag.validate(f, pathExists, flag); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 0e2135340af..c2ecd20b5a4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.util.Progressable; /**************************************************************** @@ -410,10 +411,11 @@ public class FilterFileSystem extends FileSystem { @Override protected FSDataOutputStream primitiveCreate(Path f, FsPermission absolutePermission, EnumSet flag, - int bufferSize, short replication, long blockSize, Progressable progress, int bytesPerChecksum) + int bufferSize, short replication, long blockSize, + Progressable progress, ChecksumOpt checksumOpt) throws IOException { return fs.primitiveCreate(f, absolutePermission, flag, - bufferSize, replication, blockSize, progress, bytesPerChecksum); + bufferSize, replication, blockSize, progress, checksumOpt); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java index 721b76fd7dd..6cfc11b1faa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -81,11 +82,11 @@ public abstract class FilterFs extends AbstractFileSystem { public FSDataOutputStream createInternal(Path f, EnumSet flag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, - int bytesPerChecksum, boolean createParent) + ChecksumOpt checksumOpt, boolean createParent) throws IOException, UnresolvedLinkException { checkPath(f); return myFs.createInternal(f, flag, absolutePermission, bufferSize, - replication, blockSize, progress, bytesPerChecksum, createParent); + replication, blockSize, progress, checksumOpt, createParent); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java index 274311e6682..637697b83df 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java @@ -26,6 +26,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableFactory; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.DataChecksum; /**************************************************** * Provides server default configuration values to clients. @@ -50,13 +52,15 @@ public class FsServerDefaults implements Writable { private int fileBufferSize; private boolean encryptDataTransfer; private long trashInterval; + private DataChecksum.Type checksumType; public FsServerDefaults() { } public FsServerDefaults(long blockSize, int bytesPerChecksum, int writePacketSize, short replication, int fileBufferSize, - boolean encryptDataTransfer, long trashInterval) { + boolean encryptDataTransfer, long trashInterval, + DataChecksum.Type checksumType) { this.blockSize = blockSize; this.bytesPerChecksum = bytesPerChecksum; this.writePacketSize = writePacketSize; @@ -64,6 +68,7 @@ public class FsServerDefaults implements Writable { this.fileBufferSize = fileBufferSize; this.encryptDataTransfer = encryptDataTransfer; this.trashInterval = trashInterval; + this.checksumType = checksumType; } public long getBlockSize() { @@ -94,6 +99,10 @@ public class FsServerDefaults implements Writable { return trashInterval; } + public DataChecksum.Type getChecksumType() { + return checksumType; + } + // ///////////////////////////////////////// // Writable // ///////////////////////////////////////// @@ -104,6 +113,7 @@ public class FsServerDefaults implements Writable { out.writeInt(writePacketSize); out.writeShort(replication); out.writeInt(fileBufferSize); + WritableUtils.writeEnum(out, checksumType); } @InterfaceAudience.Private @@ -113,5 +123,6 @@ public class FsServerDefaults implements Writable { writePacketSize = in.readInt(); replication = in.readShort(); fileBufferSize = in.readInt(); + checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java index 43e768cba40..173e16ea413 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java @@ -20,7 +20,9 @@ package org.apache.hadoop.fs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.HadoopIllegalArgumentException; /** * This class contains options related to file system operations. @@ -46,6 +48,10 @@ public final class Options { public static BytesPerChecksum bytesPerChecksum(short crc) { return new BytesPerChecksum(crc); } + public static ChecksumParam checksumParam( + ChecksumOpt csumOpt) { + return new ChecksumParam(csumOpt); + } public static Perms perms(FsPermission perm) { return new Perms(perm); } @@ -91,7 +97,8 @@ public final class Options { } public int getValue() { return bufferSize; } } - + + /** This is not needed if ChecksumParam is specified. **/ public static class BytesPerChecksum extends CreateOpts { private final int bytesPerChecksum; protected BytesPerChecksum(short bpc) { @@ -103,6 +110,14 @@ public final class Options { } public int getValue() { return bytesPerChecksum; } } + + public static class ChecksumParam extends CreateOpts { + private final ChecksumOpt checksumOpt; + protected ChecksumParam(ChecksumOpt csumOpt) { + checksumOpt = csumOpt; + } + public ChecksumOpt getValue() { return checksumOpt; } + } public static class Perms extends CreateOpts { private final FsPermission permissions; @@ -206,4 +221,116 @@ public final class Options { return code; } } + + /** + * This is used in FileSystem and FileContext to specify checksum options. + */ + public static class ChecksumOpt { + private final int crcBlockSize; + private final DataChecksum.Type crcType; + + /** + * Create a uninitialized one + */ + public ChecksumOpt() { + crcBlockSize = -1; + crcType = DataChecksum.Type.DEFAULT; + } + + /** + * Normal ctor + * @param type checksum type + * @param size bytes per checksum + */ + public ChecksumOpt(DataChecksum.Type type, int size) { + crcBlockSize = size; + crcType = type; + } + + public int getBytesPerChecksum() { + return crcBlockSize; + } + + public DataChecksum.Type getChecksumType() { + return crcType; + } + + /** + * Create a ChecksumOpts that disables checksum + */ + public static ChecksumOpt createDisabled() { + return new ChecksumOpt(DataChecksum.Type.NULL, -1); + } + + /** + * A helper method for processing user input and default value to + * create a combined checksum option. This is a bit complicated because + * bytesPerChecksum is kept for backward compatibility. + * + * @param defaultOpt Default checksum option + * @param userOpt User-specified checksum option. Ignored if null. + * @param userBytesPerChecksum User-specified bytesPerChecksum + * Ignored if < 0. + */ + public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt, + ChecksumOpt userOpt, int userBytesPerChecksum) { + // The following is done to avoid unnecessary creation of new objects. + // tri-state variable: 0 default, 1 userBytesPerChecksum, 2 userOpt + short whichSize; + // true default, false userOpt + boolean useDefaultType; + + // bytesPerChecksum - order of preference + // user specified value in bytesPerChecksum + // user specified value in checksumOpt + // default. + if (userBytesPerChecksum > 0) { + whichSize = 1; // userBytesPerChecksum + } else if (userOpt != null && userOpt.getBytesPerChecksum() > 0) { + whichSize = 2; // userOpt + } else { + whichSize = 0; // default + } + + // checksum type - order of preference + // user specified value in checksumOpt + // default. + if (userOpt != null && + userOpt.getChecksumType() != DataChecksum.Type.DEFAULT) { + useDefaultType = false; + } else { + useDefaultType = true; + } + + // Short out the common and easy cases + if (whichSize == 0 && useDefaultType) { + return defaultOpt; + } else if (whichSize == 2 && !useDefaultType) { + return userOpt; + } + + // Take care of the rest of combinations + DataChecksum.Type type = useDefaultType ? defaultOpt.getChecksumType() : + userOpt.getChecksumType(); + if (whichSize == 0) { + return new ChecksumOpt(type, defaultOpt.getBytesPerChecksum()); + } else if (whichSize == 1) { + return new ChecksumOpt(type, userBytesPerChecksum); + } else { + return new ChecksumOpt(type, userOpt.getBytesPerChecksum()); + } + } + + /** + * A helper method for processing user input and default value to + * create a combined checksum option. + * + * @param defaultOpt Default checksum option + * @param userOpt User-specified checksum option + */ + public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt, + ChecksumOpt userOpt) { + return processChecksumOpt(defaultOpt, userOpt, -1); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java index 0bb5de7faee..2313a1436d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java @@ -23,10 +23,16 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.util.DataChecksum; /** * This class contains constants for configuration keys used * in the ftp file system. + * + * Note that the settings for unimplemented features are ignored. + * E.g. checksum related settings are just place holders. Even when + * wrapped with {@link ChecksumFileSystem}, these settings are not + * used. */ @InterfaceAudience.Private @InterfaceStability.Unstable @@ -46,6 +52,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys { public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false; public static final long FS_TRASH_INTERVAL_DEFAULT = 0; + public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT = + DataChecksum.Type.CRC32; protected static FsServerDefaults getServerDefaults() throws IOException { return new FsServerDefaults( @@ -55,7 +63,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys { REPLICATION_DEFAULT, STREAM_BUFFER_SIZE_DEFAULT, ENCRYPT_DATA_TRANSFER_DEFAULT, - FS_TRASH_INTERVAL_DEFAULT); + FS_TRASH_INTERVAL_DEFAULT, + CHECKSUM_TYPE_DEFAULT); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java index 76626c3aa03..d1ebca2deb4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java @@ -24,11 +24,18 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.util.DataChecksum; /** * This class contains constants for configuration keys used * in the local file system, raw local fs and checksum fs. + * + * Note that the settings for unimplemented features are ignored. + * E.g. checksum related settings are just place holders. Even when + * wrapped with {@link ChecksumFileSystem}, these settings are not + * used. */ + @InterfaceAudience.Private @InterfaceStability.Unstable public class LocalConfigKeys extends CommonConfigurationKeys { @@ -45,7 +52,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys { public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false; public static final long FS_TRASH_INTERVAL_DEFAULT = 0; - + public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT = + DataChecksum.Type.CRC32; public static FsServerDefaults getServerDefaults() throws IOException { return new FsServerDefaults( BLOCK_SIZE_DEFAULT, @@ -54,7 +62,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys { REPLICATION_DEFAULT, STREAM_BUFFER_SIZE_DEFAULT, ENCRYPT_DATA_TRANSFER_DEFAULT, - FS_TRASH_INTERVAL_DEFAULT); + FS_TRASH_INTERVAL_DEFAULT, + CHECKSUM_TYPE_DEFAULT); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java index f6e27d28151..c99ce3be13b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; @@ -159,11 +160,11 @@ class ChRootedFs extends AbstractFileSystem { public FSDataOutputStream createInternal(final Path f, final EnumSet flag, final FsPermission absolutePermission, final int bufferSize, final short replication, final long blockSize, - final Progressable progress, final int bytesPerChecksum, + final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws IOException, UnresolvedLinkException { return myFs.createInternal(fullPath(f), flag, absolutePermission, bufferSize, - replication, blockSize, progress, bytesPerChecksum, createParent); + replication, blockSize, progress, checksumOpt, createParent); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java index c2774ee0f26..e62610f8d96 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -265,7 +266,7 @@ public class ViewFs extends AbstractFileSystem { public FSDataOutputStream createInternal(final Path f, final EnumSet flag, final FsPermission absolutePermission, final int bufferSize, final short replication, final long blockSize, - final Progressable progress, final int bytesPerChecksum, + final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, @@ -283,7 +284,7 @@ public class ViewFs extends AbstractFileSystem { assert(res.remainingPath != null); return res.targetFileSystem.createInternal(res.remainingPath, flag, absolutePermission, bufferSize, replication, - blockSize, progress, bytesPerChecksum, + blockSize, progress, checksumOpt, createParent); } @@ -632,7 +633,7 @@ public class ViewFs extends AbstractFileSystem { public FSDataOutputStream createInternal(final Path f, final EnumSet flag, final FsPermission absolutePermission, final int bufferSize, final short replication, final long blockSize, - final Progressable progress, final int bytesPerChecksum, + final Progressable progress, final ChecksumOpt checksumOpt, final boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index 27a3c400a40..2d41f82cd39 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -43,12 +43,14 @@ public class DataChecksum implements Checksum { public static final int CHECKSUM_NULL = 0; public static final int CHECKSUM_CRC32 = 1; public static final int CHECKSUM_CRC32C = 2; + public static final int CHECKSUM_DEFAULT = 3; /** The checksum types */ public static enum Type { NULL (CHECKSUM_NULL, 0), CRC32 (CHECKSUM_CRC32, 4), - CRC32C(CHECKSUM_CRC32C, 4); + CRC32C(CHECKSUM_CRC32C, 4), + DEFAULT(CHECKSUM_DEFAULT, 0); // This cannot be used to create DataChecksum public final int id; public final int size; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestAfsCheckPath.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestAfsCheckPath.java index 406d6d1c081..3bd14f1495a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestAfsCheckPath.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestAfsCheckPath.java @@ -24,6 +24,7 @@ import java.net.URISyntaxException; import java.util.EnumSet; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.util.Progressable; import org.junit.Test; @@ -76,7 +77,7 @@ public class TestAfsCheckPath { @Override public FSDataOutputStream createInternal(Path f, EnumSet flag, FsPermission absolutePermission, int bufferSize, short replication, - long blockSize, Progressable progress, int bytesPerChecksum, + long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent) throws IOException { // deliberately empty return null; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index 9f66ae204c0..3d029ff5fdc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -32,6 +32,7 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.security.Credentials; @@ -80,6 +81,11 @@ public class TestFilterFileSystem { Progressable progress) throws IOException { return null; } + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + EnumSet flags, int bufferSize, short replication, long blockSize, + Progressable progress, ChecksumOpt checksumOpt) throws IOException { + return null; + } public boolean mkdirs(Path f) { return false; } public FSDataInputStream open(Path f) { return null; } public FSDataOutputStream create(Path f) { return null; } @@ -138,6 +144,16 @@ public class TestFilterFileSystem { Progressable progress) throws IOException { return null; } + public FSDataOutputStream create(Path f, + FsPermission permission, + EnumSet flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress, + ChecksumOpt checksumOpt) throws IOException { + return null; + } public String getName() { return null; } public boolean delete(Path f) { return false; } public short getReplication(Path src) { return 0 ; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsOptions.java new file mode 100644 index 00000000000..c66b4fa9017 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsOptions.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.util.DataChecksum; + +import org.junit.Test; + +public class TestFsOptions { + + @Test + public void testProcessChecksumOpt() { + ChecksumOpt defaultOpt = new ChecksumOpt(DataChecksum.Type.CRC32, 512); + ChecksumOpt finalOpt; + + // Give a null + finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, null); + checkParams(defaultOpt, finalOpt); + + // null with bpc + finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, null, 1024); + checkParams(DataChecksum.Type.CRC32, 1024, finalOpt); + + ChecksumOpt myOpt = new ChecksumOpt(); + + // custom with unspecified parameters + finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt); + checkParams(defaultOpt, finalOpt); + + myOpt = new ChecksumOpt(DataChecksum.Type.CRC32C, 2048); + + // custom config + finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt); + checkParams(DataChecksum.Type.CRC32C, 2048, finalOpt); + + // custom config + bpc + finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt, 4096); + checkParams(DataChecksum.Type.CRC32C, 4096, finalOpt); + } + + private void checkParams(ChecksumOpt expected, ChecksumOpt obtained) { + assertEquals(expected.getChecksumType(), obtained.getChecksumType()); + assertEquals(expected.getBytesPerChecksum(), obtained.getBytesPerChecksum()); + } + + private void checkParams(DataChecksum.Type type, int bpc, ChecksumOpt obtained) { + assertEquals(type, obtained.getChecksumType()); + assertEquals(bpc, obtained.getBytesPerChecksum()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java index b31960c9741..2386c841304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.hdfs.CorruptFileBlockIterator; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSUtil; @@ -93,10 +94,10 @@ public class Hdfs extends AbstractFileSystem { public HdfsDataOutputStream createInternal(Path f, EnumSet createFlag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, - int bytesPerChecksum, boolean createParent) throws IOException { + ChecksumOpt checksumOpt, boolean createParent) throws IOException { return new HdfsDataOutputStream(dfs.primitiveCreate(getUriPath(f), absolutePermission, createFlag, createParent, replication, blockSize, - progress, bufferSize, bytesPerChecksum), getStatistics()); + progress, bufferSize, checksumOpt), getStatistics()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 467b612620f..2835f31bc0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -91,6 +91,7 @@ import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; @@ -203,8 +204,7 @@ public class DFSClient implements java.io.Closeable { final int maxBlockAcquireFailures; final int confTime; final int ioBufferSize; - final DataChecksum.Type checksumType; - final int bytesPerChecksum; + final ChecksumOpt defaultChecksumOpt; final int writePacketSize; final int socketTimeout; final int socketCacheCapacity; @@ -243,9 +243,7 @@ public class DFSClient implements java.io.Closeable { ioBufferSize = conf.getInt( CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); - checksumType = getChecksumType(conf); - bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, - DFS_BYTES_PER_CHECKSUM_DEFAULT); + defaultChecksumOpt = getChecksumOptFromConf(conf); socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT); /** dfs.write.packet.size is an internal config variable */ @@ -300,9 +298,32 @@ public class DFSClient implements java.io.Closeable { } } - private DataChecksum createChecksum() { - return DataChecksum.newDataChecksum( - checksumType, bytesPerChecksum); + // Construct a checksum option from conf + private ChecksumOpt getChecksumOptFromConf(Configuration conf) { + DataChecksum.Type type = getChecksumType(conf); + int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, + DFS_BYTES_PER_CHECKSUM_DEFAULT); + return new ChecksumOpt(type, bytesPerChecksum); + } + + // create a DataChecksum with the default option. + private DataChecksum createChecksum() throws IOException { + return createChecksum(null); + } + + private DataChecksum createChecksum(ChecksumOpt userOpt) + throws IOException { + // Fill in any missing field with the default. + ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt( + defaultChecksumOpt, userOpt); + DataChecksum dataChecksum = DataChecksum.newDataChecksum( + myOpt.getChecksumType(), + myOpt.getBytesPerChecksum()); + if (dataChecksum == null) { + throw new IOException("Invalid checksum type specified: " + + myOpt.getChecksumType().name()); + } + return dataChecksum; } } @@ -1143,12 +1164,13 @@ public class DFSClient implements java.io.Closeable { return create(src, FsPermission.getDefault(), overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress, - buffersize); + buffersize, null); } /** * Call {@link #create(String, FsPermission, EnumSet, boolean, short, - * long, Progressable, int)} with createParent set to true. + * long, Progressable, int, ChecksumOpt)} with createParent + * set to true. */ public DFSOutputStream create(String src, FsPermission permission, @@ -1156,10 +1178,11 @@ public class DFSClient implements java.io.Closeable { short replication, long blockSize, Progressable progress, - int buffersize) + int buffersize, + ChecksumOpt checksumOpt) throws IOException { return create(src, permission, flag, true, - replication, blockSize, progress, buffersize); + replication, blockSize, progress, buffersize, checksumOpt); } /** @@ -1177,6 +1200,7 @@ public class DFSClient implements java.io.Closeable { * @param blockSize maximum block size * @param progress interface for reporting client progress * @param buffersize underlying buffer size + * @param checksumOpts checksum options * * @return output stream * @@ -1190,8 +1214,8 @@ public class DFSClient implements java.io.Closeable { short replication, long blockSize, Progressable progress, - int buffersize) - throws IOException { + int buffersize, + ChecksumOpt checksumOpt) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getDefault(); @@ -1202,7 +1226,7 @@ public class DFSClient implements java.io.Closeable { } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, - buffersize, dfsClientConf.createChecksum()); + buffersize, dfsClientConf.createChecksum(checksumOpt)); beginFileLease(src, result); return result; } @@ -1240,15 +1264,13 @@ public class DFSClient implements java.io.Closeable { long blockSize, Progressable progress, int buffersize, - int bytesPerChecksum) + ChecksumOpt checksumOpt) throws IOException, UnresolvedLinkException { checkOpen(); CreateFlag.validate(flag); DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress); if (result == null) { - DataChecksum checksum = DataChecksum.newDataChecksum( - dfsClientConf.checksumType, - bytesPerChecksum); + DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); result = DFSOutputStream.newStreamForCreate(this, src, absPermission, flag, createParent, replication, blockSize, progress, buffersize, checksum); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index f207c8cd8b2..12a1ecc08fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; @@ -258,19 +259,19 @@ public class DistributedFileSystem extends FileSystem { public HdfsDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - return create(f, permission, + return this.create(f, permission, overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, - blockSize, progress); + blockSize, progress, null); } @Override public HdfsDataOutputStream create(Path f, FsPermission permission, EnumSet cflags, int bufferSize, short replication, long blockSize, - Progressable progress) throws IOException { + Progressable progress, ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags, - replication, blockSize, progress, bufferSize); + replication, blockSize, progress, bufferSize, checksumOpt); return new HdfsDataOutputStream(out, statistics); } @@ -279,11 +280,11 @@ public class DistributedFileSystem extends FileSystem { protected HdfsDataOutputStream primitiveCreate(Path f, FsPermission absolutePermission, EnumSet flag, int bufferSize, short replication, long blockSize, Progressable progress, - int bytesPerChecksum) throws IOException { + ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f), absolutePermission, flag, true, replication, blockSize, - progress, bufferSize, bytesPerChecksum),statistics); + progress, bufferSize, checksumOpt),statistics); } /** @@ -298,7 +299,8 @@ public class DistributedFileSystem extends FileSystem { flag.add(CreateFlag.CREATE); } return new HdfsDataOutputStream(dfs.create(getPathName(f), permission, flag, - false, replication, blockSize, progress, bufferSize), statistics); + false, replication, blockSize, progress, + bufferSize, null), statistics); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 1361c47afc2..ed02e5dbb93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; @@ -134,6 +135,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.security.token.Token; import com.google.protobuf.ByteString; @@ -1003,7 +1005,8 @@ public class PBHelper { fs.getWritePacketSize(), (short) fs.getReplication(), fs.getFileBufferSize(), fs.getEncryptDataTransfer(), - fs.getTrashInterval()); + fs.getTrashInterval(), + DataChecksum.Type.valueOf(fs.getChecksumType().name())); } public static FsServerDefaultsProto convert(FsServerDefaults fs) { @@ -1015,7 +1018,9 @@ public class PBHelper { .setReplication(fs.getReplication()) .setFileBufferSize(fs.getFileBufferSize()) .setEncryptDataTransfer(fs.getEncryptDataTransfer()) - .setTrashInterval(fs.getTrashInterval()).build(); + .setTrashInterval(fs.getTrashInterval()) + .setChecksumType(ChecksumTypeProto.valueOf(fs.getChecksumType().name())) + .build(); } public static FsPermissionProto convert(FsPermission p) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java index 2aaf157e6f5..262b66f9bcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java @@ -215,7 +215,7 @@ public class DatanodeWebHdfsMethods { fullpath, permission.getFsPermission(), overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), - replication.getValue(conf), blockSize.getValue(conf), null, b), null); + replication.getValue(conf), blockSize.getValue(conf), null, b, null), null); IOUtils.copyBytes(in, out, b); out.close(); out = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 14f4e0b114b..ba5ec3db193 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; @@ -195,6 +197,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; @@ -476,6 +479,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats, "must not be specified if HA is not enabled."); } + // Get the checksum type from config + String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT); + DataChecksum.Type checksumType; + try { + checksumType = DataChecksum.Type.valueOf(checksumTypeStr); + } catch (IllegalArgumentException iae) { + throw new IOException("Invalid checksum type in " + + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr); + } + this.serverDefaults = new FsServerDefaults( conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT), conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT), @@ -483,7 +496,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT), conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT), - conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)); + conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT), + checksumType); this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, DFS_NAMENODE_MAX_OBJECTS_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 019fb58558e..4c1fab59b6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -178,6 +178,15 @@ message HdfsFileStatusProto { optional LocatedBlocksProto locations = 12; // suppled only if asked by client } +/** + * Checksum algorithms/types used in HDFS + */ +enum ChecksumTypeProto { + NULL = 0; + CRC32 = 1; + CRC32C = 2; +} + /** * HDFS Server Defaults */ @@ -189,6 +198,7 @@ message FsServerDefaultsProto { required uint32 fileBufferSize = 5; optional bool encryptDataTransfer = 6 [default = false]; optional uint64 trashInterval = 7 [default = 0]; + optional ChecksumTypeProto checksumType = 8 [default = CRC32]; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 76263082b46..4a044d894ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.Arrays; +import java.util.EnumSet; import java.util.Random; import org.apache.commons.lang.ArrayUtils; @@ -36,16 +37,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.Test; @@ -664,4 +668,54 @@ public class TestDistributedFileSystem { (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid())); } } + + @Test + public void testCreateWithCustomChecksum() throws Exception { + Configuration conf = getTestConfiguration(); + final long grace = 1000L; + MiniDFSCluster cluster = null; + Path testBasePath = new Path("/test/csum"); + // create args + Path path1 = new Path(testBasePath, "file_wtih_crc1"); + Path path2 = new Path(testBasePath, "file_with_crc2"); + ChecksumOpt opt1 = new ChecksumOpt(DataChecksum.Type.CRC32C, 512); + ChecksumOpt opt2 = new ChecksumOpt(DataChecksum.Type.CRC32, 512); + + // common args + FsPermission perm = FsPermission.getDefault().applyUMask( + FsPermission.getUMask(conf)); + EnumSet flags = EnumSet.of(CreateFlag.OVERWRITE, + CreateFlag.CREATE); + short repl = 1; + + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + FileSystem dfs = cluster.getFileSystem(); + + dfs.mkdirs(testBasePath); + + // create two files with different checksum types + FSDataOutputStream out1 = dfs.create(path1, perm, flags, 4096, repl, + 131072L, null, opt1); + FSDataOutputStream out2 = dfs.create(path2, perm, flags, 4096, repl, + 131072L, null, opt2); + + for (int i = 0; i < 1024; i++) { + out1.write(i); + out2.write(i); + } + out1.close(); + out2.close(); + + // the two checksums must be different. + FileChecksum sum1 = dfs.getFileChecksum(path1); + FileChecksum sum2 = dfs.getFileChecksum(path2); + assertFalse(sum1.equals(sum2)); + } finally { + if (cluster != null) { + cluster.getFileSystem().delete(testBasePath, true); + cluster.shutdown(); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 76713761ad0..d8e56c5c5d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataOutputBuffer; @@ -430,7 +431,7 @@ public class TestResourceLocalizationService { new FSDataOutputStream(new DataOutputBuffer(), null); doReturn(out).when(spylfs).createInternal(isA(Path.class), isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), - anyLong(), isA(Progressable.class), anyInt(), anyBoolean()); + anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean()); final LocalResource resource = getPrivateMockedResource(r); final LocalResourceRequest req = new LocalResourceRequest(resource); Map> rsrcs =