HBASE-15172 Support setting storage policy in bulkload

This commit is contained in:
Yu Li 2017-01-06 18:35:38 +08:00
parent e02ae7724d
commit 629b04f44f
2 changed files with 100 additions and 1 deletions

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@ -125,6 +126,9 @@ public class HFileOutputFormat2
private static final String OUTPUT_TABLE_NAME_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.table.name";
public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
@Override
public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
final TaskAttemptContext context) throws IOException, InterruptedException {
@ -230,7 +234,9 @@ public class HFileOutputFormat2
// If this is a new column family, verify that the directory exists
if (wl == null) {
fs.mkdirs(new Path(outputDir, Bytes.toString(family)));
Path cfPath = new Path(outputDir, Bytes.toString(family));
fs.mkdirs(cfPath);
configureStoragePolicy(conf, fs, family, cfPath);
}
// If any of the HFiles for the column families has reached
@ -382,6 +388,29 @@ public class HFileOutputFormat2
}
}
/**
* Configure block storage policy for CF after the directory is created.
*/
static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
byte[] family, Path cfPath) {
if (null == conf || null == fs || null == family || null == cfPath) {
return;
}
String policy =
conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
conf.get(STORAGE_POLICY_PROPERTY));
if (null != policy && !policy.trim().isEmpty()) {
try {
if (fs instanceof DistributedFileSystem) {
((DistributedFileSystem) fs).setStoragePolicy(cfPath, policy.trim());
}
} catch (Throwable e) {
LOG.warn("failed to set block storage policy of [" + cfPath + "] to [" + policy + "]", e);
}
}
}
/*
* Data structure to hold a Writer and amount of data written on it.
*/

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -92,6 +93,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
@ -1292,5 +1297,70 @@ public class TestHFileOutputFormat2 {
}
}
@Test
public void testBlockStoragePolicy() throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]),
"ONE_SSD");
Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
util.startMiniDFSCluster(3);
FileSystem fs = util.getDFSCluster().getFileSystem();
try {
fs.mkdirs(cf1Dir);
fs.mkdirs(cf2Dir);
// the original block storage policy would be NULL
String spA = getStoragePolicyName(fs, cf1Dir);
String spB = getStoragePolicyName(fs, cf2Dir);
LOG.debug("Storage policy of cf 0: [" + spA + "].");
LOG.debug("Storage policy of cf 1: [" + spB + "].");
assertNull(spA);
assertNull(spB);
// alter table cf schema to change storage policies
HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir);
HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir);
spA = getStoragePolicyName(fs, cf1Dir);
spB = getStoragePolicyName(fs, cf2Dir);
LOG.debug("Storage policy of cf 0: [" + spA + "].");
LOG.debug("Storage policy of cf 1: [" + spB + "].");
assertNotNull(spA);
assertEquals("ONE_SSD", spA);
assertNotNull(spB);
assertEquals("ALL_SSD", spB);
} finally {
fs.delete(cf1Dir, true);
fs.delete(cf2Dir, true);
util.shutdownMiniDFSCluster();
}
}
private String getStoragePolicyName(FileSystem fs, Path path) {
try {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
if (null != status) {
byte storagePolicyId = status.getStoragePolicy();
if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) {
BlockStoragePolicy[] policies = dfs.getStoragePolicies();
for (BlockStoragePolicy policy : policies) {
if (policy.getId() == storagePolicyId) {
return policy.getName();
}
}
}
}
}
} catch (Throwable e) {
LOG.warn("failed to get block storage policy of [" + path + "]", e);
}
return null;
}
}