diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 61433de80a0..af143919a8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -5,6 +5,9 @@ HDFS-6584: Archival Storage HDFS-6677. Change INodeFile and FSImage to support storage policy ID. (szetszwo) + HDFS-6670. Add block storage policy support with default HOT, WARM and COLD + policies. (szetszwo) + Trunk (Unreleased) INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java new file mode 100644 index 00000000000..784ba36abf1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.util.Arrays; +import java.util.EnumSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +/** + * A block storage policy describes how to select the storage types + * for the replicas of a block. + */ +@InterfaceAudience.Private +public class BlockStoragePolicy { + public static final Log LOG = LogFactory.getLog(BlockStoragePolicy.class); + + public static final String DFS_BLOCK_STORAGE_POLICIES_KEY + = "dfs.block.storage.policies"; + public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX + = "dfs.block.storage.policy."; + public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX + = "dfs.block.storage.policy.creation-fallback."; + public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX + = "dfs.block.storage.policy.replication-fallback."; + + public static final int ID_BIT_LENGTH = 4; + public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1; + + /** A 4-bit policy ID */ + private final byte id; + /** Policy name */ + private final String name; + + /** The storage types to store the replicas of a new block. */ + private final StorageType[] storageTypes; + /** The fallback storage type for block creation. */ + private final StorageType[] creationFallbacks; + /** The fallback storage type for replication. */ + private final StorageType[] replicationFallbacks; + + BlockStoragePolicy(byte id, String name, StorageType[] storageTypes, + StorageType[] creationFallbacks, StorageType[] replicationFallbacks) { + this.id = id; + this.name = name; + this.storageTypes = storageTypes; + this.creationFallbacks = creationFallbacks; + this.replicationFallbacks = replicationFallbacks; + } + + /** + * @return a list of {@link StorageType}s for storing the replicas of a block. + */ + StorageType[] getStoragteTypes(short replication) { + final StorageType[] types = new StorageType[replication]; + int i = 0; + for(; i < types.length && i < storageTypes.length; i++) { + types[i] = storageTypes[i]; + } + final StorageType last = storageTypes[storageTypes.length - 1]; + for(; i < types.length; i++) { + types[i] = last; + } + return types; + } + + /** @return the fallback {@link StorageType} for creation. */ + StorageType getCreationFallback(EnumSet unavailables) { + return getFallback(unavailables, creationFallbacks); + } + + /** @return the fallback {@link StorageType} for replication. */ + StorageType getReplicationFallback(EnumSet unavailables) { + return getFallback(unavailables, replicationFallbacks); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + name + ":" + id + + ", storageTypes=" + Arrays.asList(storageTypes) + + ", creationFallbacks=" + Arrays.asList(creationFallbacks) + + ", replicationFallbacks=" + Arrays.asList(replicationFallbacks); + } + + private static StorageType getFallback(EnumSet unavailables, + StorageType[] fallbacks) { + for(StorageType fb : fallbacks) { + if (!unavailables.contains(fb)) { + return fb; + } + } + return null; + } + + private static byte parseID(String string) { + final byte id = Byte.parseByte(string); + if (id < 1) { + throw new IllegalArgumentException( + "Invalid block storage policy ID: id = " + id + " < 1"); + } + if (id > 15) { + throw new IllegalArgumentException( + "Invalid block storage policy ID: id = " + id + " > MAX = " + ID_MAX); + } + return id; + } + + private static StorageType[] parseStorageTypes(String[] strings) { + if (strings == null) { + return StorageType.EMPTY_ARRAY; + } + final StorageType[] types = new StorageType[strings.length]; + for(int i = 0; i < types.length; i++) { + types[i] = StorageType.valueOf(strings[i].trim().toUpperCase()); + } + return types; + } + + private static StorageType[] readStorageTypes(byte id, String keyPrefix, + Configuration conf) { + final String[] values = conf.getStrings(keyPrefix + id); + return parseStorageTypes(values); + } + + public static BlockStoragePolicy readBlockStoragePolicy(byte id, String name, + Configuration conf) { + final StorageType[] storageTypes = readStorageTypes(id, + DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX, conf); + final StorageType[] creationFallbacks = readStorageTypes(id, + DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX, conf); + final StorageType[] replicationFallbacks = readStorageTypes(id, + DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX, conf); + return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks, + replicationFallbacks); + } + + public static BlockStoragePolicy[] readBlockStoragePolicies( + Configuration conf) { + final BlockStoragePolicy[] policies = new BlockStoragePolicy[ID_MAX + 1]; + final String[] values = conf.getStrings(DFS_BLOCK_STORAGE_POLICIES_KEY); + for(String v : values) { + v = v.trim(); + final int i = v.indexOf(':'); + final String name = v.substring(0, i); + final byte id = parseID(v.substring(i + 1)); + if (policies[id] != null) { + throw new IllegalArgumentException( + "Policy duplication: ID " + id + " appears more than once in " + + DFS_BLOCK_STORAGE_POLICIES_KEY); + } + policies[id] = readBlockStoragePolicy(id, name, conf); + LOG.info(policies[id]); + } + return policies; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a43106943bb..23920365c10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -421,6 +421,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final Class DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final int DFS_REPLICATION_MAX_DEFAULT = 512; + public static final String DFS_BLOCK_STORAGE_POLICIES_KEY + = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICIES_KEY; + public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX + = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX; + public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX + = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX; + public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX + = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX; + public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final int DFS_DF_INTERVAL_DEFAULT = 60000; public static final String DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java index 408f678d650..c19ab6befac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java @@ -29,7 +29,10 @@ @InterfaceStability.Unstable public enum StorageType { DISK, - SSD; + SSD, + ARCHIVE; public static final StorageType DEFAULT = DISK; + + public static final StorageType[] EMPTY_ARRAY = {}; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index e9435024bed..5d3f2c40c19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -1612,6 +1612,8 @@ private static StorageTypeProto convertStorageType( return StorageTypeProto.DISK; case SSD: return StorageTypeProto.SSD; + case ARCHIVE: + return StorageTypeProto.ARCHIVE; default: throw new IllegalStateException( "BUG: StorageType not found, type=" + type); @@ -1640,6 +1642,8 @@ private static StorageType convertType(StorageTypeProto type) { return StorageType.DISK; case SSD: return StorageType.SSD; + case ARCHIVE: + return StorageType.ARCHIVE; default: throw new IllegalStateException( "BUG: StorageTypeProto not found, type=" + type); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 6fbb5b34205..1bf21bf2984 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -134,6 +134,7 @@ message FsPermissionProto { enum StorageTypeProto { DISK = 1; SSD = 2; + ARCHIVE = 3; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/blockStoragePolicy-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/blockStoragePolicy-default.xml new file mode 100644 index 00000000000..79007d83122 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/blockStoragePolicy-default.xml @@ -0,0 +1,117 @@ + + + + + + + + + + + + dfs.block.storage.policies + HOT:12, WARM:8, COLD:4 + + A list of block storage policy names and IDs. The syntax is + + NAME_1:ID_1, NAME_2:ID_2, ..., NAME_n:ID_n + + where ID is an integer in the range [1,15] and NAME is case insensitive. + + + + + + dfs.block.storage.policy.12 + DISK + + A list of storage types for storing the block replicas such as + + STORAGE_TYPE_1, STORAGE_TYPE_2, ..., STORAGE_TYPE_n + + When creating a block, the i-th replica is stored using i-th storage type + for i less than or equal to n, and + the j-th replica is stored using n-th storage type for j greater than n. + + The value cannot specified as an empty list. + + Examples: + DISK : all replicas stored using DISK. + DISK, ARCHIVE : the first replica is stored using DISK and all the + remaining are stored using ARCHIVE. + + + + + dfs.block.storage.policy.creation-fallback.12 + + + A list of storage types for creation fallback storage. + + STORAGE_TYPE_1, STORAGE_TYPE_2, ..., STORAGE_TYPE_n + + When creating a block, if a particular storage type specified in the policy + is unavailable, the fallback STORAGE_TYPE_1 is used. Further, if + STORAGE_TYPE_i is also unavailable, the fallback STORAGE_TYPE_(i+1) is used. + In case that all fallback storages are unavailabe, the block will be created + with number of replicas less than the specified replication factor. + + An empty list indicates that there is no fallback storage. + + + + + dfs.block.storage.policy.replication-fallback.12 + ARCHIVE + + Similar to dfs.block.storage.policy.creation-fallback.x but for replication. + + + + + + dfs.block.storage.policy.8 + DISK, ARCHIVE + + + + dfs.block.storage.policy.creation-fallback.8 + DISK, ARCHIVE + + + + dfs.block.storage.policy.replication-fallback.8 + DISK, ARCHIVE + + + + + dfs.block.storage.policy.4 + ARCHIVE + + + + dfs.block.storage.policy.creation-fallback.4 + + + + + dfs.block.storage.policy.replication-fallback.4 + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1a1f22aaf17..3a718b779af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -22,7 +22,8 @@ - + + hadoop.hdfs.configuration.version diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java new file mode 100644 index 00000000000..838e2d3fec2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +/** Test {@link BlockStoragePolicy} */ +public class TestBlockStoragePolicy { + static final EnumSet none = EnumSet.noneOf(StorageType.class); + static final EnumSet archive = EnumSet.of(StorageType.ARCHIVE); + static final EnumSet disk = EnumSet.of(StorageType.DISK); + static final EnumSet both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE); + + static { + HdfsConfiguration.init(); + } + + @Test + public void testDefaultPolicies() throws Exception { + final byte COLD = (byte)4; + final byte WARM = (byte)8; + final byte HOT = (byte)12; + final Map expectedPolicyStrings = new HashMap(); + expectedPolicyStrings.put(COLD, + "BlockStoragePolicy{COLD:4, storageTypes=[ARCHIVE], creationFallbacks=[], replicationFallbacks=[]"); + expectedPolicyStrings.put(WARM, + "BlockStoragePolicy{WARM:8, storageTypes=[DISK, ARCHIVE], creationFallbacks=[DISK, ARCHIVE], replicationFallbacks=[DISK, ARCHIVE]"); + expectedPolicyStrings.put(HOT, + "BlockStoragePolicy{HOT:12, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]"); + + final Configuration conf = new Configuration(); + final BlockStoragePolicy[] policies = BlockStoragePolicy.readBlockStoragePolicies(conf); + for(int i = 0; i < policies.length; i++) { + if (policies[i] != null) { + final String s = policies[i].toString(); + Assert.assertEquals(expectedPolicyStrings.get((byte)i), s); + } + } + + { // check Cold policy + final BlockStoragePolicy cold = policies[COLD]; + for(short replication = 1; replication < 6; replication++) { + final StorageType[] computed = cold.getStoragteTypes(replication); + assertStorageType(computed, replication, StorageType.ARCHIVE); + } + assertCreationFallback(cold, null, null, null); + assertReplicationFallback(cold, null, null, null); + } + + { // check Warm policy + final BlockStoragePolicy warm = policies[WARM]; + for(short replication = 1; replication < 6; replication++) { + final StorageType[] computed = warm.getStoragteTypes(replication); + assertStorageType(computed, replication, StorageType.DISK, StorageType.ARCHIVE); + } + assertCreationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); + assertReplicationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); + } + + { // check Hot policy + final BlockStoragePolicy hot = policies[HOT]; + for(short replication = 1; replication < 6; replication++) { + final StorageType[] computed = hot.getStoragteTypes(replication); + assertStorageType(computed, replication, StorageType.DISK); + } + assertCreationFallback(hot, null, null, null); + assertReplicationFallback(hot, StorageType.ARCHIVE, null, StorageType.ARCHIVE); + } + } + + static void assertStorageType(StorageType[] computed, short replication, + StorageType... answers) { + Assert.assertEquals(replication, computed.length); + final StorageType last = answers[answers.length - 1]; + for(int i = 0; i < computed.length; i++) { + final StorageType expected = i < answers.length? answers[i]: last; + Assert.assertEquals(expected, computed[i]); + } + } + + static void assertCreationFallback(BlockStoragePolicy policy, StorageType noneExpected, + StorageType archiveExpected, StorageType diskExpected) { + Assert.assertEquals(noneExpected, policy.getCreationFallback(none)); + Assert.assertEquals(archiveExpected, policy.getCreationFallback(archive)); + Assert.assertEquals(diskExpected, policy.getCreationFallback(disk)); + Assert.assertEquals(null, policy.getCreationFallback(both)); + } + + static void assertReplicationFallback(BlockStoragePolicy policy, StorageType noneExpected, + StorageType archiveExpected, StorageType diskExpected) { + Assert.assertEquals(noneExpected, policy.getReplicationFallback(none)); + Assert.assertEquals(archiveExpected, policy.getReplicationFallback(archive)); + Assert.assertEquals(diskExpected, policy.getReplicationFallback(disk)); + Assert.assertEquals(null, policy.getReplicationFallback(both)); + } +}