From 8cdf4a8e03d348908883e1829b86dbe9e1b30907 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Fri, 18 Sep 2015 13:41:17 -0700 Subject: [PATCH] Revert "HBASE-14230 replace reflection in FSHlog with HdfsDataOutputStream#getCurrentBlockReplication()" This reverts commit 21dfb61bbefeef3783150d97b54a7d219795070f. Also reintroduces the NO_ARGS instance that was removed in HBASE-14401 --- .../hadoop/hbase/regionserver/wal/FSHLog.java | 98 ++++++++++++++++--- 1 file changed, 86 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 5f9e3cd4645..1c298273cd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; @@ -78,8 +79,6 @@ import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; -import org.apache.hadoop.hdfs.DFSOutputStream; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.NullScope; @@ -277,8 +276,13 @@ public class FSHLog implements WAL { // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered private final int minTolerableReplication; + // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection. + private final Method getNumCurrentReplicas; + private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine private final int slowSyncNs; + private final static Object [] NO_ARGS = new Object []{}; + // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow @@ -525,6 +529,10 @@ public class FSHLog implements WAL { this.slowSyncNs = 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); + // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with + // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection. + this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); + this.getPipeLine = getGetPipeline(this.hdfs_out); // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. @@ -1414,6 +1422,34 @@ public class FSHLog implements WAL { return len; } + /** + * Find the 'getNumCurrentReplicas' on the passed os stream. + * This is used for getting current replicas of a file being written. + * @return Method or null. + */ + private Method getGetNumCurrentReplicas(final FSDataOutputStream os) { + // TODO: Remove all this and use the now publically available + // HdfsDataOutputStream#getCurrentBlockReplication() + Method m = null; + if (os != null) { + Class wrappedStreamClass = os.getWrappedStream().getClass(); + try { + m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class[] {}); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " + + "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName()); + } catch (SecurityException e) { + LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " + + "not available; fsOut=" + wrappedStreamClass.getName(), e); + m = null; // could happen on setAccessible() + } + } + if (m != null) { + if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas"); + } + return m; + } /** * This method gets the datanode replication count for the current WAL. @@ -1428,12 +1464,16 @@ public class FSHLog implements WAL { * @throws Exception */ @VisibleForTesting - int getLogReplication() { - try { - return ((HdfsDataOutputStream)this.hdfs_out).getCurrentBlockReplication(); - } catch (IOException e) { - return 0; + int getLogReplication() + throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { + final OutputStream stream = getOutputStream(); + if (this.getNumCurrentReplicas != null && stream != null) { + Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS); + if (repl instanceof Integer) { + return ((Integer)repl).intValue(); + } } + return 0; } @Override @@ -1966,17 +2006,51 @@ public class FSHLog implements WAL { System.exit(-1); } } + + /** + * Find the 'getPipeline' on the passed os stream. + * @return Method or null. + */ + private Method getGetPipeline(final FSDataOutputStream os) { + Method m = null; + if (os != null) { + Class wrappedStreamClass = os.getWrappedStream() + .getClass(); + try { + m = wrappedStreamClass.getDeclaredMethod("getPipeline", + new Class[] {}); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem's output stream doesn't support" + + " getPipeline; not available; fsOut=" + + wrappedStreamClass.getName()); + } catch (SecurityException e) { + LOG.info( + "Doesn't have access to getPipeline on " + + "FileSystems's output stream ; fsOut=" + + wrappedStreamClass.getName(), e); + m = null; // could happen on setAccessible() + } + } + return m; + } /** * This method gets the pipeline for the current WAL. */ @VisibleForTesting DatanodeInfo[] getPipeLine() { - if (this.hdfs_out != null) { - return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline(); - } else { - return new DatanodeInfo[0]; + if (this.getPipeLine != null && this.hdfs_out != null) { + Object repl; + try { + repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS); + if (repl instanceof DatanodeInfo[]) { + return ((DatanodeInfo[]) repl); + } + } catch (Exception e) { + LOG.info("Get pipeline failed", e); + } } - + return new DatanodeInfo[0]; } } \ No newline at end of file