From 21dfb61bbefeef3783150d97b54a7d219795070f Mon Sep 17 00:00:00 2001 From: chenheng Date: Mon, 17 Aug 2015 15:12:42 +0800 Subject: [PATCH] HBASE-14230 replace reflection in FSHlog with HdfsDataOutputStream#getCurrentBlockReplication() Signed-off-by: stack --- .../hadoop/hbase/regionserver/wal/FSHLog.java | 96 +++---------------- 1 file changed, 12 insertions(+), 84 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index c421f5c7206..c551a94901e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; @@ -79,6 +78,8 @@ import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.NullScope; @@ -276,9 +277,6 @@ public class FSHLog implements WAL { // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered private final int minTolerableReplication; - // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection. - private final Method getNumCurrentReplicas; - private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine private final int slowSyncNs; private final static Object [] NO_ARGS = new Object []{}; @@ -529,10 +527,6 @@ public class FSHLog implements WAL { this.slowSyncNs = 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); - // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with - // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection. - this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); - this.getPipeLine = getGetPipeline(this.hdfs_out); // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. @@ -1423,34 +1417,6 @@ public class FSHLog implements WAL { return len; } - /** - * Find the 'getNumCurrentReplicas' on the passed os stream. - * This is used for getting current replicas of a file being written. - * @return Method or null. - */ - private Method getGetNumCurrentReplicas(final FSDataOutputStream os) { - // TODO: Remove all this and use the now publically available - // HdfsDataOutputStream#getCurrentBlockReplication() - Method m = null; - if (os != null) { - Class wrappedStreamClass = os.getWrappedStream().getClass(); - try { - m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class[] {}); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " + - "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName()); - } catch (SecurityException e) { - LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " + - "not available; fsOut=" + wrappedStreamClass.getName(), e); - m = null; // could happen on setAccessible() - } - } - if (m != null) { - if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas"); - } - return m; - } /** * This method gets the datanode replication count for the current WAL. @@ -1465,16 +1431,12 @@ public class FSHLog implements WAL { * @throws Exception */ @VisibleForTesting - int getLogReplication() - throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { - final OutputStream stream = getOutputStream(); - if (this.getNumCurrentReplicas != null && stream != null) { - Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS); - if (repl instanceof Integer) { - return ((Integer)repl).intValue(); - } + int getLogReplication() { + try { + return ((HdfsDataOutputStream)this.hdfs_out).getCurrentBlockReplication(); + } catch (IOException e) { + return 0; } - return 0; } @Override @@ -1999,51 +1961,17 @@ public class FSHLog implements WAL { System.exit(-1); } } - - /** - * Find the 'getPipeline' on the passed os stream. - * @return Method or null. - */ - private Method getGetPipeline(final FSDataOutputStream os) { - Method m = null; - if (os != null) { - Class wrappedStreamClass = os.getWrappedStream() - .getClass(); - try { - m = wrappedStreamClass.getDeclaredMethod("getPipeline", - new Class[] {}); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem's output stream doesn't support" - + " getPipeline; not available; fsOut=" - + wrappedStreamClass.getName()); - } catch (SecurityException e) { - LOG.info( - "Doesn't have access to getPipeline on " - + "FileSystems's output stream ; fsOut=" - + wrappedStreamClass.getName(), e); - m = null; // could happen on setAccessible() - } - } - return m; - } /** * This method gets the pipeline for the current WAL. */ @VisibleForTesting DatanodeInfo[] getPipeLine() { - if (this.getPipeLine != null && this.hdfs_out != null) { - Object repl; - try { - repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS); - if (repl instanceof DatanodeInfo[]) { - return ((DatanodeInfo[]) repl); - } - } catch (Exception e) { - LOG.info("Get pipeline failed", e); - } + if (this.hdfs_out != null) { + return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline(); + } else { + return new DatanodeInfo[0]; } - return new DatanodeInfo[0]; + } } \ No newline at end of file