Revert "HBASE-14230 replace reflection in FSHlog with HdfsDataOutputStream#getCurrentBlockReplication()"
This reverts commit 21dfb61bbe
.
Also reintroduces the NO_ARGS instance that was removed in HBASE-14401
This commit is contained in:
parent
d81fba59cf
commit
8cdf4a8e03
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -78,8 +79,6 @@ import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
|
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.htrace.NullScope;
|
import org.apache.htrace.NullScope;
|
||||||
|
@ -277,8 +276,13 @@ public class FSHLog implements WAL {
|
||||||
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
|
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
|
||||||
private final int minTolerableReplication;
|
private final int minTolerableReplication;
|
||||||
|
|
||||||
|
// DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection.
|
||||||
|
private final Method getNumCurrentReplicas;
|
||||||
|
private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine
|
||||||
private final int slowSyncNs;
|
private final int slowSyncNs;
|
||||||
|
|
||||||
|
private final static Object [] NO_ARGS = new Object []{};
|
||||||
|
|
||||||
// If live datanode count is lower than the default replicas value,
|
// If live datanode count is lower than the default replicas value,
|
||||||
// RollWriter will be triggered in each sync(So the RollWriter will be
|
// RollWriter will be triggered in each sync(So the RollWriter will be
|
||||||
// triggered one by one in a short time). Using it as a workaround to slow
|
// triggered one by one in a short time). Using it as a workaround to slow
|
||||||
|
@ -525,6 +529,10 @@ public class FSHLog implements WAL {
|
||||||
this.slowSyncNs =
|
this.slowSyncNs =
|
||||||
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
|
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
|
||||||
DEFAULT_SLOW_SYNC_TIME_MS);
|
DEFAULT_SLOW_SYNC_TIME_MS);
|
||||||
|
// handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with
|
||||||
|
// HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection.
|
||||||
|
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
|
||||||
|
this.getPipeLine = getGetPipeline(this.hdfs_out);
|
||||||
|
|
||||||
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
|
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
|
||||||
// put on the ring buffer.
|
// put on the ring buffer.
|
||||||
|
@ -1414,6 +1422,34 @@ public class FSHLog implements WAL {
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
|
||||||
|
* This is used for getting current replicas of a file being written.
|
||||||
|
* @return Method or null.
|
||||||
|
*/
|
||||||
|
private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
|
||||||
|
// TODO: Remove all this and use the now publically available
|
||||||
|
// HdfsDataOutputStream#getCurrentBlockReplication()
|
||||||
|
Method m = null;
|
||||||
|
if (os != null) {
|
||||||
|
Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
|
||||||
|
try {
|
||||||
|
m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
|
||||||
|
m.setAccessible(true);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
|
||||||
|
"HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
|
||||||
|
"not available; fsOut=" + wrappedStreamClass.getName(), e);
|
||||||
|
m = null; // could happen on setAccessible()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (m != null) {
|
||||||
|
if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
|
||||||
|
}
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method gets the datanode replication count for the current WAL.
|
* This method gets the datanode replication count for the current WAL.
|
||||||
|
@ -1428,13 +1464,17 @@ public class FSHLog implements WAL {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
int getLogReplication() {
|
int getLogReplication()
|
||||||
try {
|
throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
|
||||||
return ((HdfsDataOutputStream)this.hdfs_out).getCurrentBlockReplication();
|
final OutputStream stream = getOutputStream();
|
||||||
} catch (IOException e) {
|
if (this.getNumCurrentReplicas != null && stream != null) {
|
||||||
return 0;
|
Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
|
||||||
|
if (repl instanceof Integer) {
|
||||||
|
return ((Integer)repl).intValue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sync() throws IOException {
|
public void sync() throws IOException {
|
||||||
|
@ -1967,16 +2007,50 @@ public class FSHLog implements WAL {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the 'getPipeline' on the passed <code>os</code> stream.
|
||||||
|
* @return Method or null.
|
||||||
|
*/
|
||||||
|
private Method getGetPipeline(final FSDataOutputStream os) {
|
||||||
|
Method m = null;
|
||||||
|
if (os != null) {
|
||||||
|
Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
|
||||||
|
.getClass();
|
||||||
|
try {
|
||||||
|
m = wrappedStreamClass.getDeclaredMethod("getPipeline",
|
||||||
|
new Class<?>[] {});
|
||||||
|
m.setAccessible(true);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
LOG.info("FileSystem's output stream doesn't support"
|
||||||
|
+ " getPipeline; not available; fsOut="
|
||||||
|
+ wrappedStreamClass.getName());
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
LOG.info(
|
||||||
|
"Doesn't have access to getPipeline on "
|
||||||
|
+ "FileSystems's output stream ; fsOut="
|
||||||
|
+ wrappedStreamClass.getName(), e);
|
||||||
|
m = null; // could happen on setAccessible()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method gets the pipeline for the current WAL.
|
* This method gets the pipeline for the current WAL.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
DatanodeInfo[] getPipeLine() {
|
DatanodeInfo[] getPipeLine() {
|
||||||
if (this.hdfs_out != null) {
|
if (this.getPipeLine != null && this.hdfs_out != null) {
|
||||||
return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline();
|
Object repl;
|
||||||
} else {
|
try {
|
||||||
|
repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
|
||||||
|
if (repl instanceof DatanodeInfo[]) {
|
||||||
|
return ((DatanodeInfo[]) repl);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Get pipeline failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
return new DatanodeInfo[0];
|
return new DatanodeInfo[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue