HDFS-13209. DistributedFileSystem.create should allow an option to provide StoragePolicy. Contributed by Ayush Saxena.

This commit is contained in:
Surendra Singh Lilhore 2019-02-14 22:13:14 +05:30
parent 080a421911
commit 0d7a5ac5f5
25 changed files with 159 additions and 59 deletions

View File

@ -1211,13 +1211,31 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
long blockSize, Progressable progress, int buffersize, long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName) throws IOException { String ecPolicyName) throws IOException {
return create(src, permission, flag, createParent, replication, blockSize,
progress, buffersize, checksumOpt, favoredNodes, ecPolicyName, null);
}
/**
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
* addition of Progressable, int, ChecksumOpt, InetSocketAddress[], String)}
* with the storagePolicy that is used to specify a specific storage policy
* instead of inheriting any policy from this new file's parent directory.
* This policy will be persisted in HDFS. A value of null means inheriting
* parent groups' whatever policy.
*/
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName, String storagePolicy)
throws IOException {
checkOpen(); checkOpen();
final FsPermission masked = applyUMask(permission); final FsPermission masked = applyUMask(permission);
LOG.debug("{}: masked={}", src, masked); LOG.debug("{}: masked={}", src, masked);
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress, src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt), dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes), ecPolicyName); getFavoredNodesStr(favoredNodes), ecPolicyName, storagePolicy);
beginFileLease(result.getFileId(), result); beginFileLease(result.getFileId(), result);
return result; return result;
} }
@ -1271,7 +1289,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
result = DFSOutputStream.newStreamForCreate(this, src, absPermission, result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
flag, createParent, replication, blockSize, progress, checksum, flag, createParent, replication, blockSize, progress, checksum,
null, null); null, null, null);
} }
beginFileLease(result.getFileId(), result); beginFileLease(result.getFileId(), result);
return result; return result;

View File

