diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 560b74b2b66..4a6550b64c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -127,6 +127,8 @@ public class HColumnDescriptor implements WritableComparable public static final String DFS_REPLICATION = "DFS_REPLICATION"; public static final short DEFAULT_DFS_REPLICATION = 0; + public static final String STORAGE_POLICY = "STORAGE_POLICY"; + /** * Default compression type. */ @@ -1567,4 +1569,24 @@ public class HColumnDescriptor implements WritableComparable setValue(DFS_REPLICATION, Short.toString(replication)); return this; } + + /** + * Return the storage policy in use by this family + *

+ * Not using {@code enum} here because HDFS is not using {@code enum} for storage policy, see + * org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite for more details + */ + public String getStoragePolicy() { + return getValue(STORAGE_POLICY); + } + + /** + * Set the storage policy for use with this family + * @param policy the policy to set, valid setting includes: "LAZY_PERSIST", + * "ALL_SSD", "ONE_SSD", "HOT", "WARM", "COLD" + */ + public HColumnDescriptor setStoragePolicy(String policy) { + setValue(STORAGE_POLICY, policy); + return this; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c9f9ded530e..63dade1dafa 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1086,14 +1086,10 @@ public final class HConstants { "hbase.regionserver.wal.enablecompression"; /** Configuration name of WAL storage policy - * Valid values are: - * NONE: no preference in destination of block replicas - * ONE_SSD: place only one block replica in SSD and the remaining in default storage - * and ALL_SSD: place all block replicas on SSD - * - * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html*/ + * Valid values are: HOT, COLD, WARM, ALL_SSD, ONE_SSD, LAZY_PERSIST + * See http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html*/ public static final String WAL_STORAGE_POLICY = "hbase.wal.storage.policy"; - public static final String DEFAULT_WAL_STORAGE_POLICY = "NONE"; + public static final String DEFAULT_WAL_STORAGE_POLICY = "HOT"; /** Region in Transition metrics threshold time */ public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java index 15b393041c1..4b50a2ffc2a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java @@ -26,6 +26,7 @@ import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.charset.Charset; import org.apache.commons.logging.Log; @@ -188,4 +189,37 @@ public class ReflectionUtils { return id + " (" + name + ")"; } + /** + * Get and invoke the target method from the given object with given parameters + * @param obj the object to get and invoke method from + * @param methodName the name of the method to invoke + * @param params the parameters for the method to invoke + * @return the return value of the method invocation + */ + public static Object invokeMethod(Object obj, String methodName, Object... params) { + Method m; + try { + m = obj.getClass().getMethod(methodName, getParameterTypes(params)); + m.setAccessible(true); + return m.invoke(obj, params); + } catch (NoSuchMethodException e) { + throw new UnsupportedOperationException("Cannot find specified method " + methodName, e); + } catch (IllegalAccessException e) { + throw new UnsupportedOperationException("Unable to access specified method " + methodName, e); + } catch (IllegalArgumentException e) { + throw new UnsupportedOperationException("Illegal arguments supplied for method " + methodName, + e); + } catch (InvocationTargetException e) { + throw new UnsupportedOperationException("Method threw an exception for " + methodName, e); + } + } + + private static Class[] getParameterTypes(Object[] params) { + Class[] parameterTypes = new Class[params.length]; + for (int i = 0; i < params.length; i++) { + parameterTypes[i] = params[i].getClass(); + } + return parameterTypes; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java index 754ea651ee0..e55916f88a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java @@ -41,16 +41,20 @@ import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.ReflectionUtils; import static org.apache.hadoop.hbase.HConstants.HBASE_DIR; @@ -67,6 +71,7 @@ public class HFileSystem extends FilterFileSystem { private final FileSystem noChecksumFs; // read hfile data from storage private final boolean useHBaseChecksum; + private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; /** * Create a FileSystem object for HBase regionservers. @@ -180,7 +185,7 @@ public class HFileSystem extends FilterFileSystem { Class clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); if (clazz != null) { // This will be true for Hadoop 1.0, or 0.20. - fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); + fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); } else { // For Hadoop 2.0, we have to go through FileSystem for the filesystem @@ -421,4 +426,72 @@ public class HFileSystem extends FilterFileSystem { return fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, progress); } + + /** + * Set the source path (directory/file) to the specified storage policy. + * @param path The source path (directory/file). + * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. + * See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g + * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. + */ + public void setStoragePolicy(Path path, String policyName) { + FSUtils.setStoragePolicy(this.fs, path, policyName); + } + + /** + * Get the storage policy of the source path (directory/file). + * @param path The source path (directory/file). + * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or + * exception thrown when trying to get policy + */ + public String getStoragePolicyName(Path path) { + try { + Object blockStoragePolicySpi = + ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path); + return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); + } catch (Exception e) { + // Maybe fail because of using old HDFS version, try the old way + if (LOG.isTraceEnabled()) { + LOG.trace("Failed to get policy directly", e); + } + return getStoragePolicyForOldHDFSVersion(path); + } + } + + /** + * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need + * to keep compatible with it. See HADOOP-12161 for more details. + * @param path Path to get storage policy against + * @return the storage policy name + */ + private String getStoragePolicyForOldHDFSVersion(Path path) { + try { + if (this.fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) this.fs; + HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); + if (null != status) { + if (unspecifiedStoragePolicyId < 0) { + // Get the unspecified id field through reflection to avoid compilation error. + // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to + // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED + Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); + unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class); + } + byte storagePolicyId = status.getStoragePolicy(); + if (storagePolicyId != unspecifiedStoragePolicyId) { + BlockStoragePolicy[] policies = dfs.getStoragePolicies(); + for (BlockStoragePolicy policy : policies) { + if (policy.getId() == storagePolicyId) { + return policy.getName(); + } + } + } + } + } + } catch (Throwable e) { + LOG.warn("failed to get block storage policy of [" + path + "]", e); + } + + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 492b5d1e36e..31e7e5a7700 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -124,6 +125,9 @@ public class HFileOutputFormat2 private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; + public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; + public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { @@ -190,7 +194,9 @@ public class HFileOutputFormat2 // If this is a new column family, verify that the directory exists if (wl == null) { - fs.mkdirs(new Path(outputdir, Bytes.toString(family))); + Path cfPath = new Path(outputdir, Bytes.toString(family)); + fs.mkdirs(cfPath); + configureStoragePolicy(conf, fs, family, cfPath); } // If any of the HFiles for the column families has reached @@ -351,6 +357,21 @@ public class HFileOutputFormat2 }; } + /** + * Configure block storage policy for CF after the directory is created. + */ + static void configureStoragePolicy(final Configuration conf, final FileSystem fs, + byte[] family, Path cfPath) { + if (null == conf || null == fs || null == family || null == cfPath) { + return; + } + String policy = + conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family), + conf.get(STORAGE_POLICY_PROPERTY)); + + FSUtils.setStoragePolicy(fs, cfPath, policy); + } + /* * Data structure to hold a Writer and amount of data written on it. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index d7fc2e5db8e..f8bbc65250d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -48,6 +48,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.CoordinatedStateException; @@ -1304,7 +1305,19 @@ public class HMaster extends HRegionServer implements MasterServices, Server { final Path walDir = new Path(FSUtils.getWALRootDir(this.conf), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); - procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, + final FileSystem walFs = walDir.getFileSystem(conf); + + // Create the log directory for the procedure store + if (!walFs.exists(walDir)) { + if (!walFs.mkdirs(walDir)) { + throw new IOException("Unable to mkdir " + walDir); + } + } + // Now that it exists, set the log policy + FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY, + HConstants.DEFAULT_WAL_STORAGE_POLICY); + + procedureStore = new WALProcedureStore(conf, walFs, walDir, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 884485cec02..7c36511dade 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -178,6 +178,36 @@ public class HRegionFileSystem { return storeDir; } + /** + * Set storage policy for a given column family. + *

+ * If we're running on a version of HDFS that doesn't support the given storage policy + * (or storage policies at all), then we'll issue a log message and continue. + * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + * for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. + * + * @param familyName The name of column family. + * @param policyName The name of the storage policy + */ + public void setStoragePolicy(String familyName, String policyName) { + FSUtils.setStoragePolicy(this.fs, getStoreDir(familyName), policyName); + } + + /** + * Get the storage policy of the directory of CF. + * @param familyName The name of column family. + * @return Storage policy name, or {@code null} if not using {@link HFileSystem} or exception + * thrown when trying to get policy + */ + public String getStoragePolicyName(String familyName) { + if (this.fs instanceof HFileSystem) { + Path storeDir = getStoreDir(familyName); + return ((HFileSystem) this.fs).getStoragePolicyName(storeDir); + } + + return null; + } + /** * Returns the store files available for the family. * This methods performs the filtering based on the valid store files. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 730deead748..e0f4042fbef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -124,6 +124,9 @@ public class HStore implements Store { public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = "hbase.server.compactchecker.interval.multiplier"; public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; + public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; + // keep in accordance with HDFS default storage policy + public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT"; public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7; @@ -237,6 +240,13 @@ public class HStore implements Store { .addWritableMap(family.getValues()); this.blocksize = family.getBlocksize(); + // set block storage policy for store directory + String policyName = family.getStoragePolicy(); + if (null == policyName) { + policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); + } + this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim()); + this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); @@ -1078,11 +1088,12 @@ public class HStore implements Store { favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( region.getRegionInfo().getEncodedName()); } + Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, cryptoContext); StoreFile.WriterBuilder builder = new StoreFile.WriterBuilder(conf, writerCacheConf, this.getFileSystem()) - .withFilePath(fs.createTempName()) + .withOutputDir(familyTempDir) .withComparator(comparator) .withBloomType(family.getBloomFilterType()) .withMaxKeyCount(maxKeyCount) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 803bfb32519..1e7491117e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; @@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.WritableUtils; import com.google.common.annotations.VisibleForTesting; @@ -726,6 +728,14 @@ public class StoreFile { HRegionFileSystem.mkdirs(fs, conf, dir); } + // set block storage policy for temp path + String policyName = this.conf.get(HColumnDescriptor.STORAGE_POLICY); + if (null == policyName) { + policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY, + HStore.DEFAULT_BLOCK_STORAGE_POLICY); + } + FSUtils.setStoragePolicy(this.fs, dir, policyName); + if (filePath == null) { filePath = getUniqueFile(fs, dir); if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 5d850b01cd1..f3e2c63810c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -141,59 +141,122 @@ public abstract class FSUtils { public static void setStoragePolicy(final FileSystem fs, final Configuration conf, final Path path, final String policyKey, final String defaultPolicy) { String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT); - if (storagePolicy.equals(defaultPolicy)) { + setStoragePolicy(fs, path, storagePolicy); + } + + private static final Map warningMap = + new ConcurrentHashMap(); + + /** + * Sets storage policy for given path. + *

+ * If the passed path is a directory, we'll set the storage policy for all files + * created in the future in said directory. Note that this change in storage + * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle. + * If we're running on a version of HDFS that doesn't support the given storage policy + * (or storage policies at all), then we'll issue a log message and continue. + * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + * for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. + * + * @param fs We only do anything if an instance of DistributedFileSystem + * @param path the Path whose storage policy is to be set + * @param storagePolicy Policy to set on path + */ + public static void setStoragePolicy(final FileSystem fs, final Path path, + final String storagePolicy) { + if (storagePolicy == null) { if (LOG.isTraceEnabled()) { - LOG.trace("default policy of " + defaultPolicy + " requested, exiting early."); + LOG.trace("We were passed a null storagePolicy, exiting early."); } return; } - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem)fs; - // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection. - Class dfsClass = dfs.getClass(); - Method m = null; - try { - m = dfsClass.getDeclaredMethod("setStoragePolicy", - new Class[] { Path.class, String.class }); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem doesn't support" - + " setStoragePolicy; --HDFS-6584 not available"); - } catch (SecurityException e) { - LOG.info("Doesn't have access to setStoragePolicy on " - + "FileSystems --HDFS-6584 not available", e); - m = null; // could happen on setAccessible() + final String trimmedStoragePolicy = storagePolicy.trim(); + if (trimmedStoragePolicy.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("We were passed an empty storagePolicy, exiting early."); } - if (m != null) { - try { - m.invoke(dfs, path, storagePolicy); - LOG.info("set " + storagePolicy + " for " + path); - } catch (Exception e) { - // check for lack of HDFS-7228 - boolean probablyBadPolicy = false; - if (e instanceof InvocationTargetException) { - final Throwable exception = e.getCause(); - if (exception instanceof RemoteException && - HadoopIllegalArgumentException.class.getName().equals( - ((RemoteException)exception).getClassName())) { - LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " + - "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + - "trying to use SSD related policies then you're likely missing HDFS-7228. For " + - "more information see the 'ArchivalStorage' docs for your Hadoop release."); - LOG.debug("More information about the invalid storage policy.", exception); - probablyBadPolicy = true; + return; + } + boolean distributed = false; + try { + distributed = isDistributedFileSystem(fs); + } catch (IOException ioe) { + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " + + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy + + " on path=" + path); + } else if (LOG.isDebugEnabled()) { + LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " + + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy + + " on path=" + path); + } + return; + } + if (distributed) { + invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); + } + } + + /* + * All args have been checked and are good. Run the setStoragePolicy invocation. + */ + private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, + final String storagePolicy) { + Method m = null; + try { + m = fs.getClass().getDeclaredMethod("setStoragePolicy", + new Class[] { Path.class, String.class }); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available"; + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn(msg, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + m = null; + } catch (SecurityException e) { + final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available"; + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn(msg, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + m = null; // could happen on setAccessible() + } + if (m != null) { + try { + m.invoke(fs, path, storagePolicy); + if (LOG.isDebugEnabled()) { + LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path); + } + } catch (Exception e) { + // This swallows FNFE, should we be throwing it? seems more likely to indicate dev + // misuse than a runtime problem with HDFS. + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); + } + // check for lack of HDFS-7228 + if (e instanceof InvocationTargetException) { + final Throwable exception = e.getCause(); + if (exception instanceof RemoteException && + HadoopIllegalArgumentException.class.getName().equals( + ((RemoteException)exception).getClassName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + + "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + + "trying to use SSD related policies then you're likely missing HDFS-7228. For " + + "more information see the 'ArchivalStorage' docs for your Hadoop release."); } } - if (!probablyBadPolicy) { - // This swallows FNFE, should we be throwing it? seems more likely to indicate dev - // misuse than a runtime problem with HDFS. - LOG.warn("Unable to set " + storagePolicy + " for " + path, e); - } } } - } else { - LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " + - "support setStoragePolicy."); } } @@ -210,7 +273,7 @@ public abstract class FSUtils { } return fileSystem instanceof DistributedFileSystem; } - + /** * Compare of path component. Does not consider schema; i.e. if schemas * different but path starts with rootPath, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index b1f94fb752f..b9d1dd38694 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -186,10 +186,13 @@ public class TestCompaction { Store s = r.stores.get(COLUMN_FAMILY); assertEquals(compactionThreshold, s.getStorefilesCount()); assertTrue(s.getStorefilesSize() > 15*1000); - // and no new store files persisted past compactStores() + // only one empty dir exists in temp dir FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); + assertEquals(1, ls.length); + Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); + assertTrue(r.getFilesystem().exists(storeTempDir)); + ls = r.getFilesystem().listStatus(storeTempDir); assertEquals(0, ls.length); - } finally { // don't mess up future tests r.writestate.writesEnabled = true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 09d0b4f1c57..e93fcf08f36 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -912,7 +912,7 @@ public class TestHRegion { assertEquals(3, region.getStore(family).getStorefilesCount()); // now find the compacted file, and manually add it to the recovered edits - Path tmpDir = region.getRegionFileSystem().getTempDir(); + Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family)); FileStatus[] files = FSUtils.listStatus(fs, tmpDir); String errorMsg = "Expected to find 1 file in the region temp directory " + "from the compaction, could not find any"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java index dfb20daba98..dfef63f2ab5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java @@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; import java.util.Collection; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,9 +40,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.Progressable; @@ -50,6 +60,126 @@ import org.junit.experimental.categories.Category; public class TestHRegionFileSystem { private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class); + private static final byte[][] FAMILIES = { + Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), + Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; + private static final TableName TABLE_NAME = TableName.valueOf("TestTable"); + + @Test + public void testBlockStoragePolicy() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + Configuration conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniCluster(); + HTable table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES); + assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table)); + HRegionFileSystem regionFs = getHRegionFS(table, conf); + // the original block storage policy would be HOT + String spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0])); + String spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1])); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertEquals("HOT", spA); + assertEquals("HOT", spB); + + // Recreate table and make sure storage policy could be set through configuration + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "WARM"); + TEST_UTIL.startMiniCluster(); + table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES); + regionFs = getHRegionFS(table, conf); + + try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { + spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0])); + spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1])); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertEquals("WARM", spA); + assertEquals("WARM", spB); + + // alter table cf schema to change storage policies + // and make sure it could override settings in conf + HColumnDescriptor hcdA = new HColumnDescriptor(Bytes.toString(FAMILIES[0])); + // alter through setting HStore#BLOCK_STORAGE_POLICY_KEY in HColumnDescriptor + hcdA.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ONE_SSD"); + admin.modifyColumn(TABLE_NAME, hcdA); + while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .isRegionsInTransition()) { + Thread.sleep(200); + LOG.debug("Waiting on table to finish schema altering"); + } + // alter through HColumnDescriptor#setStoragePolicy + HColumnDescriptor hcdB = new HColumnDescriptor(Bytes.toString(FAMILIES[1])); + hcdB.setStoragePolicy("ALL_SSD"); + admin.modifyColumn(TABLE_NAME, hcdB); + while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .isRegionsInTransition()) { + Thread.sleep(200); + LOG.debug("Waiting on table to finish schema altering"); + } + spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0])); + spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1])); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNotNull(spA); + assertEquals("ONE_SSD", spA); + assertNotNull(spB); + assertEquals("ALL_SSD", spB); + + // flush memstore snapshot into 3 files + for (long i = 0; i < 3; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(FAMILIES[0], Bytes.toBytes(i), Bytes.toBytes(i)); + table.put(put); + admin.flush(TABLE_NAME); + } + // there should be 3 files in store dir + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path storePath = regionFs.getStoreDir(Bytes.toString(FAMILIES[0])); + FileStatus[] storeFiles = FSUtils.listStatus(fs, storePath); + assertNotNull(storeFiles); + assertEquals(3, storeFiles.length); + // store temp dir still exists but empty + Path storeTempDir = new Path(regionFs.getTempDir(), Bytes.toString(FAMILIES[0])); + assertTrue(fs.exists(storeTempDir)); + FileStatus[] tempFiles = FSUtils.listStatus(fs, storeTempDir); + assertNull(tempFiles); + // storage policy of cf temp dir and 3 store files should be ONE_SSD + assertEquals("ONE_SSD", + ((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(storeTempDir)); + for (FileStatus status : storeFiles) { + assertEquals("ONE_SSD", + ((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(status.getPath())); + } + + // change storage policies by calling raw api directly + regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD"); + regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD"); + spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0])); + spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1])); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNotNull(spA); + assertEquals("ALL_SSD", spA); + assertNotNull(spB); + assertEquals("ONE_SSD", spB); + } finally { + table.close(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + } + + private HRegionFileSystem getHRegionFS(HTable table, Configuration conf) throws IOException { + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path tableDir = FSUtils.getTableDir(TEST_UTIL.getDefaultRootDirPath(), table.getName()); + List regionDirs = FSUtils.getRegionDirs(fs, tableDir); + assertEquals(1, regionDirs.size()); + List familyDirs = FSUtils.getFamilyDirs(fs, regionDirs.get(0)); + assertEquals(2, familyDirs.size()); + HRegionInfo hri = table.getRegionLocator().getAllRegionLocations().get(0).getRegionInfo(); + HRegionFileSystem regionFs = new HRegionFileSystem(conf, new HFileSystem(fs), tableDir, hri); + return regionFs; + } @Test public void testOnDiskRegionCreation() throws IOException { diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 60ae155fa5e..2dd69f81f49 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -859,6 +859,10 @@ module Hbase algorithm)) end end + if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY) + storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase + family.setStoragePolicy(storage_policy) + end set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]