HDFS-7081. Add new DistributedFileSystem API for getting all the existing storage policies. Contributed by Jing Zhao.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
Jing Zhao 2014-09-24 10:05:40 -07:00 committed by Tsz-Wo Nicholas Sze
parent ba3182c60c
commit f83096d49c
36 changed files with 497 additions and 317 deletions

View File

@ -700,6 +700,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7095. TestStorageMover often fails in Jenkins. (jing9)
HDFS-7081. Add new DistributedFileSystem API for getting all the existing
storage policies. (jing9)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
</configuration>

View File

@ -16,7 +16,6 @@
<!-- Put site-specific property overrides in this file. -->
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<xi:include href="blockStoragePolicy-site.xml" />
<configuration>
</configuration>

View File

@ -139,6 +139,7 @@
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
@ -1779,6 +1780,13 @@ public void setStoragePolicy(String src, String policyName)
}
}
/**
* @return All the existing storage policies
*/
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
return namenode.getStoragePolicySuite();
}
/**
* Rename file or directory.
* @see ClientProtocol#rename(String, String)

View File

@ -444,14 +444,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
public static final int DFS_REPLICATION_MAX_DEFAULT = 512;
public static final String DFS_BLOCK_STORAGE_POLICIES_KEY
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICIES_KEY;
public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX;
public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX;
public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX;
public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval";
public static final int DFS_DF_INTERVAL_DEFAULT = 60000;

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@ -504,6 +505,12 @@ public Void next(final FileSystem fs, final Path p)
}.resolve(this, absF);
}
/** Get all the existing storage policies */
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
statistics.incrementReadOps(1);
return dfs.getStoragePolicySuite();
}
/**
* Move blocks from srcs to trg and delete srcs afterwards.
* The file block sizes must be the same.

View File

@ -325,4 +325,15 @@ public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
throws IOException {
return dfs.getInotifyEventStream(lastReadTxid);
}
/**
* Set the source path to the specified storage policy.
*
* @param src The source path referring to either a directory or a file.
* @param policyName The name of the storage policy.
*/
public void setStoragePolicy(final Path src, final String policyName)
throws IOException {
dfs.setStoragePolicy(src, policyName);
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.protocol;
import java.util.Arrays;
import java.util.EnumSet;
@ -24,12 +24,10 @@
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttr.NameSpace;
import org.apache.hadoop.hdfs.StorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A block storage policy describes how to select the storage types
@ -37,67 +35,8 @@
*/
@InterfaceAudience.Private
public class BlockStoragePolicy {
public static final Log LOG = LogFactory.getLog(BlockStoragePolicy.class);
public static final String DFS_BLOCK_STORAGE_POLICIES_KEY
= "dfs.block.storage.policies";
public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX
= "dfs.block.storage.policy.";
public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX
= "dfs.block.storage.policy.creation-fallback.";
public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX
= "dfs.block.storage.policy.replication-fallback.";
public static final String STORAGE_POLICY_XATTR_NAME = "bsp";
/** set the namespace to TRUSTED so that only privilege users can access */
public static final NameSpace XAttrNS = NameSpace.TRUSTED;
public static final int ID_BIT_LENGTH = 4;
public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1;
public static final byte ID_UNSPECIFIED = 0;
private static final Suite DEFAULT_SUITE = createDefaultSuite();
private static Suite createDefaultSuite() {
final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH];
final StorageType[] storageTypes = {StorageType.DISK};
final byte defaultPolicyId = 12;
policies[defaultPolicyId] = new BlockStoragePolicy(defaultPolicyId, "HOT",
storageTypes, StorageType.EMPTY_ARRAY, StorageType.EMPTY_ARRAY);
return new Suite(defaultPolicyId, policies);
}
/** A block storage policy suite. */
public static class Suite {
private final byte defaultPolicyID;
private final BlockStoragePolicy[] policies;
private Suite(byte defaultPolicyID, BlockStoragePolicy[] policies) {
this.defaultPolicyID = defaultPolicyID;
this.policies = policies;
}
/** @return the corresponding policy. */
public BlockStoragePolicy getPolicy(byte id) {
// id == 0 means policy not specified.
return id == 0? getDefaultPolicy(): policies[id];
}
/** @return the default policy. */
public BlockStoragePolicy getDefaultPolicy() {
return getPolicy(defaultPolicyID);
}
public BlockStoragePolicy getPolicy(String policyName) {
if (policies != null) {
for (BlockStoragePolicy policy : policies) {
if (policy != null && policy.name.equals(policyName)) {
return policy;
}
}
}
return null;
}
}
public static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicy
.class);
/** A 4-bit policy ID */
private final byte id;
@ -160,7 +99,7 @@ private List<StorageType> chooseStorageTypes(final short replication,
/**
* Choose the storage types for storing the remaining replicas, given the
* replication number, the storage types of the chosen replicas and
* the unavailable storage types. It uses fallback storage in case that
* the unavailable storage types. It uses fallback storage in case that
* the desired storage type is unavailable.
*
* @param replication the replication number.
@ -195,7 +134,7 @@ public List<StorageType> chooseStorageTypes(final short replication,
// remove excess storage types after fallback replacement.
diff(storageTypes, excess, null);
if (storageTypes.size() < expectedSize) {
LOG.warn("Failed to place enough replicas: expected size is " + expectedSize
LOG.warn("Failed to place enough replicas: expected size is " + expectedSize
+ " but only " + storageTypes.size() + " storage types can be selected "
+ "(replication=" + replication
+ ", selected=" + storageTypes
@ -207,7 +146,8 @@ public List<StorageType> chooseStorageTypes(final short replication,
}
/**
* Compute the list difference t = t - c.
* Compute the difference between two lists t and c so that after the diff
* computation we have: t = t - c;
* Further, if e is not null, set e = e + c - t;
*/
private static void diff(List<StorageType> t, Iterable<StorageType> c,
@ -242,7 +182,7 @@ public List<StorageType> chooseExcess(final short replication,
public StorageType getCreationFallback(EnumSet<StorageType> unavailables) {
return getFallback(unavailables, creationFallbacks);
}
/** @return the fallback {@link StorageType} for replication. */
public StorageType getReplicationFallback(EnumSet<StorageType> unavailables) {
return getFallback(unavailables, replicationFallbacks);
@ -280,6 +220,18 @@ public String getName() {
return name;
}
public StorageType[] getStorageTypes() {
return this.storageTypes;
}
public StorageType[] getCreationFallbacks() {
return this.creationFallbacks;
}
public StorageType[] getReplicationFallbacks() {
return this.replicationFallbacks;
}
private static StorageType getFallback(EnumSet<StorageType> unavailables,
StorageType[] fallbacks) {
for(StorageType fb : fallbacks) {
@ -289,131 +241,4 @@ private static StorageType getFallback(EnumSet<StorageType> unavailables,
}
return null;
}
private static byte parseID(String idString, String element, Configuration conf) {
byte id = 0;
try {
id = Byte.parseByte(idString);
} catch(NumberFormatException nfe) {
throwIllegalArgumentException("Failed to parse policy ID \"" + idString
+ "\" to a " + ID_BIT_LENGTH + "-bit integer", conf);
}
if (id < 0) {
throwIllegalArgumentException("Invalid policy ID: id = " + id
+ " < 1 in \"" + element + "\"", conf);
} else if (id == 0) {
throw new IllegalArgumentException("Policy ID 0 is reserved: " + element);
} else if (id > ID_MAX) {
throwIllegalArgumentException("Invalid policy ID: id = " + id
+ " > MAX = " + ID_MAX + " in \"" + element + "\"", conf);
}
return id;
}
private static StorageType[] parseStorageTypes(String[] strings) {
if (strings == null || strings.length == 0) {
return StorageType.EMPTY_ARRAY;
}
final StorageType[] types = new StorageType[strings.length];
for(int i = 0; i < types.length; i++) {
types[i] = StorageType.valueOf(strings[i].trim().toUpperCase());
}
return types;
}
private static StorageType[] readStorageTypes(byte id, String keyPrefix,
Configuration conf) {
final String key = keyPrefix + id;
final String[] values = conf.getStrings(key);
try {
return parseStorageTypes(values);
} catch(Exception e) {
throw new IllegalArgumentException("Failed to parse " + key
+ " \"" + conf.get(key), e);
}
}
private static BlockStoragePolicy readBlockStoragePolicy(byte id, String name,
Configuration conf) {
final StorageType[] storageTypes = readStorageTypes(id,
DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX, conf);
if (storageTypes.length == 0) {
throw new IllegalArgumentException(
DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX + id + " is missing or is empty.");
}
final StorageType[] creationFallbacks = readStorageTypes(id,
DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX, conf);
final StorageType[] replicationFallbacks = readStorageTypes(id,
DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX, conf);
return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks,
replicationFallbacks);
}
/** Read {@link Suite} from conf. */
public static Suite readBlockStorageSuite(Configuration conf) {
final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH];
final String[] values = conf.getStrings(DFS_BLOCK_STORAGE_POLICIES_KEY);
if (values == null) {
// conf property is missing, use default suite.
return DEFAULT_SUITE;
}
byte firstID = -1;
for(String v : values) {
v = v.trim();
final int i = v.indexOf(':');
if (i < 0) {
throwIllegalArgumentException("Failed to parse element \"" + v
+ "\" (expected format is NAME:ID)", conf);
} else if (i == 0) {
throwIllegalArgumentException("Policy name is missing in \"" + v + "\"", conf);
} else if (i == v.length() - 1) {
throwIllegalArgumentException("Policy ID is missing in \"" + v + "\"", conf);
}
final String name = v.substring(0, i).trim();
for(int j = 1; j < policies.length; j++) {
if (policies[j] != null && policies[j].name.equals(name)) {
throwIllegalArgumentException("Policy name duplication: \""
+ name + "\" appears more than once", conf);
}
}
final byte id = parseID(v.substring(i + 1).trim(), v, conf);
if (policies[id] != null) {
throwIllegalArgumentException("Policy duplication: ID " + id
+ " appears more than once", conf);
}
policies[id] = readBlockStoragePolicy(id, name, conf);
String prefix = "";
if (firstID == -1) {
firstID = id;
prefix = "(default) ";
}
LOG.info(prefix + policies[id]);
}
if (firstID == -1) {
throwIllegalArgumentException("Empty list is not allowed", conf);
}
return new Suite(firstID, policies);
}
public static String buildXAttrName() {
return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME;
}
public static XAttr buildXAttr(byte policyId) {
final String name = buildXAttrName();
return XAttrHelper.buildXAttr(name, new byte[] { policyId });
}
public static boolean isStoragePolicyXAttr(XAttr xattr) {
return xattr != null && xattr.getNameSpace() == BlockStoragePolicy.XAttrNS
&& xattr.getName().equals(BlockStoragePolicy.STORAGE_POLICY_XATTR_NAME);
}
private static void throwIllegalArgumentException(String message,
Configuration conf) {
throw new IllegalArgumentException(message + " in "
+ DFS_BLOCK_STORAGE_POLICIES_KEY + " \""
+ conf.get(DFS_BLOCK_STORAGE_POLICIES_KEY) + "\".");
}
}
}

View File

@ -259,6 +259,13 @@ public boolean setReplication(String src, short replication)
FileNotFoundException, SafeModeException, UnresolvedLinkException,
SnapshotAccessControlException, IOException;
/**
* Get all the available block storage policies.
* @return All the in-use block storage policies currently.
*/
@Idempotent
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException;
/**
* Set the storage policy for a file/directory
* @param src Path of an existing file/directory.

View File

@ -24,8 +24,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
/**
* Metadata about a snapshottable directory
@ -62,7 +62,7 @@ public SnapshottableDirectoryStatus(long modification_time, long access_time,
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
access_time, permission, owner, group, null, localName, inodeId,
childrenNum, null, BlockStoragePolicy.ID_UNSPECIFIED);
childrenNum, null, BlockStoragePolicySuite.ID_UNSPECIFIED);
this.snapshotNumber = snapshotNumber;
this.snapshotQuota = snapshotQuota;
this.parentFullPath = parentFullPath;

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@ -119,6 +120,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
@ -1429,6 +1432,26 @@ public SetStoragePolicyResponseProto setStoragePolicy(
return VOID_SET_STORAGE_POLICY_RESPONSE;
}
@Override
public GetStoragePolicySuiteResponseProto getStoragePolicySuite(
RpcController controller, GetStoragePolicySuiteRequestProto request)
throws ServiceException {
try {
BlockStoragePolicy[] policies = server.getStoragePolicySuite();
GetStoragePolicySuiteResponseProto.Builder builder =
GetStoragePolicySuiteResponseProto.newBuilder();
if (policies == null) {
return builder.build();
}
for (BlockStoragePolicy policy : policies) {
builder.addPolicies(PBHelper.convert(policy));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid(RpcController controller,
GetCurrentEditLogTxidRequestProto req) throws ServiceException {
try {

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@ -64,9 +65,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
@ -119,6 +118,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
@ -159,13 +160,13 @@
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@ -225,6 +226,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
VOID_GET_DATA_ENCRYPTIONKEY_REQUEST =
GetDataEncryptionKeyRequestProto.newBuilder().build();
private final static GetStoragePolicySuiteRequestProto
VOID_GET_STORAGE_POLICY_SUITE_REQUEST =
GetStoragePolicySuiteRequestProto.newBuilder().build();
public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
rpcProxy = proxy;
}
@ -1440,8 +1445,7 @@ public void checkAccess(String path, FsAction mode) throws IOException {
@Override
public void setStoragePolicy(String src, String policyName)
throws SnapshotAccessControlException, UnresolvedLinkException,
FileNotFoundException, QuotaExceededException, IOException {
throws IOException {
SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto
.newBuilder().setSrc(src).setPolicyName(policyName).build();
try {
@ -1451,6 +1455,17 @@ public void setStoragePolicy(String src, String policyName)
}
}
@Override
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
try {
GetStoragePolicySuiteResponseProto response = rpcProxy
.getStoragePolicySuite(null, VOID_GET_STORAGE_POLICY_SUITE_REQUEST);
return PBHelper.convertStoragePolicies(response.getPoliciesList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
public long getCurrentEditLogTxid() throws IOException {
GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto
.getDefaultInstance();

View File

@ -44,7 +44,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.inotify.Event;
@ -120,6 +120,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
@ -174,6 +175,7 @@
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -281,6 +283,65 @@ public static NamenodeRoleProto convert(NamenodeRole role) {
return null;
}
public static BlockStoragePolicy[] convertStoragePolicies(
List<BlockStoragePolicyProto> policyProtos) {
if (policyProtos == null || policyProtos.size() == 0) {
return new BlockStoragePolicy[0];
}
BlockStoragePolicy[] policies = new BlockStoragePolicy[policyProtos.size()];
int i = 0;
for (BlockStoragePolicyProto proto : policyProtos) {
policies[i++] = convert(proto);
}
return policies;
}
public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
List<StorageTypeProto> cList = proto.getCreationPolicy()
.getStorageTypesList();
StorageType[] creationTypes = convertStorageTypes(cList, cList.size());
List<StorageTypeProto> cfList = proto.hasCreationFallbackPolicy() ? proto
.getCreationFallbackPolicy().getStorageTypesList() : null;
StorageType[] creationFallbackTypes = cfList == null ? StorageType
.EMPTY_ARRAY : convertStorageTypes(cfList, cfList.size());
List<StorageTypeProto> rfList = proto.hasReplicationFallbackPolicy() ?
proto.getReplicationFallbackPolicy().getStorageTypesList() : null;
StorageType[] replicationFallbackTypes = rfList == null ? StorageType
.EMPTY_ARRAY : convertStorageTypes(rfList, rfList.size());
return new BlockStoragePolicy((byte) proto.getPolicyId(), proto.getName(),
creationTypes, creationFallbackTypes, replicationFallbackTypes);
}
public static BlockStoragePolicyProto convert(BlockStoragePolicy policy) {
BlockStoragePolicyProto.Builder builder = BlockStoragePolicyProto
.newBuilder().setPolicyId(policy.getId()).setName(policy.getName());
// creation storage types
StorageTypesProto creationProto = convert(policy.getStorageTypes());
Preconditions.checkArgument(creationProto != null);
builder.setCreationPolicy(creationProto);
// creation fallback
StorageTypesProto creationFallbackProto = convert(
policy.getCreationFallbacks());
if (creationFallbackProto != null) {
builder.setCreationFallbackPolicy(creationFallbackProto);
}
// replication fallback
StorageTypesProto replicationFallbackProto = convert(
policy.getReplicationFallbacks());
if (replicationFallbackProto != null) {
builder.setReplicationFallbackPolicy(replicationFallbackProto);
}
return builder.build();
}
public static StorageTypesProto convert(StorageType[] types) {
if (types == null || types.length == 0) {
return null;
}
List<StorageTypeProto> list = convertStorageTypes(types);
return StorageTypesProto.newBuilder().addAllStorageTypes(list).build();
}
public static StorageInfoProto convert(StorageInfo info) {
return StorageInfoProto.newBuilder().setClusterID(info.getClusterID())
.setCTime(info.getCTime()).setLayoutVersion(info.getLayoutVersion())
@ -1350,7 +1411,7 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
: BlockStoragePolicy.ID_UNSPECIFIED);
: BlockStoragePolicySuite.ID_UNSPECIFIED);
}
public static SnapshottableDirectoryStatus convert(

View File

@ -42,7 +42,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
@ -255,7 +255,7 @@ public int getPendingDataNodeMessageCount() {
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
private final BlockStoragePolicy.Suite storagePolicySuite;
private final BlockStoragePolicySuite storagePolicySuite;
/** Check whether name system is running before terminating */
private boolean checkNSRunning = true;
@ -278,7 +278,7 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
blockplacement = BlockPlacementPolicy.getInstance(
conf, stats, datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
storagePolicySuite = BlockStoragePolicy.readBlockStorageSuite(conf);
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
@ -402,6 +402,10 @@ public BlockStoragePolicy getStoragePolicy(final String policyName) {
return storagePolicySuite.getPolicy(policyName);
}
public BlockStoragePolicy[] getStoragePolicySuite() {
return storagePolicySuite.getAllPolicies();
}
public void setBlockPoolId(String blockPoolId) {
if (isBlockTokenEnabled()) {
blockTokenSecretManager.setBlockPoolId(blockPoolId);
@ -3602,7 +3606,7 @@ public ReplicationWork(Block block,
}
private void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicy.Suite storagePolicySuite,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
targets = blockplacement.chooseTarget(bc.getName(),
additionalReplRequired, srcNode, liveReplicaStorages, false,

View File

@ -27,7 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/** A collection of block storage policies. */
public class BlockStoragePolicySuite {
static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicySuite
.class);
public static final String STORAGE_POLICY_XATTR_NAME
= "hsm.block.storage.policy.id";
public static final XAttr.NameSpace XAttrNS = XAttr.NameSpace.SYSTEM;
public static final int ID_BIT_LENGTH = 4;
public static final byte ID_UNSPECIFIED = 0;
@VisibleForTesting
public static BlockStoragePolicySuite createDefaultSuite() {
final BlockStoragePolicy[] policies =
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
final byte hotId = 12;
policies[hotId] = new BlockStoragePolicy(hotId, "HOT",
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
new StorageType[]{StorageType.ARCHIVE});
final byte warmId = 8;
policies[warmId] = new BlockStoragePolicy(warmId, "WARM",
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
final byte coldId = 4;
policies[coldId] = new BlockStoragePolicy(coldId, "COLD",
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
StorageType.EMPTY_ARRAY);
return new BlockStoragePolicySuite(hotId, policies);
}
private final byte defaultPolicyID;
private final BlockStoragePolicy[] policies;
public BlockStoragePolicySuite(byte defaultPolicyID,
BlockStoragePolicy[] policies) {
this.defaultPolicyID = defaultPolicyID;
this.policies = policies;
}
/** @return the corresponding policy. */
public BlockStoragePolicy getPolicy(byte id) {
// id == 0 means policy not specified.
return id == 0? getDefaultPolicy(): policies[id];
}
/** @return the default policy. */
public BlockStoragePolicy getDefaultPolicy() {
return getPolicy(defaultPolicyID);
}
public BlockStoragePolicy getPolicy(String policyName) {
if (policies != null) {
for (BlockStoragePolicy policy : policies) {
if (policy != null && policy.getName().equals(policyName)) {
return policy;
}
}
}
return null;
}
public BlockStoragePolicy[] getAllPolicies() {
List<BlockStoragePolicy> list = Lists.newArrayList();
if (policies != null) {
for (BlockStoragePolicy policy : policies) {
if (policy != null) {
list.add(policy);
}
}
}
return list.toArray(new BlockStoragePolicy[list.size()]);
}
public static String buildXAttrName() {
return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME;
}
public static XAttr buildXAttr(byte policyId) {
final String name = buildXAttrName();
return XAttrHelper.buildXAttr(name, new byte[]{policyId});
}
public static boolean isStoragePolicyXAttr(XAttr xattr) {
return xattr != null && xattr.getNameSpace() == XAttrNS
&& xattr.getName().equals(STORAGE_POLICY_XATTR_NAME);
}
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -102,7 +103,7 @@ private List<StorageGroup> getTargetStorages(StorageType t) {
private final StorageMap storages;
private final List<Path> targetPaths;
private final BlockStoragePolicy.Suite blockStoragePolicies;
private final BlockStoragePolicy[] blockStoragePolicies;
Mover(NameNodeConnector nnc, Configuration conf) {
final long movedWinWidth = conf.getLong(
@ -119,11 +120,13 @@ private List<StorageGroup> getTargetStorages(StorageType t) {
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
maxConcurrentMovesPerNode, conf);
this.storages = new StorageMap();
this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
this.targetPaths = nnc.getTargetPaths();
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
BlockStoragePolicySuite.ID_BIT_LENGTH];
}
void init() throws IOException {
initStoragePolicies();
final List<DatanodeStorageReport> reports = dispatcher.init();
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
@ -137,6 +140,14 @@ void init() throws IOException {
}
}
private void initStoragePolicies() throws IOException {
BlockStoragePolicy[] policies = dispatcher.getDistributedFileSystem()
.getStoragePolicySuite();
for (BlockStoragePolicy policy : policies) {
this.blockStoragePolicies[policy.getId()] = policy;
}
}
private ExitStatus run() {
try {
init();
@ -305,7 +316,7 @@ private boolean processDirRecursively(String parent,
if (!isSnapshotPathInCurrent(fullPath)) {
// the full path is a snapshot path but it is also included in the
// current directory tree, thus ignore it.
hasRemaining = processFile((HdfsLocatedFileStatus)status);
hasRemaining = processFile(fullPath, (HdfsLocatedFileStatus)status);
}
} catch (IOException e) {
LOG.warn("Failed to check the status of " + parent
@ -317,9 +328,17 @@ private boolean processDirRecursively(String parent,
}
/** @return true if it is necessary to run another round of migration */
private boolean processFile(HdfsLocatedFileStatus status) {
final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
status.getStoragePolicy());
private boolean processFile(String fullPath, HdfsLocatedFileStatus status) {
final byte policyId = status.getStoragePolicy();
// currently we ignore files with unspecified storage policy
if (policyId == BlockStoragePolicySuite.ID_UNSPECIFIED) {
return false;
}
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
if (policy == null) {
LOG.warn("Failed to get the storage policy of file " + fullPath);
return false;
}
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());

View File

@ -53,7 +53,6 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -79,6 +78,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@ -1037,7 +1037,7 @@ void unprotectedSetStoragePolicy(String src, byte policyId)
private void setDirStoragePolicy(INodeDirectory inode, byte policyId,
int latestSnapshotId) throws IOException {
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
XAttr xAttr = BlockStoragePolicy.buildXAttr(policyId);
XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId);
List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, Arrays.asList(xAttr),
EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
@ -1375,7 +1375,7 @@ private static void checkSnapshot(INode target,
}
private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
return inodePolicy != BlockStoragePolicy.ID_UNSPECIFIED ? inodePolicy :
return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
parentPolicy;
}
@ -1409,7 +1409,7 @@ DirectoryListing getListing(String src, byte[] startAfter,
if (targetNode == null)
return null;
byte parentStoragePolicy = isSuperUser ?
targetNode.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED;
targetNode.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
if (!targetNode.isDirectory()) {
return new DirectoryListing(
@ -1429,7 +1429,8 @@ DirectoryListing getListing(String src, byte[] startAfter,
for (int i=0; i<numOfListing && locationBudget>0; i++) {
INode cur = contents.get(startChild+i);
byte curPolicy = isSuperUser && !cur.isSymlink()?
cur.getLocalStoragePolicyID(): BlockStoragePolicy.ID_UNSPECIFIED;
cur.getLocalStoragePolicyID():
BlockStoragePolicySuite.ID_UNSPECIFIED;
listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation,
getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot,
isRawPath, inodesInPath);
@ -1483,7 +1484,7 @@ private DirectoryListing getSnapshotsListing(String src, byte[] startAfter)
for (int i = 0; i < numOfListing; i++) {
Root sRoot = snapshots.get(i + skipSize).getRoot();
listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
false, null);
}
return new DirectoryListing(
@ -1511,7 +1512,7 @@ HdfsFileStatus getFileInfo(String src, boolean resolveLink,
final INode[] inodes = inodesInPath.getINodes();
final INode i = inodes[inodes.length - 1];
byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
i.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED;
i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
policyId, inodesInPath.getPathSnapshotId(), isRawPath, inodesInPath);
} finally {
@ -1530,7 +1531,8 @@ private HdfsFileStatus getFileInfo4DotSnapshot(String src)
throws UnresolvedLinkException {
if (getINode4DotSnapshot(src) != null) {
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
HdfsFileStatus.EMPTY_NAME, -1L, 0, null, BlockStoragePolicy.ID_UNSPECIFIED);
HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
BlockStoragePolicySuite.ID_UNSPECIFIED);
}
return null;
}

View File

@ -34,7 +34,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -374,7 +374,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
if (toAddRetryCache) {
HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
HdfsFileStatus.EMPTY_NAME, newFile,
BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
false, iip);
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, stat);

View File

@ -17,8 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
.EncryptedKeyVersion;
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
@ -166,7 +165,7 @@
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
@ -2305,6 +2304,21 @@ private void setStoragePolicyInt(String src, final String policyName)
logAuditEvent(true, "setStoragePolicy", src, null, fileStat);
}
/**
* @return All the existing block storage policies
*/
BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
checkOperation(OperationCategory.READ);
waitForLoadingFSImage();
readLock();
try {
checkOperation(OperationCategory.READ);
return blockManager.getStoragePolicySuite();
} finally {
readUnlock();
}
}
long getPreferredBlockSize(String filename)
throws IOException, UnresolvedLinkException {
FSPermissionChecker pc = getPermissionChecker();

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@ -695,7 +695,7 @@ public final INode setAccessTime(long accessTime, int latestSnapshotId)
/**
* @return the storage policy directly specified on the INode. Return
* {@link BlockStoragePolicy#ID_UNSPECIFIED} if no policy has
* {@link BlockStoragePolicySuite#ID_UNSPECIFIED} if no policy has
* been specified.
*/
public abstract byte getLocalStoragePolicyID();

View File

@ -28,10 +28,11 @@
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
@ -112,22 +113,22 @@ public byte getLocalStoragePolicyID() {
ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
.getXAttrs();
for (XAttr xattr : xattrs) {
if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) {
if (BlockStoragePolicySuite.isStoragePolicyXAttr(xattr)) {
return (xattr.getValue())[0];
}
}
return BlockStoragePolicy.ID_UNSPECIFIED;
return BlockStoragePolicySuite.ID_UNSPECIFIED;
}
@Override
public byte getStoragePolicyID() {
byte id = getLocalStoragePolicyID();
if (id != BlockStoragePolicy.ID_UNSPECIFIED) {
if (id != BlockStoragePolicySuite.ID_UNSPECIFIED) {
return id;
}
// if it is unspecified, check its parent
return getParent() != null ? getParent().getStoragePolicyID() :
BlockStoragePolicy.ID_UNSPECIFIED;
BlockStoragePolicySuite.ID_UNSPECIFIED;
}
void setQuota(long nsQuota, long dsQuota) {

View File

@ -18,12 +18,9 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
/**
* The attributes of an inode.

View File

@ -28,12 +28,12 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
@ -79,7 +79,8 @@ public static INodeFile valueOf(INode inode, String path, boolean acceptNull)
static enum HeaderFormat {
PREFERRED_BLOCK_SIZE(null, 48, 1),
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1),
STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicy.ID_BIT_LENGTH, 0);
STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH,
0);
private final LongBitFormat BITS;
@ -374,7 +375,7 @@ public byte getLocalStoragePolicyID() {
@Override
public byte getStoragePolicyID() {
byte id = getLocalStoragePolicyID();
if (id == BlockStoragePolicy.ID_UNSPECIFIED) {
if (id == BlockStoragePolicySuite.ID_UNSPECIFIED) {
return this.getParent() != null ?
this.getParent().getStoragePolicyID() : id;
}

View File

@ -22,8 +22,9 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
@ -125,12 +126,12 @@ public Counts cleanSubtree(int snapshotId, int priorSnapshotId,
@Override
public byte getStoragePolicyID(){
return BlockStoragePolicy.ID_UNSPECIFIED;
return BlockStoragePolicySuite.ID_UNSPECIFIED;
}
@Override
public byte getLocalStoragePolicyID() {
return BlockStoragePolicy.ID_UNSPECIFIED;
return BlockStoragePolicySuite.ID_UNSPECIFIED;
}
};

View File

@ -72,6 +72,7 @@
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@ -587,6 +588,11 @@ public void setStoragePolicy(String src, String policyName)
namesystem.setStoragePolicy(src, policyName);
}
@Override
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
return namesystem.getStoragePolicySuite();
}
@Override // ClientProtocol
public void setPermission(String src, FsPermission permissions)
throws IOException {

View File

@ -49,7 +49,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.Command;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -68,6 +68,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.ipc.GenericRefreshProtocol;
@ -615,15 +616,18 @@ public int getStoragePolicy(String[] argv) throws IOException {
+ argv[1]);
}
byte storagePolicyId = status.getStoragePolicy();
BlockStoragePolicy.Suite suite = BlockStoragePolicy
.readBlockStorageSuite(getConf());
BlockStoragePolicy policy = suite.getPolicy(storagePolicyId);
if (policy != null) {
System.out.println("The storage policy of " + argv[1] + ":\n" + policy);
if (storagePolicyId == BlockStoragePolicySuite.ID_UNSPECIFIED) {
System.out.println("The storage policy of " + argv[1] + " is unspecified");
return 0;
} else {
throw new IOException("Cannot identify the storage policy for " + argv[1]);
}
BlockStoragePolicy[] policies = dfs.getStoragePolicySuite();
for (BlockStoragePolicy p : policies) {
if (p.getId() == storagePolicyId) {
System.out.println("The storage policy of " + argv[1] + ":\n" + p);
return 0;
}
}
throw new IOException("Cannot identify the storage policy for " + argv[1]);
}
/**

View File

@ -21,13 +21,14 @@
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.token.Token;
@ -264,7 +265,7 @@ public static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includes
: childrenNumLong.intValue();
final byte storagePolicy = m.containsKey("storagePolicy") ?
(byte) (long) (Long) m.get("storagePolicy") :
BlockStoragePolicy.ID_UNSPECIFIED;
BlockStoragePolicySuite.ID_UNSPECIFIED;
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
blockSize, mTime, aTime, permission, owner, group, symlink,
DFSUtil.string2Bytes(localName), fileId, childrenNum, null, storagePolicy);

View File

@ -108,6 +108,13 @@ message SetStoragePolicyRequestProto {
message SetStoragePolicyResponseProto { // void response
}
message GetStoragePolicySuiteRequestProto { // void request
}
message GetStoragePolicySuiteResponseProto {
repeated BlockStoragePolicyProto policies = 1;
}
message SetPermissionRequestProto {
required string src = 1;
required FsPermissionProto permission = 2;
@ -699,6 +706,8 @@ service ClientNamenodeProtocol {
returns(SetReplicationResponseProto);
rpc setStoragePolicy(SetStoragePolicyRequestProto)
returns(SetStoragePolicyResponseProto);
rpc getStoragePolicySuite(GetStoragePolicySuiteRequestProto)
returns(GetStoragePolicySuiteResponseProto);
rpc setPermission(SetPermissionRequestProto)
returns(SetPermissionResponseProto);
rpc setOwner(SetOwnerRequestProto) returns(SetOwnerResponseProto);

View File

@ -168,6 +168,20 @@ message StorageTypesProto {
repeated StorageTypeProto storageTypes = 1;
}
/**
* Block replica storage policy.
*/
message BlockStoragePolicyProto {
required uint32 policyId = 1;
required string name = 2;
// a list of storage types for storing the block replicas when creating a
// block.
required StorageTypesProto creationPolicy = 3;
// A list of storage types for creation fallback storage.
optional StorageTypesProto creationFallbackPolicy = 4;
optional StorageTypesProto replicationFallbackPolicy = 5;
}
/**
* A list of storage IDs.
*/

View File

@ -22,8 +22,7 @@
<!-- wish to modify from this file into hdfs-site.xml and change them -->
<!-- there. If hdfs-site.xml does not already exist, create it. -->
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<xi:include href="blockStoragePolicy-default.xml" />
<configuration>
<property>
<name>hadoop.hdfs.configuration.version</name>

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
import static org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite.ID_UNSPECIFIED;
import java.io.File;
import java.io.FileNotFoundException;
@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@ -44,7 +45,7 @@
/** Test {@link BlockStoragePolicy} */
public class TestBlockStoragePolicy {
public static final BlockStoragePolicy.Suite POLICY_SUITE;
public static final BlockStoragePolicySuite POLICY_SUITE;
public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY;
public static final Configuration conf;
@ -52,7 +53,7 @@ public class TestBlockStoragePolicy {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
POLICY_SUITE = BlockStoragePolicy.readBlockStorageSuite(conf);
POLICY_SUITE = BlockStoragePolicySuite.createDefaultSuite();
DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy();
}
@ -948,7 +949,7 @@ private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum,
Assert.assertTrue(typeList.isEmpty());
}
private void testIncreaseFileRep(String policyName, byte policyId,
private void testChangeFileRep(String policyName, byte policyId,
StorageType[] before,
StorageType[] after) throws Exception {
final int numDataNodes = 5;
@ -965,8 +966,6 @@ private void testIncreaseFileRep(String policyName, byte policyId,
final Path foo = new Path(dir, "foo");
DFSTestUtil.createFile(fs, foo, FILE_LEN, REPLICATION, 0L);
// the storage policy of foo should be WARM, and the replicas
// should be stored in DISK and ARCHIE
HdfsFileStatus[] status = fs.getClient().listPaths(foo.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(status, policyId);
@ -984,7 +983,24 @@ private void testIncreaseFileRep(String policyName, byte policyId,
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(status, policyId);
fooStatus = (HdfsLocatedFileStatus) status[0];
checkLocatedBlocks(fooStatus, 1, 5, after);
checkLocatedBlocks(fooStatus, 1, numDataNodes, after);
// change the replication factor back to 3
fs.setReplication(foo, REPLICATION);
Thread.sleep(1000);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerHeartbeat(dn);
}
Thread.sleep(1000);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}
Thread.sleep(1000);
status = fs.getClient().listPaths(foo.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(status, policyId);
fooStatus = (HdfsLocatedFileStatus) status[0];
checkLocatedBlocks(fooStatus, 1, REPLICATION, before);
} finally {
cluster.shutdown();
}
@ -995,11 +1011,12 @@ private void testIncreaseFileRep(String policyName, byte policyId,
* that file from 3 to 5. Make sure all replications are created in DISKS.
*/
@Test
public void testIncreaseHotFileRep() throws Exception {
testIncreaseFileRep("HOT", HOT, new StorageType[]{StorageType.DISK,
StorageType.DISK, StorageType.DISK},
public void testChangeHotFileRep() throws Exception {
testChangeFileRep("HOT", HOT,
new StorageType[]{StorageType.DISK, StorageType.DISK,
StorageType.DISK, StorageType.DISK, StorageType.DISK});
StorageType.DISK},
new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK,
StorageType.DISK, StorageType.DISK});
}
/**
@ -1008,9 +1025,10 @@ public void testIncreaseHotFileRep() throws Exception {
* and ARCHIVE.
*/
@Test
public void testIncreaseWarmRep() throws Exception {
testIncreaseFileRep("WARM", WARM, new StorageType[]{StorageType.DISK,
StorageType.ARCHIVE, StorageType.ARCHIVE},
public void testChangeWarmRep() throws Exception {
testChangeFileRep("WARM", WARM,
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
}
@ -1020,9 +1038,10 @@ public void testIncreaseWarmRep() throws Exception {
* that file from 3 to 5. Make sure all replicas are created in ARCHIVE.
*/
@Test
public void testIncreaseColdRep() throws Exception {
testIncreaseFileRep("COLD", COLD, new StorageType[]{StorageType.ARCHIVE,
StorageType.ARCHIVE, StorageType.ARCHIVE},
public void testChangeColdRep() throws Exception {
testChangeFileRep("COLD", COLD,
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
StorageType.ARCHIVE},
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
}
@ -1072,4 +1091,28 @@ public void testChooseTargetWithTopology() throws Exception {
System.out.println(Arrays.asList(targets));
Assert.assertEquals(3, targets.length);
}
/**
* Test getting all the storage policies from the namenode
*/
@Test
public void testGetAllStoragePolicies() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0).build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
BlockStoragePolicy[] policies = fs.getStoragePolicySuite();
Assert.assertEquals(3, policies.length);
Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
policies[0].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
policies[1].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(HOT).toString(),
policies[2].toString());
} finally {
IOUtils.cleanup(null, fs);
cluster.shutdown();
}
}
}

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -60,6 +62,11 @@ public void testSetAndGetStoragePolicy() throws Exception {
final Path bar = new Path(foo, "bar");
DFSTestUtil.createFile(fs, bar, SIZE, REPL, 0);
DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo", 0,
"The storage policy of " + foo.toString() + " is unspecified", conf);
DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo/bar", 0,
"The storage policy of " + bar.toString() + " is unspecified", conf);
DFSTestUtil.DFSAdminRun("-setStoragePolicy /foo WARM", 0,
"Set storage policy WARM on " + foo.toString(), conf);
DFSTestUtil.DFSAdminRun("-setStoragePolicy /foo/bar COLD", 0,
@ -67,8 +74,8 @@ public void testSetAndGetStoragePolicy() throws Exception {
DFSTestUtil.DFSAdminRun("-setStoragePolicy /fooz WARM", -1,
"File/Directory does not exist: /fooz", conf);
final BlockStoragePolicy.Suite suite = BlockStoragePolicy
.readBlockStorageSuite(conf);
final BlockStoragePolicySuite suite = BlockStoragePolicySuite
.createDefaultSuite();
final BlockStoragePolicy warm = suite.getPolicy("WARM");
final BlockStoragePolicy cold = suite.getPolicy("COLD");
DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo", 0,

View File

@ -36,7 +36,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -87,7 +88,7 @@ public class TestStorageMover {
private static final short REPL = 3;
private static final int NUM_DATANODES = 6;
private static final Configuration DEFAULT_CONF = new HdfsConfiguration();
private static final BlockStoragePolicy.Suite DEFAULT_POLICIES;
private static final BlockStoragePolicySuite DEFAULT_POLICIES;
private static final BlockStoragePolicy HOT;
private static final BlockStoragePolicy WARM;
private static final BlockStoragePolicy COLD;
@ -99,7 +100,7 @@ public class TestStorageMover {
2L);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF);
DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite();
HOT = DEFAULT_POLICIES.getPolicy("HOT");
WARM = DEFAULT_POLICIES.getPolicy("WARM");
COLD = DEFAULT_POLICIES.getPolicy("COLD");
@ -192,13 +193,21 @@ class MigrationTest {
private MiniDFSCluster cluster;
private DistributedFileSystem dfs;
private final BlockStoragePolicy.Suite policies;
private final BlockStoragePolicySuite policies;
MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) {
this.clusterScheme = cScheme;
this.nsScheme = nsScheme;
this.conf = clusterScheme.conf;
this.policies = BlockStoragePolicy.readBlockStorageSuite(conf);
this.policies = DEFAULT_POLICIES;
}
MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme,
BlockStoragePolicySuite policies) {
this.clusterScheme = cScheme;
this.nsScheme = nsScheme;
this.conf = clusterScheme.conf;
this.policies = policies;
}
/**

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;