@ -260,7 +260,8 @@ public class DFSOutputStream extends FSOutputSummer
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, short replication, long blockSize, Progressable progress,
DataChecksum checksum, String[] favoredNodes, String ecPolicyName) DataChecksum checksum, String[] favoredNodes, String ecPolicyName,
String storagePolicy)
throws IOException { throws IOException {
try (TraceScope ignored = try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForCreate", src)) { dfsClient.newPathTraceScope("newStreamForCreate", src)) {
@ -275,7 +276,8 @@ public class DFSOutputStream extends FSOutputSummer
try { try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication, new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName,
storagePolicy);
break; break;
} catch (RemoteException re) { } catch (RemoteException re) {
IOException e = re.unwrapRemoteException( IOException e = re.unwrapRemoteException(

View File

@ -563,13 +563,17 @@ public class DistributedFileSystem extends FileSystem
* replication policy from its ancestor (the default). * replication policy from its ancestor (the default).
* ecPolicyName and SHOULD_REPLICATE CreateFlag are mutually exclusive. It's * ecPolicyName and SHOULD_REPLICATE CreateFlag are mutually exclusive. It's
* invalid to set both SHOULD_REPLICATE and a non-null ecPolicyName. * invalid to set both SHOULD_REPLICATE and a non-null ecPolicyName.
* The third addition is storagePolicyName. A non-null storage Policy
* specifies an explicit storage policy for this file, overriding the
* inherited policy.
* *
*/ */
private HdfsDataOutputStream create(final Path f, private HdfsDataOutputStream create(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag, final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize, final int bufferSize, final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt, final Progressable progress, final ChecksumOpt checksumOpt,
final InetSocketAddress[] favoredNodes, final String ecPolicyName) final InetSocketAddress[] favoredNodes, final String ecPolicyName,
final String storagePolicy)
throws IOException { throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE); storageStatistics.incrementOpCounter(OpType.CREATE);
@ -579,7 +583,7 @@ public class DistributedFileSystem extends FileSystem
public HdfsDataOutputStream doCall(final Path p) throws IOException { public HdfsDataOutputStream doCall(final Path p) throws IOException {
final DFSOutputStream out = dfs.create(getPathName(f), permission, final DFSOutputStream out = dfs.create(getPathName(f), permission,
flag, true, replication, blockSize, progress, bufferSize, flag, true, replication, blockSize, progress, bufferSize,
checksumOpt, favoredNodes, ecPolicyName); checksumOpt, favoredNodes, ecPolicyName, storagePolicy);
return dfs.createWrappedOutputStream(out, statistics); return dfs.createWrappedOutputStream(out, statistics);
} }
@Override @Override
@ -588,7 +592,8 @@ public class DistributedFileSystem extends FileSystem
if (fs instanceof DistributedFileSystem) { if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem)fs; DistributedFileSystem myDfs = (DistributedFileSystem)fs;
return myDfs.create(p, permission, flag, bufferSize, replication, return myDfs.create(p, permission, flag, bufferSize, replication,
blockSize, progress, checksumOpt, favoredNodes, ecPolicyName); blockSize, progress, checksumOpt, favoredNodes, ecPolicyName,
storagePolicy);
} }
throw new UnsupportedOperationException("Cannot create with" + throw new UnsupportedOperationException("Cannot create with" +
" favoredNodes through a symlink to a non-DistributedFileSystem: " " favoredNodes through a symlink to a non-DistributedFileSystem: "
@ -619,14 +624,15 @@ public class DistributedFileSystem extends FileSystem
* *
* @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable, * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable,
* ChecksumOpt, InetSocketAddress[], String) for the descriptions of * ChecksumOpt, InetSocketAddress[], String) for the descriptions of
* additional parameters, i.e., favoredNodes and ecPolicyName. * additional parameters, i.e., favoredNodes, ecPolicyName and
* storagePolicyName.
*/ */
private HdfsDataOutputStream createNonRecursive(final Path f, private HdfsDataOutputStream createNonRecursive(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag, final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize, final int bufferSize, final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt, final Progressable progress, final ChecksumOpt checksumOpt,
final InetSocketAddress[] favoredNodes, final String ecPolicyName) final InetSocketAddress[] favoredNodes, final String ecPolicyName,
throws IOException { final String storagePolicyName) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE); storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f); Path absF = fixRelativePart(f);
@ -635,7 +641,7 @@ public class DistributedFileSystem extends FileSystem
public HdfsDataOutputStream doCall(final Path p) throws IOException { public HdfsDataOutputStream doCall(final Path p) throws IOException {
final DFSOutputStream out = dfs.create(getPathName(f), permission, final DFSOutputStream out = dfs.create(getPathName(f), permission,
flag, false, replication, blockSize, progress, bufferSize, flag, false, replication, blockSize, progress, bufferSize,
checksumOpt, favoredNodes, ecPolicyName); checksumOpt, favoredNodes, ecPolicyName, storagePolicyName);
return dfs.createWrappedOutputStream(out, statistics); return dfs.createWrappedOutputStream(out, statistics);
} }
@Override @Override
@ -645,7 +651,7 @@ public class DistributedFileSystem extends FileSystem
DistributedFileSystem myDfs = (DistributedFileSystem)fs; DistributedFileSystem myDfs = (DistributedFileSystem)fs;
return myDfs.createNonRecursive(p, permission, flag, bufferSize, return myDfs.createNonRecursive(p, permission, flag, bufferSize,
replication, blockSize, progress, checksumOpt, favoredNodes, replication, blockSize, progress, checksumOpt, favoredNodes,
ecPolicyName); ecPolicyName, storagePolicyName);
} }
throw new UnsupportedOperationException("Cannot create with" + throw new UnsupportedOperationException("Cannot create with" +
" favoredNodes through a symlink to a non-DistributedFileSystem: " " favoredNodes through a symlink to a non-DistributedFileSystem: "
@ -3183,6 +3189,7 @@ public class DistributedFileSystem extends FileSystem
private final DistributedFileSystem dfs; private final DistributedFileSystem dfs;
private InetSocketAddress[] favoredNodes = null; private InetSocketAddress[] favoredNodes = null;
private String ecPolicyName = null; private String ecPolicyName = null;
private String storagePolicyName = null;
/** /**
* Construct a HdfsDataOutputStream builder for a file. * Construct a HdfsDataOutputStream builder for a file.
@ -3254,6 +3261,22 @@ public class DistributedFileSystem extends FileSystem
return this; return this;
} }
@VisibleForTesting
String getStoragePolicyName() {
return storagePolicyName;
}
/**
* Enforce a file to follow the specified storage policy irrespective of the
* storage policy of its parent directory.
*/
public HdfsDataOutputStreamBuilder storagePolicyName(
@Nonnull final String policyName) {
Preconditions.checkNotNull(policyName);
storagePolicyName = policyName;
return this;
}
@VisibleForTesting @VisibleForTesting
String getEcPolicyName() { String getEcPolicyName() {
return ecPolicyName; return ecPolicyName;
@ -3320,11 +3343,12 @@ public class DistributedFileSystem extends FileSystem
return dfs.create(getPath(), getPermission(), getFlags(), return dfs.create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(), getBufferSize(), getReplication(), getBlockSize(),
getProgress(), getChecksumOpt(), getFavoredNodes(), getProgress(), getChecksumOpt(), getFavoredNodes(),
getEcPolicyName()); getEcPolicyName(), getStoragePolicyName());
} else { } else {
return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), return dfs.createNonRecursive(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(), getProgress(), getBufferSize(), getReplication(), getBlockSize(), getProgress(),
getChecksumOpt(), getFavoredNodes(), getEcPolicyName()); getChecksumOpt(), getFavoredNodes(), getEcPolicyName(),
getStoragePolicyName());
} }
} else if (getFlags().contains(CreateFlag.APPEND)) { } else if (getFlags().contains(CreateFlag.APPEND)) {
return dfs.append(getPath(), getFlags(), getBufferSize(), getProgress(), return dfs.append(getPath(), getFlags(), getBufferSize(), getProgress(),

View File

@ -176,6 +176,7 @@ public interface ClientProtocol {
* policy. ecPolicyName and SHOULD_REPLICATE CreateFlag * policy. ecPolicyName and SHOULD_REPLICATE CreateFlag
* are mutually exclusive. It's invalid to set both * are mutually exclusive. It's invalid to set both
* SHOULD_REPLICATE flag and a non-null ecPolicyName. * SHOULD_REPLICATE flag and a non-null ecPolicyName.
*@param storagePolicy the name of the storage policy.
* *
* @return the status of the created file, it could be null if the server * @return the status of the created file, it could be null if the server
* doesn't support returning the file status * doesn't support returning the file status
@ -209,7 +210,8 @@ public interface ClientProtocol {
HdfsFileStatus create(String src, FsPermission masked, HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName) CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
String storagePolicy)
throws IOException; throws IOException;
/** /**

View File

@ -345,7 +345,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
public HdfsFileStatus create(String src, FsPermission masked, public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName) CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
String storagePolicy)
throws IOException { throws IOException {
CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
.setSrc(src) .setSrc(src)
@ -358,6 +359,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
if (ecPolicyName != null) { if (ecPolicyName != null) {
builder.setEcPolicyName(ecPolicyName); builder.setEcPolicyName(ecPolicyName);
} }
if (storagePolicy != null) {
builder.setStoragePolicy(storagePolicy);
}
FsPermission unmasked = masked.getUnmasked(); FsPermission unmasked = masked.getUnmasked();
if (unmasked != null) { if (unmasked != null) {
builder.setUnmasked(PBHelperClient.convert(unmasked)); builder.setUnmasked(PBHelperClient.convert(unmasked));

View File

@ -83,6 +83,7 @@ message CreateRequestProto {
repeated CryptoProtocolVersionProto cryptoProtocolVersion = 8; repeated CryptoProtocolVersionProto cryptoProtocolVersion = 8;
optional FsPermissionProto unmasked = 9; optional FsPermissionProto unmasked = 9;
optional string ecPolicyName = 10; optional string ecPolicyName = 10;
optional string storagePolicy = 11;
} }
message CreateResponseProto { message CreateResponseProto {

View File

@ -192,7 +192,8 @@ public class RouterClientProtocol implements ClientProtocol {
public HdfsFileStatus create(String src, FsPermission masked, public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName) CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
String storagePolicy)
throws IOException { throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE); rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
@ -213,9 +214,9 @@ public class RouterClientProtocol implements ClientProtocol {
new Class<?>[] {String.class, FsPermission.class, String.class, new Class<?>[] {String.class, FsPermission.class, String.class,
EnumSetWritable.class, boolean.class, short.class, EnumSetWritable.class, boolean.class, short.class,
long.class, CryptoProtocolVersion[].class, long.class, CryptoProtocolVersion[].class,
String.class}, String.class, String.class},
createLocation.getDest(), masked, clientName, flag, createParent, createLocation.getDest(), masked, clientName, flag, createParent,
replication, blockSize, supportedVersions, ecPolicyName); replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
} }

View File

@ -500,10 +500,11 @@ public class RouterRpcServer extends AbstractService
public HdfsFileStatus create(String src, FsPermission masked, public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName) CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
String storagePolicy)
throws IOException { throws IOException {
return clientProto.create(src, masked, clientName, flag, createParent, return clientProto.create(src, masked, clientName, flag, createParent,
replication, blockSize, supportedVersions, ecPolicyName); replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
} }
/** /**

View File

@ -884,7 +884,7 @@ public class TestRouterRpc {
HdfsFileStatus status = routerProtocol.create( HdfsFileStatus status = routerProtocol.create(
newRouterFile, new FsPermission("777"), clientName, newRouterFile, new FsPermission("777"), clientName,
new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1, new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1,
(long) 1024, CryptoProtocolVersion.supported(), null); (long) 1024, CryptoProtocolVersion.supported(), null, null);
// Add a block via router (requires client to have same lease) // Add a block via router (requires client to have same lease)
LocatedBlock block = routerProtocol.addBlock( LocatedBlock block = routerProtocol.addBlock(

View File

@ -478,7 +478,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
(short) req.getReplication(), req.getBlockSize(), (short) req.getReplication(), req.getBlockSize(),
PBHelperClient.convertCryptoProtocolVersions( PBHelperClient.convertCryptoProtocolVersions(
req.getCryptoProtocolVersionList()), req.getCryptoProtocolVersionList()),
req.getEcPolicyName()); req.getEcPolicyName(), req.getStoragePolicy());
if (result != null) { if (result != null) {
return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result)) return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result))

View File

@ -361,7 +361,8 @@ class FSDirWriteFileOp {
EnumSet<CreateFlag> flag, boolean createParent, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, short replication, long blockSize,
FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry) boolean shouldReplicate, String ecPolicyName, String storagePolicy,
boolean logRetryEntry)
throws IOException { throws IOException {
assert fsn.hasWriteLock(); assert fsn.hasWriteLock();
boolean overwrite = flag.contains(CreateFlag.OVERWRITE); boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
@ -396,7 +397,7 @@ class FSDirWriteFileOp {
if (parent != null) { if (parent != null) {
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine, shouldReplicate, replication, blockSize, holder, clientMachine, shouldReplicate,
ecPolicyName); ecPolicyName, storagePolicy);
newNode = iip != null ? iip.getLastINode().asFile() : null; newNode = iip != null ? iip.getLastINode().asFile() : null;
} }
if (newNode == null) { if (newNode == null) {
@ -540,7 +541,7 @@ class FSDirWriteFileOp {
FSDirectory fsd, INodesInPath existing, byte[] localName, FSDirectory fsd, INodesInPath existing, byte[] localName,
PermissionStatus permissions, short replication, long preferredBlockSize, PermissionStatus permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine, boolean shouldReplicate, String clientName, String clientMachine, boolean shouldReplicate,
String ecPolicyName) throws IOException { String ecPolicyName, String storagePolicy) throws IOException {
Preconditions.checkNotNull(existing); Preconditions.checkNotNull(existing);
long modTime = now(); long modTime = now();
@ -549,6 +550,16 @@ class FSDirWriteFileOp {
try { try {
boolean isStriped = false; boolean isStriped = false;
ErasureCodingPolicy ecPolicy = null; ErasureCodingPolicy ecPolicy = null;
byte storagepolicyid = 0;
if (storagePolicy != null && !storagePolicy.isEmpty()) {
BlockStoragePolicy policy =
fsd.getBlockManager().getStoragePolicy(storagePolicy);
if (policy == null) {
throw new HadoopIllegalArgumentException(
"Cannot find a block policy with the name " + storagePolicy);
}
storagepolicyid = policy.getId();
}
if (!shouldReplicate) { if (!shouldReplicate) {
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
fsd.getFSNamesystem(), ecPolicyName, existing); fsd.getFSNamesystem(), ecPolicyName, existing);
@ -562,7 +573,7 @@ class FSDirWriteFileOp {
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize, modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
blockType); storagepolicyid, blockType);
newNode.setLocalName(localName); newNode.setLocalName(localName);
newNode.toUnderConstruction(clientName, clientMachine); newNode.toUnderConstruction(clientName, clientMachine);
newiip = fsd.addINode(existing, newNode, permissions.getPermission()); newiip = fsd.addINode(existing, newNode, permissions.getPermission());
@ -740,13 +751,6 @@ class FSDirWriteFileOp {
storagePolicyId, blockType); storagePolicyId, blockType);
} }
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
long mtime, long atime, Short replication, Byte ecPolicyID,
long preferredBlockSize, BlockType blockType) {
return newINodeFile(id, permissions, mtime, atime, replication, ecPolicyID,
preferredBlockSize, (byte)0, blockType);
}
/** /**
* Persist the new block (the last block of the given file). * Persist the new block (the last block of the given file).
*/ */

View File

@ -2441,13 +2441,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String holder, String clientMachine, EnumSet<CreateFlag> flag, String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName, CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
boolean logRetryCache) throws IOException { String storagePolicy, boolean logRetryCache) throws IOException {
HdfsFileStatus status; HdfsFileStatus status;
try { try {
status = startFileInt(src, permissions, holder, clientMachine, flag, status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions, ecPolicyName, createParent, replication, blockSize, supportedVersions, ecPolicyName,
logRetryCache); storagePolicy, logRetryCache);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "create", src); logAuditEvent(false, "create", src);
throw e; throw e;
@ -2460,7 +2460,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
PermissionStatus permissions, String holder, String clientMachine, PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication, EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions, long blockSize, CryptoProtocolVersion[] supportedVersions,
String ecPolicyName, boolean logRetryCache) throws IOException { String ecPolicyName, String storagePolicy, boolean logRetryCache)
throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("DIR* NameSystem.startFile: src=").append(src) builder.append("DIR* NameSystem.startFile: src=").append(src)
@ -2549,7 +2550,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try { try {
stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder, stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
clientMachine, flag, createParent, replication, blockSize, feInfo, clientMachine, flag, createParent, replication, blockSize, feInfo,
toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache); toRemoveBlocks, shouldReplicate, ecPolicyName, storagePolicy,
logRetryCache);
} catch (IOException e) { } catch (IOException e) {
skipSync = e instanceof StandbyException; skipSync = e instanceof StandbyException;
throw e; throw e;

View File

@ -780,7 +780,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
public HdfsFileStatus create(String src, FsPermission masked, public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize, boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName) CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
String storagePolicy)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
String clientMachine = getClientMachine(); String clientMachine = getClientMachine();
@ -804,7 +805,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
.getShortUserName(), null, masked); .getShortUserName(), null, masked);
status = namesystem.startFile(src, perm, clientName, clientMachine, status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions, flag.get(), createParent, replication, blockSize, supportedVersions,
ecPolicyName, cacheEntry != null); ecPolicyName, storagePolicy, cacheEntry != null);
} finally { } finally {
RetryCache.setState(cacheEntry, status != null, status); RetryCache.setState(cacheEntry, status != null, status);
} }

