HBASE-11240 Print hdfs pipeline when hlog's sync is slow (Original patch + ADDENDUM)
This commit is contained in:
parent
2a20143f72
commit
a6d271201f
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.HasThread;
|
import org.apache.hadoop.hbase.util.HasThread;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.htrace.NullScope;
|
import org.htrace.NullScope;
|
||||||
import org.htrace.Span;
|
import org.htrace.Span;
|
||||||
|
@ -139,6 +140,8 @@ class FSHLog implements HLog, Syncable {
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(FSHLog.class);
|
static final Log LOG = LogFactory.getLog(FSHLog.class);
|
||||||
|
|
||||||
|
private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering.
|
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering.
|
||||||
* Appends and syncs are each put on the ring which means handlers need to
|
* Appends and syncs are each put on the ring which means handlers need to
|
||||||
|
@ -203,6 +206,8 @@ class FSHLog implements HLog, Syncable {
|
||||||
|
|
||||||
// DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection.
|
// DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection.
|
||||||
private final Method getNumCurrentReplicas;
|
private final Method getNumCurrentReplicas;
|
||||||
|
private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine
|
||||||
|
private final int slowSyncNs;
|
||||||
|
|
||||||
private final static Object [] NO_ARGS = new Object []{};
|
private final static Object [] NO_ARGS = new Object []{};
|
||||||
|
|
||||||
|
@ -466,9 +471,13 @@ class FSHLog implements HLog, Syncable {
|
||||||
// rollWriter sets this.hdfs_out if it can.
|
// rollWriter sets this.hdfs_out if it can.
|
||||||
rollWriter();
|
rollWriter();
|
||||||
|
|
||||||
|
this.slowSyncNs =
|
||||||
|
1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
|
||||||
|
DEFAULT_SLOW_SYNC_TIME_MS);
|
||||||
// handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with
|
// handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with
|
||||||
// HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection.
|
// HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection.
|
||||||
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
|
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
|
||||||
|
this.getPipeLine = getGetPipeline(this.hdfs_out);
|
||||||
|
|
||||||
this.coprocessorHost = new WALCoprocessorHost(this, conf);
|
this.coprocessorHost = new WALCoprocessorHost(this, conf);
|
||||||
this.metrics = new MetricsWAL();
|
this.metrics = new MetricsWAL();
|
||||||
|
@ -1431,6 +1440,14 @@ class FSHLog implements HLog, Syncable {
|
||||||
public void postSync(final long timeInNanos, final int handlerSyncs) {
|
public void postSync(final long timeInNanos, final int handlerSyncs) {
|
||||||
// TODO: Add metric for handler syncs done at a time.
|
// TODO: Add metric for handler syncs done at a time.
|
||||||
if (this.metrics != null) metrics.finishSync(timeInNanos/1000000);
|
if (this.metrics != null) metrics.finishSync(timeInNanos/1000000);
|
||||||
|
if (timeInNanos > this.slowSyncNs) {
|
||||||
|
String msg =
|
||||||
|
new StringBuilder().append("Slow sync cost: ")
|
||||||
|
.append(timeInNanos / 1000000).append(" ms, current pipeline: ")
|
||||||
|
.append(Arrays.toString(getPipeLine())).toString();
|
||||||
|
Trace.addTimelineAnnotation(msg);
|
||||||
|
LOG.info(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2032,4 +2049,51 @@ class FSHLog implements HLog, Syncable {
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the 'getPipeline' on the passed <code>os</code> stream.
|
||||||
|
* @return Method or null.
|
||||||
|
*/
|
||||||
|
private Method getGetPipeline(final FSDataOutputStream os) {
|
||||||
|
Method m = null;
|
||||||
|
if (os != null) {
|
||||||
|
Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
|
||||||
|
.getClass();
|
||||||
|
try {
|
||||||
|
m = wrappedStreamClass.getDeclaredMethod("getPipeline",
|
||||||
|
new Class<?>[] {});
|
||||||
|
m.setAccessible(true);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
LOG.info("FileSystem's output stream doesn't support"
|
||||||
|
+ " getPipeline; not available; fsOut="
|
||||||
|
+ wrappedStreamClass.getName());
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
LOG.info(
|
||||||
|
"Doesn't have access to getPipeline on "
|
||||||
|
+ "FileSystems's output stream ; fsOut="
|
||||||
|
+ wrappedStreamClass.getName(), e);
|
||||||
|
m = null; // could happen on setAccessible()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method gets the pipeline for the current HLog.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
DatanodeInfo[] getPipeLine() {
|
||||||
|
if (this.getPipeLine != null && this.hdfs_out != null) {
|
||||||
|
Object repl;
|
||||||
|
try {
|
||||||
|
repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
|
||||||
|
if (repl instanceof DatanodeInfo[]) {
|
||||||
|
return ((DatanodeInfo[]) repl);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Get pipeline failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new DatanodeInfo[0];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue