diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 22a73c97380..6987bf71463 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -125,6 +126,9 @@ public class HFileOutputFormat2 private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; + public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy"; + public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { @@ -230,7 +234,9 @@ public class HFileOutputFormat2 // If this is a new column family, verify that the directory exists if (wl == null) { - fs.mkdirs(new Path(outputDir, Bytes.toString(family))); + Path cfPath = new Path(outputDir, Bytes.toString(family)); + fs.mkdirs(cfPath); + configureStoragePolicy(conf, fs, family, cfPath); } // If any of the HFiles for the column families has reached @@ -382,6 +388,29 @@ public class HFileOutputFormat2 } } + /** + * Configure block storage policy for CF after the directory is created. + */ + static void configureStoragePolicy(final Configuration conf, final FileSystem fs, + byte[] family, Path cfPath) { + if (null == conf || null == fs || null == family || null == cfPath) { + return; + } + + String policy = + conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family), + conf.get(STORAGE_POLICY_PROPERTY)); + if (null != policy && !policy.trim().isEmpty()) { + try { + if (fs instanceof DistributedFileSystem) { + ((DistributedFileSystem) fs).setStoragePolicy(cfPath, policy.trim()); + } + } catch (Throwable e) { + LOG.warn("failed to set block storage policy of [" + cfPath + "] to [" + policy + "]", e); + } + } + } + /* * Data structure to hold a Writer and amount of data written on it. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 486c961e115..21a39d407d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -92,6 +93,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -1292,5 +1297,70 @@ public class TestHFileOutputFormat2 { } } + @Test + public void testBlockStoragePolicy() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); + conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]), + "ONE_SSD"); + Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); + Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); + util.startMiniDFSCluster(3); + FileSystem fs = util.getDFSCluster().getFileSystem(); + try { + fs.mkdirs(cf1Dir); + fs.mkdirs(cf2Dir); + + // the original block storage policy would be NULL + String spA = getStoragePolicyName(fs, cf1Dir); + String spB = getStoragePolicyName(fs, cf2Dir); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNull(spA); + assertNull(spB); + + // alter table cf schema to change storage policies + HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir); + HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir); + spA = getStoragePolicyName(fs, cf1Dir); + spB = getStoragePolicyName(fs, cf2Dir); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNotNull(spA); + assertEquals("ONE_SSD", spA); + assertNotNull(spB); + assertEquals("ALL_SSD", spB); + } finally { + fs.delete(cf1Dir, true); + fs.delete(cf2Dir, true); + util.shutdownMiniDFSCluster(); + } + } + + private String getStoragePolicyName(FileSystem fs, Path path) { + try { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); + if (null != status) { + byte storagePolicyId = status.getStoragePolicy(); + if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) { + BlockStoragePolicy[] policies = dfs.getStoragePolicies(); + for (BlockStoragePolicy policy : policies) { + if (policy.getId() == storagePolicyId) { + return policy.getName(); + } + } + } + } + } + } catch (Throwable e) { + LOG.warn("failed to get block storage policy of [" + path + "]", e); + } + + return null; + } + }