View File

@ -2137,7 +2137,7 @@ public class DFSTestUtil {
.create(file.toString(), new FsPermission((short)0755), .create(file.toString(), new FsPermission((short)0755),
dfs.getClient().getClientName(), dfs.getClient().getClientName(),
new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)),
false, (short)1, 128*1024*1024L, null, null); false, (short) 1, 128 * 1024 * 1024L, null, null, null);
FSNamesystem ns = cluster.getNamesystem(); FSNamesystem ns = cluster.getNamesystem();
FSDirectory fsdir = ns.getFSDirectory(); FSDirectory fsdir = ns.getFSDirectory();

View File

@ -276,7 +276,7 @@ public class TestDFSClientRetries {
.build()) .build())
.when(mockNN) .when(mockNN)
.create(anyString(), any(), anyString(), any(), anyBoolean(), .create(anyString(), any(), anyString(), any(), anyBoolean(),
anyShort(), anyLong(), any(), any()); anyShort(), anyLong(), any(), any(), any());
final DFSClient client = new DFSClient(null, mockNN, conf, null); final DFSClient client = new DFSClient(null, mockNN, conf, null);
OutputStream os = client.create("testfile", true); OutputStream os = client.create("testfile", true);

View File

@ -1450,6 +1450,45 @@ public class TestDistributedFileSystem {
} }
} }
@Test
public void testCreateWithStoragePolicy() throws Throwable {
Configuration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.storageTypes(
new StorageType[] {StorageType.DISK, StorageType.ARCHIVE,
StorageType.SSD}).storagesPerDatanode(3).build()) {
DistributedFileSystem fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/file1");
Path file2 = new Path("/tmp/file2");
fs.mkdirs(new Path("/tmp"));
fs.setStoragePolicy(new Path("/tmp"), "ALL_SSD");
FSDataOutputStream outputStream = fs.createFile(file1)
.storagePolicyName("COLD").build();
outputStream.write(1);
outputStream.close();
assertEquals(StorageType.ARCHIVE, DFSTestUtil.getAllBlocks(fs, file1)
.get(0).getStorageTypes()[0]);
assertEquals(fs.getStoragePolicy(file1).getName(), "COLD");
// Check with storage policy not specified.
outputStream = fs.createFile(file2).build();
outputStream.write(1);
outputStream.close();
assertEquals(StorageType.SSD, DFSTestUtil.getAllBlocks(fs, file2).get(0)
.getStorageTypes()[0]);
assertEquals(fs.getStoragePolicy(file2).getName(), "ALL_SSD");
// Check with default storage policy.
outputStream = fs.createFile(new Path("/default")).build();
outputStream.write(1);
outputStream.close();
assertEquals(StorageType.DISK,
DFSTestUtil.getAllBlocks(fs, new Path("/default")).get(0)
.getStorageTypes()[0]);
assertEquals(fs.getStoragePolicy(new Path("/default")).getName(), "HOT");
}
}
@Test(timeout=60000) @Test(timeout=60000)
public void testListFiles() throws IOException { public void testListFiles() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();

View File

@ -963,7 +963,7 @@ public class TestEncryptionZones {
.build()) .build())
.when(mcp) .when(mcp)
.create(anyString(), any(), anyString(), any(), anyBoolean(), .create(anyString(), any(), anyString(), any(), anyBoolean(),
anyShort(), anyLong(), any(), any()); anyShort(), anyLong(), any(), any(), any());
} }
// This test only uses mocks. Called from the end of an existing test to // This test only uses mocks. Called from the end of an existing test to

View File

@ -1246,7 +1246,7 @@ public class TestFileCreation {
try { try {
nnrpc.create(pathStr, new FsPermission((short)0755), "client", nnrpc.create(pathStr, new FsPermission((short)0755), "client",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
true, (short)1, 128*1024*1024L, null, null); true, (short) 1, 128 * 1024 * 1024L, null, null, null);
fail("Should have thrown exception when creating '" fail("Should have thrown exception when creating '"
+ pathStr + "'" + " by " + method); + pathStr + "'" + " by " + method);
} catch (InvalidPathException ipe) { } catch (InvalidPathException ipe) {

View File

@ -372,7 +372,7 @@ public class TestLease {
.build()) .build())
.when(mcp) .when(mcp)
.create(anyString(), any(), anyString(), .create(anyString(), any(), anyString(),
any(), anyBoolean(), anyShort(), anyLong(), any(), any()); any(), anyBoolean(), anyShort(), anyLong(), any(), any(), any());
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
final DFSClient c1 = createDFSClientAs(ugi[0], conf); final DFSClient c1 = createDFSClientAs(ugi[0], conf);

View File

@ -593,7 +593,8 @@ public class NNThroughputBenchmark implements Tool {
FsPermission.getDefault(), clientName, FsPermission.getDefault(), clientName,
new EnumSetWritable<CreateFlag>(EnumSet new EnumSetWritable<CreateFlag>(EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true,
replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null); replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null,
null);
long end = Time.now(); long end = Time.now();
for (boolean written = !closeUponCreate; !written; for (boolean written = !closeUponCreate; !written;
written = clientProto.complete(fileNames[daemonId][inputIdx], written = clientProto.complete(fileNames[daemonId][inputIdx],
@ -1143,7 +1144,7 @@ public class NNThroughputBenchmark implements Tool {
String fileName = nameGenerator.getNextFileName("ThroughputBench"); String fileName = nameGenerator.getNextFileName("ThroughputBench");
clientProto.create(fileName, FsPermission.getDefault(), clientName, clientProto.create(fileName, FsPermission.getDefault(), clientName,
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
BLOCK_SIZE, CryptoProtocolVersion.supported(), null); BLOCK_SIZE, CryptoProtocolVersion.supported(), null, null);
ExtendedBlock lastBlock = addBlocks(fileName, clientName); ExtendedBlock lastBlock = addBlocks(fileName, clientName);
clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID); clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID);
} }

View File

@ -86,7 +86,7 @@ public class TestAddBlockRetry {
nn.create(src, FsPermission.getFileDefault(), nn.create(src, FsPermission.getFileDefault(),
"clientName", "clientName",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
true, (short)3, 1024, null, null); true, (short) 3, 1024, null, null, null);
// start first addBlock() // start first addBlock()
LOG.info("Starting first addBlock for " + src); LOG.info("Starting first addBlock for " + src);
@ -158,7 +158,7 @@ public class TestAddBlockRetry {
// create file // create file
nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 3, 1024, null, null); (short) 3, 1024, null, null, null);
// start first addBlock() // start first addBlock()
LOG.info("Starting first addBlock for " + src); LOG.info("Starting first addBlock for " + src);
LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null,

View File

@ -109,7 +109,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
// Create the file with client machine // Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
replication, DEFAULT_BLOCK_SIZE, null, null, false); replication, DEFAULT_BLOCK_SIZE, null, null, null, false);
//test chooseTarget for new file //test chooseTarget for new file
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
@ -139,7 +139,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
// Create the file with client machine // Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
(short) 20, DEFAULT_BLOCK_SIZE, null, null, false); (short) 20, DEFAULT_BLOCK_SIZE, null, null, null, false);
//test chooseTarget for new file //test chooseTarget for new file
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,

View File

@ -131,7 +131,7 @@ public class TestDefaultBlockPlacementPolicy {
// Create the file with client machine // Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine,
clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR,
DEFAULT_BLOCK_SIZE, null, null, false); DEFAULT_BLOCK_SIZE, null, null, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null,
null, fileStatus.getFileId(), null, null); null, fileStatus.getFileId(), null, null);
@ -184,7 +184,7 @@ public class TestDefaultBlockPlacementPolicy {
// Create the file with client machine // Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, null, false); REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, null, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null, null); null, null, fileStatus.getFileId(), null, null);

View File

@ -232,19 +232,19 @@ public class TestNamenodeRetryCache {
newCall(); newCall();
HdfsFileStatus status = nnRpc.create(src, perm, "holder", HdfsFileStatus status = nnRpc.create(src, perm, "holder",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 1, BlockSize, null, null); (short) 1, BlockSize, null, null, null);
Assert.assertEquals(status, nnRpc.create(src, perm, "holder", Assert.assertEquals(status, nnRpc.create(src, perm, "holder",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 1, BlockSize, null, null)); (short) 1, BlockSize, null, null, null));
Assert.assertEquals(status, nnRpc.create(src, perm, "holder", Assert.assertEquals(status, nnRpc.create(src, perm, "holder",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 1, BlockSize, null, null)); (short) 1, BlockSize, null, null, null));
// A non-retried call fails // A non-retried call fails
newCall(); newCall();
try { try {
nnRpc.create(src, perm, "holder", nnRpc.create(src, perm, "holder",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
true, (short) 1, BlockSize, null, null); true, (short) 1, BlockSize, null, null, null);
Assert.fail("testCreate - expected exception is not thrown"); Assert.fail("testCreate - expected exception is not thrown");
} catch (IOException e) { } catch (IOException e) {
// expected // expected

View File

@ -414,7 +414,7 @@ public class TestRetryCacheWithHA {
new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes, new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes,
BlockSize, BlockSize,
new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES}, new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES},
null); null, null);
} }
@Override @Override