svn merge -c 1374696 from trunk for HADOOP-8240. Add a new API to allow users to specify a checksum type on FileSystem.create(..).
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1374697 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b082909c32
commit
fa76c71e62
|
@ -31,6 +31,9 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
HADOOP-7754. Expose file descriptors from Hadoop-wrapped local
|
HADOOP-7754. Expose file descriptors from Hadoop-wrapped local
|
||||||
FileSystems (todd and ahmed via tucu)
|
FileSystems (todd and ahmed via tucu)
|
||||||
|
|
||||||
|
HADOOP-8240. Add a new API to allow users to specify a checksum type
|
||||||
|
on FileSystem.create(..). (Kihwal Lee via szetszwo)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual
|
HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
@ -46,6 +47,7 @@ import org.apache.hadoop.fs.InvalidPathException;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -467,6 +469,7 @@ public abstract class AbstractFileSystem {
|
||||||
short replication = -1;
|
short replication = -1;
|
||||||
long blockSize = -1;
|
long blockSize = -1;
|
||||||
int bytesPerChecksum = -1;
|
int bytesPerChecksum = -1;
|
||||||
|
ChecksumOpt checksumOpt = null;
|
||||||
FsPermission permission = null;
|
FsPermission permission = null;
|
||||||
Progressable progress = null;
|
Progressable progress = null;
|
||||||
Boolean createParent = null;
|
Boolean createParent = null;
|
||||||
|
@ -496,6 +499,12 @@ public abstract class AbstractFileSystem {
|
||||||
"BytesPerChecksum option is set multiple times");
|
"BytesPerChecksum option is set multiple times");
|
||||||
}
|
}
|
||||||
bytesPerChecksum = ((CreateOpts.BytesPerChecksum) iOpt).getValue();
|
bytesPerChecksum = ((CreateOpts.BytesPerChecksum) iOpt).getValue();
|
||||||
|
} else if (CreateOpts.ChecksumParam.class.isInstance(iOpt)) {
|
||||||
|
if (checksumOpt != null) {
|
||||||
|
throw new HadoopIllegalArgumentException(
|
||||||
|
"CreateChecksumType option is set multiple times");
|
||||||
|
}
|
||||||
|
checksumOpt = ((CreateOpts.ChecksumParam) iOpt).getValue();
|
||||||
} else if (CreateOpts.Perms.class.isInstance(iOpt)) {
|
} else if (CreateOpts.Perms.class.isInstance(iOpt)) {
|
||||||
if (permission != null) {
|
if (permission != null) {
|
||||||
throw new HadoopIllegalArgumentException(
|
throw new HadoopIllegalArgumentException(
|
||||||
|
@ -533,9 +542,16 @@ public abstract class AbstractFileSystem {
|
||||||
if (blockSize == -1) {
|
if (blockSize == -1) {
|
||||||
blockSize = ssDef.getBlockSize();
|
blockSize = ssDef.getBlockSize();
|
||||||
}
|
}
|
||||||
if (bytesPerChecksum == -1) {
|
|
||||||
bytesPerChecksum = ssDef.getBytesPerChecksum();
|
// Create a checksum option honoring user input as much as possible.
|
||||||
}
|
// If bytesPerChecksum is specified, it will override the one set in
|
||||||
|
// checksumOpt. Any missing value will be filled in using the default.
|
||||||
|
ChecksumOpt defaultOpt = new ChecksumOpt(
|
||||||
|
ssDef.getChecksumType(),
|
||||||
|
ssDef.getBytesPerChecksum());
|
||||||
|
checksumOpt = ChecksumOpt.processChecksumOpt(defaultOpt,
|
||||||
|
checksumOpt, bytesPerChecksum);
|
||||||
|
|
||||||
if (bufferSize == -1) {
|
if (bufferSize == -1) {
|
||||||
bufferSize = ssDef.getFileBufferSize();
|
bufferSize = ssDef.getFileBufferSize();
|
||||||
}
|
}
|
||||||
|
@ -552,7 +568,7 @@ public abstract class AbstractFileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
return this.createInternal(f, createFlag, permission, bufferSize,
|
return this.createInternal(f, createFlag, permission, bufferSize,
|
||||||
replication, blockSize, progress, bytesPerChecksum, createParent);
|
replication, blockSize, progress, checksumOpt, createParent);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -563,7 +579,7 @@ public abstract class AbstractFileSystem {
|
||||||
public abstract FSDataOutputStream createInternal(Path f,
|
public abstract FSDataOutputStream createInternal(Path f,
|
||||||
EnumSet<CreateFlag> flag, FsPermission absolutePermission,
|
EnumSet<CreateFlag> flag, FsPermission absolutePermission,
|
||||||
int bufferSize, short replication, long blockSize, Progressable progress,
|
int bufferSize, short replication, long blockSize, Progressable progress,
|
||||||
int bytesPerChecksum, boolean createParent)
|
ChecksumOpt checksumOpt, boolean createParent)
|
||||||
throws AccessControlException, FileAlreadyExistsException,
|
throws AccessControlException, FileAlreadyExistsException,
|
||||||
FileNotFoundException, ParentNotDirectoryException,
|
FileNotFoundException, ParentNotDirectoryException,
|
||||||
UnsupportedFileSystemException, UnresolvedLinkException, IOException;
|
UnsupportedFileSystemException, UnresolvedLinkException, IOException;
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.PureJavaCrc32;
|
import org.apache.hadoop.util.PureJavaCrc32;
|
||||||
|
@ -324,13 +325,17 @@ public abstract class ChecksumFs extends FilterFs {
|
||||||
final EnumSet<CreateFlag> createFlag,
|
final EnumSet<CreateFlag> createFlag,
|
||||||
final FsPermission absolutePermission, final int bufferSize,
|
final FsPermission absolutePermission, final int bufferSize,
|
||||||
final short replication, final long blockSize,
|
final short replication, final long blockSize,
|
||||||
final Progressable progress, final int bytesPerChecksum,
|
final Progressable progress, final ChecksumOpt checksumOpt,
|
||||||
final boolean createParent) throws IOException {
|
final boolean createParent) throws IOException {
|
||||||
super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
|
super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
|
||||||
|
|
||||||
|
// checksumOpt is passed down to the raw fs. Unless it implements
|
||||||
|
// checksum impelemts internally, checksumOpt will be ignored.
|
||||||
|
// If the raw fs does checksum internally, we will end up with
|
||||||
|
// two layers of checksumming. i.e. checksumming checksum file.
|
||||||
this.datas = fs.getRawFs().createInternal(file, createFlag,
|
this.datas = fs.getRawFs().createInternal(file, createFlag,
|
||||||
absolutePermission, bufferSize, replication, blockSize, progress,
|
absolutePermission, bufferSize, replication, blockSize, progress,
|
||||||
bytesPerChecksum, createParent);
|
checksumOpt, createParent);
|
||||||
|
|
||||||
// Now create the chekcsumfile; adjust the buffsize
|
// Now create the chekcsumfile; adjust the buffsize
|
||||||
int bytesPerSum = fs.getBytesPerSum();
|
int bytesPerSum = fs.getBytesPerSum();
|
||||||
|
@ -338,7 +343,7 @@ public abstract class ChecksumFs extends FilterFs {
|
||||||
this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file),
|
this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file),
|
||||||
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
||||||
absolutePermission, sumBufferSize, replication, blockSize, progress,
|
absolutePermission, sumBufferSize, replication, blockSize, progress,
|
||||||
bytesPerChecksum, createParent);
|
checksumOpt, createParent);
|
||||||
sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
|
sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
|
||||||
sums.writeInt(bytesPerSum);
|
sums.writeInt(bytesPerSum);
|
||||||
}
|
}
|
||||||
|
@ -361,12 +366,11 @@ public abstract class ChecksumFs extends FilterFs {
|
||||||
public FSDataOutputStream createInternal(Path f,
|
public FSDataOutputStream createInternal(Path f,
|
||||||
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
|
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
|
||||||
int bufferSize, short replication, long blockSize, Progressable progress,
|
int bufferSize, short replication, long blockSize, Progressable progress,
|
||||||
int bytesPerChecksum, boolean createParent) throws IOException {
|
ChecksumOpt checksumOpt, boolean createParent) throws IOException {
|
||||||
|
|
||||||
final FSDataOutputStream out = new FSDataOutputStream(
|
final FSDataOutputStream out = new FSDataOutputStream(
|
||||||
new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission,
|
new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission,
|
||||||
bufferSize, replication, blockSize, progress,
|
bufferSize, replication, blockSize, progress,
|
||||||
bytesPerChecksum, createParent), null);
|
checksumOpt, createParent), null);
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.List;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -62,7 +63,7 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
|
||||||
public FSDataOutputStream createInternal (Path f,
|
public FSDataOutputStream createInternal (Path f,
|
||||||
EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
|
EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
|
||||||
short replication, long blockSize, Progressable progress,
|
short replication, long blockSize, Progressable progress,
|
||||||
int bytesPerChecksum, boolean createParent) throws IOException {
|
ChecksumOpt checksumOpt, boolean createParent) throws IOException {
|
||||||
checkPath(f);
|
checkPath(f);
|
||||||
|
|
||||||
// Default impl assumes that permissions do not matter
|
// Default impl assumes that permissions do not matter
|
||||||
|
@ -81,8 +82,8 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
|
||||||
}
|
}
|
||||||
// parent does exist - go ahead with create of file.
|
// parent does exist - go ahead with create of file.
|
||||||
}
|
}
|
||||||
return fsImpl.primitiveCreate(f, absolutePermission, flag,
|
return fsImpl.primitiveCreate(f, absolutePermission, flag,
|
||||||
bufferSize, replication, blockSize, progress, bytesPerChecksum);
|
bufferSize, replication, blockSize, progress, checksumOpt);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -127,7 +127,8 @@ import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
* <li> replication factor
|
* <li> replication factor
|
||||||
* <li> block size
|
* <li> block size
|
||||||
* <li> buffer size
|
* <li> buffer size
|
||||||
* <li> bytesPerChecksum (if used).
|
* <li> encryptDataTransfer
|
||||||
|
* <li> checksum option. (checksumType and bytesPerChecksum)
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -613,7 +614,8 @@ public final class FileContext {
|
||||||
* <li>BufferSize - buffersize used in FSDataOutputStream
|
* <li>BufferSize - buffersize used in FSDataOutputStream
|
||||||
* <li>Blocksize - block size for file blocks
|
* <li>Blocksize - block size for file blocks
|
||||||
* <li>ReplicationFactor - replication for blocks
|
* <li>ReplicationFactor - replication for blocks
|
||||||
* <li>BytesPerChecksum - bytes per checksum
|
* <li>ChecksumParam - Checksum parameters. server default is used
|
||||||
|
* if not specified.
|
||||||
* </ul>
|
* </ul>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
|
@ -656,14 +658,17 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public FsServerDefaults getServerDefaults() throws IOException {
|
public FsServerDefaults getServerDefaults() throws IOException {
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
|
// CRC32 is chosen as default as it is available in all
|
||||||
|
// releases that support checksum.
|
||||||
|
// The client trash configuration is ignored.
|
||||||
return new FsServerDefaults(getDefaultBlockSize(),
|
return new FsServerDefaults(getDefaultBlockSize(),
|
||||||
conf.getInt("io.bytes.per.checksum", 512),
|
conf.getInt("io.bytes.per.checksum", 512),
|
||||||
64 * 1024,
|
64 * 1024,
|
||||||
getDefaultReplication(),
|
getDefaultReplication(),
|
||||||
conf.getInt("io.file.buffer.size", 4096),
|
conf.getInt("io.file.buffer.size", 4096),
|
||||||
false,
|
false,
|
||||||
// NB: ignoring the client trash configuration
|
CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
|
||||||
CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT);
|
DataChecksum.Type.CRC32);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -889,11 +894,40 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
short replication,
|
short replication,
|
||||||
long blockSize,
|
long blockSize,
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
// only DFS support this
|
return create(f, permission, flags, bufferSize, replication,
|
||||||
return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
|
blockSize, progress, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an FSDataOutputStream at the indicated Path with a custom
|
||||||
|
* checksum option
|
||||||
|
* @param f the file name to open
|
||||||
|
* @param permission
|
||||||
|
* @param flags {@link CreateFlag}s to use for this stream.
|
||||||
|
* @param bufferSize the size of the buffer to be used.
|
||||||
|
* @param replication required block replication for the file.
|
||||||
|
* @param blockSize
|
||||||
|
* @param progress
|
||||||
|
* @param checksumOpt checksum parameter. If null, the values
|
||||||
|
* found in conf will be used.
|
||||||
|
* @throws IOException
|
||||||
|
* @see #setPermission(Path, FsPermission)
|
||||||
|
*/
|
||||||
|
public FSDataOutputStream create(Path f,
|
||||||
|
FsPermission permission,
|
||||||
|
EnumSet<CreateFlag> flags,
|
||||||
|
int bufferSize,
|
||||||
|
short replication,
|
||||||
|
long blockSize,
|
||||||
|
Progressable progress,
|
||||||
|
ChecksumOpt checksumOpt) throws IOException {
|
||||||
|
// Checksum options are ignored by default. The file systems that
|
||||||
|
// implement checksum need to override this method. The full
|
||||||
|
// support is currently only available in DFS.
|
||||||
|
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
|
||||||
|
bufferSize, replication, blockSize, progress);
|
||||||
|
}
|
||||||
|
|
||||||
/*.
|
/*.
|
||||||
* This create has been added to support the FileContext that processes
|
* This create has been added to support the FileContext that processes
|
||||||
* the permission
|
* the permission
|
||||||
|
@ -905,7 +939,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
protected FSDataOutputStream primitiveCreate(Path f,
|
protected FSDataOutputStream primitiveCreate(Path f,
|
||||||
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
|
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
|
||||||
short replication, long blockSize, Progressable progress,
|
short replication, long blockSize, Progressable progress,
|
||||||
int bytesPerChecksum) throws IOException {
|
ChecksumOpt checksumOpt) throws IOException {
|
||||||
|
|
||||||
boolean pathExists = exists(f);
|
boolean pathExists = exists(f);
|
||||||
CreateFlag.validate(f, pathExists, flag);
|
CreateFlag.validate(f, pathExists, flag);
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
/****************************************************************
|
/****************************************************************
|
||||||
|
@ -410,10 +411,11 @@ public class FilterFileSystem extends FileSystem {
|
||||||
@Override
|
@Override
|
||||||
protected FSDataOutputStream primitiveCreate(Path f,
|
protected FSDataOutputStream primitiveCreate(Path f,
|
||||||
FsPermission absolutePermission, EnumSet<CreateFlag> flag,
|
FsPermission absolutePermission, EnumSet<CreateFlag> flag,
|
||||||
int bufferSize, short replication, long blockSize, Progressable progress, int bytesPerChecksum)
|
int bufferSize, short replication, long blockSize,
|
||||||
|
Progressable progress, ChecksumOpt checksumOpt)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return fs.primitiveCreate(f, absolutePermission, flag,
|
return fs.primitiveCreate(f, absolutePermission, flag,
|
||||||
bufferSize, replication, blockSize, progress, bytesPerChecksum);
|
bufferSize, replication, blockSize, progress, checksumOpt);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
@ -81,11 +82,11 @@ public abstract class FilterFs extends AbstractFileSystem {
|
||||||
public FSDataOutputStream createInternal(Path f,
|
public FSDataOutputStream createInternal(Path f,
|
||||||
EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
|
EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
|
||||||
short replication, long blockSize, Progressable progress,
|
short replication, long blockSize, Progressable progress,
|
||||||
int bytesPerChecksum, boolean createParent)
|
ChecksumOpt checksumOpt, boolean createParent)
|
||||||
throws IOException, UnresolvedLinkException {
|
throws IOException, UnresolvedLinkException {
|
||||||
checkPath(f);
|
checkPath(f);
|
||||||
return myFs.createInternal(f, flag, absolutePermission, bufferSize,
|
return myFs.createInternal(f, flag, absolutePermission, bufferSize,
|
||||||
replication, blockSize, progress, bytesPerChecksum, createParent);
|
replication, blockSize, progress, checksumOpt, createParent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableFactories;
|
import org.apache.hadoop.io.WritableFactories;
|
||||||
import org.apache.hadoop.io.WritableFactory;
|
import org.apache.hadoop.io.WritableFactory;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
/****************************************************
|
/****************************************************
|
||||||
* Provides server default configuration values to clients.
|
* Provides server default configuration values to clients.
|
||||||
|
@ -50,13 +52,15 @@ public class FsServerDefaults implements Writable {
|
||||||
private int fileBufferSize;
|
private int fileBufferSize;
|
||||||
private boolean encryptDataTransfer;
|
private boolean encryptDataTransfer;
|
||||||
private long trashInterval;
|
private long trashInterval;
|
||||||
|
private DataChecksum.Type checksumType;
|
||||||
|
|
||||||
public FsServerDefaults() {
|
public FsServerDefaults() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public FsServerDefaults(long blockSize, int bytesPerChecksum,
|
public FsServerDefaults(long blockSize, int bytesPerChecksum,
|
||||||
int writePacketSize, short replication, int fileBufferSize,
|
int writePacketSize, short replication, int fileBufferSize,
|
||||||
boolean encryptDataTransfer, long trashInterval) {
|
boolean encryptDataTransfer, long trashInterval,
|
||||||
|
DataChecksum.Type checksumType) {
|
||||||
this.blockSize = blockSize;
|
this.blockSize = blockSize;
|
||||||
this.bytesPerChecksum = bytesPerChecksum;
|
this.bytesPerChecksum = bytesPerChecksum;
|
||||||
this.writePacketSize = writePacketSize;
|
this.writePacketSize = writePacketSize;
|
||||||
|
@ -64,6 +68,7 @@ public class FsServerDefaults implements Writable {
|
||||||
this.fileBufferSize = fileBufferSize;
|
this.fileBufferSize = fileBufferSize;
|
||||||
this.encryptDataTransfer = encryptDataTransfer;
|
this.encryptDataTransfer = encryptDataTransfer;
|
||||||
this.trashInterval = trashInterval;
|
this.trashInterval = trashInterval;
|
||||||
|
this.checksumType = checksumType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getBlockSize() {
|
public long getBlockSize() {
|
||||||
|
@ -94,6 +99,10 @@ public class FsServerDefaults implements Writable {
|
||||||
return trashInterval;
|
return trashInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DataChecksum.Type getChecksumType() {
|
||||||
|
return checksumType;
|
||||||
|
}
|
||||||
|
|
||||||
// /////////////////////////////////////////
|
// /////////////////////////////////////////
|
||||||
// Writable
|
// Writable
|
||||||
// /////////////////////////////////////////
|
// /////////////////////////////////////////
|
||||||
|
@ -104,6 +113,7 @@ public class FsServerDefaults implements Writable {
|
||||||
out.writeInt(writePacketSize);
|
out.writeInt(writePacketSize);
|
||||||
out.writeShort(replication);
|
out.writeShort(replication);
|
||||||
out.writeInt(fileBufferSize);
|
out.writeInt(fileBufferSize);
|
||||||
|
WritableUtils.writeEnum(out, checksumType);
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -113,5 +123,6 @@ public class FsServerDefaults implements Writable {
|
||||||
writePacketSize = in.readInt();
|
writePacketSize = in.readInt();
|
||||||
replication = in.readShort();
|
replication = in.readShort();
|
||||||
fileBufferSize = in.readInt();
|
fileBufferSize = in.readInt();
|
||||||
|
checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,9 @@ package org.apache.hadoop.fs;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class contains options related to file system operations.
|
* This class contains options related to file system operations.
|
||||||
|
@ -46,6 +48,10 @@ public final class Options {
|
||||||
public static BytesPerChecksum bytesPerChecksum(short crc) {
|
public static BytesPerChecksum bytesPerChecksum(short crc) {
|
||||||
return new BytesPerChecksum(crc);
|
return new BytesPerChecksum(crc);
|
||||||
}
|
}
|
||||||
|
public static ChecksumParam checksumParam(
|
||||||
|
ChecksumOpt csumOpt) {
|
||||||
|
return new ChecksumParam(csumOpt);
|
||||||
|
}
|
||||||
public static Perms perms(FsPermission perm) {
|
public static Perms perms(FsPermission perm) {
|
||||||
return new Perms(perm);
|
return new Perms(perm);
|
||||||
}
|
}
|
||||||
|
@ -91,7 +97,8 @@ public final class Options {
|
||||||
}
|
}
|
||||||
public int getValue() { return bufferSize; }
|
public int getValue() { return bufferSize; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** This is not needed if ChecksumParam is specified. **/
|
||||||
public static class BytesPerChecksum extends CreateOpts {
|
public static class BytesPerChecksum extends CreateOpts {
|
||||||
private final int bytesPerChecksum;
|
private final int bytesPerChecksum;
|
||||||
protected BytesPerChecksum(short bpc) {
|
protected BytesPerChecksum(short bpc) {
|
||||||
|
@ -103,6 +110,14 @@ public final class Options {
|
||||||
}
|
}
|
||||||
public int getValue() { return bytesPerChecksum; }
|
public int getValue() { return bytesPerChecksum; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class ChecksumParam extends CreateOpts {
|
||||||
|
private final ChecksumOpt checksumOpt;
|
||||||
|
protected ChecksumParam(ChecksumOpt csumOpt) {
|
||||||
|
checksumOpt = csumOpt;
|
||||||
|
}
|
||||||
|
public ChecksumOpt getValue() { return checksumOpt; }
|
||||||
|
}
|
||||||
|
|
||||||
public static class Perms extends CreateOpts {
|
public static class Perms extends CreateOpts {
|
||||||
private final FsPermission permissions;
|
private final FsPermission permissions;
|
||||||
|
@ -206,4 +221,116 @@ public final class Options {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is used in FileSystem and FileContext to specify checksum options.
|
||||||
|
*/
|
||||||
|
public static class ChecksumOpt {
|
||||||
|
private final int crcBlockSize;
|
||||||
|
private final DataChecksum.Type crcType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a uninitialized one
|
||||||
|
*/
|
||||||
|
public ChecksumOpt() {
|
||||||
|
crcBlockSize = -1;
|
||||||
|
crcType = DataChecksum.Type.DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Normal ctor
|
||||||
|
* @param type checksum type
|
||||||
|
* @param size bytes per checksum
|
||||||
|
*/
|
||||||
|
public ChecksumOpt(DataChecksum.Type type, int size) {
|
||||||
|
crcBlockSize = size;
|
||||||
|
crcType = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getBytesPerChecksum() {
|
||||||
|
return crcBlockSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DataChecksum.Type getChecksumType() {
|
||||||
|
return crcType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a ChecksumOpts that disables checksum
|
||||||
|
*/
|
||||||
|
public static ChecksumOpt createDisabled() {
|
||||||
|
return new ChecksumOpt(DataChecksum.Type.NULL, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper method for processing user input and default value to
|
||||||
|
* create a combined checksum option. This is a bit complicated because
|
||||||
|
* bytesPerChecksum is kept for backward compatibility.
|
||||||
|
*
|
||||||
|
* @param defaultOpt Default checksum option
|
||||||
|
* @param userOpt User-specified checksum option. Ignored if null.
|
||||||
|
* @param userBytesPerChecksum User-specified bytesPerChecksum
|
||||||
|
* Ignored if < 0.
|
||||||
|
*/
|
||||||
|
public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt,
|
||||||
|
ChecksumOpt userOpt, int userBytesPerChecksum) {
|
||||||
|
// The following is done to avoid unnecessary creation of new objects.
|
||||||
|
// tri-state variable: 0 default, 1 userBytesPerChecksum, 2 userOpt
|
||||||
|
short whichSize;
|
||||||
|
// true default, false userOpt
|
||||||
|
boolean useDefaultType;
|
||||||
|
|
||||||
|
// bytesPerChecksum - order of preference
|
||||||
|
// user specified value in bytesPerChecksum
|
||||||
|
// user specified value in checksumOpt
|
||||||
|
// default.
|
||||||
|
if (userBytesPerChecksum > 0) {
|
||||||
|
whichSize = 1; // userBytesPerChecksum
|
||||||
|
} else if (userOpt != null && userOpt.getBytesPerChecksum() > 0) {
|
||||||
|
whichSize = 2; // userOpt
|
||||||
|
} else {
|
||||||
|
whichSize = 0; // default
|
||||||
|
}
|
||||||
|
|
||||||
|
// checksum type - order of preference
|
||||||
|
// user specified value in checksumOpt
|
||||||
|
// default.
|
||||||
|
if (userOpt != null &&
|
||||||
|
userOpt.getChecksumType() != DataChecksum.Type.DEFAULT) {
|
||||||
|
useDefaultType = false;
|
||||||
|
} else {
|
||||||
|
useDefaultType = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Short out the common and easy cases
|
||||||
|
if (whichSize == 0 && useDefaultType) {
|
||||||
|
return defaultOpt;
|
||||||
|
} else if (whichSize == 2 && !useDefaultType) {
|
||||||
|
return userOpt;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take care of the rest of combinations
|
||||||
|
DataChecksum.Type type = useDefaultType ? defaultOpt.getChecksumType() :
|
||||||
|
userOpt.getChecksumType();
|
||||||
|
if (whichSize == 0) {
|
||||||
|
return new ChecksumOpt(type, defaultOpt.getBytesPerChecksum());
|
||||||
|
} else if (whichSize == 1) {
|
||||||
|
return new ChecksumOpt(type, userBytesPerChecksum);
|
||||||
|
} else {
|
||||||
|
return new ChecksumOpt(type, userOpt.getBytesPerChecksum());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper method for processing user input and default value to
|
||||||
|
* create a combined checksum option.
|
||||||
|
*
|
||||||
|
* @param defaultOpt Default checksum option
|
||||||
|
* @param userOpt User-specified checksum option
|
||||||
|
*/
|
||||||
|
public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt,
|
||||||
|
ChecksumOpt userOpt) {
|
||||||
|
return processChecksumOpt(defaultOpt, userOpt, -1);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,10 +23,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FsServerDefaults;
|
import org.apache.hadoop.fs.FsServerDefaults;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class contains constants for configuration keys used
|
* This class contains constants for configuration keys used
|
||||||
* in the ftp file system.
|
* in the ftp file system.
|
||||||
|
*
|
||||||
|
* Note that the settings for unimplemented features are ignored.
|
||||||
|
* E.g. checksum related settings are just place holders. Even when
|
||||||
|
* wrapped with {@link ChecksumFileSystem}, these settings are not
|
||||||
|
* used.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
|
@ -46,6 +52,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
||||||
public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
||||||
public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
|
public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
|
||||||
|
public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
|
||||||
|
DataChecksum.Type.CRC32;
|
||||||
|
|
||||||
protected static FsServerDefaults getServerDefaults() throws IOException {
|
protected static FsServerDefaults getServerDefaults() throws IOException {
|
||||||
return new FsServerDefaults(
|
return new FsServerDefaults(
|
||||||
|
@ -55,7 +63,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
|
||||||
REPLICATION_DEFAULT,
|
REPLICATION_DEFAULT,
|
||||||
STREAM_BUFFER_SIZE_DEFAULT,
|
STREAM_BUFFER_SIZE_DEFAULT,
|
||||||
ENCRYPT_DATA_TRANSFER_DEFAULT,
|
ENCRYPT_DATA_TRANSFER_DEFAULT,
|
||||||
FS_TRASH_INTERVAL_DEFAULT);
|
FS_TRASH_INTERVAL_DEFAULT,
|
||||||
|
CHECKSUM_TYPE_DEFAULT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,11 +24,18 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FsServerDefaults;
|
import org.apache.hadoop.fs.FsServerDefaults;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class contains constants for configuration keys used
|
* This class contains constants for configuration keys used
|
||||||
* in the local file system, raw local fs and checksum fs.
|
* in the local file system, raw local fs and checksum fs.
|
||||||
|
*
|
||||||
|
* Note that the settings for unimplemented features are ignored.
|
||||||
|
* E.g. checksum related settings are just place holders. Even when
|
||||||
|
* wrapped with {@link ChecksumFileSystem}, these settings are not
|
||||||
|
* used.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class LocalConfigKeys extends CommonConfigurationKeys {
|
public class LocalConfigKeys extends CommonConfigurationKeys {
|
||||||
|
@ -45,7 +52,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
||||||
public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
public static final boolean ENCRYPT_DATA_TRANSFER_DEFAULT = false;
|
||||||
public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
|
public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
|
||||||
|
public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
|
||||||
|
DataChecksum.Type.CRC32;
|
||||||
public static FsServerDefaults getServerDefaults() throws IOException {
|
public static FsServerDefaults getServerDefaults() throws IOException {
|
||||||
return new FsServerDefaults(
|
return new FsServerDefaults(
|
||||||
BLOCK_SIZE_DEFAULT,
|
BLOCK_SIZE_DEFAULT,
|
||||||
|
@ -54,7 +62,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
|
||||||
REPLICATION_DEFAULT,
|
REPLICATION_DEFAULT,
|
||||||
STREAM_BUFFER_SIZE_DEFAULT,
|
STREAM_BUFFER_SIZE_DEFAULT,
|
||||||
ENCRYPT_DATA_TRANSFER_DEFAULT,
|
ENCRYPT_DATA_TRANSFER_DEFAULT,
|
||||||
FS_TRASH_INTERVAL_DEFAULT);
|
FS_TRASH_INTERVAL_DEFAULT,
|
||||||
|
CHECKSUM_TYPE_DEFAULT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FsServerDefaults;
|
import org.apache.hadoop.fs.FsServerDefaults;
|
||||||
import org.apache.hadoop.fs.FsStatus;
|
import org.apache.hadoop.fs.FsStatus;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
@ -159,11 +160,11 @@ class ChRootedFs extends AbstractFileSystem {
|
||||||
public FSDataOutputStream createInternal(final Path f,
|
public FSDataOutputStream createInternal(final Path f,
|
||||||
final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
|
final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
|
||||||
final int bufferSize, final short replication, final long blockSize,
|
final int bufferSize, final short replication, final long blockSize,
|
||||||
final Progressable progress, final int bytesPerChecksum,
|
final Progressable progress, final ChecksumOpt checksumOpt,
|
||||||
final boolean createParent) throws IOException, UnresolvedLinkException {
|
final boolean createParent) throws IOException, UnresolvedLinkException {
|
||||||
return myFs.createInternal(fullPath(f), flag,
|
return myFs.createInternal(fullPath(f), flag,
|
||||||
absolutePermission, bufferSize,
|
absolutePermission, bufferSize,
|
||||||
replication, blockSize, progress, bytesPerChecksum, createParent);
|
replication, blockSize, progress, checksumOpt, createParent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FsConstants;
|
import org.apache.hadoop.fs.FsConstants;
|
||||||
import org.apache.hadoop.fs.FsServerDefaults;
|
import org.apache.hadoop.fs.FsServerDefaults;
|
||||||
import org.apache.hadoop.fs.FsStatus;
|
import org.apache.hadoop.fs.FsStatus;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
@ -265,7 +266,7 @@ public class ViewFs extends AbstractFileSystem {
|
||||||
public FSDataOutputStream createInternal(final Path f,
|
public FSDataOutputStream createInternal(final Path f,
|
||||||
final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
|
final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
|
||||||
final int bufferSize, final short replication, final long blockSize,
|
final int bufferSize, final short replication, final long blockSize,
|
||||||
final Progressable progress, final int bytesPerChecksum,
|
final Progressable progress, final ChecksumOpt checksumOpt,
|
||||||
final boolean createParent) throws AccessControlException,
|
final boolean createParent) throws AccessControlException,
|
||||||
FileAlreadyExistsException, FileNotFoundException,
|
FileAlreadyExistsException, FileNotFoundException,
|
||||||
ParentNotDirectoryException, UnsupportedFileSystemException,
|
ParentNotDirectoryException, UnsupportedFileSystemException,
|
||||||
|
@ -283,7 +284,7 @@ public class ViewFs extends AbstractFileSystem {
|
||||||
assert(res.remainingPath != null);
|
assert(res.remainingPath != null);
|
||||||
return res.targetFileSystem.createInternal(res.remainingPath, flag,
|
return res.targetFileSystem.createInternal(res.remainingPath, flag,
|
||||||
absolutePermission, bufferSize, replication,
|
absolutePermission, bufferSize, replication,
|
||||||
blockSize, progress, bytesPerChecksum,
|
blockSize, progress, checksumOpt,
|
||||||
createParent);
|
createParent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -632,7 +633,7 @@ public class ViewFs extends AbstractFileSystem {
|
||||||
public FSDataOutputStream createInternal(final Path f,
|
public FSDataOutputStream createInternal(final Path f,
|
||||||
final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
|
final EnumSet<CreateFlag> flag, final FsPermission absolutePermission,
|
||||||
final int bufferSize, final short replication, final long blockSize,
|
final int bufferSize, final short replication, final long blockSize,
|
||||||
final Progressable progress, final int bytesPerChecksum,
|
final Progressable progress, final ChecksumOpt checksumOpt,
|
||||||
final boolean createParent) throws AccessControlException,
|
final boolean createParent) throws AccessControlException,
|
||||||
FileAlreadyExistsException, FileNotFoundException,
|
FileAlreadyExistsException, FileNotFoundException,
|
||||||
ParentNotDirectoryException, UnsupportedFileSystemException,
|
ParentNotDirectoryException, UnsupportedFileSystemException,
|
||||||
|
|
|
@ -43,12 +43,14 @@ public class DataChecksum implements Checksum {
|
||||||
public static final int CHECKSUM_NULL = 0;
|
public static final int CHECKSUM_NULL = 0;
|
||||||
public static final int CHECKSUM_CRC32 = 1;
|
public static final int CHECKSUM_CRC32 = 1;
|
||||||
public static final int CHECKSUM_CRC32C = 2;
|
public static final int CHECKSUM_CRC32C = 2;
|
||||||
|
public static final int CHECKSUM_DEFAULT = 3;
|
||||||
|
|
||||||
/** The checksum types */
|
/** The checksum types */
|
||||||
public static enum Type {
|
public static enum Type {
|
||||||
NULL (CHECKSUM_NULL, 0),
|
NULL (CHECKSUM_NULL, 0),
|
||||||
CRC32 (CHECKSUM_CRC32, 4),
|
CRC32 (CHECKSUM_CRC32, 4),
|
||||||
CRC32C(CHECKSUM_CRC32C, 4);
|
CRC32C(CHECKSUM_CRC32C, 4),
|
||||||
|
DEFAULT(CHECKSUM_DEFAULT, 0); // This cannot be used to create DataChecksum
|
||||||
|
|
||||||
public final int id;
|
public final int id;
|
||||||
public final int size;
|
public final int size;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.net.URISyntaxException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -76,7 +77,7 @@ public class TestAfsCheckPath {
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream createInternal(Path f, EnumSet<CreateFlag> flag,
|
public FSDataOutputStream createInternal(Path f, EnumSet<CreateFlag> flag,
|
||||||
FsPermission absolutePermission, int bufferSize, short replication,
|
FsPermission absolutePermission, int bufferSize, short replication,
|
||||||
long blockSize, Progressable progress, int bytesPerChecksum,
|
long blockSize, Progressable progress, ChecksumOpt checksumOpt,
|
||||||
boolean createParent) throws IOException {
|
boolean createParent) throws IOException {
|
||||||
// deliberately empty
|
// deliberately empty
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Iterator;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
@ -80,6 +81,11 @@ public class TestFilterFileSystem {
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
|
||||||
|
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
|
||||||
|
Progressable progress, ChecksumOpt checksumOpt) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
public boolean mkdirs(Path f) { return false; }
|
public boolean mkdirs(Path f) { return false; }
|
||||||
public FSDataInputStream open(Path f) { return null; }
|
public FSDataInputStream open(Path f) { return null; }
|
||||||
public FSDataOutputStream create(Path f) { return null; }
|
public FSDataOutputStream create(Path f) { return null; }
|
||||||
|
@ -138,6 +144,16 @@ public class TestFilterFileSystem {
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
public FSDataOutputStream create(Path f,
|
||||||
|
FsPermission permission,
|
||||||
|
EnumSet<CreateFlag> flags,
|
||||||
|
int bufferSize,
|
||||||
|
short replication,
|
||||||
|
long blockSize,
|
||||||
|
Progressable progress,
|
||||||
|
ChecksumOpt checksumOpt) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
public String getName() { return null; }
|
public String getName() { return null; }
|
||||||
public boolean delete(Path f) { return false; }
|
public boolean delete(Path f) { return false; }
|
||||||
public short getReplication(Path src) { return 0 ; }
|
public short getReplication(Path src) { return 0 ; }
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestFsOptions {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcessChecksumOpt() {
|
||||||
|
ChecksumOpt defaultOpt = new ChecksumOpt(DataChecksum.Type.CRC32, 512);
|
||||||
|
ChecksumOpt finalOpt;
|
||||||
|
|
||||||
|
// Give a null
|
||||||
|
finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, null);
|
||||||
|
checkParams(defaultOpt, finalOpt);
|
||||||
|
|
||||||
|
// null with bpc
|
||||||
|
finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, null, 1024);
|
||||||
|
checkParams(DataChecksum.Type.CRC32, 1024, finalOpt);
|
||||||
|
|
||||||
|
ChecksumOpt myOpt = new ChecksumOpt();
|
||||||
|
|
||||||
|
// custom with unspecified parameters
|
||||||
|
finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt);
|
||||||
|
checkParams(defaultOpt, finalOpt);
|
||||||
|
|
||||||
|
myOpt = new ChecksumOpt(DataChecksum.Type.CRC32C, 2048);
|
||||||
|
|
||||||
|
// custom config
|
||||||
|
finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt);
|
||||||
|
checkParams(DataChecksum.Type.CRC32C, 2048, finalOpt);
|
||||||
|
|
||||||
|
// custom config + bpc
|
||||||
|
finalOpt = ChecksumOpt.processChecksumOpt(defaultOpt, myOpt, 4096);
|
||||||
|
checkParams(DataChecksum.Type.CRC32C, 4096, finalOpt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkParams(ChecksumOpt expected, ChecksumOpt obtained) {
|
||||||
|
assertEquals(expected.getChecksumType(), obtained.getChecksumType());
|
||||||
|
assertEquals(expected.getBytesPerChecksum(), obtained.getBytesPerChecksum());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkParams(DataChecksum.Type type, int bpc, ChecksumOpt obtained) {
|
||||||
|
assertEquals(type, obtained.getChecksumType());
|
||||||
|
assertEquals(bpc, obtained.getBytesPerChecksum());
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
|
import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
@ -93,10 +94,10 @@ public class Hdfs extends AbstractFileSystem {
|
||||||
public HdfsDataOutputStream createInternal(Path f,
|
public HdfsDataOutputStream createInternal(Path f,
|
||||||
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
|
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
|
||||||
int bufferSize, short replication, long blockSize, Progressable progress,
|
int bufferSize, short replication, long blockSize, Progressable progress,
|
||||||
int bytesPerChecksum, boolean createParent) throws IOException {
|
ChecksumOpt checksumOpt, boolean createParent) throws IOException {
|
||||||
return new HdfsDataOutputStream(dfs.primitiveCreate(getUriPath(f),
|
return new HdfsDataOutputStream(dfs.primitiveCreate(getUriPath(f),
|
||||||
absolutePermission, createFlag, createParent, replication, blockSize,
|
absolutePermission, createFlag, createParent, replication, blockSize,
|
||||||
progress, bufferSize, bytesPerChecksum), getStatistics());
|
progress, bufferSize, checksumOpt), getStatistics());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -93,6 +93,7 @@ import org.apache.hadoop.fs.HdfsBlockLocation;
|
||||||
import org.apache.hadoop.fs.InvalidPathException;
|
import org.apache.hadoop.fs.InvalidPathException;
|
||||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
|
@ -205,8 +206,7 @@ public class DFSClient implements java.io.Closeable {
|
||||||
final int maxBlockAcquireFailures;
|
final int maxBlockAcquireFailures;
|
||||||
final int confTime;
|
final int confTime;
|
||||||
final int ioBufferSize;
|
final int ioBufferSize;
|
||||||
final DataChecksum.Type checksumType;
|
final ChecksumOpt defaultChecksumOpt;
|
||||||
final int bytesPerChecksum;
|
|
||||||
final int writePacketSize;
|
final int writePacketSize;
|
||||||
final int socketTimeout;
|
final int socketTimeout;
|
||||||
final int socketCacheCapacity;
|
final int socketCacheCapacity;
|
||||||
|
@ -245,9 +245,7 @@ public class DFSClient implements java.io.Closeable {
|
||||||
ioBufferSize = conf.getInt(
|
ioBufferSize = conf.getInt(
|
||||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
||||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||||
checksumType = getChecksumType(conf);
|
defaultChecksumOpt = getChecksumOptFromConf(conf);
|
||||||
bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
|
|
||||||
DFS_BYTES_PER_CHECKSUM_DEFAULT);
|
|
||||||
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||||
HdfsServerConstants.READ_TIMEOUT);
|
HdfsServerConstants.READ_TIMEOUT);
|
||||||
/** dfs.write.packet.size is an internal config variable */
|
/** dfs.write.packet.size is an internal config variable */
|
||||||
|
@ -302,9 +300,32 @@ public class DFSClient implements java.io.Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DataChecksum createChecksum() {
|
// Construct a checksum option from conf
|
||||||
return DataChecksum.newDataChecksum(
|
private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
|
||||||
checksumType, bytesPerChecksum);
|
DataChecksum.Type type = getChecksumType(conf);
|
||||||
|
int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
|
||||||
|
DFS_BYTES_PER_CHECKSUM_DEFAULT);
|
||||||
|
return new ChecksumOpt(type, bytesPerChecksum);
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a DataChecksum with the default option.
|
||||||
|
private DataChecksum createChecksum() throws IOException {
|
||||||
|
return createChecksum(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DataChecksum createChecksum(ChecksumOpt userOpt)
|
||||||
|
throws IOException {
|
||||||
|
// Fill in any missing field with the default.
|
||||||
|
ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
|
||||||
|
defaultChecksumOpt, userOpt);
|
||||||
|
DataChecksum dataChecksum = DataChecksum.newDataChecksum(
|
||||||
|
myOpt.getChecksumType(),
|
||||||
|
myOpt.getBytesPerChecksum());
|
||||||
|
if (dataChecksum == null) {
|
||||||
|
throw new IOException("Invalid checksum type specified: "
|
||||||
|
+ myOpt.getChecksumType().name());
|
||||||
|
}
|
||||||
|
return dataChecksum;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1181,12 +1202,13 @@ public class DFSClient implements java.io.Closeable {
|
||||||
return create(src, FsPermission.getDefault(),
|
return create(src, FsPermission.getDefault(),
|
||||||
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
||||||
: EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
|
: EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
|
||||||
buffersize);
|
buffersize, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call {@link #create(String, FsPermission, EnumSet, boolean, short,
|
* Call {@link #create(String, FsPermission, EnumSet, boolean, short,
|
||||||
* long, Progressable, int)} with <code>createParent</code> set to true.
|
* long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
|
||||||
|
* set to true.
|
||||||
*/
|
*/
|
||||||
public DFSOutputStream create(String src,
|
public DFSOutputStream create(String src,
|
||||||
FsPermission permission,
|
FsPermission permission,
|
||||||
|
@ -1194,10 +1216,11 @@ public class DFSClient implements java.io.Closeable {
|
||||||
short replication,
|
short replication,
|
||||||
long blockSize,
|
long blockSize,
|
||||||
Progressable progress,
|
Progressable progress,
|
||||||
int buffersize)
|
int buffersize,
|
||||||
|
ChecksumOpt checksumOpt)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return create(src, permission, flag, true,
|
return create(src, permission, flag, true,
|
||||||
replication, blockSize, progress, buffersize);
|
replication, blockSize, progress, buffersize, checksumOpt);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1215,6 +1238,7 @@ public class DFSClient implements java.io.Closeable {
|
||||||
* @param blockSize maximum block size
|
* @param blockSize maximum block size
|
||||||
* @param progress interface for reporting client progress
|
* @param progress interface for reporting client progress
|
||||||
* @param buffersize underlying buffer size
|
* @param buffersize underlying buffer size
|
||||||
|
* @param checksumOpts checksum options
|
||||||
*
|
*
|
||||||
* @return output stream
|
* @return output stream
|
||||||
*
|
*
|
||||||
|
@ -1228,8 +1252,8 @@ public class DFSClient implements java.io.Closeable {
|
||||||
short replication,
|
short replication,
|
||||||
long blockSize,
|
long blockSize,
|
||||||
Progressable progress,
|
Progressable progress,
|
||||||
int buffersize)
|
int buffersize,
|
||||||
throws IOException {
|
ChecksumOpt checksumOpt) throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
if (permission == null) {
|
if (permission == null) {
|
||||||
permission = FsPermission.getDefault();
|
permission = FsPermission.getDefault();
|
||||||
|
@ -1240,7 +1264,7 @@ public class DFSClient implements java.io.Closeable {
|
||||||
}
|
}
|
||||||
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
||||||
src, masked, flag, createParent, replication, blockSize, progress,
|
src, masked, flag, createParent, replication, blockSize, progress,
|
||||||
buffersize, dfsClientConf.createChecksum());
|
buffersize, dfsClientConf.createChecksum(checksumOpt));
|
||||||
beginFileLease(src, result);
|
beginFileLease(src, result);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -1278,15 +1302,13 @@ public class DFSClient implements java.io.Closeable {
|
||||||
long blockSize,
|
long blockSize,
|
||||||
Progressable progress,
|
Progressable progress,
|
||||||
int buffersize,
|
int buffersize,
|
||||||
int bytesPerChecksum)
|
ChecksumOpt checksumOpt)
|
||||||
throws IOException, UnresolvedLinkException {
|
throws IOException, UnresolvedLinkException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
CreateFlag.validate(flag);
|
CreateFlag.validate(flag);
|
||||||
DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
|
DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
DataChecksum checksum = DataChecksum.newDataChecksum(
|
DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
|
||||||
dfsClientConf.checksumType,
|
|
||||||
bytesPerChecksum);
|
|
||||||
result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
|
result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
|
||||||
flag, createParent, replication, blockSize, progress, buffersize,
|
flag, createParent, replication, blockSize, progress, buffersize,
|
||||||
checksum);
|
checksum);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FsStatus;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
@ -258,19 +259,19 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
public HdfsDataOutputStream create(Path f, FsPermission permission,
|
public HdfsDataOutputStream create(Path f, FsPermission permission,
|
||||||
boolean overwrite, int bufferSize, short replication, long blockSize,
|
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
return create(f, permission,
|
return this.create(f, permission,
|
||||||
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
||||||
: EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
|
: EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
|
||||||
blockSize, progress);
|
blockSize, progress, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HdfsDataOutputStream create(Path f, FsPermission permission,
|
public HdfsDataOutputStream create(Path f, FsPermission permission,
|
||||||
EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
|
EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
|
||||||
Progressable progress) throws IOException {
|
Progressable progress, ChecksumOpt checksumOpt) throws IOException {
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
|
final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
|
||||||
replication, blockSize, progress, bufferSize);
|
replication, blockSize, progress, bufferSize, checksumOpt);
|
||||||
return new HdfsDataOutputStream(out, statistics);
|
return new HdfsDataOutputStream(out, statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,11 +280,11 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
protected HdfsDataOutputStream primitiveCreate(Path f,
|
protected HdfsDataOutputStream primitiveCreate(Path f,
|
||||||
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
|
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
|
||||||
short replication, long blockSize, Progressable progress,
|
short replication, long blockSize, Progressable progress,
|
||||||
int bytesPerChecksum) throws IOException {
|
ChecksumOpt checksumOpt) throws IOException {
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f),
|
return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f),
|
||||||
absolutePermission, flag, true, replication, blockSize,
|
absolutePermission, flag, true, replication, blockSize,
|
||||||
progress, bufferSize, bytesPerChecksum),statistics);
|
progress, bufferSize, checksumOpt),statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -298,7 +299,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
flag.add(CreateFlag.CREATE);
|
flag.add(CreateFlag.CREATE);
|
||||||
}
|
}
|
||||||
return new HdfsDataOutputStream(dfs.create(getPathName(f), permission, flag,
|
return new HdfsDataOutputStream(dfs.create(getPathName(f), permission, flag,
|
||||||
false, replication, blockSize, progress, bufferSize), statistics);
|
false, replication, blockSize, progress,
|
||||||
|
bufferSize, null), statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
|
||||||
|
@ -134,6 +135,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
@ -1003,7 +1005,8 @@ public class PBHelper {
|
||||||
fs.getWritePacketSize(), (short) fs.getReplication(),
|
fs.getWritePacketSize(), (short) fs.getReplication(),
|
||||||
fs.getFileBufferSize(),
|
fs.getFileBufferSize(),
|
||||||
fs.getEncryptDataTransfer(),
|
fs.getEncryptDataTransfer(),
|
||||||
fs.getTrashInterval());
|
fs.getTrashInterval(),
|
||||||
|
DataChecksum.Type.valueOf(fs.getChecksumType().name()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
|
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
|
||||||
|
@ -1015,7 +1018,9 @@ public class PBHelper {
|
||||||
.setReplication(fs.getReplication())
|
.setReplication(fs.getReplication())
|
||||||
.setFileBufferSize(fs.getFileBufferSize())
|
.setFileBufferSize(fs.getFileBufferSize())
|
||||||
.setEncryptDataTransfer(fs.getEncryptDataTransfer())
|
.setEncryptDataTransfer(fs.getEncryptDataTransfer())
|
||||||
.setTrashInterval(fs.getTrashInterval()).build();
|
.setTrashInterval(fs.getTrashInterval())
|
||||||
|
.setChecksumType(ChecksumTypeProto.valueOf(fs.getChecksumType().name()))
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FsPermissionProto convert(FsPermission p) {
|
public static FsPermissionProto convert(FsPermission p) {
|
||||||
|
|
|
@ -215,7 +215,7 @@ public class DatanodeWebHdfsMethods {
|
||||||
fullpath, permission.getFsPermission(),
|
fullpath, permission.getFsPermission(),
|
||||||
overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
||||||
: EnumSet.of(CreateFlag.CREATE),
|
: EnumSet.of(CreateFlag.CREATE),
|
||||||
replication.getValue(conf), blockSize.getValue(conf), null, b), null);
|
replication.getValue(conf), blockSize.getValue(conf), null, b, null), null);
|
||||||
IOUtils.copyBytes(in, out, b);
|
IOUtils.copyBytes(in, out, b);
|
||||||
out.close();
|
out.close();
|
||||||
out = null;
|
out = null;
|
||||||
|
|
|
@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
|
||||||
|
@ -195,6 +197,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.mortbay.util.ajax.JSON;
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
@ -461,6 +464,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
"must not be specified if HA is not enabled.");
|
"must not be specified if HA is not enabled.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the checksum type from config
|
||||||
|
String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
|
||||||
|
DataChecksum.Type checksumType;
|
||||||
|
try {
|
||||||
|
checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
throw new IOException("Invalid checksum type in "
|
||||||
|
+ DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
|
||||||
|
}
|
||||||
|
|
||||||
this.serverDefaults = new FsServerDefaults(
|
this.serverDefaults = new FsServerDefaults(
|
||||||
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
|
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
|
||||||
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
|
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
|
||||||
|
@ -468,7 +481,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
|
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
|
||||||
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
|
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
|
||||||
conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
|
conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
|
||||||
conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT));
|
conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
|
||||||
|
checksumType);
|
||||||
|
|
||||||
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
|
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
|
||||||
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
|
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
|
||||||
|
|
|
@ -178,6 +178,15 @@ message HdfsFileStatusProto {
|
||||||
optional LocatedBlocksProto locations = 12; // suppled only if asked by client
|
optional LocatedBlocksProto locations = 12; // suppled only if asked by client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checksum algorithms/types used in HDFS
|
||||||
|
*/
|
||||||
|
enum ChecksumTypeProto {
|
||||||
|
NULL = 0;
|
||||||
|
CRC32 = 1;
|
||||||
|
CRC32C = 2;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HDFS Server Defaults
|
* HDFS Server Defaults
|
||||||
*/
|
*/
|
||||||
|
@ -189,6 +198,7 @@ message FsServerDefaultsProto {
|
||||||
required uint32 fileBufferSize = 5;
|
required uint32 fileBufferSize = 5;
|
||||||
optional bool encryptDataTransfer = 6 [default = false];
|
optional bool encryptDataTransfer = 6 [default = false];
|
||||||
optional uint64 trashInterval = 7 [default = 0];
|
optional uint64 trashInterval = 7 [default = 0];
|
||||||
|
optional ChecksumTypeProto checksumType = 8 [default = CRC32];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.lang.ArrayUtils;
|
import org.apache.commons.lang.ArrayUtils;
|
||||||
|
@ -36,16 +37,19 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.BlockStorageLocation;
|
import org.apache.hadoop.fs.BlockStorageLocation;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.VolumeId;
|
import org.apache.hadoop.fs.VolumeId;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -664,4 +668,54 @@ public class TestDistributedFileSystem {
|
||||||
(l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
|
(l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateWithCustomChecksum() throws Exception {
|
||||||
|
Configuration conf = getTestConfiguration();
|
||||||
|
final long grace = 1000L;
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
Path testBasePath = new Path("/test/csum");
|
||||||
|
// create args
|
||||||
|
Path path1 = new Path(testBasePath, "file_wtih_crc1");
|
||||||
|
Path path2 = new Path(testBasePath, "file_with_crc2");
|
||||||
|
ChecksumOpt opt1 = new ChecksumOpt(DataChecksum.Type.CRC32C, 512);
|
||||||
|
ChecksumOpt opt2 = new ChecksumOpt(DataChecksum.Type.CRC32, 512);
|
||||||
|
|
||||||
|
// common args
|
||||||
|
FsPermission perm = FsPermission.getDefault().applyUMask(
|
||||||
|
FsPermission.getUMask(conf));
|
||||||
|
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.OVERWRITE,
|
||||||
|
CreateFlag.CREATE);
|
||||||
|
short repl = 1;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
FileSystem dfs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
dfs.mkdirs(testBasePath);
|
||||||
|
|
||||||
|
// create two files with different checksum types
|
||||||
|
FSDataOutputStream out1 = dfs.create(path1, perm, flags, 4096, repl,
|
||||||
|
131072L, null, opt1);
|
||||||
|
FSDataOutputStream out2 = dfs.create(path2, perm, flags, 4096, repl,
|
||||||
|
131072L, null, opt2);
|
||||||
|
|
||||||
|
for (int i = 0; i < 1024; i++) {
|
||||||
|
out1.write(i);
|
||||||
|
out2.write(i);
|
||||||
|
}
|
||||||
|
out1.close();
|
||||||
|
out2.close();
|
||||||
|
|
||||||
|
// the two checksums must be different.
|
||||||
|
FileChecksum sum1 = dfs.getFileChecksum(path1);
|
||||||
|
FileChecksum sum2 = dfs.getFileChecksum(path2);
|
||||||
|
assertFalse(sum1.equals(sum2));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.getFileSystem().delete(testBasePath, true);
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
@ -430,7 +431,7 @@ public class TestResourceLocalizationService {
|
||||||
new FSDataOutputStream(new DataOutputBuffer(), null);
|
new FSDataOutputStream(new DataOutputBuffer(), null);
|
||||||
doReturn(out).when(spylfs).createInternal(isA(Path.class),
|
doReturn(out).when(spylfs).createInternal(isA(Path.class),
|
||||||
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
|
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
|
||||||
anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
|
anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
|
||||||
final LocalResource resource = getPrivateMockedResource(r);
|
final LocalResource resource = getPrivateMockedResource(r);
|
||||||
final LocalResourceRequest req = new LocalResourceRequest(resource);
|
final LocalResourceRequest req = new LocalResourceRequest(resource);
|
||||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
||||||
|
|
Loading…
Reference in New Issue