HBASE-14061 Support CF-level Storage Policy (addendum)

Addendum to resolve compatible issue with Hadoop 2.8.0+ / 3.0.0-alpha1+, meantime added
a util method in ReflectionUtils for invoking method with reflection
This commit is contained in:
Yu Li 2017-01-11 10:14:55 +08:00
parent ac3b1c9aa9
commit 36eeb2c569
7 changed files with 126 additions and 36 deletions

View File

@ -26,12 +26,15 @@ import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import edu.umd.cs.findbugs.annotations.NonNull;
@InterfaceAudience.Private
public class ReflectionUtils {
@SuppressWarnings("unchecked")
@ -188,4 +191,38 @@ public class ReflectionUtils {
return id + " (" + name + ")";
}
/**
* Get and invoke the target method from the given object with given parameters
* @param obj the object to get and invoke method from
* @param methodName the name of the method to invoke
* @param params the parameters for the method to invoke
* @return the return value of the method invocation
*/
@NonNull
public static Object invokeMethod(Object obj, String methodName, Object... params) {
Method m;
try {
m = obj.getClass().getMethod(methodName, getParameterTypes(params));
m.setAccessible(true);
return m.invoke(obj, params);
} catch (NoSuchMethodException e) {
throw new UnsupportedOperationException("Cannot find specified method " + methodName, e);
} catch (IllegalAccessException e) {
throw new UnsupportedOperationException("Unable to access specified method " + methodName, e);
} catch (IllegalArgumentException e) {
throw new UnsupportedOperationException("Illegal arguments supplied for method " + methodName,
e);
} catch (InvocationTargetException e) {
throw new UnsupportedOperationException("Method threw an exception for " + methodName, e);
}
}
private static Class<?>[] getParameterTypes(Object[] params) {
Class<?>[] parameterTypes = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
parameterTypes[i] = params[i].getClass();
}
return parameterTypes;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -52,7 +53,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import edu.umd.cs.findbugs.annotations.Nullable;
@ -68,6 +68,7 @@ public class HFileSystem extends FilterFileSystem {
private final FileSystem noChecksumFs; // read hfile data from storage
private final boolean useHBaseChecksum;
private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
/**
* Create a FileSystem object for HBase regionservers.
@ -157,11 +158,9 @@ public class HFileSystem extends FilterFileSystem {
*/
public void setStoragePolicy(Path path, String policyName) {
try {
if (this.fs instanceof DistributedFileSystem) {
((DistributedFileSystem) this.fs).setStoragePolicy(path, policyName);
}
} catch (Throwable e) {
LOG.warn("failed to set block storage policy of [" + path + "] to [" + policyName + "]", e);
ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", path, policyName);
} catch (Exception e) {
LOG.warn("Failed to set storage policy of [" + path + "] to [" + policyName + "]", e);
}
}
@ -172,14 +171,41 @@ public class HFileSystem extends FilterFileSystem {
* exception thrown when trying to get policy
*/
@Nullable
public String getStoragePolicy(Path path) {
public String getStoragePolicyName(Path path) {
try {
Object blockStoragePolicySpi =
ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path);
return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
} catch (Exception e) {
// Maybe fail because of using old HDFS version, try the old way
if (LOG.isTraceEnabled()) {
LOG.trace("Failed to get policy directly", e);
}
return getStoragePolicyForOldHDFSVersion(path);
}
}
/**
* Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need
* to keep compatible with it. See HADOOP-12161 for more details.
* @param path Path to get storage policy against
* @return the storage policy name
*/
private String getStoragePolicyForOldHDFSVersion(Path path) {
try {
if (this.fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) this.fs;
HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
if (null != status) {
if (unspecifiedStoragePolicyId < 0) {
// Get the unspecified id field through reflection to avoid compilation error.
// In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to
// HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class);
}
byte storagePolicyId = status.getStoragePolicy();
if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) {
if (storagePolicyId != unspecifiedStoragePolicyId) {
BlockStoragePolicy[] policies = dfs.getStoragePolicies();
for (BlockStoragePolicy policy : policies) {
if (policy.getId() == storagePolicyId) {

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import com.google.common.collect.Lists;
@ -189,9 +192,11 @@ public class HRegionFileSystem {
* @param policyName The name of the storage policy.
*/
public void setStoragePolicy(String familyName, String policyName) {
if (this.fs instanceof HFileSystem) {
Path storeDir = getStoreDir(familyName);
((HFileSystem) this.fs).setStoragePolicy(storeDir, policyName);
Path storeDir = getStoreDir(familyName);
try {
ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", storeDir, policyName);
} catch (Exception e) {
LOG.warn("Failed to set storage policy of [" + storeDir + "] to [" + policyName + "]", e);
}
}
@ -202,10 +207,10 @@ public class HRegionFileSystem {
* thrown when trying to get policy
*/
@Nullable
public String getStoragePolicy(String familyName) {
public String getStoragePolicyName(String familyName) {
if (this.fs instanceof HFileSystem) {
Path storeDir = getStoreDir(familyName);
return ((HFileSystem) this.fs).getStoragePolicy(storeDir);
return ((HFileSystem) this.fs).getStoragePolicyName(storeDir);
}
return null;

View File

@ -117,6 +117,8 @@ public class HStore implements Store {
"hbase.server.compactchecker.interval.multiplier";
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
// keep in accordance with HDFS default storage policy
public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT";
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
@ -231,7 +233,7 @@ public class HStore implements Store {
// set block storage policy for store directory
String policyName = family.getStoragePolicy();
if (null == policyName) {
policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY);
policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
}
if (null != policyName && !policyName.trim().isEmpty()) {
if (LOG.isTraceEnabled()) {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.BloomContext;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.RowBloomContext;
import org.apache.hadoop.hbase.util.RowColBloomContext;
import org.apache.hadoop.io.WritableUtils;
@ -480,8 +481,10 @@ public class StoreFileWriter implements CellSink, ShipperListener {
LOG.trace("set block storage policy of [" + dir + "] to [" + policyName + "]");
}
if (this.fs instanceof HFileSystem) {
((HFileSystem) this.fs).setStoragePolicy(dir, policyName.trim());
try {
ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", dir, policyName.trim());
} catch (Exception e) {
LOG.warn("Failed to set storage policy of [" + dir + "] to [" + policyName + "]", e);
}
}

View File

@ -28,6 +28,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
@ -91,6 +92,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -1312,13 +1314,13 @@ public class TestHFileOutputFormat2 {
fs.mkdirs(cf1Dir);
fs.mkdirs(cf2Dir);
// the original block storage policy would be NULL
// the original block storage policy would be HOT
String spA = getStoragePolicyName(fs, cf1Dir);
String spB = getStoragePolicyName(fs, cf2Dir);
LOG.debug("Storage policy of cf 0: [" + spA + "].");
LOG.debug("Storage policy of cf 1: [" + spB + "].");
assertNull(spA);
assertNull(spB);
assertEquals("HOT", spA);
assertEquals("HOT", spB);
// alter table cf schema to change storage policies
HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir);
@ -1339,13 +1341,28 @@ public class TestHFileOutputFormat2 {
}
private String getStoragePolicyName(FileSystem fs, Path path) {
try {
Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
} catch (Exception e) {
// Maybe fail because of using old HDFS version, try the old way
if (LOG.isTraceEnabled()) {
LOG.trace("Failed to get policy directly", e);
}
String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
return policy == null ? "HOT" : policy;// HOT by default
}
}
private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
try {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
if (null != status) {
byte storagePolicyId = status.getStoragePolicy();
if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) {
Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
BlockStoragePolicy[] policies = dfs.getStoragePolicies();
for (BlockStoragePolicy policy : policies) {
if (policy.getId() == storagePolicyId) {

View File

@ -73,28 +73,28 @@ public class TestHRegionFileSystem {
HTable table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table));
HRegionFileSystem regionFs = getHRegionFS(table, conf);
// the original block storage policy would be NULL
String spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
String spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
// the original block storage policy would be HOT
String spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
String spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
LOG.debug("Storage policy of cf 0: [" + spA + "].");
LOG.debug("Storage policy of cf 1: [" + spB + "].");
assertNull(spA);
assertNull(spB);
assertEquals("HOT", spA);
assertEquals("HOT", spB);
// Recreate table and make sure storage policy could be set through configuration
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "HOT");
TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "WARM");
TEST_UTIL.startMiniCluster();
table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
regionFs = getHRegionFS(table, conf);
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
LOG.debug("Storage policy of cf 0: [" + spA + "].");
LOG.debug("Storage policy of cf 1: [" + spB + "].");
assertEquals("HOT", spA);
assertEquals("HOT", spB);
assertEquals("WARM", spA);
assertEquals("WARM", spB);
// alter table cf schema to change storage policies
// and make sure it could override settings in conf
@ -116,8 +116,8 @@ public class TestHRegionFileSystem {
Thread.sleep(200);
LOG.debug("Waiting on table to finish schema altering");
}
spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
LOG.debug("Storage policy of cf 0: [" + spA + "].");
LOG.debug("Storage policy of cf 1: [" + spB + "].");
assertNotNull(spA);
@ -145,17 +145,17 @@ public class TestHRegionFileSystem {
assertNull(tempFiles);
// storage policy of cf temp dir and 3 store files should be ONE_SSD
assertEquals("ONE_SSD",
((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(storeTempDir));
((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(storeTempDir));
for (FileStatus status : storeFiles) {
assertEquals("ONE_SSD",
((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(status.getPath()));
((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(status.getPath()));
}
// change storage policies by calling raw api directly
regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD");
regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD");
spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
LOG.debug("Storage policy of cf 0: [" + spA + "].");
LOG.debug("Storage policy of cf 1: [" + spB + "].");
assertNotNull(spA);