HBASE-19858 Backport HBASE-14061 (Support CF-level Storage Policy) to branch-1
HBASE-14061 Support CF-level Storage Policy HBASE-14061 Support CF-level Storage Policy (addendum) HBASE-14061 Support CF-level Storage Policy (addendum2) HBASE-15172 Support setting storage policy in bulkload HBASE-17538 HDFS.setStoragePolicy() logs errors on local fs HBASE-18015 Storage class aware block placement for procedure v2 WALs HBASE-18017 Reduce frequency of setStoragePolicy failure warnings Default storage policy if not configured cannot be "NONE" HBASE-19016 Coordinate storage policy property name for table schema and bulkload Fix checkstyle warnings Addressed additional review feedback on backport
This commit is contained in:
parent
1a930cff6d
commit
f35bcd2fe3
|
@ -127,6 +127,8 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
|||
public static final String DFS_REPLICATION = "DFS_REPLICATION";
|
||||
public static final short DEFAULT_DFS_REPLICATION = 0;
|
||||
|
||||
public static final String STORAGE_POLICY = "STORAGE_POLICY";
|
||||
|
||||
/**
|
||||
* Default compression type.
|
||||
*/
|
||||
|
@ -1567,4 +1569,24 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
|||
setValue(DFS_REPLICATION, Short.toString(replication));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the storage policy in use by this family
|
||||
* <p/>
|
||||
* Not using {@code enum} here because HDFS is not using {@code enum} for storage policy, see
|
||||
* org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite for more details
|
||||
*/
|
||||
public String getStoragePolicy() {
|
||||
return getValue(STORAGE_POLICY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the storage policy for use with this family
|
||||
* @param policy the policy to set, valid setting includes: <i>"LAZY_PERSIST"</i>,
|
||||
* <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>, <i>"COLD"</i>
|
||||
*/
|
||||
public HColumnDescriptor setStoragePolicy(String policy) {
|
||||
setValue(STORAGE_POLICY, policy);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1086,14 +1086,10 @@ public final class HConstants {
|
|||
"hbase.regionserver.wal.enablecompression";
|
||||
|
||||
/** Configuration name of WAL storage policy
|
||||
* Valid values are:
|
||||
* NONE: no preference in destination of block replicas
|
||||
* ONE_SSD: place only one block replica in SSD and the remaining in default storage
|
||||
* and ALL_SSD: place all block replicas on SSD
|
||||
*
|
||||
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html*/
|
||||
* Valid values are: HOT, COLD, WARM, ALL_SSD, ONE_SSD, LAZY_PERSIST
|
||||
* See http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html*/
|
||||
public static final String WAL_STORAGE_POLICY = "hbase.wal.storage.policy";
|
||||
public static final String DEFAULT_WAL_STORAGE_POLICY = "NONE";
|
||||
public static final String DEFAULT_WAL_STORAGE_POLICY = "HOT";
|
||||
|
||||
/** Region in Transition metrics threshold time */
|
||||
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.lang.management.ThreadInfo;
|
|||
import java.lang.management.ThreadMXBean;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -188,4 +189,37 @@ public class ReflectionUtils {
|
|||
return id + " (" + name + ")";
|
||||
}
|
||||
|
||||
/**
|
||||
* Get and invoke the target method from the given object with given parameters
|
||||
* @param obj the object to get and invoke method from
|
||||
* @param methodName the name of the method to invoke
|
||||
* @param params the parameters for the method to invoke
|
||||
* @return the return value of the method invocation
|
||||
*/
|
||||
public static Object invokeMethod(Object obj, String methodName, Object... params) {
|
||||
Method m;
|
||||
try {
|
||||
m = obj.getClass().getMethod(methodName, getParameterTypes(params));
|
||||
m.setAccessible(true);
|
||||
return m.invoke(obj, params);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new UnsupportedOperationException("Cannot find specified method " + methodName, e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new UnsupportedOperationException("Unable to access specified method " + methodName, e);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new UnsupportedOperationException("Illegal arguments supplied for method " + methodName,
|
||||
e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new UnsupportedOperationException("Method threw an exception for " + methodName, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Class<?>[] getParameterTypes(Object[] params) {
|
||||
Class<?>[] parameterTypes = new Class<?>[params.length];
|
||||
for (int i = 0; i < params.length; i++) {
|
||||
parameterTypes[i] = params[i].getClass();
|
||||
}
|
||||
return parameterTypes;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -41,16 +41,20 @@ import org.apache.hadoop.fs.FilterFileSystem;
|
|||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
|
||||
|
||||
|
@ -67,6 +71,7 @@ public class HFileSystem extends FilterFileSystem {
|
|||
|
||||
private final FileSystem noChecksumFs; // read hfile data from storage
|
||||
private final boolean useHBaseChecksum;
|
||||
private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
|
||||
|
||||
/**
|
||||
* Create a FileSystem object for HBase regionservers.
|
||||
|
@ -180,7 +185,7 @@ public class HFileSystem extends FilterFileSystem {
|
|||
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
|
||||
if (clazz != null) {
|
||||
// This will be true for Hadoop 1.0, or 0.20.
|
||||
fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
|
||||
fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
|
||||
fs.initialize(uri, conf);
|
||||
} else {
|
||||
// For Hadoop 2.0, we have to go through FileSystem for the filesystem
|
||||
|
@ -421,4 +426,72 @@ public class HFileSystem extends FilterFileSystem {
|
|||
return fs.createNonRecursive(f, overwrite, bufferSize, replication,
|
||||
blockSize, progress);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the source path (directory/file) to the specified storage policy.
|
||||
* @param path The source path (directory/file).
|
||||
* @param policyName The name of the storage policy: 'HOT', 'COLD', etc.
|
||||
* See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
|
||||
* 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
|
||||
*/
|
||||
public void setStoragePolicy(Path path, String policyName) {
|
||||
FSUtils.setStoragePolicy(this.fs, path, policyName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the storage policy of the source path (directory/file).
|
||||
* @param path The source path (directory/file).
|
||||
* @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or
|
||||
* exception thrown when trying to get policy
|
||||
*/
|
||||
public String getStoragePolicyName(Path path) {
|
||||
try {
|
||||
Object blockStoragePolicySpi =
|
||||
ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path);
|
||||
return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
|
||||
} catch (Exception e) {
|
||||
// Maybe fail because of using old HDFS version, try the old way
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Failed to get policy directly", e);
|
||||
}
|
||||
return getStoragePolicyForOldHDFSVersion(path);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need
|
||||
* to keep compatible with it. See HADOOP-12161 for more details.
|
||||
* @param path Path to get storage policy against
|
||||
* @return the storage policy name
|
||||
*/
|
||||
private String getStoragePolicyForOldHDFSVersion(Path path) {
|
||||
try {
|
||||
if (this.fs instanceof DistributedFileSystem) {
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) this.fs;
|
||||
HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
|
||||
if (null != status) {
|
||||
if (unspecifiedStoragePolicyId < 0) {
|
||||
// Get the unspecified id field through reflection to avoid compilation error.
|
||||
// In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to
|
||||
// HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
|
||||
Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
|
||||
unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class);
|
||||
}
|
||||
byte storagePolicyId = status.getStoragePolicy();
|
||||
if (storagePolicyId != unspecifiedStoragePolicyId) {
|
||||
BlockStoragePolicy[] policies = dfs.getStoragePolicies();
|
||||
for (BlockStoragePolicy policy : policies) {
|
||||
if (policy.getId() == storagePolicyId) {
|
||||
return policy.getName();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("failed to get block storage policy of [" + path + "]", e);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
|
|||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -124,6 +125,9 @@ public class HFileOutputFormat2
|
|||
private static final String OUTPUT_TABLE_NAME_CONF_KEY =
|
||||
"hbase.mapreduce.hfileoutputformat.table.name";
|
||||
|
||||
public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
|
||||
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
|
||||
|
||||
@Override
|
||||
public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
|
||||
final TaskAttemptContext context) throws IOException, InterruptedException {
|
||||
|
@ -190,7 +194,9 @@ public class HFileOutputFormat2
|
|||
|
||||
// If this is a new column family, verify that the directory exists
|
||||
if (wl == null) {
|
||||
fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
|
||||
Path cfPath = new Path(outputdir, Bytes.toString(family));
|
||||
fs.mkdirs(cfPath);
|
||||
configureStoragePolicy(conf, fs, family, cfPath);
|
||||
}
|
||||
|
||||
// If any of the HFiles for the column families has reached
|
||||
|
@ -351,6 +357,21 @@ public class HFileOutputFormat2
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure block storage policy for CF after the directory is created.
|
||||
*/
|
||||
static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
|
||||
byte[] family, Path cfPath) {
|
||||
if (null == conf || null == fs || null == family || null == cfPath) {
|
||||
return;
|
||||
}
|
||||
String policy =
|
||||
conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
|
||||
conf.get(STORAGE_POLICY_PROPERTY));
|
||||
|
||||
FSUtils.setStoragePolicy(fs, cfPath, policy);
|
||||
}
|
||||
|
||||
/*
|
||||
* Data structure to hold a Writer and amount of data written on it.
|
||||
*/
|
||||
|
|
|
@ -48,6 +48,7 @@ import javax.servlet.http.HttpServletResponse;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateException;
|
||||
|
@ -1304,7 +1305,19 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
|
||||
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
|
||||
|
||||
procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
|
||||
final FileSystem walFs = walDir.getFileSystem(conf);
|
||||
|
||||
// Create the log directory for the procedure store
|
||||
if (!walFs.exists(walDir)) {
|
||||
if (!walFs.mkdirs(walDir)) {
|
||||
throw new IOException("Unable to mkdir " + walDir);
|
||||
}
|
||||
}
|
||||
// Now that it exists, set the log policy
|
||||
FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
|
||||
HConstants.DEFAULT_WAL_STORAGE_POLICY);
|
||||
|
||||
procedureStore = new WALProcedureStore(conf, walFs, walDir,
|
||||
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
|
||||
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
|
||||
procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
|
||||
|
|
|
@ -178,6 +178,36 @@ public class HRegionFileSystem {
|
|||
return storeDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set storage policy for a given column family.
|
||||
* <p>
|
||||
* If we're running on a version of HDFS that doesn't support the given storage policy
|
||||
* (or storage policies at all), then we'll issue a log message and continue.
|
||||
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
|
||||
* for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
|
||||
*
|
||||
* @param familyName The name of column family.
|
||||
* @param policyName The name of the storage policy
|
||||
*/
|
||||
public void setStoragePolicy(String familyName, String policyName) {
|
||||
FSUtils.setStoragePolicy(this.fs, getStoreDir(familyName), policyName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the storage policy of the directory of CF.
|
||||
* @param familyName The name of column family.
|
||||
* @return Storage policy name, or {@code null} if not using {@link HFileSystem} or exception
|
||||
* thrown when trying to get policy
|
||||
*/
|
||||
public String getStoragePolicyName(String familyName) {
|
||||
if (this.fs instanceof HFileSystem) {
|
||||
Path storeDir = getStoreDir(familyName);
|
||||
return ((HFileSystem) this.fs).getStoragePolicyName(storeDir);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the store files available for the family.
|
||||
* This methods performs the filtering based on the valid store files.
|
||||
|
|
|
@ -124,6 +124,9 @@ public class HStore implements Store {
|
|||
public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
|
||||
"hbase.server.compactchecker.interval.multiplier";
|
||||
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
|
||||
public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
|
||||
// keep in accordance with HDFS default storage policy
|
||||
public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT";
|
||||
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
|
||||
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
|
||||
|
||||
|
@ -237,6 +240,13 @@ public class HStore implements Store {
|
|||
.addWritableMap(family.getValues());
|
||||
this.blocksize = family.getBlocksize();
|
||||
|
||||
// set block storage policy for store directory
|
||||
String policyName = family.getStoragePolicy();
|
||||
if (null == policyName) {
|
||||
policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
|
||||
}
|
||||
this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
|
||||
|
||||
this.dataBlockEncoder =
|
||||
new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
|
||||
|
||||
|
@ -1078,11 +1088,12 @@ public class HStore implements Store {
|
|||
favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
|
||||
region.getRegionInfo().getEncodedName());
|
||||
}
|
||||
Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString());
|
||||
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
|
||||
cryptoContext);
|
||||
StoreFile.WriterBuilder builder = new StoreFile.WriterBuilder(conf, writerCacheConf,
|
||||
this.getFileSystem())
|
||||
.withFilePath(fs.createTempName())
|
||||
.withOutputDir(familyTempDir)
|
||||
.withComparator(comparator)
|
||||
.withBloomType(family.getBloomFilterType())
|
||||
.withMaxKeyCount(maxKeyCount)
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
|
|||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -726,6 +728,14 @@ public class StoreFile {
|
|||
HRegionFileSystem.mkdirs(fs, conf, dir);
|
||||
}
|
||||
|
||||
// set block storage policy for temp path
|
||||
String policyName = this.conf.get(HColumnDescriptor.STORAGE_POLICY);
|
||||
if (null == policyName) {
|
||||
policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY,
|
||||
HStore.DEFAULT_BLOCK_STORAGE_POLICY);
|
||||
}
|
||||
FSUtils.setStoragePolicy(this.fs, dir, policyName);
|
||||
|
||||
if (filePath == null) {
|
||||
filePath = getUniqueFile(fs, dir);
|
||||
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
|
||||
|
|
|
@ -141,59 +141,122 @@ public abstract class FSUtils {
|
|||
public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
|
||||
final Path path, final String policyKey, final String defaultPolicy) {
|
||||
String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
|
||||
if (storagePolicy.equals(defaultPolicy)) {
|
||||
setStoragePolicy(fs, path, storagePolicy);
|
||||
}
|
||||
|
||||
private static final Map<FileSystem, Boolean> warningMap =
|
||||
new ConcurrentHashMap<FileSystem, Boolean>();
|
||||
|
||||
/**
|
||||
* Sets storage policy for given path.
|
||||
* <p>
|
||||
* If the passed path is a directory, we'll set the storage policy for all files
|
||||
* created in the future in said directory. Note that this change in storage
|
||||
* policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
|
||||
* If we're running on a version of HDFS that doesn't support the given storage policy
|
||||
* (or storage policies at all), then we'll issue a log message and continue.
|
||||
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
|
||||
* for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
|
||||
*
|
||||
* @param fs We only do anything if an instance of DistributedFileSystem
|
||||
* @param path the Path whose storage policy is to be set
|
||||
* @param storagePolicy Policy to set on <code>path</code>
|
||||
*/
|
||||
public static void setStoragePolicy(final FileSystem fs, final Path path,
|
||||
final String storagePolicy) {
|
||||
if (storagePolicy == null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
|
||||
LOG.trace("We were passed a null storagePolicy, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
DistributedFileSystem dfs = (DistributedFileSystem)fs;
|
||||
// Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
|
||||
Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
|
||||
Method m = null;
|
||||
try {
|
||||
m = dfsClass.getDeclaredMethod("setStoragePolicy",
|
||||
new Class<?>[] { Path.class, String.class });
|
||||
m.setAccessible(true);
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.info("FileSystem doesn't support"
|
||||
+ " setStoragePolicy; --HDFS-6584 not available");
|
||||
} catch (SecurityException e) {
|
||||
LOG.info("Doesn't have access to setStoragePolicy on "
|
||||
+ "FileSystems --HDFS-6584 not available", e);
|
||||
m = null; // could happen on setAccessible()
|
||||
final String trimmedStoragePolicy = storagePolicy.trim();
|
||||
if (trimmedStoragePolicy.isEmpty()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed an empty storagePolicy, exiting early.");
|
||||
}
|
||||
if (m != null) {
|
||||
try {
|
||||
m.invoke(dfs, path, storagePolicy);
|
||||
LOG.info("set " + storagePolicy + " for " + path);
|
||||
} catch (Exception e) {
|
||||
// check for lack of HDFS-7228
|
||||
boolean probablyBadPolicy = false;
|
||||
if (e instanceof InvocationTargetException) {
|
||||
final Throwable exception = e.getCause();
|
||||
if (exception instanceof RemoteException &&
|
||||
HadoopIllegalArgumentException.class.getName().equals(
|
||||
((RemoteException)exception).getClassName())) {
|
||||
LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
|
||||
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
|
||||
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
|
||||
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
|
||||
LOG.debug("More information about the invalid storage policy.", exception);
|
||||
probablyBadPolicy = true;
|
||||
return;
|
||||
}
|
||||
boolean distributed = false;
|
||||
try {
|
||||
distributed = isDistributedFileSystem(fs);
|
||||
} catch (IOException ioe) {
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
|
||||
+ "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
|
||||
+ " on path=" + path);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
|
||||
+ "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
|
||||
+ " on path=" + path);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (distributed) {
|
||||
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* All args have been checked and are good. Run the setStoragePolicy invocation.
|
||||
*/
|
||||
private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
|
||||
final String storagePolicy) {
|
||||
Method m = null;
|
||||
try {
|
||||
m = fs.getClass().getDeclaredMethod("setStoragePolicy",
|
||||
new Class<?>[] { Path.class, String.class });
|
||||
m.setAccessible(true);
|
||||
} catch (NoSuchMethodException e) {
|
||||
final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available";
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn(msg, e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(msg, e);
|
||||
}
|
||||
m = null;
|
||||
} catch (SecurityException e) {
|
||||
final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available";
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn(msg, e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(msg, e);
|
||||
}
|
||||
m = null; // could happen on setAccessible()
|
||||
}
|
||||
if (m != null) {
|
||||
try {
|
||||
m.invoke(fs, path, storagePolicy);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
|
||||
// misuse than a runtime problem with HDFS.
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
|
||||
}
|
||||
// check for lack of HDFS-7228
|
||||
if (e instanceof InvocationTargetException) {
|
||||
final Throwable exception = e.getCause();
|
||||
if (exception instanceof RemoteException &&
|
||||
HadoopIllegalArgumentException.class.getName().equals(
|
||||
((RemoteException)exception).getClassName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
|
||||
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
|
||||
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
|
||||
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
|
||||
}
|
||||
}
|
||||
if (!probablyBadPolicy) {
|
||||
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
|
||||
// misuse than a runtime problem with HDFS.
|
||||
LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
|
||||
"support setStoragePolicy.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -210,7 +273,7 @@ public abstract class FSUtils {
|
|||
}
|
||||
return fileSystem instanceof DistributedFileSystem;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Compare of path component. Does not consider schema; i.e. if schemas
|
||||
* different but <code>path</code> starts with <code>rootPath</code>,
|
||||
|
|
|
@ -186,10 +186,13 @@ public class TestCompaction {
|
|||
Store s = r.stores.get(COLUMN_FAMILY);
|
||||
assertEquals(compactionThreshold, s.getStorefilesCount());
|
||||
assertTrue(s.getStorefilesSize() > 15*1000);
|
||||
// and no new store files persisted past compactStores()
|
||||
// only one empty dir exists in temp dir
|
||||
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
|
||||
assertEquals(1, ls.length);
|
||||
Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
|
||||
assertTrue(r.getFilesystem().exists(storeTempDir));
|
||||
ls = r.getFilesystem().listStatus(storeTempDir);
|
||||
assertEquals(0, ls.length);
|
||||
|
||||
} finally {
|
||||
// don't mess up future tests
|
||||
r.writestate.writesEnabled = true;
|
||||
|
|
|
@ -912,7 +912,7 @@ public class TestHRegion {
|
|||
assertEquals(3, region.getStore(family).getStorefilesCount());
|
||||
|
||||
// now find the compacted file, and manually add it to the recovered edits
|
||||
Path tmpDir = region.getRegionFileSystem().getTempDir();
|
||||
Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
|
||||
FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
|
||||
String errorMsg = "Expected to find 1 file in the region temp directory "
|
||||
+ "from the compaction, could not find any";
|
||||
|
|
|
@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -37,9 +40,16 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.PerformanceEvaluation;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
|
@ -50,6 +60,126 @@ import org.junit.experimental.categories.Category;
|
|||
public class TestHRegionFileSystem {
|
||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class);
|
||||
private static final byte[][] FAMILIES = {
|
||||
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
|
||||
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("TestTable");
|
||||
|
||||
@Test
|
||||
public void testBlockStoragePolicy() throws Exception {
|
||||
TEST_UTIL = new HBaseTestingUtility();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
TEST_UTIL.startMiniCluster();
|
||||
HTable table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
|
||||
assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table));
|
||||
HRegionFileSystem regionFs = getHRegionFS(table, conf);
|
||||
// the original block storage policy would be HOT
|
||||
String spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
|
||||
String spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
|
||||
LOG.debug("Storage policy of cf 0: [" + spA + "].");
|
||||
LOG.debug("Storage policy of cf 1: [" + spB + "].");
|
||||
assertEquals("HOT", spA);
|
||||
assertEquals("HOT", spB);
|
||||
|
||||
// Recreate table and make sure storage policy could be set through configuration
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "WARM");
|
||||
TEST_UTIL.startMiniCluster();
|
||||
table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
|
||||
regionFs = getHRegionFS(table, conf);
|
||||
|
||||
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
|
||||
spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
|
||||
spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
|
||||
LOG.debug("Storage policy of cf 0: [" + spA + "].");
|
||||
LOG.debug("Storage policy of cf 1: [" + spB + "].");
|
||||
assertEquals("WARM", spA);
|
||||
assertEquals("WARM", spB);
|
||||
|
||||
// alter table cf schema to change storage policies
|
||||
// and make sure it could override settings in conf
|
||||
HColumnDescriptor hcdA = new HColumnDescriptor(Bytes.toString(FAMILIES[0]));
|
||||
// alter through setting HStore#BLOCK_STORAGE_POLICY_KEY in HColumnDescriptor
|
||||
hcdA.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ONE_SSD");
|
||||
admin.modifyColumn(TABLE_NAME, hcdA);
|
||||
while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||
.isRegionsInTransition()) {
|
||||
Thread.sleep(200);
|
||||
LOG.debug("Waiting on table to finish schema altering");
|
||||
}
|
||||
// alter through HColumnDescriptor#setStoragePolicy
|
||||
HColumnDescriptor hcdB = new HColumnDescriptor(Bytes.toString(FAMILIES[1]));
|
||||
hcdB.setStoragePolicy("ALL_SSD");
|
||||
admin.modifyColumn(TABLE_NAME, hcdB);
|
||||
while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||
.isRegionsInTransition()) {
|
||||
Thread.sleep(200);
|
||||
LOG.debug("Waiting on table to finish schema altering");
|
||||
}
|
||||
spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
|
||||
spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
|
||||
LOG.debug("Storage policy of cf 0: [" + spA + "].");
|
||||
LOG.debug("Storage policy of cf 1: [" + spB + "].");
|
||||
assertNotNull(spA);
|
||||
assertEquals("ONE_SSD", spA);
|
||||
assertNotNull(spB);
|
||||
assertEquals("ALL_SSD", spB);
|
||||
|
||||
// flush memstore snapshot into 3 files
|
||||
for (long i = 0; i < 3; i++) {
|
||||
Put put = new Put(Bytes.toBytes(i));
|
||||
put.addColumn(FAMILIES[0], Bytes.toBytes(i), Bytes.toBytes(i));
|
||||
table.put(put);
|
||||
admin.flush(TABLE_NAME);
|
||||
}
|
||||
// there should be 3 files in store dir
|
||||
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
Path storePath = regionFs.getStoreDir(Bytes.toString(FAMILIES[0]));
|
||||
FileStatus[] storeFiles = FSUtils.listStatus(fs, storePath);
|
||||
assertNotNull(storeFiles);
|
||||
assertEquals(3, storeFiles.length);
|
||||
// store temp dir still exists but empty
|
||||
Path storeTempDir = new Path(regionFs.getTempDir(), Bytes.toString(FAMILIES[0]));
|
||||
assertTrue(fs.exists(storeTempDir));
|
||||
FileStatus[] tempFiles = FSUtils.listStatus(fs, storeTempDir);
|
||||
assertNull(tempFiles);
|
||||
// storage policy of cf temp dir and 3 store files should be ONE_SSD
|
||||
assertEquals("ONE_SSD",
|
||||
((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(storeTempDir));
|
||||
for (FileStatus status : storeFiles) {
|
||||
assertEquals("ONE_SSD",
|
||||
((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(status.getPath()));
|
||||
}
|
||||
|
||||
// change storage policies by calling raw api directly
|
||||
regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD");
|
||||
regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD");
|
||||
spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
|
||||
spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
|
||||
LOG.debug("Storage policy of cf 0: [" + spA + "].");
|
||||
LOG.debug("Storage policy of cf 1: [" + spB + "].");
|
||||
assertNotNull(spA);
|
||||
assertEquals("ALL_SSD", spA);
|
||||
assertNotNull(spB);
|
||||
assertEquals("ONE_SSD", spB);
|
||||
} finally {
|
||||
table.close();
|
||||
TEST_UTIL.deleteTable(TABLE_NAME);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private HRegionFileSystem getHRegionFS(HTable table, Configuration conf) throws IOException {
|
||||
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
Path tableDir = FSUtils.getTableDir(TEST_UTIL.getDefaultRootDirPath(), table.getName());
|
||||
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
|
||||
assertEquals(1, regionDirs.size());
|
||||
List<Path> familyDirs = FSUtils.getFamilyDirs(fs, regionDirs.get(0));
|
||||
assertEquals(2, familyDirs.size());
|
||||
HRegionInfo hri = table.getRegionLocator().getAllRegionLocations().get(0).getRegionInfo();
|
||||
HRegionFileSystem regionFs = new HRegionFileSystem(conf, new HFileSystem(fs), tableDir, hri);
|
||||
return regionFs;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnDiskRegionCreation() throws IOException {
|
||||
|
|
|
@ -859,6 +859,10 @@ module Hbase
|
|||
algorithm))
|
||||
end
|
||||
end
|
||||
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
|
||||
storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
|
||||
family.setStoragePolicy(storage_policy)
|
||||
end
|
||||
|
||||
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
|
||||
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
|
||||
|
|
Loading…
Reference in New Issue