From 24370547c500df0026a71944d8be88cd5b51b23e Mon Sep 17 00:00:00 2001 From: chenheng Date: Tue, 22 Sep 2015 15:13:09 +0800 Subject: [PATCH] HBASE-14230 replace reflection in FSHlog with HdfsDataOutputStream#getCurrentBlockReplication() --- .../hadoop/hbase/regionserver/wal/FSHLog.java | 92 +++---------------- 1 file changed, 12 insertions(+), 80 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 0e4a585dade..928f4b61c0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; @@ -77,6 +76,8 @@ import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.NullScope; @@ -275,9 +276,6 @@ public class FSHLog implements WAL { // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered private final int minTolerableReplication; - // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection. - private final Method getNumCurrentReplicas; - private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine private final int slowSyncNs; private final static Object [] NO_ARGS = new Object []{}; @@ -528,10 +526,6 @@ public class FSHLog implements WAL { this.slowSyncNs = 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); - // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with - // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection. - this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); - this.getPipeLine = getGetPipeline(this.hdfs_out); // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. @@ -1405,34 +1399,6 @@ public class FSHLog implements WAL { return len; } - /** - * Find the 'getNumCurrentReplicas' on the passed os stream. - * This is used for getting current replicas of a file being written. - * @return Method or null. - */ - private Method getGetNumCurrentReplicas(final FSDataOutputStream os) { - // TODO: Remove all this and use the now publically available - // HdfsDataOutputStream#getCurrentBlockReplication() - Method m = null; - if (os != null) { - Class wrappedStreamClass = os.getWrappedStream().getClass(); - try { - m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class[] {}); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " + - "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName()); - } catch (SecurityException e) { - LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " + - "not available; fsOut=" + wrappedStreamClass.getName(), e); - m = null; // could happen on setAccessible() - } - } - if (m != null) { - if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas"); - } - return m; - } /** * This method gets the datanode replication count for the current WAL. @@ -1447,14 +1413,14 @@ public class FSHLog implements WAL { * @throws Exception */ @VisibleForTesting - int getLogReplication() - throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { - final OutputStream stream = getOutputStream(); - if (this.getNumCurrentReplicas != null && stream != null) { - Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS); - if (repl instanceof Integer) { - return ((Integer)repl).intValue(); + int getLogReplication() { + try { + //in standalone mode, it will return 0 + if (this.hdfs_out instanceof HdfsDataOutputStream) { + return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication(); } + } catch (IOException e) { + LOG.info("", e); } return 0; } @@ -1988,49 +1954,15 @@ public class FSHLog implements WAL { System.exit(-1); } } - - /** - * Find the 'getPipeline' on the passed os stream. - * @return Method or null. - */ - private Method getGetPipeline(final FSDataOutputStream os) { - Method m = null; - if (os != null) { - Class wrappedStreamClass = os.getWrappedStream() - .getClass(); - try { - m = wrappedStreamClass.getDeclaredMethod("getPipeline", - new Class[] {}); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem's output stream doesn't support" - + " getPipeline; not available; fsOut=" - + wrappedStreamClass.getName()); - } catch (SecurityException e) { - LOG.info( - "Doesn't have access to getPipeline on " - + "FileSystems's output stream ; fsOut=" - + wrappedStreamClass.getName(), e); - m = null; // could happen on setAccessible() - } - } - return m; - } /** * This method gets the pipeline for the current WAL. */ @VisibleForTesting DatanodeInfo[] getPipeLine() { - if (this.getPipeLine != null && this.hdfs_out != null) { - Object repl; - try { - repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS); - if (repl instanceof DatanodeInfo[]) { - return ((DatanodeInfo[]) repl); - } - } catch (Exception e) { - LOG.info("Get pipeline failed", e); + if (this.hdfs_out != null) { + if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) { + return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline(); } } return new DatanodeInfo[0];