From 36eeb2c569c574b299f8628bed6b8dd20fb900e2 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Wed, 11 Jan 2017 10:14:55 +0800 Subject: [PATCH] HBASE-14061 Support CF-level Storage Policy (addendum) Addendum to resolve compatible issue with Hadoop 2.8.0+ / 3.0.0-alpha1+, meantime added a util method in ReflectionUtils for invoking method with reflection --- .../hadoop/hbase/util/ReflectionUtils.java | 37 ++++++++++++++++ .../apache/hadoop/hbase/fs/HFileSystem.java | 42 +++++++++++++++---- .../hbase/regionserver/HRegionFileSystem.java | 15 ++++--- .../hadoop/hbase/regionserver/HStore.java | 4 +- .../hbase/regionserver/StoreFileWriter.java | 7 +++- .../mapreduce/TestHFileOutputFormat2.java | 25 +++++++++-- .../regionserver/TestHRegionFileSystem.java | 32 +++++++------- 7 files changed, 126 insertions(+), 36 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java index 740f9ee420d..63c6b9c4a6a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java @@ -26,12 +26,15 @@ import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.charset.Charset; import org.apache.commons.logging.Log; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import edu.umd.cs.findbugs.annotations.NonNull; + @InterfaceAudience.Private public class ReflectionUtils { @SuppressWarnings("unchecked") @@ -188,4 +191,38 @@ public class ReflectionUtils { return id + " (" + name + ")"; } + /** + * Get and invoke the target method from the given object with given parameters + * @param obj the object to get and invoke method from + * @param methodName the name of the method to invoke + * @param params the parameters for the method to invoke + * @return the return value of the method invocation + */ + @NonNull + public static Object invokeMethod(Object obj, String methodName, Object... params) { + Method m; + try { + m = obj.getClass().getMethod(methodName, getParameterTypes(params)); + m.setAccessible(true); + return m.invoke(obj, params); + } catch (NoSuchMethodException e) { + throw new UnsupportedOperationException("Cannot find specified method " + methodName, e); + } catch (IllegalAccessException e) { + throw new UnsupportedOperationException("Unable to access specified method " + methodName, e); + } catch (IllegalArgumentException e) { + throw new UnsupportedOperationException("Illegal arguments supplied for method " + methodName, + e); + } catch (InvocationTargetException e) { + throw new UnsupportedOperationException("Method threw an exception for " + methodName, e); + } + } + + private static Class[] getParameterTypes(Object[] params) { + Class[] parameterTypes = new Class[params.length]; + for (int i = 0; i < params.length; i++) { + parameterTypes[i] = params[i].getClass(); + } + return parameterTypes; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java index 1a5408bcb58..fe8b3b23189 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -52,7 +53,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.ReflectionUtils; import edu.umd.cs.findbugs.annotations.Nullable; @@ -68,6 +68,7 @@ public class HFileSystem extends FilterFileSystem { private final FileSystem noChecksumFs; // read hfile data from storage private final boolean useHBaseChecksum; + private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; /** * Create a FileSystem object for HBase regionservers. @@ -157,11 +158,9 @@ public class HFileSystem extends FilterFileSystem { */ public void setStoragePolicy(Path path, String policyName) { try { - if (this.fs instanceof DistributedFileSystem) { - ((DistributedFileSystem) this.fs).setStoragePolicy(path, policyName); - } - } catch (Throwable e) { - LOG.warn("failed to set block storage policy of [" + path + "] to [" + policyName + "]", e); + ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", path, policyName); + } catch (Exception e) { + LOG.warn("Failed to set storage policy of [" + path + "] to [" + policyName + "]", e); } } @@ -172,14 +171,41 @@ public class HFileSystem extends FilterFileSystem { * exception thrown when trying to get policy */ @Nullable - public String getStoragePolicy(Path path) { + public String getStoragePolicyName(Path path) { + try { + Object blockStoragePolicySpi = + ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path); + return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); + } catch (Exception e) { + // Maybe fail because of using old HDFS version, try the old way + if (LOG.isTraceEnabled()) { + LOG.trace("Failed to get policy directly", e); + } + return getStoragePolicyForOldHDFSVersion(path); + } + } + + /** + * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need + * to keep compatible with it. See HADOOP-12161 for more details. + * @param path Path to get storage policy against + * @return the storage policy name + */ + private String getStoragePolicyForOldHDFSVersion(Path path) { try { if (this.fs instanceof DistributedFileSystem) { DistributedFileSystem dfs = (DistributedFileSystem) this.fs; HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); if (null != status) { + if (unspecifiedStoragePolicyId < 0) { + // Get the unspecified id field through reflection to avoid compilation error. + // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to + // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED + Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); + unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class); + } byte storagePolicyId = status.getStoragePolicy(); - if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) { + if (storagePolicyId != unspecifiedStoragePolicyId) { BlockStoragePolicy[] policies = dfs.getStoragePolicies(); for (BlockStoragePolicy policy : policies) { if (policy.getId() == storagePolicyId) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 4fcea64a9c9..6fd17644ef3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import com.google.common.collect.Lists; @@ -189,9 +192,11 @@ public class HRegionFileSystem { * @param policyName The name of the storage policy. */ public void setStoragePolicy(String familyName, String policyName) { - if (this.fs instanceof HFileSystem) { - Path storeDir = getStoreDir(familyName); - ((HFileSystem) this.fs).setStoragePolicy(storeDir, policyName); + Path storeDir = getStoreDir(familyName); + try { + ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", storeDir, policyName); + } catch (Exception e) { + LOG.warn("Failed to set storage policy of [" + storeDir + "] to [" + policyName + "]", e); } } @@ -202,10 +207,10 @@ public class HRegionFileSystem { * thrown when trying to get policy */ @Nullable - public String getStoragePolicy(String familyName) { + public String getStoragePolicyName(String familyName) { if (this.fs instanceof HFileSystem) { Path storeDir = getStoreDir(familyName); - return ((HFileSystem) this.fs).getStoragePolicy(storeDir); + return ((HFileSystem) this.fs).getStoragePolicyName(storeDir); } return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 09d1a66f88d..2a93b7052ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -117,6 +117,8 @@ public class HStore implements Store { "hbase.server.compactchecker.interval.multiplier"; public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; + // keep in accordance with HDFS default storage policy + public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT"; public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7; @@ -231,7 +233,7 @@ public class HStore implements Store { // set block storage policy for store directory String policyName = family.getStoragePolicy(); if (null == policyName) { - policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY); + policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY); } if (null != policyName && !policyName.trim().isEmpty()) { if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index deba6b2b0fb..786d58a227e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.BloomContext; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.RowBloomContext; import org.apache.hadoop.hbase.util.RowColBloomContext; import org.apache.hadoop.io.WritableUtils; @@ -480,8 +481,10 @@ public class StoreFileWriter implements CellSink, ShipperListener { LOG.trace("set block storage policy of [" + dir + "] to [" + policyName + "]"); } - if (this.fs instanceof HFileSystem) { - ((HFileSystem) this.fs).setStoragePolicy(dir, policyName.trim()); + try { + ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", dir, policyName.trim()); + } catch (Exception e) { + LOG.warn("Failed to set storage policy of [" + dir + "] to [" + policyName + "]", e); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 21a39d407d2..52b2901310f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -28,6 +28,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.ArrayList; import java.util.HashMap; @@ -91,6 +92,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -1312,13 +1314,13 @@ public class TestHFileOutputFormat2 { fs.mkdirs(cf1Dir); fs.mkdirs(cf2Dir); - // the original block storage policy would be NULL + // the original block storage policy would be HOT String spA = getStoragePolicyName(fs, cf1Dir); String spB = getStoragePolicyName(fs, cf2Dir); LOG.debug("Storage policy of cf 0: [" + spA + "]."); LOG.debug("Storage policy of cf 1: [" + spB + "]."); - assertNull(spA); - assertNull(spB); + assertEquals("HOT", spA); + assertEquals("HOT", spB); // alter table cf schema to change storage policies HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir); @@ -1339,13 +1341,28 @@ public class TestHFileOutputFormat2 { } private String getStoragePolicyName(FileSystem fs, Path path) { + try { + Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); + return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); + } catch (Exception e) { + // Maybe fail because of using old HDFS version, try the old way + if (LOG.isTraceEnabled()) { + LOG.trace("Failed to get policy directly", e); + } + String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); + return policy == null ? "HOT" : policy;// HOT by default + } + } + + private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { try { if (fs instanceof DistributedFileSystem) { DistributedFileSystem dfs = (DistributedFileSystem) fs; HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); if (null != status) { byte storagePolicyId = status.getStoragePolicy(); - if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) { + Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); + if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { BlockStoragePolicy[] policies = dfs.getStoragePolicies(); for (BlockStoragePolicy policy : policies) { if (policy.getId() == storagePolicyId) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java index a2772ecf06f..7875e6822a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java @@ -73,28 +73,28 @@ public class TestHRegionFileSystem { HTable table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES); assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table)); HRegionFileSystem regionFs = getHRegionFS(table, conf); - // the original block storage policy would be NULL - String spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0])); - String spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1])); + // the original block storage policy would be HOT + String spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0])); + String spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1])); LOG.debug("Storage policy of cf 0: [" + spA + "]."); LOG.debug("Storage policy of cf 1: [" + spB + "]."); - assertNull(spA); - assertNull(spB); + assertEquals("HOT", spA); + assertEquals("HOT", spB); // Recreate table and make sure storage policy could be set through configuration TEST_UTIL.shutdownMiniCluster(); - TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "HOT"); + TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "WARM"); TEST_UTIL.startMiniCluster(); table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES); regionFs = getHRegionFS(table, conf); try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { - spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0])); - spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1])); + spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0])); + spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1])); LOG.debug("Storage policy of cf 0: [" + spA + "]."); LOG.debug("Storage policy of cf 1: [" + spB + "]."); - assertEquals("HOT", spA); - assertEquals("HOT", spB); + assertEquals("WARM", spA); + assertEquals("WARM", spB); // alter table cf schema to change storage policies // and make sure it could override settings in conf @@ -116,8 +116,8 @@ public class TestHRegionFileSystem { Thread.sleep(200); LOG.debug("Waiting on table to finish schema altering"); } - spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0])); - spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1])); + spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0])); + spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1])); LOG.debug("Storage policy of cf 0: [" + spA + "]."); LOG.debug("Storage policy of cf 1: [" + spB + "]."); assertNotNull(spA); @@ -145,17 +145,17 @@ public class TestHRegionFileSystem { assertNull(tempFiles); // storage policy of cf temp dir and 3 store files should be ONE_SSD assertEquals("ONE_SSD", - ((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(storeTempDir)); + ((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(storeTempDir)); for (FileStatus status : storeFiles) { assertEquals("ONE_SSD", - ((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(status.getPath())); + ((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(status.getPath())); } // change storage policies by calling raw api directly regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD"); regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD"); - spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0])); - spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1])); + spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0])); + spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1])); LOG.debug("Storage policy of cf 0: [" + spA + "]."); LOG.debug("Storage policy of cf 1: [" + spB + "]."); assertNotNull(spA);