HDFS-7081. Add new DistributedFileSystem API for getting all the existing storage policies. Contributed by Jing Zhao.
This commit is contained in:
parent
7af4c3888b
commit
073bbd805c
|
@ -345,6 +345,9 @@ Trunk (Unreleased)
|
|||
|
||||
HDFS-7095. TestStorageMover often fails in Jenkins. (jing9)
|
||||
|
||||
HDFS-7081. Add new DistributedFileSystem API for getting all the existing
|
||||
storage policies. (jing9)
|
||||
|
||||
Release 2.6.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
<!-- Put site-specific property overrides in this file. -->
|
||||
|
||||
<configuration>
|
||||
|
||||
</configuration>
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
<!-- Put site-specific property overrides in this file. -->
|
||||
|
||||
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
|
||||
<xi:include href="blockStoragePolicy-site.xml" />
|
||||
<configuration>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -139,6 +139,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
|
||||
|
@ -1779,6 +1780,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return All the existing storage policies
|
||||
*/
|
||||
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
|
||||
return namenode.getStoragePolicySuite();
|
||||
}
|
||||
|
||||
/**
|
||||
* Rename file or directory.
|
||||
* @see ClientProtocol#rename(String, String)
|
||||
|
|
|
@ -438,14 +438,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
|
||||
public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
|
||||
public static final int DFS_REPLICATION_MAX_DEFAULT = 512;
|
||||
public static final String DFS_BLOCK_STORAGE_POLICIES_KEY
|
||||
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICIES_KEY;
|
||||
public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX
|
||||
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX;
|
||||
public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX
|
||||
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX;
|
||||
public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX
|
||||
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX;
|
||||
|
||||
public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval";
|
||||
public static final int DFS_DF_INTERVAL_DEFAULT = 60000;
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||
|
@ -504,6 +505,12 @@ public class DistributedFileSystem extends FileSystem {
|
|||
}.resolve(this, absF);
|
||||
}
|
||||
|
||||
/** Get all the existing storage policies */
|
||||
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
return dfs.getStoragePolicySuite();
|
||||
}
|
||||
|
||||
/**
|
||||
* Move blocks from srcs to trg and delete srcs afterwards.
|
||||
* The file block sizes must be the same.
|
||||
|
|
|
@ -325,4 +325,15 @@ public class HdfsAdmin {
|
|||
throws IOException {
|
||||
return dfs.getInotifyEventStream(lastReadTxid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the source path to the specified storage policy.
|
||||
*
|
||||
* @param src The source path referring to either a directory or a file.
|
||||
* @param policyName The name of the storage policy.
|
||||
*/
|
||||
public void setStoragePolicy(final Path src, final String policyName)
|
||||
throws IOException {
|
||||
dfs.setStoragePolicy(src, policyName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
|
@ -24,12 +24,10 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.fs.XAttr.NameSpace;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A block storage policy describes how to select the storage types
|
||||
|
@ -37,67 +35,8 @@ import org.apache.hadoop.fs.XAttr.NameSpace;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BlockStoragePolicy {
|
||||
public static final Log LOG = LogFactory.getLog(BlockStoragePolicy.class);
|
||||
|
||||
public static final String DFS_BLOCK_STORAGE_POLICIES_KEY
|
||||
= "dfs.block.storage.policies";
|
||||
public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX
|
||||
= "dfs.block.storage.policy.";
|
||||
public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX
|
||||
= "dfs.block.storage.policy.creation-fallback.";
|
||||
public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX
|
||||
= "dfs.block.storage.policy.replication-fallback.";
|
||||
public static final String STORAGE_POLICY_XATTR_NAME = "bsp";
|
||||
/** set the namespace to TRUSTED so that only privilege users can access */
|
||||
public static final NameSpace XAttrNS = NameSpace.TRUSTED;
|
||||
|
||||
public static final int ID_BIT_LENGTH = 4;
|
||||
public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1;
|
||||
public static final byte ID_UNSPECIFIED = 0;
|
||||
|
||||
private static final Suite DEFAULT_SUITE = createDefaultSuite();
|
||||
|
||||
private static Suite createDefaultSuite() {
|
||||
final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
||||
final StorageType[] storageTypes = {StorageType.DISK};
|
||||
final byte defaultPolicyId = 12;
|
||||
policies[defaultPolicyId] = new BlockStoragePolicy(defaultPolicyId, "HOT",
|
||||
storageTypes, StorageType.EMPTY_ARRAY, StorageType.EMPTY_ARRAY);
|
||||
return new Suite(defaultPolicyId, policies);
|
||||
}
|
||||
|
||||
/** A block storage policy suite. */
|
||||
public static class Suite {
|
||||
private final byte defaultPolicyID;
|
||||
private final BlockStoragePolicy[] policies;
|
||||
|
||||
private Suite(byte defaultPolicyID, BlockStoragePolicy[] policies) {
|
||||
this.defaultPolicyID = defaultPolicyID;
|
||||
this.policies = policies;
|
||||
}
|
||||
|
||||
/** @return the corresponding policy. */
|
||||
public BlockStoragePolicy getPolicy(byte id) {
|
||||
// id == 0 means policy not specified.
|
||||
return id == 0? getDefaultPolicy(): policies[id];
|
||||
}
|
||||
|
||||
/** @return the default policy. */
|
||||
public BlockStoragePolicy getDefaultPolicy() {
|
||||
return getPolicy(defaultPolicyID);
|
||||
}
|
||||
|
||||
public BlockStoragePolicy getPolicy(String policyName) {
|
||||
if (policies != null) {
|
||||
for (BlockStoragePolicy policy : policies) {
|
||||
if (policy != null && policy.name.equals(policyName)) {
|
||||
return policy;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
public static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicy
|
||||
.class);
|
||||
|
||||
/** A 4-bit policy ID */
|
||||
private final byte id;
|
||||
|
@ -160,7 +99,7 @@ public class BlockStoragePolicy {
|
|||
/**
|
||||
* Choose the storage types for storing the remaining replicas, given the
|
||||
* replication number, the storage types of the chosen replicas and
|
||||
* the unavailable storage types. It uses fallback storage in case that
|
||||
* the unavailable storage types. It uses fallback storage in case that
|
||||
* the desired storage type is unavailable.
|
||||
*
|
||||
* @param replication the replication number.
|
||||
|
@ -195,7 +134,7 @@ public class BlockStoragePolicy {
|
|||
// remove excess storage types after fallback replacement.
|
||||
diff(storageTypes, excess, null);
|
||||
if (storageTypes.size() < expectedSize) {
|
||||
LOG.warn("Failed to place enough replicas: expected size is " + expectedSize
|
||||
LOG.warn("Failed to place enough replicas: expected size is " + expectedSize
|
||||
+ " but only " + storageTypes.size() + " storage types can be selected "
|
||||
+ "(replication=" + replication
|
||||
+ ", selected=" + storageTypes
|
||||
|
@ -207,7 +146,8 @@ public class BlockStoragePolicy {
|
|||
}
|
||||
|
||||
/**
|
||||
* Compute the list difference t = t - c.
|
||||
* Compute the difference between two lists t and c so that after the diff
|
||||
* computation we have: t = t - c;
|
||||
* Further, if e is not null, set e = e + c - t;
|
||||
*/
|
||||
private static void diff(List<StorageType> t, Iterable<StorageType> c,
|
||||
|
@ -242,7 +182,7 @@ public class BlockStoragePolicy {
|
|||
public StorageType getCreationFallback(EnumSet<StorageType> unavailables) {
|
||||
return getFallback(unavailables, creationFallbacks);
|
||||
}
|
||||
|
||||
|
||||
/** @return the fallback {@link StorageType} for replication. */
|
||||
public StorageType getReplicationFallback(EnumSet<StorageType> unavailables) {
|
||||
return getFallback(unavailables, replicationFallbacks);
|
||||
|
@ -280,6 +220,18 @@ public class BlockStoragePolicy {
|
|||
return name;
|
||||
}
|
||||
|
||||
public StorageType[] getStorageTypes() {
|
||||
return this.storageTypes;
|
||||
}
|
||||
|
||||
public StorageType[] getCreationFallbacks() {
|
||||
return this.creationFallbacks;
|
||||
}
|
||||
|
||||
public StorageType[] getReplicationFallbacks() {
|
||||
return this.replicationFallbacks;
|
||||
}
|
||||
|
||||
private static StorageType getFallback(EnumSet<StorageType> unavailables,
|
||||
StorageType[] fallbacks) {
|
||||
for(StorageType fb : fallbacks) {
|
||||
|
@ -289,131 +241,4 @@ public class BlockStoragePolicy {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static byte parseID(String idString, String element, Configuration conf) {
|
||||
byte id = 0;
|
||||
try {
|
||||
id = Byte.parseByte(idString);
|
||||
} catch(NumberFormatException nfe) {
|
||||
throwIllegalArgumentException("Failed to parse policy ID \"" + idString
|
||||
+ "\" to a " + ID_BIT_LENGTH + "-bit integer", conf);
|
||||
}
|
||||
if (id < 0) {
|
||||
throwIllegalArgumentException("Invalid policy ID: id = " + id
|
||||
+ " < 1 in \"" + element + "\"", conf);
|
||||
} else if (id == 0) {
|
||||
throw new IllegalArgumentException("Policy ID 0 is reserved: " + element);
|
||||
} else if (id > ID_MAX) {
|
||||
throwIllegalArgumentException("Invalid policy ID: id = " + id
|
||||
+ " > MAX = " + ID_MAX + " in \"" + element + "\"", conf);
|
||||
}
|
||||
return id;
|
||||
}
|
||||
|
||||
private static StorageType[] parseStorageTypes(String[] strings) {
|
||||
if (strings == null || strings.length == 0) {
|
||||
return StorageType.EMPTY_ARRAY;
|
||||
}
|
||||
final StorageType[] types = new StorageType[strings.length];
|
||||
for(int i = 0; i < types.length; i++) {
|
||||
types[i] = StorageType.valueOf(strings[i].trim().toUpperCase());
|
||||
}
|
||||
return types;
|
||||
}
|
||||
|
||||
private static StorageType[] readStorageTypes(byte id, String keyPrefix,
|
||||
Configuration conf) {
|
||||
final String key = keyPrefix + id;
|
||||
final String[] values = conf.getStrings(key);
|
||||
try {
|
||||
return parseStorageTypes(values);
|
||||
} catch(Exception e) {
|
||||
throw new IllegalArgumentException("Failed to parse " + key
|
||||
+ " \"" + conf.get(key), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static BlockStoragePolicy readBlockStoragePolicy(byte id, String name,
|
||||
Configuration conf) {
|
||||
final StorageType[] storageTypes = readStorageTypes(id,
|
||||
DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX, conf);
|
||||
if (storageTypes.length == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX + id + " is missing or is empty.");
|
||||
}
|
||||
final StorageType[] creationFallbacks = readStorageTypes(id,
|
||||
DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX, conf);
|
||||
final StorageType[] replicationFallbacks = readStorageTypes(id,
|
||||
DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX, conf);
|
||||
return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks,
|
||||
replicationFallbacks);
|
||||
}
|
||||
|
||||
/** Read {@link Suite} from conf. */
|
||||
public static Suite readBlockStorageSuite(Configuration conf) {
|
||||
final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
||||
final String[] values = conf.getStrings(DFS_BLOCK_STORAGE_POLICIES_KEY);
|
||||
if (values == null) {
|
||||
// conf property is missing, use default suite.
|
||||
return DEFAULT_SUITE;
|
||||
}
|
||||
byte firstID = -1;
|
||||
for(String v : values) {
|
||||
v = v.trim();
|
||||
final int i = v.indexOf(':');
|
||||
if (i < 0) {
|
||||
throwIllegalArgumentException("Failed to parse element \"" + v
|
||||
+ "\" (expected format is NAME:ID)", conf);
|
||||
} else if (i == 0) {
|
||||
throwIllegalArgumentException("Policy name is missing in \"" + v + "\"", conf);
|
||||
} else if (i == v.length() - 1) {
|
||||
throwIllegalArgumentException("Policy ID is missing in \"" + v + "\"", conf);
|
||||
}
|
||||
final String name = v.substring(0, i).trim();
|
||||
for(int j = 1; j < policies.length; j++) {
|
||||
if (policies[j] != null && policies[j].name.equals(name)) {
|
||||
throwIllegalArgumentException("Policy name duplication: \""
|
||||
+ name + "\" appears more than once", conf);
|
||||
}
|
||||
}
|
||||
|
||||
final byte id = parseID(v.substring(i + 1).trim(), v, conf);
|
||||
if (policies[id] != null) {
|
||||
throwIllegalArgumentException("Policy duplication: ID " + id
|
||||
+ " appears more than once", conf);
|
||||
}
|
||||
policies[id] = readBlockStoragePolicy(id, name, conf);
|
||||
String prefix = "";
|
||||
if (firstID == -1) {
|
||||
firstID = id;
|
||||
prefix = "(default) ";
|
||||
}
|
||||
LOG.info(prefix + policies[id]);
|
||||
}
|
||||
if (firstID == -1) {
|
||||
throwIllegalArgumentException("Empty list is not allowed", conf);
|
||||
}
|
||||
return new Suite(firstID, policies);
|
||||
}
|
||||
|
||||
public static String buildXAttrName() {
|
||||
return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME;
|
||||
}
|
||||
|
||||
public static XAttr buildXAttr(byte policyId) {
|
||||
final String name = buildXAttrName();
|
||||
return XAttrHelper.buildXAttr(name, new byte[] { policyId });
|
||||
}
|
||||
|
||||
public static boolean isStoragePolicyXAttr(XAttr xattr) {
|
||||
return xattr != null && xattr.getNameSpace() == BlockStoragePolicy.XAttrNS
|
||||
&& xattr.getName().equals(BlockStoragePolicy.STORAGE_POLICY_XATTR_NAME);
|
||||
}
|
||||
|
||||
private static void throwIllegalArgumentException(String message,
|
||||
Configuration conf) {
|
||||
throw new IllegalArgumentException(message + " in "
|
||||
+ DFS_BLOCK_STORAGE_POLICIES_KEY + " \""
|
||||
+ conf.get(DFS_BLOCK_STORAGE_POLICIES_KEY) + "\".");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -259,6 +259,13 @@ public interface ClientProtocol {
|
|||
FileNotFoundException, SafeModeException, UnresolvedLinkException,
|
||||
SnapshotAccessControlException, IOException;
|
||||
|
||||
/**
|
||||
* Get all the available block storage policies.
|
||||
* @return All the in-use block storage policies currently.
|
||||
*/
|
||||
@Idempotent
|
||||
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException;
|
||||
|
||||
/**
|
||||
* Set the storage policy for a file/directory
|
||||
* @param src Path of an existing file/directory.
|
||||
|
|
|
@ -24,8 +24,8 @@ import java.util.Date;
|
|||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
|
||||
/**
|
||||
* Metadata about a snapshottable directory
|
||||
|
@ -62,7 +62,7 @@ public class SnapshottableDirectoryStatus {
|
|||
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
|
||||
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
|
||||
access_time, permission, owner, group, null, localName, inodeId,
|
||||
childrenNum, null, BlockStoragePolicy.ID_UNSPECIFIED);
|
||||
childrenNum, null, BlockStoragePolicySuite.ID_UNSPECIFIED);
|
||||
this.snapshotNumber = snapshotNumber;
|
||||
this.snapshotQuota = snapshotQuota;
|
||||
this.parentFullPath = parentFullPath;
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
|||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||
|
@ -119,6 +120,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
|
||||
|
@ -1429,6 +1432,26 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
return VOID_SET_STORAGE_POLICY_RESPONSE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetStoragePolicySuiteResponseProto getStoragePolicySuite(
|
||||
RpcController controller, GetStoragePolicySuiteRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
BlockStoragePolicy[] policies = server.getStoragePolicySuite();
|
||||
GetStoragePolicySuiteResponseProto.Builder builder =
|
||||
GetStoragePolicySuiteResponseProto.newBuilder();
|
||||
if (policies == null) {
|
||||
return builder.build();
|
||||
}
|
||||
for (BlockStoragePolicy policy : policies) {
|
||||
builder.addPolicies(PBHelper.convert(policy));
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid(RpcController controller,
|
||||
GetCurrentEditLogTxidRequestProto req) throws ServiceException {
|
||||
try {
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.fs.permission.FsAction;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.inotify.EventsList;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||
|
@ -64,9 +65,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
|
||||
|
@ -119,6 +118,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
|
||||
|
@ -159,13 +160,13 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
|
@ -225,6 +226,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
VOID_GET_DATA_ENCRYPTIONKEY_REQUEST =
|
||||
GetDataEncryptionKeyRequestProto.newBuilder().build();
|
||||
|
||||
private final static GetStoragePolicySuiteRequestProto
|
||||
VOID_GET_STORAGE_POLICY_SUITE_REQUEST =
|
||||
GetStoragePolicySuiteRequestProto.newBuilder().build();
|
||||
|
||||
public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
|
||||
rpcProxy = proxy;
|
||||
}
|
||||
|
@ -1440,8 +1445,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
|
||||
@Override
|
||||
public void setStoragePolicy(String src, String policyName)
|
||||
throws SnapshotAccessControlException, UnresolvedLinkException,
|
||||
FileNotFoundException, QuotaExceededException, IOException {
|
||||
throws IOException {
|
||||
SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto
|
||||
.newBuilder().setSrc(src).setPolicyName(policyName).build();
|
||||
try {
|
||||
|
@ -1451,6 +1455,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
|
||||
try {
|
||||
GetStoragePolicySuiteResponseProto response = rpcProxy
|
||||
.getStoragePolicySuite(null, VOID_GET_STORAGE_POLICY_SUITE_REQUEST);
|
||||
return PBHelper.convertStoragePolicies(response.getPoliciesList());
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public long getCurrentEditLogTxid() throws IOException {
|
||||
GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto
|
||||
.getDefaultInstance();
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.hadoop.fs.permission.FsAction;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.inotify.Event;
|
||||
|
@ -120,6 +120,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterComm
|
|||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
|
||||
|
@ -174,6 +175,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
|
@ -280,6 +282,65 @@ public class PBHelper {
|
|||
return null;
|
||||
}
|
||||
|
||||
public static BlockStoragePolicy[] convertStoragePolicies(
|
||||
List<BlockStoragePolicyProto> policyProtos) {
|
||||
if (policyProtos == null || policyProtos.size() == 0) {
|
||||
return new BlockStoragePolicy[0];
|
||||
}
|
||||
BlockStoragePolicy[] policies = new BlockStoragePolicy[policyProtos.size()];
|
||||
int i = 0;
|
||||
for (BlockStoragePolicyProto proto : policyProtos) {
|
||||
policies[i++] = convert(proto);
|
||||
}
|
||||
return policies;
|
||||
}
|
||||
|
||||
public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
|
||||
List<StorageTypeProto> cList = proto.getCreationPolicy()
|
||||
.getStorageTypesList();
|
||||
StorageType[] creationTypes = convertStorageTypes(cList, cList.size());
|
||||
List<StorageTypeProto> cfList = proto.hasCreationFallbackPolicy() ? proto
|
||||
.getCreationFallbackPolicy().getStorageTypesList() : null;
|
||||
StorageType[] creationFallbackTypes = cfList == null ? StorageType
|
||||
.EMPTY_ARRAY : convertStorageTypes(cfList, cfList.size());
|
||||
List<StorageTypeProto> rfList = proto.hasReplicationFallbackPolicy() ?
|
||||
proto.getReplicationFallbackPolicy().getStorageTypesList() : null;
|
||||
StorageType[] replicationFallbackTypes = rfList == null ? StorageType
|
||||
.EMPTY_ARRAY : convertStorageTypes(rfList, rfList.size());
|
||||
return new BlockStoragePolicy((byte) proto.getPolicyId(), proto.getName(),
|
||||
creationTypes, creationFallbackTypes, replicationFallbackTypes);
|
||||
}
|
||||
|
||||
public static BlockStoragePolicyProto convert(BlockStoragePolicy policy) {
|
||||
BlockStoragePolicyProto.Builder builder = BlockStoragePolicyProto
|
||||
.newBuilder().setPolicyId(policy.getId()).setName(policy.getName());
|
||||
// creation storage types
|
||||
StorageTypesProto creationProto = convert(policy.getStorageTypes());
|
||||
Preconditions.checkArgument(creationProto != null);
|
||||
builder.setCreationPolicy(creationProto);
|
||||
// creation fallback
|
||||
StorageTypesProto creationFallbackProto = convert(
|
||||
policy.getCreationFallbacks());
|
||||
if (creationFallbackProto != null) {
|
||||
builder.setCreationFallbackPolicy(creationFallbackProto);
|
||||
}
|
||||
// replication fallback
|
||||
StorageTypesProto replicationFallbackProto = convert(
|
||||
policy.getReplicationFallbacks());
|
||||
if (replicationFallbackProto != null) {
|
||||
builder.setReplicationFallbackPolicy(replicationFallbackProto);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static StorageTypesProto convert(StorageType[] types) {
|
||||
if (types == null || types.length == 0) {
|
||||
return null;
|
||||
}
|
||||
List<StorageTypeProto> list = convertStorageTypes(types);
|
||||
return StorageTypesProto.newBuilder().addAllStorageTypes(list).build();
|
||||
}
|
||||
|
||||
public static StorageInfoProto convert(StorageInfo info) {
|
||||
return StorageInfoProto.newBuilder().setClusterID(info.getClusterID())
|
||||
.setCTime(info.getCTime()).setLayoutVersion(info.getLayoutVersion())
|
||||
|
@ -1349,7 +1410,7 @@ public class PBHelper {
|
|||
fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
|
||||
fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
|
||||
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
|
||||
: BlockStoragePolicy.ID_UNSPECIFIED);
|
||||
: BlockStoragePolicySuite.ID_UNSPECIFIED);
|
||||
}
|
||||
|
||||
public static SnapshottableDirectoryStatus convert(
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
|
@ -255,7 +255,7 @@ public class BlockManager {
|
|||
|
||||
/** for block replicas placement */
|
||||
private BlockPlacementPolicy blockplacement;
|
||||
private final BlockStoragePolicy.Suite storagePolicySuite;
|
||||
private final BlockStoragePolicySuite storagePolicySuite;
|
||||
|
||||
/** Check whether name system is running before terminating */
|
||||
private boolean checkNSRunning = true;
|
||||
|
@ -278,7 +278,7 @@ public class BlockManager {
|
|||
blockplacement = BlockPlacementPolicy.getInstance(
|
||||
conf, stats, datanodeManager.getNetworkTopology(),
|
||||
datanodeManager.getHost2DatanodeMap());
|
||||
storagePolicySuite = BlockStoragePolicy.readBlockStorageSuite(conf);
|
||||
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
|
||||
pendingReplications = new PendingReplicationBlocks(conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
|
||||
|
@ -402,6 +402,10 @@ public class BlockManager {
|
|||
return storagePolicySuite.getPolicy(policyName);
|
||||
}
|
||||
|
||||
public BlockStoragePolicy[] getStoragePolicySuite() {
|
||||
return storagePolicySuite.getAllPolicies();
|
||||
}
|
||||
|
||||
public void setBlockPoolId(String blockPoolId) {
|
||||
if (isBlockTokenEnabled()) {
|
||||
blockTokenSecretManager.setBlockPoolId(blockPoolId);
|
||||
|
@ -3599,7 +3603,7 @@ public class BlockManager {
|
|||
}
|
||||
|
||||
private void chooseTargets(BlockPlacementPolicy blockplacement,
|
||||
BlockStoragePolicy.Suite storagePolicySuite,
|
||||
BlockStoragePolicySuite storagePolicySuite,
|
||||
Set<Node> excludedNodes) {
|
||||
targets = blockplacement.chooseTarget(bc.getName(),
|
||||
additionalReplRequired, srcNode, liveReplicaStorages, false,
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.*;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/** A collection of block storage policies. */
|
||||
public class BlockStoragePolicySuite {
|
||||
static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicySuite
|
||||
.class);
|
||||
|
||||
public static final String STORAGE_POLICY_XATTR_NAME
|
||||
= "hsm.block.storage.policy.id";
|
||||
public static final XAttr.NameSpace XAttrNS = XAttr.NameSpace.SYSTEM;
|
||||
|
||||
public static final int ID_BIT_LENGTH = 4;
|
||||
public static final byte ID_UNSPECIFIED = 0;
|
||||
|
||||
@VisibleForTesting
|
||||
public static BlockStoragePolicySuite createDefaultSuite() {
|
||||
final BlockStoragePolicy[] policies =
|
||||
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
||||
final byte hotId = 12;
|
||||
policies[hotId] = new BlockStoragePolicy(hotId, "HOT",
|
||||
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
|
||||
new StorageType[]{StorageType.ARCHIVE});
|
||||
final byte warmId = 8;
|
||||
policies[warmId] = new BlockStoragePolicy(warmId, "WARM",
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
|
||||
final byte coldId = 4;
|
||||
policies[coldId] = new BlockStoragePolicy(coldId, "COLD",
|
||||
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
|
||||
StorageType.EMPTY_ARRAY);
|
||||
return new BlockStoragePolicySuite(hotId, policies);
|
||||
}
|
||||
|
||||
private final byte defaultPolicyID;
|
||||
private final BlockStoragePolicy[] policies;
|
||||
|
||||
public BlockStoragePolicySuite(byte defaultPolicyID,
|
||||
BlockStoragePolicy[] policies) {
|
||||
this.defaultPolicyID = defaultPolicyID;
|
||||
this.policies = policies;
|
||||
}
|
||||
|
||||
/** @return the corresponding policy. */
|
||||
public BlockStoragePolicy getPolicy(byte id) {
|
||||
// id == 0 means policy not specified.
|
||||
return id == 0? getDefaultPolicy(): policies[id];
|
||||
}
|
||||
|
||||
/** @return the default policy. */
|
||||
public BlockStoragePolicy getDefaultPolicy() {
|
||||
return getPolicy(defaultPolicyID);
|
||||
}
|
||||
|
||||
public BlockStoragePolicy getPolicy(String policyName) {
|
||||
if (policies != null) {
|
||||
for (BlockStoragePolicy policy : policies) {
|
||||
if (policy != null && policy.getName().equals(policyName)) {
|
||||
return policy;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public BlockStoragePolicy[] getAllPolicies() {
|
||||
List<BlockStoragePolicy> list = Lists.newArrayList();
|
||||
if (policies != null) {
|
||||
for (BlockStoragePolicy policy : policies) {
|
||||
if (policy != null) {
|
||||
list.add(policy);
|
||||
}
|
||||
}
|
||||
}
|
||||
return list.toArray(new BlockStoragePolicy[list.size()]);
|
||||
}
|
||||
|
||||
public static String buildXAttrName() {
|
||||
return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME;
|
||||
}
|
||||
|
||||
public static XAttr buildXAttr(byte policyId) {
|
||||
final String name = buildXAttrName();
|
||||
return XAttrHelper.buildXAttr(name, new byte[]{policyId});
|
||||
}
|
||||
|
||||
public static boolean isStoragePolicyXAttr(XAttr xattr) {
|
||||
return xattr != null && xattr.getNameSpace() == XAttrNS
|
||||
&& xattr.getName().equals(STORAGE_POLICY_XATTR_NAME);
|
||||
}
|
||||
}
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
|
|||
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
||||
import org.apache.hadoop.hdfs.server.balancer.Matcher;
|
||||
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
|
@ -102,7 +103,7 @@ public class Mover {
|
|||
private final StorageMap storages;
|
||||
private final List<Path> targetPaths;
|
||||
|
||||
private final BlockStoragePolicy.Suite blockStoragePolicies;
|
||||
private final BlockStoragePolicy[] blockStoragePolicies;
|
||||
|
||||
Mover(NameNodeConnector nnc, Configuration conf) {
|
||||
final long movedWinWidth = conf.getLong(
|
||||
|
@ -119,11 +120,13 @@ public class Mover {
|
|||
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
|
||||
maxConcurrentMovesPerNode, conf);
|
||||
this.storages = new StorageMap();
|
||||
this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
|
||||
this.targetPaths = nnc.getTargetPaths();
|
||||
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
|
||||
BlockStoragePolicySuite.ID_BIT_LENGTH];
|
||||
}
|
||||
|
||||
void init() throws IOException {
|
||||
initStoragePolicies();
|
||||
final List<DatanodeStorageReport> reports = dispatcher.init();
|
||||
for(DatanodeStorageReport r : reports) {
|
||||
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
||||
|
@ -137,6 +140,14 @@ public class Mover {
|
|||
}
|
||||
}
|
||||
|
||||
private void initStoragePolicies() throws IOException {
|
||||
BlockStoragePolicy[] policies = dispatcher.getDistributedFileSystem()
|
||||
.getStoragePolicySuite();
|
||||
for (BlockStoragePolicy policy : policies) {
|
||||
this.blockStoragePolicies[policy.getId()] = policy;
|
||||
}
|
||||
}
|
||||
|
||||
private ExitStatus run() {
|
||||
try {
|
||||
init();
|
||||
|
@ -305,7 +316,7 @@ public class Mover {
|
|||
if (!isSnapshotPathInCurrent(fullPath)) {
|
||||
// the full path is a snapshot path but it is also included in the
|
||||
// current directory tree, thus ignore it.
|
||||
hasRemaining = processFile((HdfsLocatedFileStatus)status);
|
||||
hasRemaining = processFile(fullPath, (HdfsLocatedFileStatus)status);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to check the status of " + parent
|
||||
|
@ -317,9 +328,17 @@ public class Mover {
|
|||
}
|
||||
|
||||
/** @return true if it is necessary to run another round of migration */
|
||||
private boolean processFile(HdfsLocatedFileStatus status) {
|
||||
final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
|
||||
status.getStoragePolicy());
|
||||
private boolean processFile(String fullPath, HdfsLocatedFileStatus status) {
|
||||
final byte policyId = status.getStoragePolicy();
|
||||
// currently we ignore files with unspecified storage policy
|
||||
if (policyId == BlockStoragePolicySuite.ID_UNSPECIFIED) {
|
||||
return false;
|
||||
}
|
||||
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
|
||||
if (policy == null) {
|
||||
LOG.warn("Failed to get the storage policy of file " + fullPath);
|
||||
return false;
|
||||
}
|
||||
final List<StorageType> types = policy.chooseStorageTypes(
|
||||
status.getReplication());
|
||||
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.fs.permission.AclEntry;
|
|||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
|
@ -79,6 +78,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
|
@ -1037,7 +1037,7 @@ public class FSDirectory implements Closeable {
|
|||
private void setDirStoragePolicy(INodeDirectory inode, byte policyId,
|
||||
int latestSnapshotId) throws IOException {
|
||||
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
|
||||
XAttr xAttr = BlockStoragePolicy.buildXAttr(policyId);
|
||||
XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId);
|
||||
List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, Arrays.asList(xAttr),
|
||||
EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
|
||||
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
|
||||
|
@ -1375,7 +1375,7 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
|
||||
private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
|
||||
return inodePolicy != BlockStoragePolicy.ID_UNSPECIFIED ? inodePolicy :
|
||||
return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
|
||||
parentPolicy;
|
||||
}
|
||||
|
||||
|
@ -1410,7 +1410,7 @@ public class FSDirectory implements Closeable {
|
|||
if (targetNode == null)
|
||||
return null;
|
||||
byte parentStoragePolicy = isSuperUser ?
|
||||
targetNode.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
targetNode.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
|
||||
if (!targetNode.isDirectory()) {
|
||||
return new DirectoryListing(
|
||||
|
@ -1430,7 +1430,8 @@ public class FSDirectory implements Closeable {
|
|||
for (int i=0; i<numOfListing && locationBudget>0; i++) {
|
||||
INode cur = contents.get(startChild+i);
|
||||
byte curPolicy = isSuperUser && !cur.isSymlink()?
|
||||
cur.getLocalStoragePolicyID(): BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
cur.getLocalStoragePolicyID():
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation,
|
||||
getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot,
|
||||
isRawPath, inodesInPath);
|
||||
|
@ -1484,7 +1485,7 @@ public class FSDirectory implements Closeable {
|
|||
for (int i = 0; i < numOfListing; i++) {
|
||||
Root sRoot = snapshots.get(i + skipSize).getRoot();
|
||||
listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
|
||||
BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
|
||||
false, null);
|
||||
}
|
||||
return new DirectoryListing(
|
||||
|
@ -1512,7 +1513,7 @@ public class FSDirectory implements Closeable {
|
|||
final INode[] inodes = inodesInPath.getINodes();
|
||||
final INode i = inodes[inodes.length - 1];
|
||||
byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ?
|
||||
i.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
|
||||
policyId, inodesInPath.getPathSnapshotId(), isRawPath,
|
||||
inodesInPath);
|
||||
|
@ -1532,7 +1533,8 @@ public class FSDirectory implements Closeable {
|
|||
throws UnresolvedLinkException {
|
||||
if (getINode4DotSnapshot(src) != null) {
|
||||
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
|
||||
HdfsFileStatus.EMPTY_NAME, -1L, 0, null, BlockStoragePolicy.ID_UNSPECIFIED);
|
||||
HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -373,7 +373,7 @@ public class FSEditLogLoader {
|
|||
if (toAddRetryCache) {
|
||||
HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
|
||||
HdfsFileStatus.EMPTY_NAME, newFile,
|
||||
BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID,
|
||||
false, iip);
|
||||
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
|
||||
addCloseOp.rpcCallId, stat);
|
||||
|
|
|
@ -17,8 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
|
||||
.EncryptedKeyVersion;
|
||||
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||
|
@ -160,7 +159,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
|
@ -2302,6 +2301,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
logAuditEvent(true, "setStoragePolicy", src, null, fileStat);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return All the existing block storage policies
|
||||
*/
|
||||
BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
|
||||
checkOperation(OperationCategory.READ);
|
||||
waitForLoadingFSImage();
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
return blockManager.getStoragePolicySuite();
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
long getPreferredBlockSize(String filename)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.hadoop.fs.ContentSummary;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
|
@ -695,7 +695,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
|
|||
|
||||
/**
|
||||
* @return the storage policy directly specified on the INode. Return
|
||||
* {@link BlockStoragePolicy#ID_UNSPECIFIED} if no policy has
|
||||
* {@link BlockStoragePolicySuite#ID_UNSPECIFIED} if no policy has
|
||||
* been specified.
|
||||
*/
|
||||
public abstract byte getLocalStoragePolicyID();
|
||||
|
|
|
@ -28,10 +28,11 @@ import java.util.Map;
|
|||
import org.apache.hadoop.fs.PathIsNotDirectoryException;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
|
||||
|
@ -112,22 +113,22 @@ public class INodeDirectory extends INodeWithAdditionalFields
|
|||
ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
|
||||
.getXAttrs();
|
||||
for (XAttr xattr : xattrs) {
|
||||
if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) {
|
||||
if (BlockStoragePolicySuite.isStoragePolicyXAttr(xattr)) {
|
||||
return (xattr.getValue())[0];
|
||||
}
|
||||
}
|
||||
return BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
return BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getStoragePolicyID() {
|
||||
byte id = getLocalStoragePolicyID();
|
||||
if (id != BlockStoragePolicy.ID_UNSPECIFIED) {
|
||||
if (id != BlockStoragePolicySuite.ID_UNSPECIFIED) {
|
||||
return id;
|
||||
}
|
||||
// if it is unspecified, check its parent
|
||||
return getParent() != null ? getParent().getStoragePolicyID() :
|
||||
BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
}
|
||||
|
||||
void setQuota(long nsQuota, long dsQuota) {
|
||||
|
|
|
@ -18,12 +18,9 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.XAttr;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
/**
|
||||
* The attributes of an inode.
|
||||
|
|
|
@ -28,12 +28,12 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
|
||||
|
@ -79,7 +79,8 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
static enum HeaderFormat {
|
||||
PREFERRED_BLOCK_SIZE(null, 48, 1),
|
||||
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1),
|
||||
STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicy.ID_BIT_LENGTH, 0);
|
||||
STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH,
|
||||
0);
|
||||
|
||||
private final LongBitFormat BITS;
|
||||
|
||||
|
@ -374,7 +375,7 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
@Override
|
||||
public byte getStoragePolicyID() {
|
||||
byte id = getLocalStoragePolicyID();
|
||||
if (id == BlockStoragePolicy.ID_UNSPECIFIED) {
|
||||
if (id == BlockStoragePolicySuite.ID_UNSPECIFIED) {
|
||||
return this.getParent() != null ?
|
||||
this.getParent().getStoragePolicyID() : id;
|
||||
}
|
||||
|
|
|
@ -22,8 +22,9 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
|
||||
import org.apache.hadoop.util.GSet;
|
||||
import org.apache.hadoop.util.LightWeightGSet;
|
||||
|
@ -125,12 +126,12 @@ public class INodeMap {
|
|||
|
||||
@Override
|
||||
public byte getStoragePolicyID(){
|
||||
return BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
return BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getLocalStoragePolicyID() {
|
||||
return BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
return BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.inotify.EventsList;
|
|||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||
|
@ -592,6 +593,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
namesystem.setStoragePolicy(src, policyName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockStoragePolicy[] getStoragePolicySuite() throws IOException {
|
||||
return namesystem.getStoragePolicySuite();
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void setPermission(String src, FsPermission permissions)
|
||||
throws IOException {
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.hadoop.fs.FsStatus;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.shell.Command;
|
||||
import org.apache.hadoop.fs.shell.CommandFormat;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||
import org.apache.hadoop.ipc.GenericRefreshProtocol;
|
||||
|
@ -609,15 +610,18 @@ public class DFSAdmin extends FsShell {
|
|||
+ argv[1]);
|
||||
}
|
||||
byte storagePolicyId = status.getStoragePolicy();
|
||||
BlockStoragePolicy.Suite suite = BlockStoragePolicy
|
||||
.readBlockStorageSuite(getConf());
|
||||
BlockStoragePolicy policy = suite.getPolicy(storagePolicyId);
|
||||
if (policy != null) {
|
||||
System.out.println("The storage policy of " + argv[1] + ":\n" + policy);
|
||||
if (storagePolicyId == BlockStoragePolicySuite.ID_UNSPECIFIED) {
|
||||
System.out.println("The storage policy of " + argv[1] + " is unspecified");
|
||||
return 0;
|
||||
} else {
|
||||
throw new IOException("Cannot identify the storage policy for " + argv[1]);
|
||||
}
|
||||
BlockStoragePolicy[] policies = dfs.getStoragePolicySuite();
|
||||
for (BlockStoragePolicy p : policies) {
|
||||
if (p.getId() == storagePolicyId) {
|
||||
System.out.println("The storage policy of " + argv[1] + ":\n" + p);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
throw new IOException("Cannot identify the storage policy for " + argv[1]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,13 +21,14 @@ import org.apache.hadoop.fs.*;
|
|||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||
import org.apache.hadoop.hdfs.protocol.*;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -264,7 +265,7 @@ public class JsonUtil {
|
|||
: childrenNumLong.intValue();
|
||||
final byte storagePolicy = m.containsKey("storagePolicy") ?
|
||||
(byte) (long) (Long) m.get("storagePolicy") :
|
||||
BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
|
||||
blockSize, mTime, aTime, permission, owner, group, symlink,
|
||||
DFSUtil.string2Bytes(localName), fileId, childrenNum, null, storagePolicy);
|
||||
|
|
|
@ -108,6 +108,13 @@ message SetStoragePolicyRequestProto {
|
|||
message SetStoragePolicyResponseProto { // void response
|
||||
}
|
||||
|
||||
message GetStoragePolicySuiteRequestProto { // void request
|
||||
}
|
||||
|
||||
message GetStoragePolicySuiteResponseProto {
|
||||
repeated BlockStoragePolicyProto policies = 1;
|
||||
}
|
||||
|
||||
message SetPermissionRequestProto {
|
||||
required string src = 1;
|
||||
required FsPermissionProto permission = 2;
|
||||
|
@ -699,6 +706,8 @@ service ClientNamenodeProtocol {
|
|||
returns(SetReplicationResponseProto);
|
||||
rpc setStoragePolicy(SetStoragePolicyRequestProto)
|
||||
returns(SetStoragePolicyResponseProto);
|
||||
rpc getStoragePolicySuite(GetStoragePolicySuiteRequestProto)
|
||||
returns(GetStoragePolicySuiteResponseProto);
|
||||
rpc setPermission(SetPermissionRequestProto)
|
||||
returns(SetPermissionResponseProto);
|
||||
rpc setOwner(SetOwnerRequestProto) returns(SetOwnerResponseProto);
|
||||
|
|
|
@ -168,6 +168,20 @@ message StorageTypesProto {
|
|||
repeated StorageTypeProto storageTypes = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Block replica storage policy.
|
||||
*/
|
||||
message BlockStoragePolicyProto {
|
||||
required uint32 policyId = 1;
|
||||
required string name = 2;
|
||||
// a list of storage types for storing the block replicas when creating a
|
||||
// block.
|
||||
required StorageTypesProto creationPolicy = 3;
|
||||
// A list of storage types for creation fallback storage.
|
||||
optional StorageTypesProto creationFallbackPolicy = 4;
|
||||
optional StorageTypesProto replicationFallbackPolicy = 5;
|
||||
}
|
||||
|
||||
/**
|
||||
* A list of storage IDs.
|
||||
*/
|
||||
|
|
|
@ -22,8 +22,7 @@
|
|||
<!-- wish to modify from this file into hdfs-site.xml and change them -->
|
||||
<!-- there. If hdfs-site.xml does not already exist, create it. -->
|
||||
|
||||
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
|
||||
<xi:include href="blockStoragePolicy-default.xml" />
|
||||
<configuration>
|
||||
|
||||
<property>
|
||||
<name>hadoop.hdfs.configuration.version</name>
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
|
||||
import static org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
|
@ -44,7 +45,7 @@ import org.junit.Test;
|
|||
|
||||
/** Test {@link BlockStoragePolicy} */
|
||||
public class TestBlockStoragePolicy {
|
||||
public static final BlockStoragePolicy.Suite POLICY_SUITE;
|
||||
public static final BlockStoragePolicySuite POLICY_SUITE;
|
||||
public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY;
|
||||
public static final Configuration conf;
|
||||
|
||||
|
@ -52,7 +53,7 @@ public class TestBlockStoragePolicy {
|
|||
conf = new HdfsConfiguration();
|
||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
||||
POLICY_SUITE = BlockStoragePolicy.readBlockStorageSuite(conf);
|
||||
POLICY_SUITE = BlockStoragePolicySuite.createDefaultSuite();
|
||||
DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy();
|
||||
}
|
||||
|
||||
|
@ -948,7 +949,7 @@ public class TestBlockStoragePolicy {
|
|||
Assert.assertTrue(typeList.isEmpty());
|
||||
}
|
||||
|
||||
private void testIncreaseFileRep(String policyName, byte policyId,
|
||||
private void testChangeFileRep(String policyName, byte policyId,
|
||||
StorageType[] before,
|
||||
StorageType[] after) throws Exception {
|
||||
final int numDataNodes = 5;
|
||||
|
@ -965,8 +966,6 @@ public class TestBlockStoragePolicy {
|
|||
final Path foo = new Path(dir, "foo");
|
||||
DFSTestUtil.createFile(fs, foo, FILE_LEN, REPLICATION, 0L);
|
||||
|
||||
// the storage policy of foo should be WARM, and the replicas
|
||||
// should be stored in DISK and ARCHIE
|
||||
HdfsFileStatus[] status = fs.getClient().listPaths(foo.toString(),
|
||||
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
|
||||
checkDirectoryListing(status, policyId);
|
||||
|
@ -984,7 +983,24 @@ public class TestBlockStoragePolicy {
|
|||
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
|
||||
checkDirectoryListing(status, policyId);
|
||||
fooStatus = (HdfsLocatedFileStatus) status[0];
|
||||
checkLocatedBlocks(fooStatus, 1, 5, after);
|
||||
checkLocatedBlocks(fooStatus, 1, numDataNodes, after);
|
||||
|
||||
// change the replication factor back to 3
|
||||
fs.setReplication(foo, REPLICATION);
|
||||
Thread.sleep(1000);
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
DataNodeTestUtils.triggerBlockReport(dn);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
status = fs.getClient().listPaths(foo.toString(),
|
||||
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
|
||||
checkDirectoryListing(status, policyId);
|
||||
fooStatus = (HdfsLocatedFileStatus) status[0];
|
||||
checkLocatedBlocks(fooStatus, 1, REPLICATION, before);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
@ -995,11 +1011,12 @@ public class TestBlockStoragePolicy {
|
|||
* that file from 3 to 5. Make sure all replications are created in DISKS.
|
||||
*/
|
||||
@Test
|
||||
public void testIncreaseHotFileRep() throws Exception {
|
||||
testIncreaseFileRep("HOT", HOT, new StorageType[]{StorageType.DISK,
|
||||
StorageType.DISK, StorageType.DISK},
|
||||
public void testChangeHotFileRep() throws Exception {
|
||||
testChangeFileRep("HOT", HOT,
|
||||
new StorageType[]{StorageType.DISK, StorageType.DISK,
|
||||
StorageType.DISK, StorageType.DISK, StorageType.DISK});
|
||||
StorageType.DISK},
|
||||
new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK,
|
||||
StorageType.DISK, StorageType.DISK});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1008,9 +1025,10 @@ public class TestBlockStoragePolicy {
|
|||
* and ARCHIVE.
|
||||
*/
|
||||
@Test
|
||||
public void testIncreaseWarmRep() throws Exception {
|
||||
testIncreaseFileRep("WARM", WARM, new StorageType[]{StorageType.DISK,
|
||||
StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||
public void testChangeWarmRep() throws Exception {
|
||||
testChangeFileRep("WARM", WARM,
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
|
||||
StorageType.ARCHIVE},
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
|
||||
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
|
||||
}
|
||||
|
@ -1020,9 +1038,10 @@ public class TestBlockStoragePolicy {
|
|||
* that file from 3 to 5. Make sure all replicas are created in ARCHIVE.
|
||||
*/
|
||||
@Test
|
||||
public void testIncreaseColdRep() throws Exception {
|
||||
testIncreaseFileRep("COLD", COLD, new StorageType[]{StorageType.ARCHIVE,
|
||||
StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||
public void testChangeColdRep() throws Exception {
|
||||
testChangeFileRep("COLD", COLD,
|
||||
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
|
||||
StorageType.ARCHIVE},
|
||||
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
|
||||
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
|
||||
}
|
||||
|
@ -1072,4 +1091,28 @@ public class TestBlockStoragePolicy {
|
|||
System.out.println(Arrays.asList(targets));
|
||||
Assert.assertEquals(3, targets.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test getting all the storage policies from the namenode
|
||||
*/
|
||||
@Test
|
||||
public void testGetAllStoragePolicies() throws Exception {
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(0).build();
|
||||
cluster.waitActive();
|
||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||
try {
|
||||
BlockStoragePolicy[] policies = fs.getStoragePolicySuite();
|
||||
Assert.assertEquals(3, policies.length);
|
||||
Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
|
||||
policies[0].toString());
|
||||
Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
|
||||
policies[1].toString());
|
||||
Assert.assertEquals(POLICY_SUITE.getPolicy(HOT).toString(),
|
||||
policies[2].toString());
|
||||
} finally {
|
||||
IOUtils.cleanup(null, fs);
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -60,6 +62,11 @@ public class TestStoragePolicyCommands {
|
|||
final Path bar = new Path(foo, "bar");
|
||||
DFSTestUtil.createFile(fs, bar, SIZE, REPL, 0);
|
||||
|
||||
DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo", 0,
|
||||
"The storage policy of " + foo.toString() + " is unspecified", conf);
|
||||
DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo/bar", 0,
|
||||
"The storage policy of " + bar.toString() + " is unspecified", conf);
|
||||
|
||||
DFSTestUtil.DFSAdminRun("-setStoragePolicy /foo WARM", 0,
|
||||
"Set storage policy WARM on " + foo.toString(), conf);
|
||||
DFSTestUtil.DFSAdminRun("-setStoragePolicy /foo/bar COLD", 0,
|
||||
|
@ -67,8 +74,8 @@ public class TestStoragePolicyCommands {
|
|||
DFSTestUtil.DFSAdminRun("-setStoragePolicy /fooz WARM", -1,
|
||||
"File/Directory does not exist: /fooz", conf);
|
||||
|
||||
final BlockStoragePolicy.Suite suite = BlockStoragePolicy
|
||||
.readBlockStorageSuite(conf);
|
||||
final BlockStoragePolicySuite suite = BlockStoragePolicySuite
|
||||
.createDefaultSuite();
|
||||
final BlockStoragePolicy warm = suite.getPolicy("WARM");
|
||||
final BlockStoragePolicy cold = suite.getPolicy("COLD");
|
||||
DFSTestUtil.DFSAdminRun("-getStoragePolicy /foo", 0,
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.conf.ReconfigurationException;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
|
|||
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
||||
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
@ -87,7 +88,7 @@ public class TestStorageMover {
|
|||
private static final short REPL = 3;
|
||||
private static final int NUM_DATANODES = 6;
|
||||
private static final Configuration DEFAULT_CONF = new HdfsConfiguration();
|
||||
private static final BlockStoragePolicy.Suite DEFAULT_POLICIES;
|
||||
private static final BlockStoragePolicySuite DEFAULT_POLICIES;
|
||||
private static final BlockStoragePolicy HOT;
|
||||
private static final BlockStoragePolicy WARM;
|
||||
private static final BlockStoragePolicy COLD;
|
||||
|
@ -99,7 +100,7 @@ public class TestStorageMover {
|
|||
2L);
|
||||
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
|
||||
|
||||
DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF);
|
||||
DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite();
|
||||
HOT = DEFAULT_POLICIES.getPolicy("HOT");
|
||||
WARM = DEFAULT_POLICIES.getPolicy("WARM");
|
||||
COLD = DEFAULT_POLICIES.getPolicy("COLD");
|
||||
|
@ -192,13 +193,21 @@ public class TestStorageMover {
|
|||
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem dfs;
|
||||
private final BlockStoragePolicy.Suite policies;
|
||||
private final BlockStoragePolicySuite policies;
|
||||
|
||||
MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) {
|
||||
this.clusterScheme = cScheme;
|
||||
this.nsScheme = nsScheme;
|
||||
this.conf = clusterScheme.conf;
|
||||
this.policies = BlockStoragePolicy.readBlockStorageSuite(conf);
|
||||
this.policies = DEFAULT_POLICIES;
|
||||
}
|
||||
|
||||
MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme,
|
||||
BlockStoragePolicySuite policies) {
|
||||
this.clusterScheme = cScheme;
|
||||
this.nsScheme = nsScheme;
|
||||
this.conf = clusterScheme.conf;
|
||||
this.policies = policies;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
|
|
Loading…
Reference in New Issue