diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 09aa3a12bf2..fec26f42090 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -700,6 +700,9 @@ Release 2.6.0 - UNRELEASED HDFS-7095. TestStorageMover often fails in Jenkins. (jing9) + HDFS-7081. Add new DistributedFileSystem API for getting all the existing + storage policies. (jing9) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/blockStoragePolicy-site.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/blockStoragePolicy-site.xml deleted file mode 100644 index 04142ad80d0..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/blockStoragePolicy-site.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - - - - - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml index 3a0b0ed88fa..50ec1460bd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml @@ -16,7 +16,6 @@ - - + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 6f943701837..03f5670a2b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -139,6 +139,7 @@ import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.AclException; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; @@ -1779,6 +1780,13 @@ public void setStoragePolicy(String src, String policyName) } } + /** + * @return All the existing storage policies + */ + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + return namenode.getStoragePolicySuite(); + } + /** * Rename file or directory. * @see ClientProtocol#rename(String, String) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 878a74181fb..ead82143b23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -444,14 +444,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final Class DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final int DFS_REPLICATION_MAX_DEFAULT = 512; - public static final String DFS_BLOCK_STORAGE_POLICIES_KEY - = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICIES_KEY; - public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX - = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX; - public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX - = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX; - public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX - = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final int DFS_DF_INTERVAL_DEFAULT = 60000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 79f2ef5dafb..3069289a902 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -504,6 +505,12 @@ public Void next(final FileSystem fs, final Path p) }.resolve(this, absF); } + /** Get all the existing storage policies */ + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + statistics.incrementReadOps(1); + return dfs.getStoragePolicySuite(); + } + /** * Move blocks from srcs to trg and delete srcs afterwards. * The file block sizes must be the same. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index fdc466a0658..6280d675d01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -325,4 +325,15 @@ public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) throws IOException { return dfs.getInotifyEventStream(lastReadTxid); } + + /** + * Set the source path to the specified storage policy. + * + * @param src The source path referring to either a directory or a file. + * @param policyName The name of the storage policy. + */ + public void setStoragePolicy(final Path src, final String policyName) + throws IOException { + dfs.setStoragePolicy(src, policyName); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java similarity index 50% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java index efbf8a00d6f..35bef516307 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hdfs; +package org.apache.hadoop.hdfs.protocol; import java.util.Arrays; import java.util.EnumSet; @@ -24,12 +24,10 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.XAttr; -import org.apache.hadoop.fs.XAttr.NameSpace; +import org.apache.hadoop.hdfs.StorageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A block storage policy describes how to select the storage types @@ -37,67 +35,8 @@ */ @InterfaceAudience.Private public class BlockStoragePolicy { - public static final Log LOG = LogFactory.getLog(BlockStoragePolicy.class); - - public static final String DFS_BLOCK_STORAGE_POLICIES_KEY - = "dfs.block.storage.policies"; - public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX - = "dfs.block.storage.policy."; - public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX - = "dfs.block.storage.policy.creation-fallback."; - public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX - = "dfs.block.storage.policy.replication-fallback."; - public static final String STORAGE_POLICY_XATTR_NAME = "bsp"; - /** set the namespace to TRUSTED so that only privilege users can access */ - public static final NameSpace XAttrNS = NameSpace.TRUSTED; - - public static final int ID_BIT_LENGTH = 4; - public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1; - public static final byte ID_UNSPECIFIED = 0; - - private static final Suite DEFAULT_SUITE = createDefaultSuite(); - - private static Suite createDefaultSuite() { - final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH]; - final StorageType[] storageTypes = {StorageType.DISK}; - final byte defaultPolicyId = 12; - policies[defaultPolicyId] = new BlockStoragePolicy(defaultPolicyId, "HOT", - storageTypes, StorageType.EMPTY_ARRAY, StorageType.EMPTY_ARRAY); - return new Suite(defaultPolicyId, policies); - } - - /** A block storage policy suite. */ - public static class Suite { - private final byte defaultPolicyID; - private final BlockStoragePolicy[] policies; - - private Suite(byte defaultPolicyID, BlockStoragePolicy[] policies) { - this.defaultPolicyID = defaultPolicyID; - this.policies = policies; - } - - /** @return the corresponding policy. */ - public BlockStoragePolicy getPolicy(byte id) { - // id == 0 means policy not specified. - return id == 0? getDefaultPolicy(): policies[id]; - } - - /** @return the default policy. */ - public BlockStoragePolicy getDefaultPolicy() { - return getPolicy(defaultPolicyID); - } - - public BlockStoragePolicy getPolicy(String policyName) { - if (policies != null) { - for (BlockStoragePolicy policy : policies) { - if (policy != null && policy.name.equals(policyName)) { - return policy; - } - } - } - return null; - } - } + public static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicy + .class); /** A 4-bit policy ID */ private final byte id; @@ -160,7 +99,7 @@ private List chooseStorageTypes(final short replication, /** * Choose the storage types for storing the remaining replicas, given the * replication number, the storage types of the chosen replicas and - * the unavailable storage types. It uses fallback storage in case that + * the unavailable storage types. It uses fallback storage in case that * the desired storage type is unavailable. * * @param replication the replication number. @@ -195,7 +134,7 @@ public List chooseStorageTypes(final short replication, // remove excess storage types after fallback replacement. diff(storageTypes, excess, null); if (storageTypes.size() < expectedSize) { - LOG.warn("Failed to place enough replicas: expected size is " + expectedSize + LOG.warn("Failed to place enough replicas: expected size is " + expectedSize + " but only " + storageTypes.size() + " storage types can be selected " + "(replication=" + replication + ", selected=" + storageTypes @@ -207,7 +146,8 @@ public List chooseStorageTypes(final short replication, } /** - * Compute the list difference t = t - c. + * Compute the difference between two lists t and c so that after the diff + * computation we have: t = t - c; * Further, if e is not null, set e = e + c - t; */ private static void diff(List t, Iterable c, @@ -242,7 +182,7 @@ public List chooseExcess(final short replication, public StorageType getCreationFallback(EnumSet unavailables) { return getFallback(unavailables, creationFallbacks); } - + /** @return the fallback {@link StorageType} for replication. */ public StorageType getReplicationFallback(EnumSet unavailables) { return getFallback(unavailables, replicationFallbacks); @@ -280,6 +220,18 @@ public String getName() { return name; } + public StorageType[] getStorageTypes() { + return this.storageTypes; + } + + public StorageType[] getCreationFallbacks() { + return this.creationFallbacks; + } + + public StorageType[] getReplicationFallbacks() { + return this.replicationFallbacks; + } + private static StorageType getFallback(EnumSet unavailables, StorageType[] fallbacks) { for(StorageType fb : fallbacks) { @@ -289,131 +241,4 @@ private static StorageType getFallback(EnumSet unavailables, } return null; } - - private static byte parseID(String idString, String element, Configuration conf) { - byte id = 0; - try { - id = Byte.parseByte(idString); - } catch(NumberFormatException nfe) { - throwIllegalArgumentException("Failed to parse policy ID \"" + idString - + "\" to a " + ID_BIT_LENGTH + "-bit integer", conf); - } - if (id < 0) { - throwIllegalArgumentException("Invalid policy ID: id = " + id - + " < 1 in \"" + element + "\"", conf); - } else if (id == 0) { - throw new IllegalArgumentException("Policy ID 0 is reserved: " + element); - } else if (id > ID_MAX) { - throwIllegalArgumentException("Invalid policy ID: id = " + id - + " > MAX = " + ID_MAX + " in \"" + element + "\"", conf); - } - return id; - } - - private static StorageType[] parseStorageTypes(String[] strings) { - if (strings == null || strings.length == 0) { - return StorageType.EMPTY_ARRAY; - } - final StorageType[] types = new StorageType[strings.length]; - for(int i = 0; i < types.length; i++) { - types[i] = StorageType.valueOf(strings[i].trim().toUpperCase()); - } - return types; - } - - private static StorageType[] readStorageTypes(byte id, String keyPrefix, - Configuration conf) { - final String key = keyPrefix + id; - final String[] values = conf.getStrings(key); - try { - return parseStorageTypes(values); - } catch(Exception e) { - throw new IllegalArgumentException("Failed to parse " + key - + " \"" + conf.get(key), e); - } - } - - private static BlockStoragePolicy readBlockStoragePolicy(byte id, String name, - Configuration conf) { - final StorageType[] storageTypes = readStorageTypes(id, - DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX, conf); - if (storageTypes.length == 0) { - throw new IllegalArgumentException( - DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX + id + " is missing or is empty."); - } - final StorageType[] creationFallbacks = readStorageTypes(id, - DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX, conf); - final StorageType[] replicationFallbacks = readStorageTypes(id, - DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX, conf); - return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks, - replicationFallbacks); - } - - /** Read {@link Suite} from conf. */ - public static Suite readBlockStorageSuite(Configuration conf) { - final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH]; - final String[] values = conf.getStrings(DFS_BLOCK_STORAGE_POLICIES_KEY); - if (values == null) { - // conf property is missing, use default suite. - return DEFAULT_SUITE; - } - byte firstID = -1; - for(String v : values) { - v = v.trim(); - final int i = v.indexOf(':'); - if (i < 0) { - throwIllegalArgumentException("Failed to parse element \"" + v - + "\" (expected format is NAME:ID)", conf); - } else if (i == 0) { - throwIllegalArgumentException("Policy name is missing in \"" + v + "\"", conf); - } else if (i == v.length() - 1) { - throwIllegalArgumentException("Policy ID is missing in \"" + v + "\"", conf); - } - final String name = v.substring(0, i).trim(); - for(int j = 1; j < policies.length; j++) { - if (policies[j] != null && policies[j].name.equals(name)) { - throwIllegalArgumentException("Policy name duplication: \"" - + name + "\" appears more than once", conf); - } - } - - final byte id = parseID(v.substring(i + 1).trim(), v, conf); - if (policies[id] != null) { - throwIllegalArgumentException("Policy duplication: ID " + id - + " appears more than once", conf); - } - policies[id] = readBlockStoragePolicy(id, name, conf); - String prefix = ""; - if (firstID == -1) { - firstID = id; - prefix = "(default) "; - } - LOG.info(prefix + policies[id]); - } - if (firstID == -1) { - throwIllegalArgumentException("Empty list is not allowed", conf); - } - return new Suite(firstID, policies); - } - - public static String buildXAttrName() { - return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME; - } - - public static XAttr buildXAttr(byte policyId) { - final String name = buildXAttrName(); - return XAttrHelper.buildXAttr(name, new byte[] { policyId }); - } - - public static boolean isStoragePolicyXAttr(XAttr xattr) { - return xattr != null && xattr.getNameSpace() == BlockStoragePolicy.XAttrNS - && xattr.getName().equals(BlockStoragePolicy.STORAGE_POLICY_XATTR_NAME); - } - - private static void throwIllegalArgumentException(String message, - Configuration conf) { - throw new IllegalArgumentException(message + " in " - + DFS_BLOCK_STORAGE_POLICIES_KEY + " \"" - + conf.get(DFS_BLOCK_STORAGE_POLICIES_KEY) + "\"."); - } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 4be83f2dd25..7e16febdd35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -259,6 +259,13 @@ public boolean setReplication(String src, short replication) FileNotFoundException, SafeModeException, UnresolvedLinkException, SnapshotAccessControlException, IOException; + /** + * Get all the available block storage policies. + * @return All the in-use block storage policies currently. + */ + @Idempotent + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException; + /** * Set the storage policy for a file/directory * @param src Path of an existing file/directory. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java index 13acc7a76b6..31feb1e53e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java @@ -24,8 +24,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; /** * Metadata about a snapshottable directory @@ -62,7 +62,7 @@ public SnapshottableDirectoryStatus(long modification_time, long access_time, int snapshotNumber, int snapshotQuota, byte[] parentFullPath) { this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time, access_time, permission, owner, group, null, localName, inodeId, - childrenNum, null, BlockStoragePolicy.ID_UNSPECIFIED); + childrenNum, null, BlockStoragePolicySuite.ID_UNSPECIFIED); this.snapshotNumber = snapshotNumber; this.snapshotQuota = snapshotQuota; this.parentFullPath = parentFullPath; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 9d0d13cff34..26a97629c99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -119,6 +120,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; @@ -1429,6 +1432,26 @@ public SetStoragePolicyResponseProto setStoragePolicy( return VOID_SET_STORAGE_POLICY_RESPONSE; } + @Override + public GetStoragePolicySuiteResponseProto getStoragePolicySuite( + RpcController controller, GetStoragePolicySuiteRequestProto request) + throws ServiceException { + try { + BlockStoragePolicy[] policies = server.getStoragePolicySuite(); + GetStoragePolicySuiteResponseProto.Builder builder = + GetStoragePolicySuiteResponseProto.newBuilder(); + if (policies == null) { + return builder.build(); + } + for (BlockStoragePolicy policy : policies) { + builder.addPolicies(PBHelper.convert(policy)); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid(RpcController controller, GetCurrentEditLogTxidRequestProto req) throws ServiceException { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 1279f7c24e4..22238b43bab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -64,9 +65,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; -import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto; @@ -119,6 +118,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; @@ -159,13 +160,13 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; @@ -225,6 +226,10 @@ public class ClientNamenodeProtocolTranslatorPB implements VOID_GET_DATA_ENCRYPTIONKEY_REQUEST = GetDataEncryptionKeyRequestProto.newBuilder().build(); + private final static GetStoragePolicySuiteRequestProto + VOID_GET_STORAGE_POLICY_SUITE_REQUEST = + GetStoragePolicySuiteRequestProto.newBuilder().build(); + public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) { rpcProxy = proxy; } @@ -1440,8 +1445,7 @@ public void checkAccess(String path, FsAction mode) throws IOException { @Override public void setStoragePolicy(String src, String policyName) - throws SnapshotAccessControlException, UnresolvedLinkException, - FileNotFoundException, QuotaExceededException, IOException { + throws IOException { SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto .newBuilder().setSrc(src).setPolicyName(policyName).build(); try { @@ -1451,6 +1455,17 @@ public void setStoragePolicy(String src, String policyName) } } + @Override + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + try { + GetStoragePolicySuiteResponseProto response = rpcProxy + .getStoragePolicySuite(null, VOID_GET_STORAGE_POLICY_SUITE_REQUEST); + return PBHelper.convertStoragePolicies(response.getPoliciesList()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + public long getCurrentEditLogTxid() throws IOException { GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto .getDefaultInstance(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 81bb2318e10..140b47a937c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -44,7 +44,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.inotify.Event; @@ -120,6 +120,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; @@ -174,6 +175,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -281,6 +283,65 @@ public static NamenodeRoleProto convert(NamenodeRole role) { return null; } + public static BlockStoragePolicy[] convertStoragePolicies( + List policyProtos) { + if (policyProtos == null || policyProtos.size() == 0) { + return new BlockStoragePolicy[0]; + } + BlockStoragePolicy[] policies = new BlockStoragePolicy[policyProtos.size()]; + int i = 0; + for (BlockStoragePolicyProto proto : policyProtos) { + policies[i++] = convert(proto); + } + return policies; + } + + public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) { + List cList = proto.getCreationPolicy() + .getStorageTypesList(); + StorageType[] creationTypes = convertStorageTypes(cList, cList.size()); + List cfList = proto.hasCreationFallbackPolicy() ? proto + .getCreationFallbackPolicy().getStorageTypesList() : null; + StorageType[] creationFallbackTypes = cfList == null ? StorageType + .EMPTY_ARRAY : convertStorageTypes(cfList, cfList.size()); + List rfList = proto.hasReplicationFallbackPolicy() ? + proto.getReplicationFallbackPolicy().getStorageTypesList() : null; + StorageType[] replicationFallbackTypes = rfList == null ? StorageType + .EMPTY_ARRAY : convertStorageTypes(rfList, rfList.size()); + return new BlockStoragePolicy((byte) proto.getPolicyId(), proto.getName(), + creationTypes, creationFallbackTypes, replicationFallbackTypes); + } + + public static BlockStoragePolicyProto convert(BlockStoragePolicy policy) { + BlockStoragePolicyProto.Builder builder = BlockStoragePolicyProto + .newBuilder().setPolicyId(policy.getId()).setName(policy.getName()); + // creation storage types + StorageTypesProto creationProto = convert(policy.getStorageTypes()); + Preconditions.checkArgument(creationProto != null); + builder.setCreationPolicy(creationProto); + // creation fallback + StorageTypesProto creationFallbackProto = convert( + policy.getCreationFallbacks()); + if (creationFallbackProto != null) { + builder.setCreationFallbackPolicy(creationFallbackProto); + } + // replication fallback + StorageTypesProto replicationFallbackProto = convert( + policy.getReplicationFallbacks()); + if (replicationFallbackProto != null) { + builder.setReplicationFallbackPolicy(replicationFallbackProto); + } + return builder.build(); + } + + public static StorageTypesProto convert(StorageType[] types) { + if (types == null || types.length == 0) { + return null; + } + List list = convertStorageTypes(types); + return StorageTypesProto.newBuilder().addAllStorageTypes(list).build(); + } + public static StorageInfoProto convert(StorageInfo info) { return StorageInfoProto.newBuilder().setClusterID(info.getClusterID()) .setCTime(info.getCTime()).setLayoutVersion(info.getLayoutVersion()) @@ -1350,7 +1411,7 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) { fs.hasChildrenNum() ? fs.getChildrenNum() : -1, fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null, fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy() - : BlockStoragePolicy.ID_UNSPECIFIED); + : BlockStoragePolicySuite.ID_UNSPECIFIED); } public static SnapshottableDirectoryStatus convert( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7c18444d44d..80a1883970a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -42,7 +42,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -255,7 +255,7 @@ public int getPendingDataNodeMessageCount() { /** for block replicas placement */ private BlockPlacementPolicy blockplacement; - private final BlockStoragePolicy.Suite storagePolicySuite; + private final BlockStoragePolicySuite storagePolicySuite; /** Check whether name system is running before terminating */ private boolean checkNSRunning = true; @@ -278,7 +278,7 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats, blockplacement = BlockPlacementPolicy.getInstance( conf, stats, datanodeManager.getNetworkTopology(), datanodeManager.getHost2DatanodeMap()); - storagePolicySuite = BlockStoragePolicy.readBlockStorageSuite(conf); + storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite(); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); @@ -402,6 +402,10 @@ public BlockStoragePolicy getStoragePolicy(final String policyName) { return storagePolicySuite.getPolicy(policyName); } + public BlockStoragePolicy[] getStoragePolicySuite() { + return storagePolicySuite.getAllPolicies(); + } + public void setBlockPoolId(String blockPoolId) { if (isBlockTokenEnabled()) { blockTokenSecretManager.setBlockPoolId(blockPoolId); @@ -3602,7 +3606,7 @@ public ReplicationWork(Block block, } private void chooseTargets(BlockPlacementPolicy blockplacement, - BlockStoragePolicy.Suite storagePolicySuite, + BlockStoragePolicySuite storagePolicySuite, Set excludedNodes) { targets = blockplacement.chooseTarget(bc.getName(), additionalReplRequired, srcNode, liveReplicaStorages, false, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index af58127a6e0..f744eff73f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index a0e67013649..19c30756a90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -23,7 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java new file mode 100644 index 00000000000..1d162a0b3c6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.XAttrHelper; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** A collection of block storage policies. */ +public class BlockStoragePolicySuite { + static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicySuite + .class); + + public static final String STORAGE_POLICY_XATTR_NAME + = "hsm.block.storage.policy.id"; + public static final XAttr.NameSpace XAttrNS = XAttr.NameSpace.SYSTEM; + + public static final int ID_BIT_LENGTH = 4; + public static final byte ID_UNSPECIFIED = 0; + + @VisibleForTesting + public static BlockStoragePolicySuite createDefaultSuite() { + final BlockStoragePolicy[] policies = + new BlockStoragePolicy[1 << ID_BIT_LENGTH]; + final byte hotId = 12; + policies[hotId] = new BlockStoragePolicy(hotId, "HOT", + new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY, + new StorageType[]{StorageType.ARCHIVE}); + final byte warmId = 8; + policies[warmId] = new BlockStoragePolicy(warmId, "WARM", + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}); + final byte coldId = 4; + policies[coldId] = new BlockStoragePolicy(coldId, "COLD", + new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY, + StorageType.EMPTY_ARRAY); + return new BlockStoragePolicySuite(hotId, policies); + } + + private final byte defaultPolicyID; + private final BlockStoragePolicy[] policies; + + public BlockStoragePolicySuite(byte defaultPolicyID, + BlockStoragePolicy[] policies) { + this.defaultPolicyID = defaultPolicyID; + this.policies = policies; + } + + /** @return the corresponding policy. */ + public BlockStoragePolicy getPolicy(byte id) { + // id == 0 means policy not specified. + return id == 0? getDefaultPolicy(): policies[id]; + } + + /** @return the default policy. */ + public BlockStoragePolicy getDefaultPolicy() { + return getPolicy(defaultPolicyID); + } + + public BlockStoragePolicy getPolicy(String policyName) { + if (policies != null) { + for (BlockStoragePolicy policy : policies) { + if (policy != null && policy.getName().equals(policyName)) { + return policy; + } + } + } + return null; + } + + public BlockStoragePolicy[] getAllPolicies() { + List list = Lists.newArrayList(); + if (policies != null) { + for (BlockStoragePolicy policy : policies) { + if (policy != null) { + list.add(policy); + } + } + } + return list.toArray(new BlockStoragePolicy[list.size()]); + } + + public static String buildXAttrName() { + return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME; + } + + public static XAttr buildXAttr(byte policyId) { + final String name = buildXAttrName(); + return XAttrHelper.buildXAttr(name, new byte[]{policyId}); + } + + public static boolean isStoragePolicyXAttr(XAttr xattr) { + return xattr != null && xattr.getNameSpace() == XAttrNS + && xattr.getName().equals(STORAGE_POLICY_XATTR_NAME); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 858db1d6846..c2221818fc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.Matcher; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -102,7 +103,7 @@ private List getTargetStorages(StorageType t) { private final StorageMap storages; private final List targetPaths; - private final BlockStoragePolicy.Suite blockStoragePolicies; + private final BlockStoragePolicy[] blockStoragePolicies; Mover(NameNodeConnector nnc, Configuration conf) { final long movedWinWidth = conf.getLong( @@ -119,11 +120,13 @@ private List getTargetStorages(StorageType t) { Collections. emptySet(), movedWinWidth, moverThreads, 0, maxConcurrentMovesPerNode, conf); this.storages = new StorageMap(); - this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf); this.targetPaths = nnc.getTargetPaths(); + this.blockStoragePolicies = new BlockStoragePolicy[1 << + BlockStoragePolicySuite.ID_BIT_LENGTH]; } void init() throws IOException { + initStoragePolicies(); final List reports = dispatcher.init(); for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); @@ -137,6 +140,14 @@ void init() throws IOException { } } + private void initStoragePolicies() throws IOException { + BlockStoragePolicy[] policies = dispatcher.getDistributedFileSystem() + .getStoragePolicySuite(); + for (BlockStoragePolicy policy : policies) { + this.blockStoragePolicies[policy.getId()] = policy; + } + } + private ExitStatus run() { try { init(); @@ -305,7 +316,7 @@ private boolean processDirRecursively(String parent, if (!isSnapshotPathInCurrent(fullPath)) { // the full path is a snapshot path but it is also included in the // current directory tree, thus ignore it. - hasRemaining = processFile((HdfsLocatedFileStatus)status); + hasRemaining = processFile(fullPath, (HdfsLocatedFileStatus)status); } } catch (IOException e) { LOG.warn("Failed to check the status of " + parent @@ -317,9 +328,17 @@ private boolean processDirRecursively(String parent, } /** @return true if it is necessary to run another round of migration */ - private boolean processFile(HdfsLocatedFileStatus status) { - final BlockStoragePolicy policy = blockStoragePolicies.getPolicy( - status.getStoragePolicy()); + private boolean processFile(String fullPath, HdfsLocatedFileStatus status) { + final byte policyId = status.getStoragePolicy(); + // currently we ignore files with unspecified storage policy + if (policyId == BlockStoragePolicySuite.ID_UNSPECIFIED) { + return false; + } + final BlockStoragePolicy policy = blockStoragePolicies[policyId]; + if (policy == null) { + LOG.warn("Failed to get the storage policy of file " + fullPath); + return false; + } final List types = policy.chooseStorageTypes( status.getReplication()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index cc5e9958434..4f35846c381 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -53,7 +53,6 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -79,6 +78,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -1037,7 +1037,7 @@ void unprotectedSetStoragePolicy(String src, byte policyId) private void setDirStoragePolicy(INodeDirectory inode, byte policyId, int latestSnapshotId) throws IOException { List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); - XAttr xAttr = BlockStoragePolicy.buildXAttr(policyId); + XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId); List newXAttrs = setINodeXAttrs(existingXAttrs, Arrays.asList(xAttr), EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE)); XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId); @@ -1375,7 +1375,7 @@ private static void checkSnapshot(INode target, } private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) { - return inodePolicy != BlockStoragePolicy.ID_UNSPECIFIED ? inodePolicy : + return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy : parentPolicy; } @@ -1409,7 +1409,7 @@ DirectoryListing getListing(String src, byte[] startAfter, if (targetNode == null) return null; byte parentStoragePolicy = isSuperUser ? - targetNode.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED; + targetNode.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED; if (!targetNode.isDirectory()) { return new DirectoryListing( @@ -1429,7 +1429,8 @@ DirectoryListing getListing(String src, byte[] startAfter, for (int i=0; i0; i++) { INode cur = contents.get(startChild+i); byte curPolicy = isSuperUser && !cur.isSymlink()? - cur.getLocalStoragePolicyID(): BlockStoragePolicy.ID_UNSPECIFIED; + cur.getLocalStoragePolicyID(): + BlockStoragePolicySuite.ID_UNSPECIFIED; listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation, getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot, isRawPath, inodesInPath); @@ -1483,7 +1484,7 @@ private DirectoryListing getSnapshotsListing(String src, byte[] startAfter) for (int i = 0; i < numOfListing; i++) { Root sRoot = snapshots.get(i + skipSize).getRoot(); listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot, - BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, + BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, false, null); } return new DirectoryListing( @@ -1511,7 +1512,7 @@ HdfsFileStatus getFileInfo(String src, boolean resolveLink, final INode[] inodes = inodesInPath.getINodes(); final INode i = inodes[inodes.length - 1]; byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ? - i.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED; + i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED; return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i, policyId, inodesInPath.getPathSnapshotId(), isRawPath, inodesInPath); } finally { @@ -1530,7 +1531,8 @@ private HdfsFileStatus getFileInfo4DotSnapshot(String src) throws UnresolvedLinkException { if (getINode4DotSnapshot(src) != null) { return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null, - HdfsFileStatus.EMPTY_NAME, -1L, 0, null, BlockStoragePolicy.ID_UNSPECIFIED); + HdfsFileStatus.EMPTY_NAME, -1L, 0, null, + BlockStoragePolicySuite.ID_UNSPECIFIED); } return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 83761bd401b..7b240afa880 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -34,7 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.XAttrSetFlag; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -374,7 +374,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, if (toAddRetryCache) { HdfsFileStatus stat = fsNamesys.dir.createFileStatus( HdfsFileStatus.EMPTY_NAME, newFile, - BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, + BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, false, iip); fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId, addCloseOp.rpcCallId, stat); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b7ed16b773d..955ff083d9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -17,8 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension - .EncryptedKeyVersion; +import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; @@ -166,7 +165,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.ServiceFailedException; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -2305,6 +2304,21 @@ private void setStoragePolicyInt(String src, final String policyName) logAuditEvent(true, "setStoragePolicy", src, null, fileStat); } + /** + * @return All the existing block storage policies + */ + BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + checkOperation(OperationCategory.READ); + waitForLoadingFSImage(); + readLock(); + try { + checkOperation(OperationCategory.READ); + return blockManager.getStoragePolicySuite(); + } finally { + readUnlock(); + } + } + long getPreferredBlockSize(String filename) throws IOException, UnresolvedLinkException { FSPermissionChecker pc = getPermissionChecker(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index 307f507d508..44549303f92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; @@ -695,7 +695,7 @@ public final INode setAccessTime(long accessTime, int latestSnapshotId) /** * @return the storage policy directly specified on the INode. Return - * {@link BlockStoragePolicy#ID_UNSPECIFIED} if no policy has + * {@link BlockStoragePolicySuite#ID_UNSPECIFIED} if no policy has * been specified. */ public abstract byte getLocalStoragePolicyID(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java index f5579ee8901..a75323017fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java @@ -28,10 +28,11 @@ import org.apache.hadoop.fs.PathIsNotDirectoryException; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature; @@ -112,22 +113,22 @@ public byte getLocalStoragePolicyID() { ImmutableList xattrs = f == null ? ImmutableList. of() : f .getXAttrs(); for (XAttr xattr : xattrs) { - if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) { + if (BlockStoragePolicySuite.isStoragePolicyXAttr(xattr)) { return (xattr.getValue())[0]; } } - return BlockStoragePolicy.ID_UNSPECIFIED; + return BlockStoragePolicySuite.ID_UNSPECIFIED; } @Override public byte getStoragePolicyID() { byte id = getLocalStoragePolicyID(); - if (id != BlockStoragePolicy.ID_UNSPECIFIED) { + if (id != BlockStoragePolicySuite.ID_UNSPECIFIED) { return id; } // if it is unspecified, check its parent return getParent() != null ? getParent().getStoragePolicyID() : - BlockStoragePolicy.ID_UNSPECIFIED; + BlockStoragePolicySuite.ID_UNSPECIFIED; } void setQuota(long nsQuota, long dsQuota) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java index f0f58a92668..26a6678cdcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java @@ -18,12 +18,9 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; /** * The attributes of an inode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 7af2b713eac..583c193af8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -28,12 +28,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff; @@ -79,7 +79,8 @@ public static INodeFile valueOf(INode inode, String path, boolean acceptNull) static enum HeaderFormat { PREFERRED_BLOCK_SIZE(null, 48, 1), REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1), - STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicy.ID_BIT_LENGTH, 0); + STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH, + 0); private final LongBitFormat BITS; @@ -374,7 +375,7 @@ public byte getLocalStoragePolicyID() { @Override public byte getStoragePolicyID() { byte id = getLocalStoragePolicyID(); - if (id == BlockStoragePolicy.ID_UNSPECIFIED) { + if (id == BlockStoragePolicySuite.ID_UNSPECIFIED) { return this.getParent() != null ? this.getParent().getStoragePolicyID() : id; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java index 87e47151fd8..5009c58dc10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java @@ -22,8 +22,9 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.namenode.Quota.Counts; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; @@ -125,12 +126,12 @@ public Counts cleanSubtree(int snapshotId, int priorSnapshotId, @Override public byte getStoragePolicyID(){ - return BlockStoragePolicy.ID_UNSPECIFIED; + return BlockStoragePolicySuite.ID_UNSPECIFIED; } @Override public byte getLocalStoragePolicyID() { - return BlockStoragePolicy.ID_UNSPECIFIED; + return BlockStoragePolicySuite.ID_UNSPECIFIED; } }; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 006c803d14e..bb2fb8351cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -587,6 +588,11 @@ public void setStoragePolicy(String src, String policyName) namesystem.setStoragePolicy(src, policyName); } + @Override + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + return namesystem.getStoragePolicySuite(); + } + @Override // ClientProtocol public void setPermission(String src, FsPermission permissions) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 6164ee9f40a..17e6cc949a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -49,7 +49,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.shell.Command; import org.apache.hadoop.fs.shell.CommandFormat; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.ipc.GenericRefreshProtocol; @@ -615,15 +616,18 @@ public int getStoragePolicy(String[] argv) throws IOException { + argv[1]); } byte storagePolicyId = status.getStoragePolicy(); - BlockStoragePolicy.Suite suite = BlockStoragePolicy - .readBlockStorageSuite(getConf()); - BlockStoragePolicy policy = suite.getPolicy(storagePolicyId); - if (policy != null) { - System.out.println("The storage policy of " + argv[1] + ":\n" + policy); + if (storagePolicyId == BlockStoragePolicySuite.ID_UNSPECIFIED) { + System.out.println("The storage policy of " + argv[1] + " is unspecified"); return 0; - } else { - throw new IOException("Cannot identify the storage policy for " + argv[1]); } + BlockStoragePolicy[] policies = dfs.getStoragePolicySuite(); + for (BlockStoragePolicy p : policies) { + if (p.getId() == storagePolicyId) { + System.out.println("The storage policy of " + argv[1] + ":\n" + p); + return 0; + } + } + throw new IOException("Cannot identify the storage policy for " + argv[1]); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java index 29e8f8d60d7..b9186fa4e4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java @@ -21,13 +21,14 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.token.Token; @@ -264,7 +265,7 @@ public static HdfsFileStatus toFileStatus(final Map json, boolean includes : childrenNumLong.intValue(); final byte storagePolicy = m.containsKey("storagePolicy") ? (byte) (long) (Long) m.get("storagePolicy") : - BlockStoragePolicy.ID_UNSPECIFIED; + BlockStoragePolicySuite.ID_UNSPECIFIED; return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication, blockSize, mTime, aTime, permission, owner, group, symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum, null, storagePolicy); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 082e5bd8145..ce7bf1cde8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -108,6 +108,13 @@ message SetStoragePolicyRequestProto { message SetStoragePolicyResponseProto { // void response } +message GetStoragePolicySuiteRequestProto { // void request +} + +message GetStoragePolicySuiteResponseProto { + repeated BlockStoragePolicyProto policies = 1; +} + message SetPermissionRequestProto { required string src = 1; required FsPermissionProto permission = 2; @@ -699,6 +706,8 @@ service ClientNamenodeProtocol { returns(SetReplicationResponseProto); rpc setStoragePolicy(SetStoragePolicyRequestProto) returns(SetStoragePolicyResponseProto); + rpc getStoragePolicySuite(GetStoragePolicySuiteRequestProto) + returns(GetStoragePolicySuiteResponseProto); rpc setPermission(SetPermissionRequestProto) returns(SetPermissionResponseProto); rpc setOwner(SetOwnerRequestProto) returns(SetOwnerResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index d1ba68f6dca..c57e308bf2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -168,6 +168,20 @@ message StorageTypesProto { repeated StorageTypeProto storageTypes = 1; } +/** + * Block replica storage policy. + */ +message BlockStoragePolicyProto { + required uint32 policyId = 1; + required string name = 2; + // a list of storage types for storing the block replicas when creating a + // block. + required StorageTypesProto creationPolicy = 3; + // A list of storage types for creation fallback storage. + optional StorageTypesProto creationFallbackPolicy = 4; + optional StorageTypesProto replicationFallbackPolicy = 5; +} + /** * A list of storage IDs. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 9123d419f64..0ecaaa7b25e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -22,8 +22,7 @@ - - + hadoop.hdfs.configuration.version diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index 158c22547cb..38ffceef969 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED; +import static org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite.ID_UNSPECIFIED; import java.io.File; import java.io.FileNotFoundException; @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -44,7 +45,7 @@ /** Test {@link BlockStoragePolicy} */ public class TestBlockStoragePolicy { - public static final BlockStoragePolicy.Suite POLICY_SUITE; + public static final BlockStoragePolicySuite POLICY_SUITE; public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY; public static final Configuration conf; @@ -52,7 +53,7 @@ public class TestBlockStoragePolicy { conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); - POLICY_SUITE = BlockStoragePolicy.readBlockStorageSuite(conf); + POLICY_SUITE = BlockStoragePolicySuite.createDefaultSuite(); DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy(); } @@ -948,7 +949,7 @@ private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum, Assert.assertTrue(typeList.isEmpty()); } - private void testIncreaseFileRep(String policyName, byte policyId, + private void testChangeFileRep(String policyName, byte policyId, StorageType[] before, StorageType[] after) throws Exception { final int numDataNodes = 5; @@ -965,8 +966,6 @@ private void testIncreaseFileRep(String policyName, byte policyId, final Path foo = new Path(dir, "foo"); DFSTestUtil.createFile(fs, foo, FILE_LEN, REPLICATION, 0L); - // the storage policy of foo should be WARM, and the replicas - // should be stored in DISK and ARCHIE HdfsFileStatus[] status = fs.getClient().listPaths(foo.toString(), HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); checkDirectoryListing(status, policyId); @@ -984,7 +983,24 @@ private void testIncreaseFileRep(String policyName, byte policyId, HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); checkDirectoryListing(status, policyId); fooStatus = (HdfsLocatedFileStatus) status[0]; - checkLocatedBlocks(fooStatus, 1, 5, after); + checkLocatedBlocks(fooStatus, 1, numDataNodes, after); + + // change the replication factor back to 3 + fs.setReplication(foo, REPLICATION); + Thread.sleep(1000); + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.triggerHeartbeat(dn); + } + Thread.sleep(1000); + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.triggerBlockReport(dn); + } + Thread.sleep(1000); + status = fs.getClient().listPaths(foo.toString(), + HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); + checkDirectoryListing(status, policyId); + fooStatus = (HdfsLocatedFileStatus) status[0]; + checkLocatedBlocks(fooStatus, 1, REPLICATION, before); } finally { cluster.shutdown(); } @@ -995,11 +1011,12 @@ private void testIncreaseFileRep(String policyName, byte policyId, * that file from 3 to 5. Make sure all replications are created in DISKS. */ @Test - public void testIncreaseHotFileRep() throws Exception { - testIncreaseFileRep("HOT", HOT, new StorageType[]{StorageType.DISK, - StorageType.DISK, StorageType.DISK}, + public void testChangeHotFileRep() throws Exception { + testChangeFileRep("HOT", HOT, new StorageType[]{StorageType.DISK, StorageType.DISK, - StorageType.DISK, StorageType.DISK, StorageType.DISK}); + StorageType.DISK}, + new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK, + StorageType.DISK, StorageType.DISK}); } /** @@ -1008,9 +1025,10 @@ public void testIncreaseHotFileRep() throws Exception { * and ARCHIVE. */ @Test - public void testIncreaseWarmRep() throws Exception { - testIncreaseFileRep("WARM", WARM, new StorageType[]{StorageType.DISK, - StorageType.ARCHIVE, StorageType.ARCHIVE}, + public void testChangeWarmRep() throws Exception { + testChangeFileRep("WARM", WARM, + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE, + StorageType.ARCHIVE}, new StorageType[]{StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE}); } @@ -1020,9 +1038,10 @@ public void testIncreaseWarmRep() throws Exception { * that file from 3 to 5. Make sure all replicas are created in ARCHIVE. */ @Test - public void testIncreaseColdRep() throws Exception { - testIncreaseFileRep("COLD", COLD, new StorageType[]{StorageType.ARCHIVE, - StorageType.ARCHIVE, StorageType.ARCHIVE}, + public void testChangeColdRep() throws Exception { + testChangeFileRep("COLD", COLD, + new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE, + StorageType.ARCHIVE}, new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE}); } @@ -1072,4 +1091,28 @@ public void testChooseTargetWithTopology() throws Exception { System.out.println(Arrays.asList(targets)); Assert.assertEquals(3, targets.length); } + + /** + * Test getting all the storage policies from the namenode + */ + @Test + public void testGetAllStoragePolicies() throws Exception { + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0).build(); + cluster.waitActive(); + final DistributedFileSystem fs = cluster.getFileSystem(); + try { + BlockStoragePolicy[] policies = fs.getStoragePolicySuite(); + Assert.assertEquals(3, policies.length); + Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(), + policies[0].toString()); + Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(), + policies[1].toString()); + Assert.assertEquals(POLICY_SUITE.getPolicy(HOT).toString(), + policies[2].toString()); + } finally { + IOUtils.cleanup(null, fs); + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStoragePolicyCommands.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStoragePolicyCommands.java index d6ead093764..d80356a8803 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStoragePolicyCommands.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStoragePolicyCommands.java @@ -21,6 +21,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -60,6 +62,11 @@ public void testSetAndGetStoragePolicy() throws Exception { final Path bar = new Path(foo, "bar"); DFSTestUtil.createFile(fs, bar, SIZE, REPL, 0); + DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo", 0, + "The storage policy of " + foo.toString() + " is unspecified", conf); + DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo/bar", 0, + "The storage policy of " + bar.toString() + " is unspecified", conf); + DFSTestUtil.DFSAdminRun("-setStoragePolicy /foo WARM", 0, "Set storage policy WARM on " + foo.toString(), conf); DFSTestUtil.DFSAdminRun("-setStoragePolicy /foo/bar COLD", 0, @@ -67,8 +74,8 @@ public void testSetAndGetStoragePolicy() throws Exception { DFSTestUtil.DFSAdminRun("-setStoragePolicy /fooz WARM", -1, "File/Directory does not exist: /fooz", conf); - final BlockStoragePolicy.Suite suite = BlockStoragePolicy - .readBlockStorageSuite(conf); + final BlockStoragePolicySuite suite = BlockStoragePolicySuite + .createDefaultSuite(); final BlockStoragePolicy warm = suite.getPolicy("WARM"); final BlockStoragePolicy cold = suite.getPolicy("COLD"); DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo", 0, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java index e40f142e5ae..0001e3b5a75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -36,7 +36,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -87,7 +88,7 @@ public class TestStorageMover { private static final short REPL = 3; private static final int NUM_DATANODES = 6; private static final Configuration DEFAULT_CONF = new HdfsConfiguration(); - private static final BlockStoragePolicy.Suite DEFAULT_POLICIES; + private static final BlockStoragePolicySuite DEFAULT_POLICIES; private static final BlockStoragePolicy HOT; private static final BlockStoragePolicy WARM; private static final BlockStoragePolicy COLD; @@ -99,7 +100,7 @@ public class TestStorageMover { 2L); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L); - DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF); + DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite(); HOT = DEFAULT_POLICIES.getPolicy("HOT"); WARM = DEFAULT_POLICIES.getPolicy("WARM"); COLD = DEFAULT_POLICIES.getPolicy("COLD"); @@ -192,13 +193,21 @@ class MigrationTest { private MiniDFSCluster cluster; private DistributedFileSystem dfs; - private final BlockStoragePolicy.Suite policies; + private final BlockStoragePolicySuite policies; MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) { this.clusterScheme = cScheme; this.nsScheme = nsScheme; this.conf = clusterScheme.conf; - this.policies = BlockStoragePolicy.readBlockStorageSuite(conf); + this.policies = DEFAULT_POLICIES; + } + + MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme, + BlockStoragePolicySuite policies) { + this.clusterScheme = cScheme; + this.nsScheme = nsScheme; + this.conf = clusterScheme.conf; + this.policies = policies; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java index b0f6b6ae010..267821f85be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil;