HDFS-15355. Make the default block storage policy ID configurable. Contributed by Yang Yun.
This commit is contained in:
parent
6e04b00df1
commit
f4901d0778
|
@ -50,6 +50,52 @@ public final class HdfsConstants {
|
||||||
public static final byte PROVIDED_STORAGE_POLICY_ID = 1;
|
public static final byte PROVIDED_STORAGE_POLICY_ID = 1;
|
||||||
public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED";
|
public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This enum wraps above Storage Policy ID and name.
|
||||||
|
* Recommend to use this enum instead of above static variables.
|
||||||
|
* For example,
|
||||||
|
* StoragePolicy.HOT.value() is equal to HOT_STORAGE_POLICY_ID
|
||||||
|
* StoragePolicy.HOT.name() is equal to HOT_STORAGE_POLICY_NAME
|
||||||
|
*/
|
||||||
|
public enum StoragePolicy{
|
||||||
|
PROVIDED(PROVIDED_STORAGE_POLICY_ID),
|
||||||
|
COLD(COLD_STORAGE_POLICY_ID),
|
||||||
|
WARM(WARM_STORAGE_POLICY_ID),
|
||||||
|
HOT(HOT_STORAGE_POLICY_ID),
|
||||||
|
ONE_SSD(ONESSD_STORAGE_POLICY_ID),
|
||||||
|
ALL_SSD(ALLSSD_STORAGE_POLICY_ID),
|
||||||
|
LAZY_PERSIST(MEMORY_STORAGE_POLICY_ID);
|
||||||
|
|
||||||
|
private byte value;
|
||||||
|
StoragePolicy(byte value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static StoragePolicy valueOf(int value) {
|
||||||
|
switch (value) {
|
||||||
|
case 1:
|
||||||
|
return PROVIDED;
|
||||||
|
case 2:
|
||||||
|
return COLD;
|
||||||
|
case 5:
|
||||||
|
return WARM;
|
||||||
|
case 7:
|
||||||
|
return HOT;
|
||||||
|
case 10:
|
||||||
|
return ONE_SSD;
|
||||||
|
case 12:
|
||||||
|
return ALL_SSD;
|
||||||
|
case 15:
|
||||||
|
return LAZY_PERSIST;
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte value() {
|
||||||
|
return this.value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static final int DEFAULT_DATA_SOCKET_SIZE = 0;
|
public static final int DEFAULT_DATA_SOCKET_SIZE = 0;
|
||||||
|
|
||||||
|
|
|
@ -754,6 +754,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.storage.policy.satisfier.retry.max.attempts";
|
"dfs.storage.policy.satisfier.retry.max.attempts";
|
||||||
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
|
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
|
||||||
3;
|
3;
|
||||||
|
public static final String DFS_STORAGE_DEFAULT_POLICY =
|
||||||
|
"dfs.storage.default.policy";
|
||||||
|
public static final HdfsConstants.StoragePolicy
|
||||||
|
DFS_STORAGE_DEFAULT_POLICY_DEFAULT = HdfsConstants.StoragePolicy.HOT;
|
||||||
|
|
||||||
public static final String DFS_SPS_MAX_OUTSTANDING_PATHS_KEY =
|
public static final String DFS_SPS_MAX_OUTSTANDING_PATHS_KEY =
|
||||||
"dfs.storage.policy.satisfier.max.outstanding.paths";
|
"dfs.storage.policy.satisfier.max.outstanding.paths";
|
||||||
public static final int DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT = 10000;
|
public static final int DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT = 10000;
|
||||||
|
|
|
@ -493,7 +493,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
conf, datanodeManager.getFSClusterStats(),
|
conf, datanodeManager.getFSClusterStats(),
|
||||||
datanodeManager.getNetworkTopology(),
|
datanodeManager.getNetworkTopology(),
|
||||||
datanodeManager.getHost2DatanodeMap());
|
datanodeManager.getHost2DatanodeMap());
|
||||||
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
|
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite(conf);
|
||||||
pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(
|
pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
|
||||||
|
|
|
@ -21,8 +21,10 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.fs.XAttr;
|
import org.apache.hadoop.fs.XAttr;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
@ -45,50 +47,75 @@ public class BlockStoragePolicySuite {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static BlockStoragePolicySuite createDefaultSuite() {
|
public static BlockStoragePolicySuite createDefaultSuite() {
|
||||||
|
return createDefaultSuite(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static BlockStoragePolicySuite createDefaultSuite(
|
||||||
|
final Configuration conf) {
|
||||||
final BlockStoragePolicy[] policies =
|
final BlockStoragePolicy[] policies =
|
||||||
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
||||||
final byte lazyPersistId = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
|
final byte lazyPersistId =
|
||||||
|
HdfsConstants.StoragePolicy.LAZY_PERSIST.value();
|
||||||
policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId,
|
policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId,
|
||||||
HdfsConstants.MEMORY_STORAGE_POLICY_NAME,
|
HdfsConstants.StoragePolicy.LAZY_PERSIST.name(),
|
||||||
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
|
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK},
|
new StorageType[]{StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK},
|
new StorageType[]{StorageType.DISK},
|
||||||
true); // Cannot be changed on regular files, but inherited.
|
true); // Cannot be changed on regular files, but inherited.
|
||||||
final byte allssdId = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
|
final byte allssdId = HdfsConstants.StoragePolicy.ALL_SSD.value();
|
||||||
policies[allssdId] = new BlockStoragePolicy(allssdId,
|
policies[allssdId] = new BlockStoragePolicy(allssdId,
|
||||||
HdfsConstants.ALLSSD_STORAGE_POLICY_NAME,
|
HdfsConstants.StoragePolicy.ALL_SSD.name(),
|
||||||
new StorageType[]{StorageType.SSD},
|
new StorageType[]{StorageType.SSD},
|
||||||
new StorageType[]{StorageType.DISK},
|
new StorageType[]{StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK});
|
new StorageType[]{StorageType.DISK});
|
||||||
final byte onessdId = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
|
final byte onessdId = HdfsConstants.StoragePolicy.ONE_SSD.value();
|
||||||
policies[onessdId] = new BlockStoragePolicy(onessdId,
|
policies[onessdId] = new BlockStoragePolicy(onessdId,
|
||||||
HdfsConstants.ONESSD_STORAGE_POLICY_NAME,
|
HdfsConstants.StoragePolicy.ONE_SSD.name(),
|
||||||
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.SSD, StorageType.DISK});
|
new StorageType[]{StorageType.SSD, StorageType.DISK});
|
||||||
final byte hotId = HdfsConstants.HOT_STORAGE_POLICY_ID;
|
final byte hotId = HdfsConstants.StoragePolicy.HOT.value();
|
||||||
policies[hotId] = new BlockStoragePolicy(hotId,
|
policies[hotId] = new BlockStoragePolicy(hotId,
|
||||||
HdfsConstants.HOT_STORAGE_POLICY_NAME,
|
HdfsConstants.StoragePolicy.HOT.name(),
|
||||||
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
|
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
|
||||||
new StorageType[]{StorageType.ARCHIVE});
|
new StorageType[]{StorageType.ARCHIVE});
|
||||||
final byte warmId = HdfsConstants.WARM_STORAGE_POLICY_ID;
|
final byte warmId = HdfsConstants.StoragePolicy.WARM.value();
|
||||||
policies[warmId] = new BlockStoragePolicy(warmId,
|
policies[warmId] = new BlockStoragePolicy(warmId,
|
||||||
HdfsConstants.WARM_STORAGE_POLICY_NAME,
|
HdfsConstants.StoragePolicy.WARM.name(),
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
|
||||||
final byte coldId = HdfsConstants.COLD_STORAGE_POLICY_ID;
|
final byte coldId = HdfsConstants.StoragePolicy.COLD.value();
|
||||||
policies[coldId] = new BlockStoragePolicy(coldId,
|
policies[coldId] = new BlockStoragePolicy(coldId,
|
||||||
HdfsConstants.COLD_STORAGE_POLICY_NAME,
|
HdfsConstants.StoragePolicy.COLD.name(),
|
||||||
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
|
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
|
||||||
StorageType.EMPTY_ARRAY);
|
StorageType.EMPTY_ARRAY);
|
||||||
final byte providedId = HdfsConstants.PROVIDED_STORAGE_POLICY_ID;
|
final byte providedId = HdfsConstants.StoragePolicy.PROVIDED.value();
|
||||||
policies[providedId] = new BlockStoragePolicy(providedId,
|
policies[providedId] = new BlockStoragePolicy(providedId,
|
||||||
HdfsConstants.PROVIDED_STORAGE_POLICY_NAME,
|
HdfsConstants.StoragePolicy.PROVIDED.name(),
|
||||||
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
|
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
|
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.PROVIDED, StorageType.DISK});
|
new StorageType[]{StorageType.PROVIDED, StorageType.DISK});
|
||||||
return new BlockStoragePolicySuite(hotId, policies);
|
|
||||||
|
return new BlockStoragePolicySuite(getDefaultPolicyID(conf, policies),
|
||||||
|
policies);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte getDefaultPolicyID(
|
||||||
|
final Configuration conf, final BlockStoragePolicy[] policies) {
|
||||||
|
if (conf != null) {
|
||||||
|
HdfsConstants.StoragePolicy defaultPolicy = conf.getEnum(
|
||||||
|
DFSConfigKeys.DFS_STORAGE_DEFAULT_POLICY,
|
||||||
|
DFSConfigKeys.DFS_STORAGE_DEFAULT_POLICY_DEFAULT);
|
||||||
|
for (BlockStoragePolicy policy : policies) {
|
||||||
|
if (policy != null &&
|
||||||
|
policy.getName().equalsIgnoreCase(defaultPolicy.name())) {
|
||||||
|
return policy.getId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DFSConfigKeys.DFS_STORAGE_DEFAULT_POLICY_DEFAULT.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
private final byte defaultPolicyID;
|
private final byte defaultPolicyID;
|
||||||
|
|
|
@ -5935,4 +5935,20 @@
|
||||||
set on fs.protected.directories.
|
set on fs.protected.directories.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.storage.default.policy</name>
|
||||||
|
<value>HOT</value>
|
||||||
|
<description>
|
||||||
|
Set the default Storage Policy name with following value,
|
||||||
|
LAZY_PERSIST: memory storage policy.
|
||||||
|
ALL_SSD : all SSD storage policy.
|
||||||
|
ONE_SSD : one SSD_storage policy.
|
||||||
|
HOT : hot storage policy.
|
||||||
|
WARM : warm policy.
|
||||||
|
COLD : cold_storage policy.
|
||||||
|
PROVIDED : provided storage policy.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -88,6 +88,7 @@ The effective storage policy can be retrieved by the "[`storagepolicies -getStor
|
||||||
### Configuration
|
### Configuration
|
||||||
|
|
||||||
* **dfs.storage.policy.enabled** - for enabling/disabling the storage policy feature. The default value is `true`.
|
* **dfs.storage.policy.enabled** - for enabling/disabling the storage policy feature. The default value is `true`.
|
||||||
|
* **dfs.storage.default.policy** - Set the default storage policy with the policy name. The default value is `HOT`. All possible policies are defined in enum StoragePolicy, including `LAZY_PERSIST` `ALL_SSD` `ONE_SSD` `HOT` `WARM` `COLD` and `PROVIDED`.
|
||||||
* **dfs.datanode.data.dir** - on each data node, the comma-separated storage locations should be tagged with their storage types. This allows storage policies to place the blocks on different storage types according to policy. For example:
|
* **dfs.datanode.data.dir** - on each data node, the comma-separated storage locations should be tagged with their storage types. This allows storage policies to place the blocks on different storage types according to policy. For example:
|
||||||
|
|
||||||
1. A datanode storage location /grid/dn/disk0 on DISK should be configured with `[DISK]file:///grid/dn/disk0`
|
1. A datanode storage location /grid/dn/disk0 on DISK should be configured with `[DISK]file:///grid/dn/disk0`
|
||||||
|
|
|
@ -1546,4 +1546,41 @@ public class TestBlockStoragePolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateDefaultPoliciesFromConf() {
|
||||||
|
BlockStoragePolicySuite suite =
|
||||||
|
BlockStoragePolicySuite.createDefaultSuite();
|
||||||
|
Assert.assertEquals(HdfsConstants.StoragePolicy.HOT.value(),
|
||||||
|
suite.getDefaultPolicy().getId());
|
||||||
|
|
||||||
|
Configuration newConf = new Configuration();
|
||||||
|
newConf.setEnum(DFSConfigKeys.DFS_STORAGE_DEFAULT_POLICY,
|
||||||
|
HdfsConstants.StoragePolicy.ONE_SSD);
|
||||||
|
BlockStoragePolicySuite suiteConf =
|
||||||
|
BlockStoragePolicySuite.createDefaultSuite(newConf);
|
||||||
|
Assert.assertEquals(HdfsConstants.StoragePolicy.ONE_SSD.value(),
|
||||||
|
suiteConf.getDefaultPolicy().getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateFileWithConfiguredDefaultPolicies()
|
||||||
|
throws IOException{
|
||||||
|
Configuration newConf = new HdfsConfiguration();
|
||||||
|
newConf.set(DFSConfigKeys.DFS_STORAGE_DEFAULT_POLICY,
|
||||||
|
HdfsConstants.StoragePolicy.WARM.name());
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(newConf)
|
||||||
|
.numDataNodes(0).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final Path fooFile = new Path("/foo");
|
||||||
|
FileSystem newfs = cluster.getFileSystem();
|
||||||
|
DFSTestUtil.createFile(newfs, fooFile, 0, REPLICATION, 0L);
|
||||||
|
|
||||||
|
String policy = newfs.getStoragePolicy(fooFile).getName();
|
||||||
|
Assert.assertEquals(HdfsConstants.StoragePolicy.WARM.name(), policy);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue