From 0d7a5ac5f526801367a9ec963e6d72783b637d55 Mon Sep 17 00:00:00 2001 From: Surendra Singh Lilhore Date: Thu, 14 Feb 2019 22:13:14 +0530 Subject: [PATCH] HDFS-13209. DistributedFileSystem.create should allow an option to provide StoragePolicy. Contributed by Ayush Saxena. --- .../org/apache/hadoop/hdfs/DFSClient.java | 22 +++++++++- .../apache/hadoop/hdfs/DFSOutputStream.java | 6 ++- .../hadoop/hdfs/DistributedFileSystem.java | 44 ++++++++++++++----- .../hadoop/hdfs/protocol/ClientProtocol.java | 4 +- .../ClientNamenodeProtocolTranslatorPB.java | 6 ++- .../main/proto/ClientNamenodeProtocol.proto | 1 + .../router/RouterClientProtocol.java | 7 +-- .../federation/router/RouterRpcServer.java | 5 ++- .../federation/router/TestRouterRpc.java | 2 +- ...amenodeProtocolServerSideTranslatorPB.java | 2 +- .../server/namenode/FSDirWriteFileOp.java | 26 ++++++----- .../hdfs/server/namenode/FSNamesystem.java | 10 +++-- .../server/namenode/NameNodeRpcServer.java | 5 ++- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../hadoop/hdfs/TestDFSClientRetries.java | 2 +- .../hdfs/TestDistributedFileSystem.java | 41 ++++++++++++++++- .../hadoop/hdfs/TestEncryptionZones.java | 2 +- .../apache/hadoop/hdfs/TestFileCreation.java | 2 +- .../org/apache/hadoop/hdfs/TestLease.java | 2 +- .../namenode/NNThroughputBenchmark.java | 5 ++- .../server/namenode/TestAddBlockRetry.java | 4 +- ...BlockPlacementPolicyRackFaultTolerant.java | 4 +- .../TestDefaultBlockPlacementPolicy.java | 4 +- .../namenode/TestNamenodeRetryCache.java | 8 ++-- .../namenode/ha/TestRetryCacheWithHA.java | 2 +- 25 files changed, 159 insertions(+), 59 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index d96101bfb7a..56280f3a8bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1211,13 +1211,31 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, String ecPolicyName) throws IOException { + return create(src, permission, flag, createParent, replication, blockSize, + progress, buffersize, checksumOpt, favoredNodes, ecPolicyName, null); + } + + /** + * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, + * addition of Progressable, int, ChecksumOpt, InetSocketAddress[], String)} + * with the storagePolicy that is used to specify a specific storage policy + * instead of inheriting any policy from this new file's parent directory. + * This policy will be persisted in HDFS. A value of null means inheriting + * parent groups' whatever policy. + */ + public DFSOutputStream create(String src, FsPermission permission, + EnumSet flag, boolean createParent, short replication, + long blockSize, Progressable progress, int buffersize, + ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, + String ecPolicyName, String storagePolicy) + throws IOException { checkOpen(); final FsPermission masked = applyUMask(permission); LOG.debug("{}: masked={}", src, masked); final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, dfsClientConf.createChecksum(checksumOpt), - getFavoredNodesStr(favoredNodes), ecPolicyName); + getFavoredNodesStr(favoredNodes), ecPolicyName, storagePolicy); beginFileLease(result.getFileId(), result); return result; } @@ -1271,7 +1289,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); result = DFSOutputStream.newStreamForCreate(this, src, absPermission, flag, createParent, replication, blockSize, progress, checksum, - null, null); + null, null, null); } beginFileLease(result.getFileId(), result); return result; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index b8aae970302..aaef8ad909a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -260,7 +260,8 @@ public class DFSOutputStream extends FSOutputSummer static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, - DataChecksum checksum, String[] favoredNodes, String ecPolicyName) + DataChecksum checksum, String[] favoredNodes, String ecPolicyName, + String storagePolicy) throws IOException { try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src)) { @@ -275,7 +276,8 @@ public class DFSOutputStream extends FSOutputSummer try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<>(flag), createParent, replication, - blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); + blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName, + storagePolicy); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index ed37f1dcdfa..7956d8eda97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -563,13 +563,17 @@ public class DistributedFileSystem extends FileSystem * replication policy from its ancestor (the default). * ecPolicyName and SHOULD_REPLICATE CreateFlag are mutually exclusive. It's * invalid to set both SHOULD_REPLICATE and a non-null ecPolicyName. + * The third addition is storagePolicyName. A non-null storage Policy + * specifies an explicit storage policy for this file, overriding the + * inherited policy. * */ private HdfsDataOutputStream create(final Path f, final FsPermission permission, final EnumSet flag, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, - final InetSocketAddress[] favoredNodes, final String ecPolicyName) + final InetSocketAddress[] favoredNodes, final String ecPolicyName, + final String storagePolicy) throws IOException { statistics.incrementWriteOps(1); storageStatistics.incrementOpCounter(OpType.CREATE); @@ -579,7 +583,7 @@ public class DistributedFileSystem extends FileSystem public HdfsDataOutputStream doCall(final Path p) throws IOException { final DFSOutputStream out = dfs.create(getPathName(f), permission, flag, true, replication, blockSize, progress, bufferSize, - checksumOpt, favoredNodes, ecPolicyName); + checksumOpt, favoredNodes, ecPolicyName, storagePolicy); return dfs.createWrappedOutputStream(out, statistics); } @Override @@ -588,7 +592,8 @@ public class DistributedFileSystem extends FileSystem if (fs instanceof DistributedFileSystem) { DistributedFileSystem myDfs = (DistributedFileSystem)fs; return myDfs.create(p, permission, flag, bufferSize, replication, - blockSize, progress, checksumOpt, favoredNodes, ecPolicyName); + blockSize, progress, checksumOpt, favoredNodes, ecPolicyName, + storagePolicy); } throw new UnsupportedOperationException("Cannot create with" + " favoredNodes through a symlink to a non-DistributedFileSystem: " @@ -619,14 +624,15 @@ public class DistributedFileSystem extends FileSystem * * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable, * ChecksumOpt, InetSocketAddress[], String) for the descriptions of - * additional parameters, i.e., favoredNodes and ecPolicyName. + * additional parameters, i.e., favoredNodes, ecPolicyName and + * storagePolicyName. */ private HdfsDataOutputStream createNonRecursive(final Path f, final FsPermission permission, final EnumSet flag, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, - final InetSocketAddress[] favoredNodes, final String ecPolicyName) - throws IOException { + final InetSocketAddress[] favoredNodes, final String ecPolicyName, + final String storagePolicyName) throws IOException { statistics.incrementWriteOps(1); storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f); @@ -635,7 +641,7 @@ public class DistributedFileSystem extends FileSystem public HdfsDataOutputStream doCall(final Path p) throws IOException { final DFSOutputStream out = dfs.create(getPathName(f), permission, flag, false, replication, blockSize, progress, bufferSize, - checksumOpt, favoredNodes, ecPolicyName); + checksumOpt, favoredNodes, ecPolicyName, storagePolicyName); return dfs.createWrappedOutputStream(out, statistics); } @Override @@ -645,7 +651,7 @@ public class DistributedFileSystem extends FileSystem DistributedFileSystem myDfs = (DistributedFileSystem)fs; return myDfs.createNonRecursive(p, permission, flag, bufferSize, replication, blockSize, progress, checksumOpt, favoredNodes, - ecPolicyName); + ecPolicyName, storagePolicyName); } throw new UnsupportedOperationException("Cannot create with" + " favoredNodes through a symlink to a non-DistributedFileSystem: " @@ -3183,6 +3189,7 @@ public class DistributedFileSystem extends FileSystem private final DistributedFileSystem dfs; private InetSocketAddress[] favoredNodes = null; private String ecPolicyName = null; + private String storagePolicyName = null; /** * Construct a HdfsDataOutputStream builder for a file. @@ -3254,6 +3261,22 @@ public class DistributedFileSystem extends FileSystem return this; } + @VisibleForTesting + String getStoragePolicyName() { + return storagePolicyName; + } + + /** + * Enforce a file to follow the specified storage policy irrespective of the + * storage policy of its parent directory. + */ + public HdfsDataOutputStreamBuilder storagePolicyName( + @Nonnull final String policyName) { + Preconditions.checkNotNull(policyName); + storagePolicyName = policyName; + return this; + } + @VisibleForTesting String getEcPolicyName() { return ecPolicyName; @@ -3320,11 +3343,12 @@ public class DistributedFileSystem extends FileSystem return dfs.create(getPath(), getPermission(), getFlags(), getBufferSize(), getReplication(), getBlockSize(), getProgress(), getChecksumOpt(), getFavoredNodes(), - getEcPolicyName()); + getEcPolicyName(), getStoragePolicyName()); } else { return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), getBufferSize(), getReplication(), getBlockSize(), getProgress(), - getChecksumOpt(), getFavoredNodes(), getEcPolicyName()); + getChecksumOpt(), getFavoredNodes(), getEcPolicyName(), + getStoragePolicyName()); } } else if (getFlags().contains(CreateFlag.APPEND)) { return dfs.append(getPath(), getFlags(), getBufferSize(), getProgress(), diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 953e48a932c..da937075511 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -176,6 +176,7 @@ public interface ClientProtocol { * policy. ecPolicyName and SHOULD_REPLICATE CreateFlag * are mutually exclusive. It's invalid to set both * SHOULD_REPLICATE flag and a non-null ecPolicyName. + *@param storagePolicy the name of the storage policy. * * @return the status of the created file, it could be null if the server * doesn't support returning the file status @@ -209,7 +210,8 @@ public interface ClientProtocol { HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 65ebc2cc897..a23ae48de30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -345,7 +345,8 @@ public class ClientNamenodeProtocolTranslatorPB implements public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() .setSrc(src) @@ -358,6 +359,9 @@ public class ClientNamenodeProtocolTranslatorPB implements if (ecPolicyName != null) { builder.setEcPolicyName(ecPolicyName); } + if (storagePolicy != null) { + builder.setStoragePolicy(storagePolicy); + } FsPermission unmasked = masked.getUnmasked(); if (unmasked != null) { builder.setUnmasked(PBHelperClient.convert(unmasked)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 7343997be21..d08ad9b4f8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -83,6 +83,7 @@ message CreateRequestProto { repeated CryptoProtocolVersionProto cryptoProtocolVersion = 8; optional FsPermissionProto unmasked = 9; optional string ecPolicyName = 10; + optional string storagePolicy = 11; } message CreateResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 3c8465b7da3..344401f4f3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -192,7 +192,8 @@ public class RouterClientProtocol implements ClientProtocol { public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); @@ -213,9 +214,9 @@ public class RouterClientProtocol implements ClientProtocol { new Class[] {String.class, FsPermission.class, String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class, - String.class}, + String.class, String.class}, createLocation.getDest(), masked, clientName, flag, createParent, - replication, blockSize, supportedVersions, ecPolicyName); + replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 4e2bb82f076..36d3c81e169 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -500,10 +500,11 @@ public class RouterRpcServer extends AbstractService public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { return clientProto.create(src, masked, clientName, flag, createParent, - replication, blockSize, supportedVersions, ecPolicyName); + replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index a2a7b189a60..a32cba147df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -884,7 +884,7 @@ public class TestRouterRpc { HdfsFileStatus status = routerProtocol.create( newRouterFile, new FsPermission("777"), clientName, new EnumSetWritable(createFlag), true, (short) 1, - (long) 1024, CryptoProtocolVersion.supported(), null); + (long) 1024, CryptoProtocolVersion.supported(), null, null); // Add a block via router (requires client to have same lease) LocatedBlock block = routerProtocol.addBlock( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index e4a2f0b6553..6673baa58eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -478,7 +478,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements (short) req.getReplication(), req.getBlockSize(), PBHelperClient.convertCryptoProtocolVersions( req.getCryptoProtocolVersionList()), - req.getEcPolicyName()); + req.getEcPolicyName(), req.getStoragePolicy()); if (result != null) { return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result)) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 2875708b72d..9b0a64d75ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -361,7 +361,8 @@ class FSDirWriteFileOp { EnumSet flag, boolean createParent, short replication, long blockSize, FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, - boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry) + boolean shouldReplicate, String ecPolicyName, String storagePolicy, + boolean logRetryEntry) throws IOException { assert fsn.hasWriteLock(); boolean overwrite = flag.contains(CreateFlag.OVERWRITE); @@ -396,7 +397,7 @@ class FSDirWriteFileOp { if (parent != null) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder, clientMachine, shouldReplicate, - ecPolicyName); + ecPolicyName, storagePolicy); newNode = iip != null ? iip.getLastINode().asFile() : null; } if (newNode == null) { @@ -540,7 +541,7 @@ class FSDirWriteFileOp { FSDirectory fsd, INodesInPath existing, byte[] localName, PermissionStatus permissions, short replication, long preferredBlockSize, String clientName, String clientMachine, boolean shouldReplicate, - String ecPolicyName) throws IOException { + String ecPolicyName, String storagePolicy) throws IOException { Preconditions.checkNotNull(existing); long modTime = now(); @@ -549,6 +550,16 @@ class FSDirWriteFileOp { try { boolean isStriped = false; ErasureCodingPolicy ecPolicy = null; + byte storagepolicyid = 0; + if (storagePolicy != null && !storagePolicy.isEmpty()) { + BlockStoragePolicy policy = + fsd.getBlockManager().getStoragePolicy(storagePolicy); + if (policy == null) { + throw new HadoopIllegalArgumentException( + "Cannot find a block policy with the name " + storagePolicy); + } + storagepolicyid = policy.getId(); + } if (!shouldReplicate) { ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( fsd.getFSNamesystem(), ecPolicyName, existing); @@ -562,7 +573,7 @@ class FSDirWriteFileOp { final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize, - blockType); + storagepolicyid, blockType); newNode.setLocalName(localName); newNode.toUnderConstruction(clientName, clientMachine); newiip = fsd.addINode(existing, newNode, permissions.getPermission()); @@ -740,13 +751,6 @@ class FSDirWriteFileOp { storagePolicyId, blockType); } - private static INodeFile newINodeFile(long id, PermissionStatus permissions, - long mtime, long atime, Short replication, Byte ecPolicyID, - long preferredBlockSize, BlockType blockType) { - return newINodeFile(id, permissions, mtime, atime, replication, ecPolicyID, - preferredBlockSize, (byte)0, blockType); - } - /** * Persist the new block (the last block of the given file). */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ea7db4d04b4..8659ea4f7b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2441,13 +2441,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, String ecPolicyName, - boolean logRetryCache) throws IOException { + String storagePolicy, boolean logRetryCache) throws IOException { HdfsFileStatus status; try { status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize, supportedVersions, ecPolicyName, - logRetryCache); + storagePolicy, logRetryCache); } catch (AccessControlException e) { logAuditEvent(false, "create", src); throw e; @@ -2460,7 +2460,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, - String ecPolicyName, boolean logRetryCache) throws IOException { + String ecPolicyName, String storagePolicy, boolean logRetryCache) + throws IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); builder.append("DIR* NameSystem.startFile: src=").append(src) @@ -2549,7 +2550,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, try { stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder, clientMachine, flag, createParent, replication, blockSize, feInfo, - toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache); + toRemoveBlocks, shouldReplicate, ecPolicyName, storagePolicy, + logRetryCache); } catch (IOException e) { skipSync = e instanceof StandbyException; throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 0da8099f10a..f50648d9b66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -780,7 +780,8 @@ public class NameNodeRpcServer implements NamenodeProtocols { public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + CryptoProtocolVersion[] supportedVersions, String ecPolicyName, + String storagePolicy) throws IOException { checkNNStartup(); String clientMachine = getClientMachine(); @@ -804,7 +805,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { .getShortUserName(), null, masked); status = namesystem.startFile(src, perm, clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions, - ecPolicyName, cacheEntry != null); + ecPolicyName, storagePolicy, cacheEntry != null); } finally { RetryCache.setState(cacheEntry, status != null, status); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index e3cab7a38d6..8886eee74b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -2137,7 +2137,7 @@ public class DFSTestUtil { .create(file.toString(), new FsPermission((short)0755), dfs.getClient().getClientName(), new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)), - false, (short)1, 128*1024*1024L, null, null); + false, (short) 1, 128 * 1024 * 1024L, null, null, null); FSNamesystem ns = cluster.getNamesystem(); FSDirectory fsdir = ns.getFSDirectory(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 6dfd86c68b2..a27bfc1003f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -276,7 +276,7 @@ public class TestDFSClientRetries { .build()) .when(mockNN) .create(anyString(), any(), anyString(), any(), anyBoolean(), - anyShort(), anyLong(), any(), any()); + anyShort(), anyLong(), any(), any(), any()); final DFSClient client = new DFSClient(null, mockNN, conf, null); OutputStream os = client.create("testfile", true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 629abd7d9dd..60ff61406db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -1449,7 +1449,46 @@ public class TestDistributedFileSystem { cluster.shutdown(); } } - + + @Test + public void testCreateWithStoragePolicy() throws Throwable { + Configuration conf = new HdfsConfiguration(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .storageTypes( + new StorageType[] {StorageType.DISK, StorageType.ARCHIVE, + StorageType.SSD}).storagesPerDatanode(3).build()) { + DistributedFileSystem fs = cluster.getFileSystem(); + Path file1 = new Path("/tmp/file1"); + Path file2 = new Path("/tmp/file2"); + fs.mkdirs(new Path("/tmp")); + fs.setStoragePolicy(new Path("/tmp"), "ALL_SSD"); + FSDataOutputStream outputStream = fs.createFile(file1) + .storagePolicyName("COLD").build(); + outputStream.write(1); + outputStream.close(); + assertEquals(StorageType.ARCHIVE, DFSTestUtil.getAllBlocks(fs, file1) + .get(0).getStorageTypes()[0]); + assertEquals(fs.getStoragePolicy(file1).getName(), "COLD"); + + // Check with storage policy not specified. + outputStream = fs.createFile(file2).build(); + outputStream.write(1); + outputStream.close(); + assertEquals(StorageType.SSD, DFSTestUtil.getAllBlocks(fs, file2).get(0) + .getStorageTypes()[0]); + assertEquals(fs.getStoragePolicy(file2).getName(), "ALL_SSD"); + + // Check with default storage policy. + outputStream = fs.createFile(new Path("/default")).build(); + outputStream.write(1); + outputStream.close(); + assertEquals(StorageType.DISK, + DFSTestUtil.getAllBlocks(fs, new Path("/default")).get(0) + .getStorageTypes()[0]); + assertEquals(fs.getStoragePolicy(new Path("/default")).getName(), "HOT"); + } + } + @Test(timeout=60000) public void testListFiles() throws IOException { Configuration conf = new HdfsConfiguration(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java index 7ad25b7d0c7..d401380cebf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java @@ -963,7 +963,7 @@ public class TestEncryptionZones { .build()) .when(mcp) .create(anyString(), any(), anyString(), any(), anyBoolean(), - anyShort(), anyLong(), any(), any()); + anyShort(), anyLong(), any(), any(), any()); } // This test only uses mocks. Called from the end of an existing test to diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 89aa9ba0899..93687b680a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -1246,7 +1246,7 @@ public class TestFileCreation { try { nnrpc.create(pathStr, new FsPermission((short)0755), "client", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), - true, (short)1, 128*1024*1024L, null, null); + true, (short) 1, 128 * 1024 * 1024L, null, null, null); fail("Should have thrown exception when creating '" + pathStr + "'" + " by " + method); } catch (InvalidPathException ipe) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index 19a4bb6d595..909a072d340 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -372,7 +372,7 @@ public class TestLease { .build()) .when(mcp) .create(anyString(), any(), anyString(), - any(), anyBoolean(), anyShort(), anyLong(), any(), any()); + any(), anyBoolean(), anyShort(), anyLong(), any(), any(), any()); final Configuration conf = new Configuration(); final DFSClient c1 = createDFSClientAs(ugi[0], conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 654a8a5c8e1..e5d9826c3eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -593,7 +593,8 @@ public class NNThroughputBenchmark implements Tool { FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, - replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null); + replication, BLOCK_SIZE, CryptoProtocolVersion.supported(), null, + null); long end = Time.now(); for (boolean written = !closeUponCreate; !written; written = clientProto.complete(fileNames[daemonId][inputIdx], @@ -1143,7 +1144,7 @@ public class NNThroughputBenchmark implements Tool { String fileName = nameGenerator.getNextFileName("ThroughputBench"); clientProto.create(fileName, FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, - BLOCK_SIZE, CryptoProtocolVersion.supported(), null); + BLOCK_SIZE, CryptoProtocolVersion.supported(), null, null); ExtendedBlock lastBlock = addBlocks(fileName, clientName); clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index 13cd16f71a7..088a47e8936 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -86,7 +86,7 @@ public class TestAddBlockRetry { nn.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), - true, (short)3, 1024, null, null); + true, (short) 3, 1024, null, null, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); @@ -158,7 +158,7 @@ public class TestAddBlockRetry { // create file nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 3, 1024, null, null); + (short) 3, 1024, null, null, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java index 7cef64be225..f1e59f5c65b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java @@ -109,7 +109,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - replication, DEFAULT_BLOCK_SIZE, null, null, false); + replication, DEFAULT_BLOCK_SIZE, null, null, null, false); //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, @@ -139,7 +139,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - (short) 20, DEFAULT_BLOCK_SIZE, null, null, false); + (short) 20, DEFAULT_BLOCK_SIZE, null, null, null, false); //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index 205593f597e..c93339bd846 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -131,7 +131,7 @@ public class TestDefaultBlockPlacementPolicy { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, - DEFAULT_BLOCK_SIZE, null, null, false); + DEFAULT_BLOCK_SIZE, null, null, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, null, fileStatus.getFileId(), null, null); @@ -184,7 +184,7 @@ public class TestDefaultBlockPlacementPolicy { // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, null, false); + REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, null, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, null, fileStatus.getFileId(), null, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java index 0995f135d97..7f6f3990237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -232,19 +232,19 @@ public class TestNamenodeRetryCache { newCall(); HdfsFileStatus status = nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 1, BlockSize, null, null); + (short) 1, BlockSize, null, null, null); Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 1, BlockSize, null, null)); + (short) 1, BlockSize, null, null, null)); Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), true, - (short) 1, BlockSize, null, null)); + (short) 1, BlockSize, null, null, null)); // A non-retried call fails newCall(); try { nnRpc.create(src, perm, "holder", new EnumSetWritable(EnumSet.of(CreateFlag.CREATE)), - true, (short) 1, BlockSize, null, null); + true, (short) 1, BlockSize, null, null, null); Assert.fail("testCreate - expected exception is not thrown"); } catch (IOException e) { // expected diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index 3014778993a..eac3659710c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -414,7 +414,7 @@ public class TestRetryCacheWithHA { new EnumSetWritable(createFlag), false, DataNodes, BlockSize, new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES}, - null); + null, null); } @Override